Job Lifecycle¶
Jobs progress through a defined lifecycle from submission to completion.
Status Flow¶
stateDiagram-v2
[*] --> PENDING: Submit Job
PENDING --> QUEUED: Job Enqueued
PENDING --> FAILED: Validation Error
QUEUED --> PROCESSING: Worker Claims
PROCESSING --> COMPLETED: Success
PROCESSING --> FAILED: Error
PROCESSING --> CANCELLED: User Cancel
COMPLETED --> [*]
FAILED --> QUEUED: Retry (if attempts left)
FAILED --> [*]: Max Retries Exceeded
CANCELLED --> [*]
Job Statuses¶
| Status | Description |
|---|---|
PENDING |
Job submitted, not yet queued |
QUEUED |
Job in queue, waiting for worker |
PROCESSING |
Worker is executing job |
COMPLETED |
Job finished successfully |
FAILED |
Job encountered an error |
CANCELLED |
Job was cancelled by user |
Transitions¶
PENDING → QUEUED¶
Job is validated and added to the Redis queue.
# What happens internally
async def submit_job(request):
# 1. Validate request
validate_request(request)
# 2. Create job record
job = Job(
job_id=generate_id(),
job_type=request.type,
status=JobStatus.PENDING,
params=request.params,
)
# 3. Enqueue
await queue.enqueue(job)
job.status = JobStatus.QUEUED
return job
QUEUED → PROCESSING¶
Worker claims the job from the queue.
# Worker claims job
async def worker_loop():
while True:
job = await queue.dequeue()
job.status = JobStatus.PROCESSING
job.started_at = datetime.utcnow()
try:
result = await execute(job)
job.status = JobStatus.COMPLETED
job.result = result
except Exception as e:
job.status = JobStatus.FAILED
job.error = str(e)
PROCESSING → COMPLETED¶
Job finishes successfully.
{
"job_id": "job_abc123",
"status": "COMPLETED",
"progress": 100,
"started_at": "2024-01-15T10:30:00Z",
"completed_at": "2024-01-15T10:35:00Z",
"duration_seconds": 300,
"result": {...}
}
PROCESSING → FAILED¶
Job encounters an error.
{
"job_id": "job_abc123",
"status": "FAILED",
"progress": 45,
"started_at": "2024-01-15T10:30:00Z",
"failed_at": "2024-01-15T10:32:00Z",
"error": "Connection timeout to upstream server",
"attempts": 2,
"max_retries": 3
}
FAILED → QUEUED (Retry)¶
Job is requeued for retry if attempts remain.
if job.attempts < job.max_retries:
job.attempts += 1
job.status = JobStatus.QUEUED
await queue.enqueue(job, delay=backoff(job.attempts))
else:
job.status = JobStatus.FAILED # Final failure
PROCESSING → CANCELLED¶
User cancels the job.
# API request
DELETE /api/v1/jobs/{job_id}
# Result
{
"job_id": "job_abc123",
"status": "CANCELLED",
"cancelled_at": "2024-01-15T10:33:00Z"
}
Progress Tracking¶
Jobs report progress during execution:
# Worker reports progress
async def execute_crawl(job):
urls = discover_urls(job.params.query)
for i, url in enumerate(urls):
await fetch_and_process(url)
# Update progress
progress = int((i + 1) / len(urls) * 100)
await job.update_progress(progress)
Polling for Progress¶
# Poll job status
curl https://intel.nominate.ai/api/v1/jobs/job_abc123
# Response with progress
{
"job_id": "job_abc123",
"status": "PROCESSING",
"progress": 45,
"message": "Processing URL 23 of 50"
}
Timeouts¶
Jobs have configurable timeouts:
| Setting | Default | Description |
|---|---|---|
WORKER_TIMEOUT |
3600s | Max execution time |
QUEUE_TIMEOUT |
7200s | Max time in queue |
# Job times out
if job.running_time > WORKER_TIMEOUT:
job.status = JobStatus.FAILED
job.error = "Job execution timed out"
Retry Logic¶
Failed jobs are retried with exponential backoff:
def backoff(attempt: int) -> int:
"""Calculate retry delay in seconds."""
base = 30 # 30 seconds
max_delay = 3600 # 1 hour
delay = min(base * (2 ** attempt), max_delay)
return delay
# Retry delays:
# Attempt 1: 30 seconds
# Attempt 2: 60 seconds
# Attempt 3: 120 seconds
# ...
Retry Configuration¶
{
"max_retries": 3, # Max retry attempts
"retry_on": ["TIMEOUT", "NETWORK_ERROR"], # Retryable errors
"no_retry_on": ["VALIDATION_ERROR"] # Non-retryable
}
Job History¶
Jobs maintain execution history:
{
"job_id": "job_abc123",
"status": "COMPLETED",
"history": [
{
"status": "PENDING",
"timestamp": "2024-01-15T10:30:00Z"
},
{
"status": "QUEUED",
"timestamp": "2024-01-15T10:30:01Z"
},
{
"status": "PROCESSING",
"timestamp": "2024-01-15T10:30:05Z",
"worker": "worker-1"
},
{
"status": "FAILED",
"timestamp": "2024-01-15T10:31:00Z",
"error": "Connection timeout"
},
{
"status": "QUEUED",
"timestamp": "2024-01-15T10:31:30Z",
"attempt": 2
},
{
"status": "PROCESSING",
"timestamp": "2024-01-15T10:32:00Z",
"worker": "worker-2"
},
{
"status": "COMPLETED",
"timestamp": "2024-01-15T10:35:00Z"
}
]
}
Cleanup¶
Completed and failed jobs are cleaned up after retention period:
# Configuration
JOB_RETENTION_DAYS = 7 # Keep jobs for 7 days
# Cleanup runs periodically
async def cleanup_old_jobs():
cutoff = datetime.utcnow() - timedelta(days=JOB_RETENTION_DAYS)
await delete_jobs_older_than(cutoff)
Client Patterns¶
Wait for Completion¶
from cbintel.client import JobsClient
client = JobsClient()
# Submit and wait
job = await client.submit("crawl", params)
result = await client.wait(job.job_id, timeout=3600)
Poll with Progress¶
job = await client.submit("crawl", params)
while True:
status = await client.get(job.job_id)
print(f"Status: {status.status}, Progress: {status.progress}%")
if status.status in ["COMPLETED", "FAILED", "CANCELLED"]:
break
await asyncio.sleep(5)