Skip to content

Jobs System

The jobs system provides async job submission, queue management, and worker execution for all cbintel operations.

Overview

graph TB
    subgraph "API Layer"
        API[Jobs API :9003]
    end

    subgraph "Queue Layer"
        REDIS[(Redis Queue)]
    end

    subgraph "Worker Pool"
        W_CRAWL[CrawlWorker]
        W_GRAPH[GraphWorker]
        W_BROWSER[BrowserWorker]
        W_LAZARUS[LazarusWorker]
        W_VECTL[VectlWorker]
        W_TRANSCRIPT[TranscriptWorker]
        W_SCREENSHOT[ScreenshotWorker]
    end

    subgraph "Storage"
        FILES[files.nominate.ai]
    end

    API -->|Submit| REDIS
    REDIS -->|Claim| W_CRAWL & W_GRAPH & W_BROWSER & W_LAZARUS & W_VECTL & W_TRANSCRIPT & W_SCREENSHOT
    W_CRAWL & W_GRAPH & W_BROWSER & W_LAZARUS & W_VECTL & W_TRANSCRIPT & W_SCREENSHOT -->|Results| FILES

Quick Reference

Document Description
Job Types All 7 job types with schemas
Job Lifecycle Status flow and transitions
Workers Worker architecture and implementation
Output Shapes Result structures by job type

Job Types

Type Worker Purpose
crawl CrawlWorker AI-powered web crawling
lazarus LazarusWorker Historical archive retrieval
vectl VectlWorker Vector embedding/search
screenshot ScreenshotWorker Browser capture
transcript TranscriptWorker YouTube processing
browser BrowserWorker Ferret automation
graph GraphWorker Research graph execution

Job Lifecycle

stateDiagram-v2
    [*] --> PENDING: Submit Job
    PENDING --> QUEUED: Enqueued
    QUEUED --> PROCESSING: Worker Claims
    PROCESSING --> COMPLETED: Success
    PROCESSING --> FAILED: Error
    PROCESSING --> CANCELLED: User Cancel
    COMPLETED --> [*]
    FAILED --> QUEUED: Retry
    FAILED --> [*]: Max Retries
    CANCELLED --> [*]

API Endpoints

Method Endpoint Description
POST /api/v1/jobs/{type} Submit job
GET /api/v1/jobs/{job_id} Get job status
DELETE /api/v1/jobs/{job_id} Cancel job
GET /api/v1/jobs List jobs

Quick Start

Submit a Job

curl -X POST https://intel.nominate.ai/api/v1/jobs/crawl \
  -H "Content-Type: application/json" \
  -d '{
    "query": "AI regulation trends",
    "geo": "us:ca",
    "max_urls": 50
  }'

Response:

{
  "job_id": "job_abc123",
  "status": "PENDING",
  "job_type": "crawl",
  "created_at": "2024-01-15T10:30:00Z"
}

Check Status

curl https://intel.nominate.ai/api/v1/jobs/job_abc123

Response:

{
  "job_id": "job_abc123",
  "status": "COMPLETED",
  "job_type": "crawl",
  "progress": 100,
  "result": {
    "total_urls": 42,
    "chunks_generated": 156,
    "report_url": "https://files.nominate.ai/cbintel-jobs/..."
  }
}

Cancel Job

curl -X DELETE https://intel.nominate.ai/api/v1/jobs/job_abc123

Python Client

from cbintel.client import JobsClient

client = JobsClient("https://intel.nominate.ai")

# Submit job
job = await client.submit("crawl", {
    "query": "AI regulation",
    "geo": "us:ca"
})

print(f"Job ID: {job.job_id}")

# Wait for completion
result = await client.wait(job.job_id)
print(f"Status: {result.status}")
print(f"Result: {result.result}")

Configuration

Environment Variables

# API
JOBS_API_PORT=9003
JOBS_API_HOST=0.0.0.0

# Queue
REDIS_URL=redis://localhost:6379/0

# Storage
FILES_BASE_URL=https://files.nominate.ai
FILES_API_KEY=...

# Workers
WORKER_CONCURRENCY=4
WORKER_TIMEOUT=3600

Running the Service

# Via systemd
sudo systemctl start cbjobs

# Or manually
python -m cbintel.jobs.main

# Check status
sudo systemctl status cbjobs

Module Structure

src/cbintel/jobs/
├── __init__.py
├── main.py              # FastAPI application
├── config.py            # Settings
├── routers/
│   ├── jobs.py          # Job endpoints
│   └── schedules.py     # Schedule endpoints
├── services/
│   └── job_service.py   # Job management
├── workers/
│   ├── base.py          # BaseWorker
│   ├── crawl_worker.py
│   ├── graph_worker.py
│   ├── browser_worker.py
│   ├── lazarus_worker.py
│   ├── vectl_worker.py
│   ├── transcript_worker.py
│   └── screenshot_worker.py
├── queue.py             # Redis job queue
└── models.py            # Job models

Error Handling

from cbintel.client import JobsClient, JobError, JobTimeoutError

client = JobsClient()

try:
    result = await client.wait(job_id, timeout=3600)
except JobTimeoutError:
    print("Job took too long")
except JobError as e:
    print(f"Job failed: {e}")

Best Practices

  1. Use appropriate job types - Match the job type to your task
  2. Handle failures - Jobs can fail; implement retry logic
  3. Monitor progress - Poll status for long-running jobs
  4. Clean up - Cancel abandoned jobs
  5. Set timeouts - Don't wait indefinitely