Jobs System¶
The jobs system provides async job submission, queue management, and worker execution for all cbintel operations.
Overview¶
graph TB
subgraph "API Layer"
API[Jobs API :9003]
end
subgraph "Queue Layer"
REDIS[(Redis Queue)]
end
subgraph "Worker Pool"
W_CRAWL[CrawlWorker]
W_GRAPH[GraphWorker]
W_BROWSER[BrowserWorker]
W_LAZARUS[LazarusWorker]
W_VECTL[VectlWorker]
W_TRANSCRIPT[TranscriptWorker]
W_SCREENSHOT[ScreenshotWorker]
end
subgraph "Storage"
FILES[files.nominate.ai]
end
API -->|Submit| REDIS
REDIS -->|Claim| W_CRAWL & W_GRAPH & W_BROWSER & W_LAZARUS & W_VECTL & W_TRANSCRIPT & W_SCREENSHOT
W_CRAWL & W_GRAPH & W_BROWSER & W_LAZARUS & W_VECTL & W_TRANSCRIPT & W_SCREENSHOT -->|Results| FILES
Quick Reference¶
| Document | Description |
|---|---|
| Job Types | All 7 job types with schemas |
| Job Lifecycle | Status flow and transitions |
| Workers | Worker architecture and implementation |
| Output Shapes | Result structures by job type |
Job Types¶
| Type | Worker | Purpose |
|---|---|---|
crawl |
CrawlWorker | AI-powered web crawling |
lazarus |
LazarusWorker | Historical archive retrieval |
vectl |
VectlWorker | Vector embedding/search |
screenshot |
ScreenshotWorker | Browser capture |
transcript |
TranscriptWorker | YouTube processing |
browser |
BrowserWorker | Ferret automation |
graph |
GraphWorker | Research graph execution |
Job Lifecycle¶
stateDiagram-v2
[*] --> PENDING: Submit Job
PENDING --> QUEUED: Enqueued
QUEUED --> PROCESSING: Worker Claims
PROCESSING --> COMPLETED: Success
PROCESSING --> FAILED: Error
PROCESSING --> CANCELLED: User Cancel
COMPLETED --> [*]
FAILED --> QUEUED: Retry
FAILED --> [*]: Max Retries
CANCELLED --> [*]
API Endpoints¶
| Method | Endpoint | Description |
|---|---|---|
POST |
/api/v1/jobs/{type} |
Submit job |
GET |
/api/v1/jobs/{job_id} |
Get job status |
DELETE |
/api/v1/jobs/{job_id} |
Cancel job |
GET |
/api/v1/jobs |
List jobs |
Quick Start¶
Submit a Job¶
curl -X POST https://intel.nominate.ai/api/v1/jobs/crawl \
-H "Content-Type: application/json" \
-d '{
"query": "AI regulation trends",
"geo": "us:ca",
"max_urls": 50
}'
Response:
{
"job_id": "job_abc123",
"status": "PENDING",
"job_type": "crawl",
"created_at": "2024-01-15T10:30:00Z"
}
Check Status¶
Response:
{
"job_id": "job_abc123",
"status": "COMPLETED",
"job_type": "crawl",
"progress": 100,
"result": {
"total_urls": 42,
"chunks_generated": 156,
"report_url": "https://files.nominate.ai/cbintel-jobs/..."
}
}
Cancel Job¶
Python Client¶
from cbintel.client import JobsClient
client = JobsClient("https://intel.nominate.ai")
# Submit job
job = await client.submit("crawl", {
"query": "AI regulation",
"geo": "us:ca"
})
print(f"Job ID: {job.job_id}")
# Wait for completion
result = await client.wait(job.job_id)
print(f"Status: {result.status}")
print(f"Result: {result.result}")
Configuration¶
Environment Variables¶
# API
JOBS_API_PORT=9003
JOBS_API_HOST=0.0.0.0
# Queue
REDIS_URL=redis://localhost:6379/0
# Storage
FILES_BASE_URL=https://files.nominate.ai
FILES_API_KEY=...
# Workers
WORKER_CONCURRENCY=4
WORKER_TIMEOUT=3600
Running the Service¶
# Via systemd
sudo systemctl start cbjobs
# Or manually
python -m cbintel.jobs.main
# Check status
sudo systemctl status cbjobs
Module Structure¶
src/cbintel/jobs/
├── __init__.py
├── main.py # FastAPI application
├── config.py # Settings
├── routers/
│ ├── jobs.py # Job endpoints
│ └── schedules.py # Schedule endpoints
├── services/
│ └── job_service.py # Job management
├── workers/
│ ├── base.py # BaseWorker
│ ├── crawl_worker.py
│ ├── graph_worker.py
│ ├── browser_worker.py
│ ├── lazarus_worker.py
│ ├── vectl_worker.py
│ ├── transcript_worker.py
│ └── screenshot_worker.py
├── queue.py # Redis job queue
└── models.py # Job models
Error Handling¶
from cbintel.client import JobsClient, JobError, JobTimeoutError
client = JobsClient()
try:
result = await client.wait(job_id, timeout=3600)
except JobTimeoutError:
print("Job took too long")
except JobError as e:
print(f"Job failed: {e}")
Best Practices¶
- Use appropriate job types - Match the job type to your task
- Handle failures - Jobs can fail; implement retry logic
- Monitor progress - Poll status for long-running jobs
- Clean up - Cancel abandoned jobs
- Set timeouts - Don't wait indefinitely