Skip to content

Job Lifecycle

Jobs progress through a defined lifecycle from submission to completion.

Status Flow

stateDiagram-v2
    [*] --> PENDING: Submit Job

    PENDING --> QUEUED: Job Enqueued
    PENDING --> FAILED: Validation Error

    QUEUED --> PROCESSING: Worker Claims

    PROCESSING --> COMPLETED: Success
    PROCESSING --> FAILED: Error
    PROCESSING --> CANCELLED: User Cancel

    COMPLETED --> [*]

    FAILED --> QUEUED: Retry (if attempts left)
    FAILED --> [*]: Max Retries Exceeded

    CANCELLED --> [*]

Job Statuses

Status Description
PENDING Job submitted, not yet queued
QUEUED Job in queue, waiting for worker
PROCESSING Worker is executing job
COMPLETED Job finished successfully
FAILED Job encountered an error
CANCELLED Job was cancelled by user

Transitions

PENDING → QUEUED

Job is validated and added to the Redis queue.

# What happens internally
async def submit_job(request):
    # 1. Validate request
    validate_request(request)

    # 2. Create job record
    job = Job(
        job_id=generate_id(),
        job_type=request.type,
        status=JobStatus.PENDING,
        params=request.params,
    )

    # 3. Enqueue
    await queue.enqueue(job)
    job.status = JobStatus.QUEUED

    return job

QUEUED → PROCESSING

Worker claims the job from the queue.

# Worker claims job
async def worker_loop():
    while True:
        job = await queue.dequeue()
        job.status = JobStatus.PROCESSING
        job.started_at = datetime.utcnow()

        try:
            result = await execute(job)
            job.status = JobStatus.COMPLETED
            job.result = result
        except Exception as e:
            job.status = JobStatus.FAILED
            job.error = str(e)

PROCESSING → COMPLETED

Job finishes successfully.

{
    "job_id": "job_abc123",
    "status": "COMPLETED",
    "progress": 100,
    "started_at": "2024-01-15T10:30:00Z",
    "completed_at": "2024-01-15T10:35:00Z",
    "duration_seconds": 300,
    "result": {...}
}

PROCESSING → FAILED

Job encounters an error.

{
    "job_id": "job_abc123",
    "status": "FAILED",
    "progress": 45,
    "started_at": "2024-01-15T10:30:00Z",
    "failed_at": "2024-01-15T10:32:00Z",
    "error": "Connection timeout to upstream server",
    "attempts": 2,
    "max_retries": 3
}

FAILED → QUEUED (Retry)

Job is requeued for retry if attempts remain.

if job.attempts < job.max_retries:
    job.attempts += 1
    job.status = JobStatus.QUEUED
    await queue.enqueue(job, delay=backoff(job.attempts))
else:
    job.status = JobStatus.FAILED  # Final failure

PROCESSING → CANCELLED

User cancels the job.

# API request
DELETE /api/v1/jobs/{job_id}

# Result
{
    "job_id": "job_abc123",
    "status": "CANCELLED",
    "cancelled_at": "2024-01-15T10:33:00Z"
}

Progress Tracking

Jobs report progress during execution:

# Worker reports progress
async def execute_crawl(job):
    urls = discover_urls(job.params.query)

    for i, url in enumerate(urls):
        await fetch_and_process(url)

        # Update progress
        progress = int((i + 1) / len(urls) * 100)
        await job.update_progress(progress)

Polling for Progress

# Poll job status
curl https://intel.nominate.ai/api/v1/jobs/job_abc123

# Response with progress
{
    "job_id": "job_abc123",
    "status": "PROCESSING",
    "progress": 45,
    "message": "Processing URL 23 of 50"
}

Timeouts

Jobs have configurable timeouts:

Setting Default Description
WORKER_TIMEOUT 3600s Max execution time
QUEUE_TIMEOUT 7200s Max time in queue
# Job times out
if job.running_time > WORKER_TIMEOUT:
    job.status = JobStatus.FAILED
    job.error = "Job execution timed out"

Retry Logic

Failed jobs are retried with exponential backoff:

def backoff(attempt: int) -> int:
    """Calculate retry delay in seconds."""
    base = 30  # 30 seconds
    max_delay = 3600  # 1 hour
    delay = min(base * (2 ** attempt), max_delay)
    return delay

# Retry delays:
# Attempt 1: 30 seconds
# Attempt 2: 60 seconds
# Attempt 3: 120 seconds
# ...

Retry Configuration

{
    "max_retries": 3,           # Max retry attempts
    "retry_on": ["TIMEOUT", "NETWORK_ERROR"],  # Retryable errors
    "no_retry_on": ["VALIDATION_ERROR"]         # Non-retryable
}

Job History

Jobs maintain execution history:

{
    "job_id": "job_abc123",
    "status": "COMPLETED",
    "history": [
        {
            "status": "PENDING",
            "timestamp": "2024-01-15T10:30:00Z"
        },
        {
            "status": "QUEUED",
            "timestamp": "2024-01-15T10:30:01Z"
        },
        {
            "status": "PROCESSING",
            "timestamp": "2024-01-15T10:30:05Z",
            "worker": "worker-1"
        },
        {
            "status": "FAILED",
            "timestamp": "2024-01-15T10:31:00Z",
            "error": "Connection timeout"
        },
        {
            "status": "QUEUED",
            "timestamp": "2024-01-15T10:31:30Z",
            "attempt": 2
        },
        {
            "status": "PROCESSING",
            "timestamp": "2024-01-15T10:32:00Z",
            "worker": "worker-2"
        },
        {
            "status": "COMPLETED",
            "timestamp": "2024-01-15T10:35:00Z"
        }
    ]
}

Cleanup

Completed and failed jobs are cleaned up after retention period:

# Configuration
JOB_RETENTION_DAYS = 7  # Keep jobs for 7 days

# Cleanup runs periodically
async def cleanup_old_jobs():
    cutoff = datetime.utcnow() - timedelta(days=JOB_RETENTION_DAYS)
    await delete_jobs_older_than(cutoff)

Client Patterns

Wait for Completion

from cbintel.client import JobsClient

client = JobsClient()

# Submit and wait
job = await client.submit("crawl", params)
result = await client.wait(job.job_id, timeout=3600)

Poll with Progress

job = await client.submit("crawl", params)

while True:
    status = await client.get(job.job_id)

    print(f"Status: {status.status}, Progress: {status.progress}%")

    if status.status in ["COMPLETED", "FAILED", "CANCELLED"]:
        break

    await asyncio.sleep(5)

Handle Failures

result = await client.wait(job.job_id)

if result.status == "COMPLETED":
    print(f"Success: {result.result}")
elif result.status == "FAILED":
    print(f"Failed: {result.error}")
    print(f"Attempts: {result.attempts}/{result.max_retries}")
elif result.status == "CANCELLED":
    print("Job was cancelled")