Skip to content

Graph Building

The Pipeline Builder transforms classified intents into executable research graphs.

Overview

flowchart LR
    INTENT[Intent + Params] --> BUILDER[PipelineBuilder]
    BUILDER --> TEMPLATE[Select Template]
    TEMPLATE --> CUSTOMIZE[Customize Graph]
    CUSTOMIZE --> VALIDATE[Validate]
    VALIDATE --> GRAPH[GraphDef]

PipelineBuilder

from cbintel.chat import PipelineBuilder

builder = PipelineBuilder()

# Build from intent
graph = builder.build(intent_result)

# Build from intent name and params
graph = builder.build_from_intent(
    intent="person_research",
    params={
        "subject_name": "John Smith",
        "topic": "healthcare"
    }
)

Intent to Graph Mapping

Each intent maps to a graph template with customizations:

INTENT_PIPELINES = {
    "person_research": {
        "template": "opposition_research",
        "param_mapping": {
            "subject_name": "subject_name",
            "topic": "focus_topic"
        },
        "defaults": {
            "include_archives": True,
            "max_urls": 100
        }
    },
    "company_research": {
        "template": "company_profile",
        "param_mapping": {
            "company_name": "company_name",
            "domain": "domain"
        }
    },
    "compare_sources": {
        "template": "source_comparison",
        "param_mapping": {
            "topic": "topic",
            "sources": "sources"
        }
    }
}

Pipeline Customization

From Intent Parameters

# Intent result with parameters
intent = IntentResult(
    intent="person_research",
    params={
        "subject_name": "Senator Jane Smith",
        "topic": "climate change",
        "include_video": True
    }
)

# Build customized graph
graph = builder.build(intent)

# Graph has modified stages based on params

Manual Customization

# Build base graph
graph = builder.build(intent)

# Add custom stage
graph.add_stage({
    "name": "extra_analysis",
    "sequential": [
        {"op": "sentiment", "input": "texts", "output": "sentiments"}
    ]
})

# Modify parameters
graph.set_param("max_urls", 200)

Building Process

1. Template Selection

def select_template(intent: str) -> str:
    mapping = {
        "person_research": "opposition_research",
        "company_research": "company_profile",
        # ...
    }
    return mapping.get(intent, "basic_research")

2. Parameter Mapping

def map_params(intent_params: dict, mapping: dict) -> dict:
    graph_params = {}
    for intent_key, graph_key in mapping.items():
        if intent_key in intent_params:
            graph_params[graph_key] = intent_params[intent_key]
    return graph_params

3. Stage Customization

def customize_stages(graph: GraphDef, params: dict) -> GraphDef:
    # Add optional stages based on params
    if params.get("include_video"):
        graph.add_stage(VIDEO_STAGE)

    if params.get("include_archives"):
        graph.add_stage(ARCHIVE_STAGE)

    return graph

4. Validation

def validate_graph(graph: GraphDef) -> None:
    # Check required inputs
    for input in graph.inputs:
        if input.required and input.name not in params:
            raise ValidationError(f"Missing required input: {input.name}")

    # Check type compatibility
    validate_types(graph)

Interactive Building

Step-by-Step Construction

from cbintel.chat import InteractivePipelineBuilder

builder = InteractivePipelineBuilder()

# Start with intent
builder.set_intent("person_research")

# Add parameters interactively
builder.add_param("subject_name", "John Smith")
builder.add_param("topic", "healthcare")

# Get suggestions for next steps
suggestions = builder.get_suggestions()
# ["Add video research", "Include archives", "Focus on specific time range"]

# Apply suggestion
builder.apply_suggestion(0)

# Build final graph
graph = builder.build()

Suggestions System

def get_suggestions(current_state: BuilderState) -> list[Suggestion]:
    suggestions = []

    # Based on current params
    if "person" in current_state.entities:
        if not current_state.has_param("include_archives"):
            suggestions.append(Suggestion(
                label="Include historical analysis",
                action=lambda: set_param("include_archives", True)
            ))

    # Based on output types
    if current_state.last_output_type == "Chunk[]":
        suggestions.append(Suggestion(
            label="Filter by relevance",
            action=lambda: add_stage(SEMANTIC_FILTER_STAGE)
        ))

    return suggestions

Graph Modification

Adding Stages

# Add stage at specific position
graph.add_stage(
    {
        "name": "new_stage",
        "sequential": [...]
    },
    after="discover"
)

Removing Stages

# Remove optional stage
graph.remove_stage("screenshots")

Modifying Operations

# Change operation parameters
graph.set_operation_param(
    stage="fetch",
    operation="fetch_batch",
    param="concurrency",
    value=20
)

Validation

Type Checking

from cbintel.graph import validate_graph

errors = validate_graph(graph)
for error in errors:
    print(f"Stage {error.stage}: {error.message}")

Completeness Check

def check_completeness(graph: GraphDef) -> list[str]:
    warnings = []

    # Check if synthesis is present
    if not has_synthesis_stage(graph):
        warnings.append("No synthesis stage - results may be raw data only")

    # Check if entities are used
    if has_entities_stage(graph) and not has_entity_usage(graph):
        warnings.append("Entities extracted but not used")

    return warnings

Built-in Pipelines

person_research Pipeline

stages:
  - discover (search)
  - fetch (fetch_batch)
  - process (to_text, chunk)
  - extract (entities)
  - synthesize (integrate, to_report)

company_research Pipeline

stages:
  - discover (search company + news)
  - fetch (fetch_batch)
  - process (to_text, chunk)
  - extract (entities, executives)
  - synthesize (company_profile_report)

compare_sources Pipeline

stages:
  - fetch (parallel fetch sources)
  - process (to_text)
  - analyze (sentiment, entities)
  - compare (compare operation)
  - synthesize (comparison_report)

Error Handling

from cbintel.chat import BuilderError, ValidationError

try:
    graph = builder.build(intent)
except ValidationError as e:
    print(f"Invalid graph: {e}")
except BuilderError as e:
    print(f"Build failed: {e}")

Best Practices

  1. Start with templates - Modify rather than build from scratch
  2. Validate early - Check before execution
  3. Use suggestions - Follow type-based recommendations
  4. Test incrementally - Validate each modification
  5. Document customizations - Track changes for debugging