Execution Modes¶
Stages can execute operations in different modes depending on the workflow needs.
Mode Overview¶
| Mode | Description | Use Case |
|---|---|---|
sequential |
One after another | Dependent operations |
parallel |
Concurrent execution | Independent operations |
parallel_foreach |
Parallel over collection | Batch processing |
loop |
Repeat until condition | Iterative crawling |
conditional |
Execute if condition | Optional steps |
Sequential Mode¶
Operations execute in order, each waiting for the previous to complete.
stages:
- name: process_text
sequential:
- op: to_text
input: html
output: text
- op: chunk
input: text # Uses previous output
output: chunks
- op: embed_batch
input: chunks # Uses previous output
output: vectors
Diagram¶
flowchart LR
A[to_text] --> B[chunk] --> C[embed_batch]
When to Use¶
- Operations depend on each other
- Outputs feed into inputs
- Order matters
Parallel Mode¶
Operations execute concurrently.
stages:
- name: fetch_sources
parallel:
- op: fetch
params:
url: "https://source1.com"
output: page1
- op: fetch
params:
url: "https://source2.com"
output: page2
- op: fetch
params:
url: "https://source3.com"
output: page3
Diagram¶
flowchart LR
subgraph parallel
A[fetch source1]
B[fetch source2]
C[fetch source3]
end
When to Use¶
- Operations are independent
- No data dependencies between them
- Want to maximize throughput
Parallel ForEach Mode¶
Apply operations to each item in a collection, in parallel.
stages:
- name: fetch_all
parallel_foreach:
input: urls
item_name: url
concurrency: 10
operations:
- op: fetch
params:
url: "{{ url }}"
output: page
- op: to_text
input: page
output: text
Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
input |
string | required | Collection to iterate |
item_name |
string | "item" | Variable name for each item |
concurrency |
int | 10 | Max parallel workers |
operations |
list | required | Operations per item |
Diagram¶
flowchart LR
subgraph "ForEach urls"
subgraph "url[0]"
A1[fetch] --> B1[to_text]
end
subgraph "url[1]"
A2[fetch] --> B2[to_text]
end
subgraph "url[2]"
A3[fetch] --> B3[to_text]
end
end
Output Aggregation¶
Results are collected into arrays:
# If input has 10 URLs
# Output: pages = [page0, page1, ..., page9]
# Output: texts = [text0, text1, ..., text9]
Loop Mode¶
Repeat operations until a condition is met.
stages:
- name: iterative_crawl
loop:
condition: "state.depth < 3 AND NOT is_empty(state.new_urls)"
max_iterations: 10
state_init:
depth: 0
new_urls: "{{ seed_urls }}"
operations:
- op: fetch_batch
input: state.new_urls
output: pages
- op: extract_links
input: pages
output: links
- op: filter_urls
input: links
params:
exclude: state.seen_urls
output: new_urls
state_update:
depth: "state.depth + 1"
new_urls: new_urls
seen_urls: "state.seen_urls + state.new_urls"
Parameters¶
| Parameter | Type | Description |
|---|---|---|
condition |
string | Continue while true |
max_iterations |
int | Safety limit |
state_init |
object | Initial state values |
operations |
list | Operations per iteration |
state_update |
object | State updates after each iteration |
Diagram¶
flowchart TB
START[Start] --> CHECK{condition?}
CHECK -->|Yes| EXECUTE[Execute operations]
EXECUTE --> UPDATE[Update state]
UPDATE --> CHECK
CHECK -->|No| END[End]
Loop Conditions¶
# Depth limit
condition: "state.depth < 5"
# URL availability
condition: "NOT is_empty(state.pending)"
# Coverage threshold
condition: "state.coverage < 0.8"
# Combined
condition: "state.depth < 5 AND state.urls_found < 100"
Conditional Execution¶
Execute stage only if condition is true.
stages:
- name: optional_screenshots
condition: "params.include_screenshots == true"
parallel_foreach:
input: urls
operations:
- op: screenshot
output: image
- name: deep_analysis
condition: "len(entities) > 50"
sequential:
- op: entities
input: combined_text
params:
types: [relationship]
output: relationships
Condition Expressions¶
# Boolean parameter
condition: "params.enabled"
# Comparison
condition: "len(urls) > 10"
# Combined
condition: "params.deep AND len(urls) < 100"
# String check
condition: "params.mode == 'detailed'"
Combining Modes¶
Stages can use different modes:
stages:
# Sequential discovery
- name: discover
sequential:
- op: search
output: urls
# Parallel fetch with concurrency
- name: fetch
parallel_foreach:
input: urls
concurrency: 20
operations:
- op: fetch
output: page
# Parallel processing
- name: process
parallel:
- op: embed_batch
input: pages
output: vectors
- op: entities
input: combined_text
output: entities
# Conditional deep analysis
- name: deep
condition: "params.deep_analysis"
sequential:
- op: integrate
output: synthesis
Error Handling¶
Fail Fast¶
Default: stop on first error.
stages:
- name: fetch
parallel_foreach:
input: urls
error_handling: fail_fast # Default
operations:
- op: fetch
Continue on Error¶
Skip failed items, continue with others.
stages:
- name: fetch
parallel_foreach:
input: urls
error_handling: continue
operations:
- op: fetch
Collect Errors¶
Collect errors, report at end.
Performance Tips¶
- Use parallel when operations are independent
- Limit concurrency to avoid overwhelming resources
- Batch operations prefer
embed_batchover multipleembed - Avoid deep loops - set reasonable
max_iterations - Use conditions to skip unnecessary work