Graph Building¶
The Pipeline Builder transforms classified intents into executable research graphs.
Overview¶
flowchart LR
INTENT[Intent + Params] --> BUILDER[PipelineBuilder]
BUILDER --> TEMPLATE[Select Template]
TEMPLATE --> CUSTOMIZE[Customize Graph]
CUSTOMIZE --> VALIDATE[Validate]
VALIDATE --> GRAPH[GraphDef]
PipelineBuilder¶
from cbintel.chat import PipelineBuilder
builder = PipelineBuilder()
# Build from intent
graph = builder.build(intent_result)
# Build from intent name and params
graph = builder.build_from_intent(
intent="person_research",
params={
"subject_name": "John Smith",
"topic": "healthcare"
}
)
Intent to Graph Mapping¶
Each intent maps to a graph template with customizations:
INTENT_PIPELINES = {
"person_research": {
"template": "opposition_research",
"param_mapping": {
"subject_name": "subject_name",
"topic": "focus_topic"
},
"defaults": {
"include_archives": True,
"max_urls": 100
}
},
"company_research": {
"template": "company_profile",
"param_mapping": {
"company_name": "company_name",
"domain": "domain"
}
},
"compare_sources": {
"template": "source_comparison",
"param_mapping": {
"topic": "topic",
"sources": "sources"
}
}
}
Pipeline Customization¶
From Intent Parameters¶
# Intent result with parameters
intent = IntentResult(
intent="person_research",
params={
"subject_name": "Senator Jane Smith",
"topic": "climate change",
"include_video": True
}
)
# Build customized graph
graph = builder.build(intent)
# Graph has modified stages based on params
Manual Customization¶
# Build base graph
graph = builder.build(intent)
# Add custom stage
graph.add_stage({
"name": "extra_analysis",
"sequential": [
{"op": "sentiment", "input": "texts", "output": "sentiments"}
]
})
# Modify parameters
graph.set_param("max_urls", 200)
Building Process¶
1. Template Selection¶
def select_template(intent: str) -> str:
mapping = {
"person_research": "opposition_research",
"company_research": "company_profile",
# ...
}
return mapping.get(intent, "basic_research")
2. Parameter Mapping¶
def map_params(intent_params: dict, mapping: dict) -> dict:
graph_params = {}
for intent_key, graph_key in mapping.items():
if intent_key in intent_params:
graph_params[graph_key] = intent_params[intent_key]
return graph_params
3. Stage Customization¶
def customize_stages(graph: GraphDef, params: dict) -> GraphDef:
# Add optional stages based on params
if params.get("include_video"):
graph.add_stage(VIDEO_STAGE)
if params.get("include_archives"):
graph.add_stage(ARCHIVE_STAGE)
return graph
4. Validation¶
def validate_graph(graph: GraphDef) -> None:
# Check required inputs
for input in graph.inputs:
if input.required and input.name not in params:
raise ValidationError(f"Missing required input: {input.name}")
# Check type compatibility
validate_types(graph)
Interactive Building¶
Step-by-Step Construction¶
from cbintel.chat import InteractivePipelineBuilder
builder = InteractivePipelineBuilder()
# Start with intent
builder.set_intent("person_research")
# Add parameters interactively
builder.add_param("subject_name", "John Smith")
builder.add_param("topic", "healthcare")
# Get suggestions for next steps
suggestions = builder.get_suggestions()
# ["Add video research", "Include archives", "Focus on specific time range"]
# Apply suggestion
builder.apply_suggestion(0)
# Build final graph
graph = builder.build()
Suggestions System¶
def get_suggestions(current_state: BuilderState) -> list[Suggestion]:
suggestions = []
# Based on current params
if "person" in current_state.entities:
if not current_state.has_param("include_archives"):
suggestions.append(Suggestion(
label="Include historical analysis",
action=lambda: set_param("include_archives", True)
))
# Based on output types
if current_state.last_output_type == "Chunk[]":
suggestions.append(Suggestion(
label="Filter by relevance",
action=lambda: add_stage(SEMANTIC_FILTER_STAGE)
))
return suggestions
Graph Modification¶
Adding Stages¶
# Add stage at specific position
graph.add_stage(
{
"name": "new_stage",
"sequential": [...]
},
after="discover"
)
Removing Stages¶
Modifying Operations¶
# Change operation parameters
graph.set_operation_param(
stage="fetch",
operation="fetch_batch",
param="concurrency",
value=20
)
Validation¶
Type Checking¶
from cbintel.graph import validate_graph
errors = validate_graph(graph)
for error in errors:
print(f"Stage {error.stage}: {error.message}")
Completeness Check¶
def check_completeness(graph: GraphDef) -> list[str]:
warnings = []
# Check if synthesis is present
if not has_synthesis_stage(graph):
warnings.append("No synthesis stage - results may be raw data only")
# Check if entities are used
if has_entities_stage(graph) and not has_entity_usage(graph):
warnings.append("Entities extracted but not used")
return warnings
Built-in Pipelines¶
person_research Pipeline¶
stages:
- discover (search)
- fetch (fetch_batch)
- process (to_text, chunk)
- extract (entities)
- synthesize (integrate, to_report)
company_research Pipeline¶
stages:
- discover (search company + news)
- fetch (fetch_batch)
- process (to_text, chunk)
- extract (entities, executives)
- synthesize (company_profile_report)
compare_sources Pipeline¶
stages:
- fetch (parallel fetch sources)
- process (to_text)
- analyze (sentiment, entities)
- compare (compare operation)
- synthesize (comparison_report)
Error Handling¶
from cbintel.chat import BuilderError, ValidationError
try:
graph = builder.build(intent)
except ValidationError as e:
print(f"Invalid graph: {e}")
except BuilderError as e:
print(f"Build failed: {e}")
Best Practices¶
- Start with templates - Modify rather than build from scratch
- Validate early - Check before execution
- Use suggestions - Follow type-based recommendations
- Test incrementally - Validate each modification
- Document customizations - Track changes for debugging