Skip to content

Runs

A run represents a single graph execution within a workspace.

Run Model

@dataclass
class Run:
    run_id: str             # Unique identifier
    workspace_id: str       # Parent workspace
    job_id: str            # Associated job
    graph_name: str        # Executed graph name
    status: str            # pending/running/completed/failed
    created_at: datetime   # Start time
    completed_at: datetime # End time (if finished)
    duration_seconds: int  # Execution duration
    params: dict           # Input parameters
    outputs: dict          # Output references
    artifacts: list        # Artifact IDs
    error: str            # Error message (if failed)

Run Manifest

Each run produces a manifest file:

{
  "run_id": "run_xyz789",
  "workspace_id": "ws_abc123",
  "job_id": "job_def456",
  "graph_name": "research_pipeline",
  "status": "completed",
  "created_at": "2024-01-15T10:30:00Z",
  "completed_at": "2024-01-15T10:35:00Z",
  "duration_seconds": 300,
  "params": {
    "query": "AI regulation",
    "max_urls": 50
  },
  "outputs": {
    "urls": 42,
    "synthesis": "AI regulation is evolving...",
    "entities_count": 127
  },
  "artifacts": [
    "art_001",
    "art_002",
    "art_003"
  ],
  "stages": [
    {
      "name": "discover",
      "status": "completed",
      "duration_seconds": 12
    },
    {
      "name": "fetch",
      "status": "completed",
      "duration_seconds": 85
    }
  ]
}

Working with Runs

List Runs

workspace = await manager.get("ws_abc123")
runs = await workspace.list_runs()

for run in runs:
    print(f"Run {run.run_id}")
    print(f"  Graph: {run.graph_name}")
    print(f"  Status: {run.status}")
    print(f"  Duration: {run.duration_seconds}s")

Filter Runs

# By status
completed = await workspace.list_runs(status="completed")

# By date range
recent = await workspace.list_runs(
    created_after=datetime.utcnow() - timedelta(days=7)
)

# By graph name
research_runs = await workspace.list_runs(
    graph_name="research_pipeline"
)

Get Run Details

run = await workspace.get_run("run_xyz789")

print(f"Graph: {run.graph_name}")
print(f"Params: {run.params}")
print(f"Outputs: {run.outputs}")
print(f"Artifacts: {len(run.artifacts)}")

Get Run Artifacts

run = await workspace.get_run("run_xyz789")
artifacts = await run.list_artifacts()

for artifact in artifacts:
    print(f"{artifact.path}: {artifact.artifact_type}")

Run Lifecycle

stateDiagram-v2
    [*] --> Pending: job submitted
    Pending --> Running: worker claims
    Running --> Completed: success
    Running --> Failed: error
    Completed --> [*]
    Failed --> [*]

Status Transitions

From To Trigger
- pending Job submitted with workspace
pending running Worker starts execution
running completed Graph finishes successfully
running failed Error during execution

Run Outputs

Accessing Outputs

run = await workspace.get_run("run_xyz789")

# Named outputs from graph
print(run.outputs["synthesis"])
print(run.outputs["entity_count"])

# Full result
result = await run.get_result()
print(result)

Output Types

Output Type Description
urls list Discovered URLs
synthesis string AI-generated summary
entities list Extracted entities
chunks list Text chunks
report_url string Generated report URL

Comparing Runs

Compare Parameters

run1 = await workspace.get_run("run_001")
run2 = await workspace.get_run("run_002")

# Compare params
diff_params = {
    k: (run1.params.get(k), run2.params.get(k))
    for k in set(run1.params) | set(run2.params)
    if run1.params.get(k) != run2.params.get(k)
}
print(f"Parameter differences: {diff_params}")

Compare Outputs

# Compare entity counts
print(f"Run 1 entities: {run1.outputs['entity_count']}")
print(f"Run 2 entities: {run2.outputs['entity_count']}")

# Compare synthesis
from cbintel.ai import diff
changes = await diff(
    run1.outputs["synthesis"],
    run2.outputs["synthesis"]
)

Replay Runs

Re-run a previous execution with same or modified params:

# Replay with same params
new_job = await workspace.replay_run(
    "run_xyz789"
)

# Replay with modified params
new_job = await workspace.replay_run(
    "run_xyz789",
    param_overrides={"max_urls": 100}
)

Run History

View History

# Get run history with stats
history = await workspace.get_run_history()

print(f"Total runs: {history.total_runs}")
print(f"Completed: {history.completed}")
print(f"Failed: {history.failed}")
print(f"Avg duration: {history.avg_duration_seconds}s")

Export History

# Export run history to JSON
history_data = await workspace.export_run_history()

with open("run_history.json", "w") as f:
    json.dump(history_data, f, indent=2)

Cleanup

Delete Run

# Delete run and its artifacts
await workspace.delete_run("run_xyz789")

Cleanup Old Runs

# Delete runs older than 30 days
cutoff = datetime.utcnow() - timedelta(days=30)
deleted = await workspace.cleanup_runs(older_than=cutoff)
print(f"Deleted {deleted} runs")

Keep Only Recent

# Keep only the last 10 runs
await workspace.cleanup_runs(keep_last=10)

Error Handling

from cbintel.workspace import RunNotFoundError, RunError

try:
    run = await workspace.get_run("run_xyz789")
except RunNotFoundError:
    print("Run not found")
except RunError as e:
    print(f"Run error: {e}")

Best Practices

  1. Descriptive params - Include context in params
  2. Check status - Handle failed runs appropriately
  3. Clean up - Delete old runs to save storage
  4. Compare runs - Track changes between executions
  5. Use replay - For iterative refinement