Skip to content

Workers

Workers are background processes that execute jobs from the queue.

Architecture

graph TB
    subgraph "Queue"
        REDIS[(Redis)]
    end

    subgraph "Worker Pool"
        WP[Worker Process]

        subgraph "Workers"
            W1[CrawlWorker]
            W2[GraphWorker]
            W3[BrowserWorker]
            W4[LazarusWorker]
            W5[VectlWorker]
            W6[TranscriptWorker]
            W7[ScreenshotWorker]
        end
    end

    subgraph "Services"
        AI[AI Clients]
        HTTP[HTTP Client]
        BROWSER[Browser Pool]
        STORAGE[File Storage]
    end

    REDIS -->|Dequeue| WP
    WP -->|Route| W1 & W2 & W3 & W4 & W5 & W6 & W7
    W1 & W2 & W3 & W4 --> AI & HTTP
    W3 --> BROWSER
    W1 & W2 & W4 & W5 & W7 --> STORAGE

BaseWorker

All workers extend BaseWorker:

from cbintel.jobs.workers.base import BaseWorker

class CrawlWorker(BaseWorker):
    job_type = "crawl"

    async def execute(self, job: Job) -> dict:
        """Execute the job and return result."""
        pipeline = CrawlPipeline(job.params)
        result = await pipeline.run()
        return result.to_dict()

    async def validate(self, params: dict) -> None:
        """Validate job parameters."""
        if "query" not in params:
            raise ValidationError("query is required")

BaseWorker Methods

Method Description
execute(job) Main execution logic
validate(params) Parameter validation
on_progress(percent, message) Report progress
on_start(job) Called when job starts
on_complete(job, result) Called on success
on_error(job, error) Called on failure

Progress Reporting

async def execute(self, job: Job) -> dict:
    urls = await discover_urls(job.params["query"])

    for i, url in enumerate(urls):
        await self.on_progress(
            percent=int((i + 1) / len(urls) * 100),
            message=f"Processing URL {i + 1} of {len(urls)}"
        )
        await process_url(url)

    return {"total": len(urls)}

Worker Types

CrawlWorker

AI-powered web crawling.

class CrawlWorker(BaseWorker):
    job_type = "crawl"

    async def execute(self, job: Job) -> dict:
        config = CrawlConfig(
            query=job.params["query"],
            max_urls=job.params.get("max_urls", 50),
            max_depth=job.params.get("max_depth", 3),
            geo=job.params.get("geo"),
        )

        pipeline = CrawlPipeline(config)
        result = await pipeline.run()

        # Upload report
        report_url = await self.upload_result(result.report)

        return {
            "total_urls": result.total_urls,
            "synthesis": result.synthesis,
            "report_url": report_url,
        }

GraphWorker

Research graph execution.

class GraphWorker(BaseWorker):
    job_type = "graph"

    async def execute(self, job: Job) -> dict:
        # Load graph from YAML or template
        if "graph" in job.params:
            graph_def = parse_yaml(job.params["graph"])
        else:
            graph_def = load_template(job.params["template"])

        # Execute graph
        executor = GraphExecutor(graph_def)
        result = await executor.run(job.params.get("params", {}))

        return {
            "graph_name": graph_def.name,
            "stages_completed": result.stages_completed,
            "outputs": result.outputs,
        }

BrowserWorker

Ferret browser automation.

class BrowserWorker(BaseWorker):
    job_type = "browser"

    async def execute(self, job: Job) -> dict:
        async with SWRMClient() as client:
            await client.navigate(job.params["url"])

            for action in job.params["actions"]:
                await execute_action(client, action)

            return {
                "success": True,
                "final_url": await client.get_current_url(),
            }

LazarusWorker

Historical archive retrieval.

class LazarusWorker(BaseWorker):
    job_type = "lazarus"

    async def execute(self, job: Job) -> dict:
        client = ArchiveClient()

        if "domain" in job.params:
            result = await client.process_domain(
                job.params["domain"],
                from_date=job.params.get("from_date"),
                to_date=job.params.get("to_date"),
            )
        else:
            result = await client.get_snapshots(
                job.params["url"],
                from_date=job.params.get("from_date"),
            )

        return {
            "snapshots": len(result.snapshots),
            "output_url": await self.upload_result(result),
        }

VectlWorker

Vector operations.

class VectlWorker(BaseWorker):
    job_type = "vectl"

    async def execute(self, job: Job) -> dict:
        if job.params["operation"] == "embed":
            service = EmbeddingService()
            vectors = await service.embed_batch(job.params["texts"])

            if job.params.get("store"):
                store = VectorStore(job.params["store"])
                for i, v in enumerate(vectors):
                    await store.add(f"doc_{i}", v)
                await store.save()

            return {"vectors_generated": len(vectors)}

        elif job.params["operation"] == "search":
            search = SemanticSearch(job.params["store"])
            results = await search.search(
                job.params["query"],
                top_k=job.params.get("top_k", 10),
            )
            return {"matches": [r.to_dict() for r in results]}

TranscriptWorker

YouTube transcript extraction.

class TranscriptWorker(BaseWorker):
    job_type = "transcript"

    async def execute(self, job: Job) -> dict:
        transcript = await fetch_transcript(
            job.params["video_id"],
            language=job.params.get("language", "en"),
        )

        return {
            "video_id": job.params["video_id"],
            "transcript": transcript.segments,
            "full_text": transcript.full_text,
        }

ScreenshotWorker

Browser screenshots.

class ScreenshotWorker(BaseWorker):
    job_type = "screenshot"

    async def execute(self, job: Job) -> dict:
        config = CaptureConfig(
            viewport_width=job.params.get("viewport_width", 1920),
            viewport_height=job.params.get("viewport_height", 1080),
        )

        async with ScreenshotService(config) as service:
            screenshots = []
            for url in job.params["urls"]:
                capture = await service.screenshot(
                    url,
                    full_page=job.params.get("full_page", True),
                )
                file_url = await self.upload_file(capture.data)
                screenshots.append({
                    "url": url,
                    "file_url": file_url,
                })

        return {"screenshots": screenshots}

Worker Configuration

Environment Variables

# Worker settings
WORKER_CONCURRENCY=4      # Parallel job capacity
WORKER_TIMEOUT=3600       # Max job duration (seconds)
WORKER_HEARTBEAT=30       # Heartbeat interval (seconds)

# Queue settings
REDIS_URL=redis://localhost:6379/0
QUEUE_NAME=cbintel-jobs

Running Workers

# Start worker process
python -m cbintel.jobs.worker

# With specific job types
python -m cbintel.jobs.worker --types crawl,graph

# With concurrency
python -m cbintel.jobs.worker --concurrency 8

Worker Lifecycle

sequenceDiagram
    participant Queue as Redis Queue
    participant Worker
    participant Services
    participant Storage

    Worker->>Queue: Connect
    loop Forever
        Worker->>Queue: Dequeue job
        Queue-->>Worker: Job data

        Worker->>Worker: Validate params
        Worker->>Services: Execute operations
        Services-->>Worker: Results

        Worker->>Storage: Upload artifacts
        Storage-->>Worker: File URLs

        Worker->>Queue: Mark complete
    end

Error Handling

class CrawlWorker(BaseWorker):
    async def execute(self, job: Job) -> dict:
        try:
            result = await self.do_crawl(job)
            return result
        except NetworkError as e:
            # Retryable error
            raise RetryableError(str(e))
        except ValidationError as e:
            # Non-retryable error
            raise FatalError(str(e))

Error Types

Error Type Behavior
RetryableError Job is requeued for retry
FatalError Job fails permanently
TimeoutError Job fails, may retry

File Storage

Workers upload results to file storage:

class BaseWorker:
    async def upload_file(self, data: bytes, filename: str) -> str:
        """Upload file and return URL."""
        url = await self.files_client.upload(
            bucket="cbintel-jobs",
            path=f"{self.job.job_id}/{filename}",
            data=data,
        )
        return url

    async def upload_result(self, result: dict) -> str:
        """Upload JSON result and return URL."""
        data = json.dumps(result).encode()
        return await self.upload_file(data, "result.json")

Monitoring

Health Checks

# Worker exposes health endpoint
GET /health

{
    "status": "healthy",
    "workers": 4,
    "jobs_processed": 1234,
    "uptime_seconds": 86400
}

Metrics

# Prometheus metrics
cbintel_jobs_processed_total{type="crawl"} 500
cbintel_jobs_failed_total{type="crawl"} 12
cbintel_job_duration_seconds{type="crawl"} 245.3
cbintel_queue_depth{type="crawl"} 5