Workers¶
Workers are background processes that execute jobs from the queue.
Architecture¶
graph TB
subgraph "Queue"
REDIS[(Redis)]
end
subgraph "Worker Pool"
WP[Worker Process]
subgraph "Workers"
W1[CrawlWorker]
W2[GraphWorker]
W3[BrowserWorker]
W4[LazarusWorker]
W5[VectlWorker]
W6[TranscriptWorker]
W7[ScreenshotWorker]
end
end
subgraph "Services"
AI[AI Clients]
HTTP[HTTP Client]
BROWSER[Browser Pool]
STORAGE[File Storage]
end
REDIS -->|Dequeue| WP
WP -->|Route| W1 & W2 & W3 & W4 & W5 & W6 & W7
W1 & W2 & W3 & W4 --> AI & HTTP
W3 --> BROWSER
W1 & W2 & W4 & W5 & W7 --> STORAGE
BaseWorker¶
All workers extend BaseWorker:
from cbintel.jobs.workers.base import BaseWorker
class CrawlWorker(BaseWorker):
job_type = "crawl"
async def execute(self, job: Job) -> dict:
"""Execute the job and return result."""
pipeline = CrawlPipeline(job.params)
result = await pipeline.run()
return result.to_dict()
async def validate(self, params: dict) -> None:
"""Validate job parameters."""
if "query" not in params:
raise ValidationError("query is required")
BaseWorker Methods¶
| Method | Description |
|---|---|
execute(job) |
Main execution logic |
validate(params) |
Parameter validation |
on_progress(percent, message) |
Report progress |
on_start(job) |
Called when job starts |
on_complete(job, result) |
Called on success |
on_error(job, error) |
Called on failure |
Progress Reporting¶
async def execute(self, job: Job) -> dict:
urls = await discover_urls(job.params["query"])
for i, url in enumerate(urls):
await self.on_progress(
percent=int((i + 1) / len(urls) * 100),
message=f"Processing URL {i + 1} of {len(urls)}"
)
await process_url(url)
return {"total": len(urls)}
Worker Types¶
CrawlWorker¶
AI-powered web crawling.
class CrawlWorker(BaseWorker):
job_type = "crawl"
async def execute(self, job: Job) -> dict:
config = CrawlConfig(
query=job.params["query"],
max_urls=job.params.get("max_urls", 50),
max_depth=job.params.get("max_depth", 3),
geo=job.params.get("geo"),
)
pipeline = CrawlPipeline(config)
result = await pipeline.run()
# Upload report
report_url = await self.upload_result(result.report)
return {
"total_urls": result.total_urls,
"synthesis": result.synthesis,
"report_url": report_url,
}
GraphWorker¶
Research graph execution.
class GraphWorker(BaseWorker):
job_type = "graph"
async def execute(self, job: Job) -> dict:
# Load graph from YAML or template
if "graph" in job.params:
graph_def = parse_yaml(job.params["graph"])
else:
graph_def = load_template(job.params["template"])
# Execute graph
executor = GraphExecutor(graph_def)
result = await executor.run(job.params.get("params", {}))
return {
"graph_name": graph_def.name,
"stages_completed": result.stages_completed,
"outputs": result.outputs,
}
BrowserWorker¶
Ferret browser automation.
class BrowserWorker(BaseWorker):
job_type = "browser"
async def execute(self, job: Job) -> dict:
async with SWRMClient() as client:
await client.navigate(job.params["url"])
for action in job.params["actions"]:
await execute_action(client, action)
return {
"success": True,
"final_url": await client.get_current_url(),
}
LazarusWorker¶
Historical archive retrieval.
class LazarusWorker(BaseWorker):
job_type = "lazarus"
async def execute(self, job: Job) -> dict:
client = ArchiveClient()
if "domain" in job.params:
result = await client.process_domain(
job.params["domain"],
from_date=job.params.get("from_date"),
to_date=job.params.get("to_date"),
)
else:
result = await client.get_snapshots(
job.params["url"],
from_date=job.params.get("from_date"),
)
return {
"snapshots": len(result.snapshots),
"output_url": await self.upload_result(result),
}
VectlWorker¶
Vector operations.
class VectlWorker(BaseWorker):
job_type = "vectl"
async def execute(self, job: Job) -> dict:
if job.params["operation"] == "embed":
service = EmbeddingService()
vectors = await service.embed_batch(job.params["texts"])
if job.params.get("store"):
store = VectorStore(job.params["store"])
for i, v in enumerate(vectors):
await store.add(f"doc_{i}", v)
await store.save()
return {"vectors_generated": len(vectors)}
elif job.params["operation"] == "search":
search = SemanticSearch(job.params["store"])
results = await search.search(
job.params["query"],
top_k=job.params.get("top_k", 10),
)
return {"matches": [r.to_dict() for r in results]}
TranscriptWorker¶
YouTube transcript extraction.
class TranscriptWorker(BaseWorker):
job_type = "transcript"
async def execute(self, job: Job) -> dict:
transcript = await fetch_transcript(
job.params["video_id"],
language=job.params.get("language", "en"),
)
return {
"video_id": job.params["video_id"],
"transcript": transcript.segments,
"full_text": transcript.full_text,
}
ScreenshotWorker¶
Browser screenshots.
class ScreenshotWorker(BaseWorker):
job_type = "screenshot"
async def execute(self, job: Job) -> dict:
config = CaptureConfig(
viewport_width=job.params.get("viewport_width", 1920),
viewport_height=job.params.get("viewport_height", 1080),
)
async with ScreenshotService(config) as service:
screenshots = []
for url in job.params["urls"]:
capture = await service.screenshot(
url,
full_page=job.params.get("full_page", True),
)
file_url = await self.upload_file(capture.data)
screenshots.append({
"url": url,
"file_url": file_url,
})
return {"screenshots": screenshots}
Worker Configuration¶
Environment Variables¶
# Worker settings
WORKER_CONCURRENCY=4 # Parallel job capacity
WORKER_TIMEOUT=3600 # Max job duration (seconds)
WORKER_HEARTBEAT=30 # Heartbeat interval (seconds)
# Queue settings
REDIS_URL=redis://localhost:6379/0
QUEUE_NAME=cbintel-jobs
Running Workers¶
# Start worker process
python -m cbintel.jobs.worker
# With specific job types
python -m cbintel.jobs.worker --types crawl,graph
# With concurrency
python -m cbintel.jobs.worker --concurrency 8
Worker Lifecycle¶
sequenceDiagram
participant Queue as Redis Queue
participant Worker
participant Services
participant Storage
Worker->>Queue: Connect
loop Forever
Worker->>Queue: Dequeue job
Queue-->>Worker: Job data
Worker->>Worker: Validate params
Worker->>Services: Execute operations
Services-->>Worker: Results
Worker->>Storage: Upload artifacts
Storage-->>Worker: File URLs
Worker->>Queue: Mark complete
end
Error Handling¶
class CrawlWorker(BaseWorker):
async def execute(self, job: Job) -> dict:
try:
result = await self.do_crawl(job)
return result
except NetworkError as e:
# Retryable error
raise RetryableError(str(e))
except ValidationError as e:
# Non-retryable error
raise FatalError(str(e))
Error Types¶
| Error Type | Behavior |
|---|---|
RetryableError |
Job is requeued for retry |
FatalError |
Job fails permanently |
TimeoutError |
Job fails, may retry |
File Storage¶
Workers upload results to file storage:
class BaseWorker:
async def upload_file(self, data: bytes, filename: str) -> str:
"""Upload file and return URL."""
url = await self.files_client.upload(
bucket="cbintel-jobs",
path=f"{self.job.job_id}/{filename}",
data=data,
)
return url
async def upload_result(self, result: dict) -> str:
"""Upload JSON result and return URL."""
data = json.dumps(result).encode()
return await self.upload_file(data, "result.json")
Monitoring¶
Health Checks¶
# Worker exposes health endpoint
GET /health
{
"status": "healthy",
"workers": 4,
"jobs_processed": 1234,
"uptime_seconds": 86400
}