Graph Execution
Graphs are DAG-based research pipelines that orchestrate operations across stages.
Overview
flowchart LR
subgraph "Stage 1: Discover"
SEARCH[search<br/>query -> url[]]
end
subgraph "Stage 2: Acquire"
FETCH[fetch_batch<br/>url[] -> html[]]
end
subgraph "Stage 3: Transform"
TEXT[to_text_batch<br/>html[] -> text[]]
CHUNK[chunk<br/>text -> chunk[]]
end
subgraph "Stage 4: Process"
EMBED[embed_batch<br/>chunk[] -> vector[]]
ENTITY[entities<br/>text -> entity[]]
end
subgraph "Stage 5: Synthesize"
FILTER[semantic_filter<br/>chunk[] -> chunk[]]
INTEGRATE[integrate<br/>chunk[] -> text]
end
SEARCH --> FETCH --> TEXT --> CHUNK
CHUNK --> EMBED
TEXT --> ENTITY
CHUNK --> FILTER --> INTEGRATE
Quick Reference
Graph Concepts
| Concept |
Description |
| GraphDef |
YAML definition of a graph |
| Stage |
Named group of operations |
| Operation |
Single atomic action |
| Input |
Data flowing into operation |
| Output |
Named result from operation |
| State |
Accumulated values during execution |
Basic Graph Structure
name: research_pipeline
description: Basic research workflow
inputs:
- name: query
type: string
required: true
stages:
- name: discover
sequential:
- op: search
params:
query: "{{ query }}"
output: urls
- name: acquire
parallel:
- op: fetch_batch
input: urls
output: pages
- name: process
sequential:
- op: to_text_batch
input: pages
output: texts
- op: chunk
input: texts
output: chunks
- name: synthesize
sequential:
- op: integrate
input: chunks
params:
query: "{{ query }}"
output: synthesis
outputs:
- urls
- synthesis
Execution Flow
sequenceDiagram
participant Client
participant API as Jobs API
participant Executor as GraphExecutor
participant Ops as Operations
participant State as ExecutionState
Client->>API: Submit graph job
API->>Executor: Parse & validate graph
loop For each stage
Executor->>Executor: Resolve inputs
Executor->>Ops: Execute operations
Ops-->>Executor: Results
Executor->>State: Store outputs
end
Executor->>API: GraphResult
API-->>Client: Job complete
GraphExecutor
Main entry point for graph execution.
from cbintel.graph import GraphExecutor, parse_yaml
# Parse YAML
graph_def = parse_yaml(yaml_content)
# Create executor
executor = GraphExecutor(graph_def)
# Execute with parameters
result = await executor.run({
"query": "AI regulation trends"
})
print(f"Stages completed: {result.stages_completed}")
print(f"Outputs: {result.outputs}")
Running Graphs
Via API
curl -X POST https://intel.nominate.ai/api/v1/jobs/graph \
-H "Content-Type: application/json" \
-d '{
"graph": "name: simple\nstages:\n ...",
"params": {"query": "AI regulation"}
}'
Via Template
curl -X POST https://intel.nominate.ai/api/v1/jobs/graph \
-H "Content-Type: application/json" \
-d '{
"template": "basic_research",
"params": {"query": "AI regulation"}
}'
Via Python
from cbintel.client import JobsClient
client = JobsClient()
job = await client.submit("graph", {
"template": "deep_research",
"params": {"query": "AI regulation", "max_urls": 100}
})
result = await client.wait(job.job_id)
print(result.result["outputs"])
Stage Execution Modes
| Mode |
Description |
sequential |
Operations run one after another |
parallel |
Operations run concurrently |
parallel_foreach |
Parallel over input collection |
conditional |
Run based on condition |
loop |
Repeat until condition |
stages:
# Sequential
- name: step_by_step
sequential:
- op: search
- op: fetch_batch # Waits for search
# Parallel
- name: concurrent
parallel:
- op: fetch # url1
- op: fetch # url2 (concurrent)
# ForEach
- name: batch
parallel_foreach:
input: urls
operations:
- op: fetch
Operations Overview
| Category |
Operations |
| Discover |
search, archive_discover, youtube_search |
| Acquire |
fetch, fetch_batch, fetch_archive, tor_fetch, screenshot |
| Transform |
to_markdown, to_text, chunk, merge |
| Process |
embed, embed_batch, ocr |
| Filter |
semantic_filter, quality_filter, filter_urls |
| Extract |
entities, topics, summarize |
| Store |
store_vector, store_entity |
| Synthesize |
integrate, chat, compare, to_report |
GraphResult
@dataclass
class GraphResult:
graph_name: str # Graph name
stages_completed: int # Stages finished
stages_total: int # Total stages
duration_seconds: float # Execution time
outputs: dict # Named outputs
state: dict # Final state
artifacts: list # Generated files
Error Handling
from cbintel.graph import GraphExecutor, GraphError, StageError
try:
result = await executor.run(params)
except StageError as e:
print(f"Stage '{e.stage_name}' failed: {e.message}")
print(f"Partial outputs: {e.partial_outputs}")
except GraphError as e:
print(f"Graph execution failed: {e}")
Configuration
Environment Variables
# Execution
GRAPH_TIMEOUT=3600
GRAPH_MAX_PARALLEL=10
# Operations
FETCH_TIMEOUT=30
EMBED_BATCH_SIZE=32
Best Practices
- Name stages clearly - Descriptive names help debugging
- Use appropriate modes - Parallel when possible
- Handle errors - Use try-catch patterns
- Limit parallelism - Don't overwhelm resources
- Test incrementally - Validate stages one at a time