Skip to content

YAML Schema

Graph definitions use a YAML schema that specifies inputs, stages, operations, and outputs.

Full Schema

# Graph metadata
name: string                    # Required: Graph name
description: string             # Optional: Description
version: string                 # Optional: Version (e.g., "1.0.0")

# Input parameters
inputs:
  - name: string               # Parameter name
    type: string               # Type (string, int, bool, etc.)
    required: bool             # Is required?
    default: any               # Default value
    description: string        # Description

# Custom type definitions
types:
  TypeName:
    base: string               # Base type
    constraints:
      - string                 # Constraint expressions

# Execution stages
stages:
  - name: string               # Stage name
    description: string        # Stage description
    condition: string          # Conditional execution

    # Execution mode (one of):
    sequential:                # Sequential operations
      - op: string
    parallel:                  # Parallel operations
      - op: string
    parallel_foreach:          # Parallel over collection
      input: string
      operations:
        - op: string
    loop:                      # Loop execution
      condition: string
      max_iterations: int
      operations:
        - op: string

# Output declarations
outputs:
  - string                     # Output names to expose

Metadata Section

name: research_pipeline
description: |
  Multi-stage research pipeline for intelligence gathering.
  Supports geographic routing and AI synthesis.
version: "1.2.0"

Inputs Section

inputs:
  - name: query
    type: string
    required: true
    description: "Search query"

  - name: max_urls
    type: int
    default: 50
    description: "Maximum URLs to process"

  - name: geo
    type: string
    default: null
    description: "Geographic routing (e.g., us:ca)"

  - name: options
    type: object
    schema:
      include_images:
        type: bool
        default: false
      min_score:
        type: float
        default: 6.0
        range: [0.0, 10.0]

Input Types

Type Python Description
string str Text value
int int Integer
float float Decimal
bool bool True/false
datetime datetime ISO 8601
url str URL string
url[] list[str] URL array
object dict Nested object

Stages Section

Sequential Mode

Operations execute one after another:

stages:
  - name: process
    sequential:
      - op: to_text
        input: html
        output: text

      - op: chunk
        input: text
        params:
          size: 500
        output: chunks

      - op: embed_batch
        input: chunks
        output: vectors

Parallel Mode

Operations execute concurrently:

stages:
  - name: multi_fetch
    parallel:
      - op: fetch
        params:
          url: "https://example.com"
        output: page1

      - op: fetch
        params:
          url: "https://example.org"
        output: page2

      - op: fetch
        params:
          url: "https://example.net"
        output: page3

Parallel ForEach Mode

Apply operation to each item in collection:

stages:
  - name: fetch_all
    parallel_foreach:
      input: urls          # Collection to iterate
      item_name: url       # Variable name for each item
      concurrency: 10      # Max parallel (optional)
      operations:
        - op: fetch
          params:
            url: "{{ url }}"
          output: page

Loop Mode

Repeat until condition:

stages:
  - name: iterative_crawl
    loop:
      condition: "state.depth < 3 AND NOT is_empty(state.new_urls)"
      max_iterations: 10
      operations:
        - op: fetch_batch
          input: state.new_urls
          output: pages

        - op: extract_links
          input: pages
          output: new_urls

Conditional Execution

stages:
  - name: optional_step
    condition: "params.include_screenshots == true"
    sequential:
      - op: screenshot
        input: urls
        output: images

Operations Section

Basic Operation

- op: search
  params:
    query: "{{ query }}"
    max_results: 50
  output: urls

With Input Reference

- op: fetch_batch
  input: urls              # Reference previous output
  params:
    geo: "{{ geo }}"
  output: pages

Multiple Inputs

- op: integrate
  input:
    chunks: filtered_chunks
    query: "{{ query }}"
  output: synthesis

Typed Operations

- op: semantic_filter
  input:
    type: chunk[]
    from: chunks
  params:
    query:
      type: string
      required: true
    threshold:
      type: float
      range: [0.0, 1.0]
      default: 0.5
  output:
    name: filtered
    type: chunk[]

Variable Substitution

Parameter References

- op: search
  params:
    query: "{{ query }}"          # From inputs
    max_results: "{{ max_urls }}"

Output References

- op: fetch_batch
  input: "{{ urls }}"             # From previous output

State References

- op: fetch_batch
  input: "{{ state.pending_urls }}"

Expressions

- op: fetch_batch
  params:
    limit: "{{ min(max_urls, 100) }}"

Outputs Section

outputs:
  - urls                          # Expose named outputs
  - synthesis
  - entities

Types Section

Define custom type aliases:

types:
  PersonEntity:
    base: entity
    constraints:
      - "type == 'person'"

  RelevantChunk:
    base: chunk
    constraints:
      - "quality_score >= 0.5"
      - "word_count >= 50"

  GovUrl:
    base: url
    constraints:
      - "domain_matches('*.gov')"

Complete Example

name: deep_research
description: Deep research pipeline with entity extraction
version: "2.0.0"

inputs:
  - name: query
    type: string
    required: true
  - name: max_urls
    type: int
    default: 100
  - name: geo
    type: string
    default: null

stages:
  - name: discover
    sequential:
      - op: search
        params:
          query: "{{ query }}"
          max_results: "{{ max_urls }}"
        output: urls

  - name: acquire
    parallel_foreach:
      input: urls
      concurrency: 10
      operations:
        - op: fetch
          params:
            geo: "{{ geo }}"
          output: page

  - name: transform
    sequential:
      - op: to_text_batch
        input: pages
        output: texts
      - op: chunk
        input: texts
        params:
          size: 500
          overlap: 50
        output: chunks

  - name: process
    parallel:
      - op: embed_batch
        input: chunks
        output: vectors

      - op: entities
        input: texts
        params:
          types: [person, organization, location]
        output: entities

  - name: filter
    sequential:
      - op: semantic_filter
        input: chunks
        params:
          query: "{{ query }}"
          threshold: 0.5
        output: relevant_chunks

  - name: synthesize
    sequential:
      - op: integrate
        input: relevant_chunks
        params:
          query: "{{ query }}"
        output: synthesis

      - op: to_report
        input:
          synthesis: synthesis
          entities: entities
          sources: urls
        params:
          template: research_report
        output: report

outputs:
  - urls
  - entities
  - synthesis
  - report

Validation

Graphs are validated at parse time:

from cbintel.graph import parse_yaml, ValidationError

try:
    graph_def = parse_yaml(yaml_content)
except ValidationError as e:
    print(f"Invalid graph: {e}")

Common Errors

Error Cause
Missing required input Required input not in params
Unknown operation Op name not registered
Invalid output reference Referencing non-existent output
Type mismatch Input type doesn't match expected
Cycle detected Circular dependency