Runs¶
A run represents a single graph execution within a workspace.
Run Model¶
@dataclass
class Run:
run_id: str # Unique identifier
workspace_id: str # Parent workspace
job_id: str # Associated job
graph_name: str # Executed graph name
status: str # pending/running/completed/failed
created_at: datetime # Start time
completed_at: datetime # End time (if finished)
duration_seconds: int # Execution duration
params: dict # Input parameters
outputs: dict # Output references
artifacts: list # Artifact IDs
error: str # Error message (if failed)
Run Manifest¶
Each run produces a manifest file:
{
"run_id": "run_xyz789",
"workspace_id": "ws_abc123",
"job_id": "job_def456",
"graph_name": "research_pipeline",
"status": "completed",
"created_at": "2024-01-15T10:30:00Z",
"completed_at": "2024-01-15T10:35:00Z",
"duration_seconds": 300,
"params": {
"query": "AI regulation",
"max_urls": 50
},
"outputs": {
"urls": 42,
"synthesis": "AI regulation is evolving...",
"entities_count": 127
},
"artifacts": [
"art_001",
"art_002",
"art_003"
],
"stages": [
{
"name": "discover",
"status": "completed",
"duration_seconds": 12
},
{
"name": "fetch",
"status": "completed",
"duration_seconds": 85
}
]
}
Working with Runs¶
List Runs¶
workspace = await manager.get("ws_abc123")
runs = await workspace.list_runs()
for run in runs:
print(f"Run {run.run_id}")
print(f" Graph: {run.graph_name}")
print(f" Status: {run.status}")
print(f" Duration: {run.duration_seconds}s")
Filter Runs¶
# By status
completed = await workspace.list_runs(status="completed")
# By date range
recent = await workspace.list_runs(
created_after=datetime.utcnow() - timedelta(days=7)
)
# By graph name
research_runs = await workspace.list_runs(
graph_name="research_pipeline"
)
Get Run Details¶
run = await workspace.get_run("run_xyz789")
print(f"Graph: {run.graph_name}")
print(f"Params: {run.params}")
print(f"Outputs: {run.outputs}")
print(f"Artifacts: {len(run.artifacts)}")
Get Run Artifacts¶
run = await workspace.get_run("run_xyz789")
artifacts = await run.list_artifacts()
for artifact in artifacts:
print(f"{artifact.path}: {artifact.artifact_type}")
Run Lifecycle¶
stateDiagram-v2
[*] --> Pending: job submitted
Pending --> Running: worker claims
Running --> Completed: success
Running --> Failed: error
Completed --> [*]
Failed --> [*]
Status Transitions¶
| From | To | Trigger |
|---|---|---|
| - | pending |
Job submitted with workspace |
pending |
running |
Worker starts execution |
running |
completed |
Graph finishes successfully |
running |
failed |
Error during execution |
Run Outputs¶
Accessing Outputs¶
run = await workspace.get_run("run_xyz789")
# Named outputs from graph
print(run.outputs["synthesis"])
print(run.outputs["entity_count"])
# Full result
result = await run.get_result()
print(result)
Output Types¶
| Output | Type | Description |
|---|---|---|
urls |
list | Discovered URLs |
synthesis |
string | AI-generated summary |
entities |
list | Extracted entities |
chunks |
list | Text chunks |
report_url |
string | Generated report URL |
Comparing Runs¶
Compare Parameters¶
run1 = await workspace.get_run("run_001")
run2 = await workspace.get_run("run_002")
# Compare params
diff_params = {
k: (run1.params.get(k), run2.params.get(k))
for k in set(run1.params) | set(run2.params)
if run1.params.get(k) != run2.params.get(k)
}
print(f"Parameter differences: {diff_params}")
Compare Outputs¶
# Compare entity counts
print(f"Run 1 entities: {run1.outputs['entity_count']}")
print(f"Run 2 entities: {run2.outputs['entity_count']}")
# Compare synthesis
from cbintel.ai import diff
changes = await diff(
run1.outputs["synthesis"],
run2.outputs["synthesis"]
)
Replay Runs¶
Re-run a previous execution with same or modified params:
# Replay with same params
new_job = await workspace.replay_run(
"run_xyz789"
)
# Replay with modified params
new_job = await workspace.replay_run(
"run_xyz789",
param_overrides={"max_urls": 100}
)
Run History¶
View History¶
# Get run history with stats
history = await workspace.get_run_history()
print(f"Total runs: {history.total_runs}")
print(f"Completed: {history.completed}")
print(f"Failed: {history.failed}")
print(f"Avg duration: {history.avg_duration_seconds}s")
Export History¶
# Export run history to JSON
history_data = await workspace.export_run_history()
with open("run_history.json", "w") as f:
json.dump(history_data, f, indent=2)
Cleanup¶
Delete Run¶
Cleanup Old Runs¶
# Delete runs older than 30 days
cutoff = datetime.utcnow() - timedelta(days=30)
deleted = await workspace.cleanup_runs(older_than=cutoff)
print(f"Deleted {deleted} runs")
Keep Only Recent¶
Error Handling¶
from cbintel.workspace import RunNotFoundError, RunError
try:
run = await workspace.get_run("run_xyz789")
except RunNotFoundError:
print("Run not found")
except RunError as e:
print(f"Run error: {e}")
Best Practices¶
- Descriptive params - Include context in params
- Check status - Handle failed runs appropriately
- Clean up - Delete old runs to save storage
- Compare runs - Track changes between executions
- Use replay - For iterative refinement