Skip to content

Graph Execution

Graphs are DAG-based research pipelines that orchestrate operations across stages.

Overview

flowchart LR
    subgraph "Stage 1: Discover"
        SEARCH[search<br/>query -> url[]]
    end

    subgraph "Stage 2: Acquire"
        FETCH[fetch_batch<br/>url[] -> html[]]
    end

    subgraph "Stage 3: Transform"
        TEXT[to_text_batch<br/>html[] -> text[]]
        CHUNK[chunk<br/>text -> chunk[]]
    end

    subgraph "Stage 4: Process"
        EMBED[embed_batch<br/>chunk[] -> vector[]]
        ENTITY[entities<br/>text -> entity[]]
    end

    subgraph "Stage 5: Synthesize"
        FILTER[semantic_filter<br/>chunk[] -> chunk[]]
        INTEGRATE[integrate<br/>chunk[] -> text]
    end

    SEARCH --> FETCH --> TEXT --> CHUNK
    CHUNK --> EMBED
    TEXT --> ENTITY
    CHUNK --> FILTER --> INTEGRATE

Quick Reference

Document Description
YAML Schema GraphDef structure
Operations All 31 operations
Execution Modes Sequential, parallel, foreach
Examples Complete graph examples
Templates Built-in templates

Graph Concepts

Concept Description
GraphDef YAML definition of a graph
Stage Named group of operations
Operation Single atomic action
Input Data flowing into operation
Output Named result from operation
State Accumulated values during execution

Basic Graph Structure

name: research_pipeline
description: Basic research workflow

inputs:
  - name: query
    type: string
    required: true

stages:
  - name: discover
    sequential:
      - op: search
        params:
          query: "{{ query }}"
        output: urls

  - name: acquire
    parallel:
      - op: fetch_batch
        input: urls
        output: pages

  - name: process
    sequential:
      - op: to_text_batch
        input: pages
        output: texts
      - op: chunk
        input: texts
        output: chunks

  - name: synthesize
    sequential:
      - op: integrate
        input: chunks
        params:
          query: "{{ query }}"
        output: synthesis

outputs:
  - urls
  - synthesis

Execution Flow

sequenceDiagram
    participant Client
    participant API as Jobs API
    participant Executor as GraphExecutor
    participant Ops as Operations
    participant State as ExecutionState

    Client->>API: Submit graph job
    API->>Executor: Parse & validate graph

    loop For each stage
        Executor->>Executor: Resolve inputs
        Executor->>Ops: Execute operations
        Ops-->>Executor: Results
        Executor->>State: Store outputs
    end

    Executor->>API: GraphResult
    API-->>Client: Job complete

GraphExecutor

Main entry point for graph execution.

from cbintel.graph import GraphExecutor, parse_yaml

# Parse YAML
graph_def = parse_yaml(yaml_content)

# Create executor
executor = GraphExecutor(graph_def)

# Execute with parameters
result = await executor.run({
    "query": "AI regulation trends"
})

print(f"Stages completed: {result.stages_completed}")
print(f"Outputs: {result.outputs}")

Running Graphs

Via API

curl -X POST https://intel.nominate.ai/api/v1/jobs/graph \
  -H "Content-Type: application/json" \
  -d '{
    "graph": "name: simple\nstages:\n  ...",
    "params": {"query": "AI regulation"}
  }'

Via Template

curl -X POST https://intel.nominate.ai/api/v1/jobs/graph \
  -H "Content-Type: application/json" \
  -d '{
    "template": "basic_research",
    "params": {"query": "AI regulation"}
  }'

Via Python

from cbintel.client import JobsClient

client = JobsClient()

job = await client.submit("graph", {
    "template": "deep_research",
    "params": {"query": "AI regulation", "max_urls": 100}
})

result = await client.wait(job.job_id)
print(result.result["outputs"])

Stage Execution Modes

Mode Description
sequential Operations run one after another
parallel Operations run concurrently
parallel_foreach Parallel over input collection
conditional Run based on condition
loop Repeat until condition
stages:
  # Sequential
  - name: step_by_step
    sequential:
      - op: search
      - op: fetch_batch  # Waits for search

  # Parallel
  - name: concurrent
    parallel:
      - op: fetch          # url1
      - op: fetch          # url2 (concurrent)

  # ForEach
  - name: batch
    parallel_foreach:
      input: urls
      operations:
        - op: fetch

Operations Overview

Category Operations
Discover search, archive_discover, youtube_search
Acquire fetch, fetch_batch, fetch_archive, tor_fetch, screenshot
Transform to_markdown, to_text, chunk, merge
Process embed, embed_batch, ocr
Filter semantic_filter, quality_filter, filter_urls
Extract entities, topics, summarize
Store store_vector, store_entity
Synthesize integrate, chat, compare, to_report

GraphResult

@dataclass
class GraphResult:
    graph_name: str           # Graph name
    stages_completed: int     # Stages finished
    stages_total: int        # Total stages
    duration_seconds: float  # Execution time
    outputs: dict            # Named outputs
    state: dict              # Final state
    artifacts: list          # Generated files

Error Handling

from cbintel.graph import GraphExecutor, GraphError, StageError

try:
    result = await executor.run(params)
except StageError as e:
    print(f"Stage '{e.stage_name}' failed: {e.message}")
    print(f"Partial outputs: {e.partial_outputs}")
except GraphError as e:
    print(f"Graph execution failed: {e}")

Configuration

Environment Variables

# Execution
GRAPH_TIMEOUT=3600
GRAPH_MAX_PARALLEL=10

# Operations
FETCH_TIMEOUT=30
EMBED_BATCH_SIZE=32

Best Practices

  1. Name stages clearly - Descriptive names help debugging
  2. Use appropriate modes - Parallel when possible
  3. Handle errors - Use try-catch patterns
  4. Limit parallelism - Don't overwhelm resources
  5. Test incrementally - Validate stages one at a time