Skip to content

Scout + cbmesh Integration Plan

Overview

This document outlines how Scout (real-time team chat) integrates with cbmesh (API mesh) to enable system services to communicate through the chat interface.

Architecture Decision

Bridge Pattern - A bridge service translates between Scout's chat protocol and cbmesh's service protocol.

┌─────────────────────────────────────────────────────────────────┐
│                         USERS                                    │
│                    (Web Browser)                                 │
│                         │                                        │
│                         ▼                                        │
│              Scout WebSocket (/ws/scout)                         │
│                         │                                        │
│     ┌───────────────────┼───────────────────┐                   │
│     │                   │                   │                    │
│     ▼                   ▼                   ▼                    │
│  User Chat         System Rooms       Scout Bridge               │
│  (DMs, Groups)     (#logs, #alerts)        │                    │
│                                            │                     │
│                                            ▼                     │
│                                    cbmesh Gateway                │
│                                      │   │   │                   │
│                              ┌───────┘   │   └───────┐          │
│                              ▼           ▼           ▼          │
│                          CBFiles    CBIntel    List Loader      │
│                          Worker     Worker       Worker          │
└─────────────────────────────────────────────────────────────────┘

Why Bridge, Not Replace?

Scout WebSocket cbmesh WebSocket
User-to-user chat Service-to-service API
Persistent connections Request-response
Room-based broadcast Target-based routing
Real-time presence Heartbeat health

Different purposes = keep both, bridge between them.


Part 1: System Rooms

Concept

Pre-defined rooms for system messages that admins/users can subscribe to:

Room Purpose Access
#system-logs Application logs, debug info Admin
#system-alerts Critical alerts, errors Admin
#system-jobs Background job progress All
#announcements System-wide notices All

Database Changes

-- Add to scout_room table
ALTER TABLE scout_room ADD COLUMN is_system BOOLEAN DEFAULT FALSE;
ALTER TABLE scout_room ADD COLUMN system_type VARCHAR;
-- Values: 'logs', 'alerts', 'jobs', 'announcements'

CREATE INDEX idx_scout_room_system ON scout_room(is_system, system_type);

Auto-Creation

System rooms are created on-demand when first accessed:

def get_or_create_system_room(system_type: str, user_id: str) -> ScoutRoom:
    """Get or create a system room by type."""
    existing = execute_and_fetch_one(
        "SELECT * FROM scout_room WHERE is_system = TRUE AND system_type = ?",
        (system_type,)
    )
    if existing:
        # Ensure user is subscribed
        ensure_system_subscription(existing["id"], user_id, system_type)
        return existing

    # Create system room
    room_id = generate_id()
    # ... create with is_system=True, system_type=system_type

Part 2: System Message Objects

Object Schema

System messages use Scout's generic object transport:

# Log Entry
{
    "type": "log",
    "id": "log-20240112-153045-abc123",
    "title": "Import completed",
    "data": {
        "level": "info",           # debug, info, warning, error, critical
        "service": "list-loader",
        "message": "Imported 1,247 contacts",
        "details": {
            "file": "voters.csv",
            "records": 1247,
            "skipped": 3,
            "duration_ms": 4521
        },
        "timestamp": "2024-01-12T15:30:45Z",
        "trace_id": "abc123"       # For correlation
    }
}

# Alert
{
    "type": "alert",
    "id": "alert-uuid",
    "title": "Database connection lost",
    "data": {
        "severity": "critical",    # info, warning, critical
        "service": "cbapp-api",
        "message": "Connection to DuckDB failed",
        "action_required": true,
        "action_url": "/admin/health",
        "auto_resolve": false
    }
}

# Job Progress
{
    "type": "job",
    "id": "job-uuid",
    "title": "Generating embeddings",
    "data": {
        "status": "running",       # queued, running, completed, failed, cancelled
        "progress": 0.75,          # 0.0 to 1.0
        "current_step": "Processing batch 3/4",
        "started_at": "2024-01-12T15:00:00Z",
        "eta": "2024-01-12T15:35:00Z",
        "can_cancel": true
    }
}

# Service Event
{
    "type": "event",
    "id": "event-uuid",
    "title": "Voter file updated",
    "data": {
        "event_type": "data.updated",
        "service": "i360-sync",
        "summary": "12,456 records refreshed",
        "action_url": "/audience",
        "metadata": {...}
    }
}

Part 3: Addressing Scheme

Current (UUID-based)

room:a1b2c3d4-e5f6-...    # Specific room
user:a1b2c3d4-e5f6-...    # Specific user

Extended (Semantic addresses)

# Rooms
room:uuid                  # Direct room ID
room:system:logs          # System log room
room:system:alerts        # System alerts
room:dm:user1:user2       # DM between two users

# Users
user:uuid                 # Direct user ID
user:email:john@...       # By email
service:cbfiles           # Service identity

# Broadcasts
broadcast:admins          # All admin users
broadcast:room:uuid       # All members of room
broadcast:online          # All online users

Address Resolution

class AddressResolver:
    """Resolve semantic addresses to concrete IDs."""

    def resolve_room(self, address: str) -> str | None:
        if address.startswith("room:system:"):
            system_type = address.split(":")[2]
            return get_system_room_id(system_type)
        elif address.startswith("room:"):
            return address.split(":")[1]  # UUID
        return None

    def resolve_recipients(self, address: str) -> list[str]:
        if address.startswith("broadcast:admins"):
            return get_admin_user_ids()
        elif address.startswith("broadcast:room:"):
            room_id = address.split(":")[2]
            return get_room_member_ids(room_id)
        elif address.startswith("user:"):
            return [address.split(":")[1]]
        return []

Part 4: Scout Bridge Service

Purpose

Bidirectional bridge between Scout WebSocket and cbmesh gateway.

Implementation

# src/api/services/scout_bridge.py

class ScoutBridge:
    """Bridge between Scout chat and cbmesh service mesh."""

    def __init__(self, gateway_url: str):
        self.gateway_url = gateway_url
        self.mesh_ws: WebSocket | None = None
        self.scout_ws: WebSocket | None = None
        self.address_resolver = AddressResolver()

    async def connect(self):
        """Connect to both Scout and cbmesh."""
        # Connect to cbmesh as a worker
        self.mesh_ws = await websockets.connect(
            f"{self.gateway_url}/ws/worker"
        )

        # Announce ourselves
        await self.announce_to_mesh()

        # Connect to Scout as system user
        self.scout_ws = await self.connect_to_scout_as_system()

    async def announce_to_mesh(self):
        """Announce bridge capabilities to mesh."""
        card = {
            "worker_id": str(uuid4()),
            "service_name": "scout-bridge",
            "service_version": "1.0.0",
            "endpoints": [
                {"path": "/publish", "method": "POST"},
                {"path": "/broadcast", "method": "POST"},
            ]
        }
        await self.mesh_ws.send(WSMessage(
            type="announce",
            source="scout-bridge",
            payload={"card": card}
        ).model_dump_json())

    async def handle_mesh_message(self, msg: WSMessage):
        """Route mesh message to Scout."""
        if msg.type == "request":
            request = msg.payload["request"]

            if request["path"] == "/publish":
                # Publish to specific room
                await self.publish_to_scout(
                    room=request["body"]["room"],
                    message=request["body"]["message"]
                )

            elif request["path"] == "/broadcast":
                # Broadcast to multiple recipients
                await self.broadcast_to_scout(
                    address=request["body"]["address"],
                    message=request["body"]["message"]
                )

    async def publish_to_scout(self, room: str, message: dict):
        """Publish a system message to a Scout room."""
        room_id = self.address_resolver.resolve_room(room)
        if not room_id:
            return

        # Create Scout message
        scout_msg = ScoutMessageCreate(
            content=message.get("text", ""),
            content_type="object" if "object" in message else "text",
            object=message.get("object")
        )

        # Send via Scout service
        created = scout_service.create_message(
            room_id=room_id,
            sender_id=None,  # System
            data=scout_msg,
            sender_type="system"
        )

        # Broadcast to room via WebSocket
        await manager.broadcast_to_room(room_id, {
            "type": "message",
            "room_id": room_id,
            "message": created.model_dump(mode="json")
        })

Message Flow: Mesh → Scout

1. CBFiles Worker completes upload
2. Sends to mesh: POST /scout-bridge/publish
   {
     "room": "room:system:jobs",
     "message": {
       "text": "File upload complete",
       "object": {
         "type": "file",
         "id": "file-123",
         "title": "report.pdf",
         "data": {"size": 1024000, "url": "..."}
       }
     }
   }
3. Bridge receives via mesh WebSocket
4. Resolves "room:system:jobs" → actual room UUID
5. Creates Scout message with sender_type="system"
6. Broadcasts to room subscribers via Scout WebSocket
7. Users see file notification in #system-jobs

Message Flow: Scout → Mesh

1. User types: /query voters where score > 80
2. Scout detects command, routes to bridge
3. Bridge sends to mesh: target="query-engine"
   {
     "method": "POST",
     "path": "/execute",
     "body": {"query": "voters where score > 80"}
   }
4. Query engine worker processes
5. Response flows back through mesh
6. Bridge posts results to Scout room as object

Part 5: cbmesh Integration Points

Scout as Mesh Worker (Optional)

Expose Scout API through mesh for other services:

apimesh worker http://localhost:8000 \
  --gateway ws://mesh-gateway:8080/ws/worker \
  --prefix /api/scout

Other services can then:

# From any mesh-connected service
await mesh.request(
    target="cbapp",
    method="POST",
    path="/api/scout/system/logs/publish",
    body={"content": "Log message", "level": "info"}
)

Scout as Mesh Client

Scout backend can query other services:

# In scout_ai.py - when user asks a question
async def handle_scout_query(query: str, room_id: str):
    # Query the mesh for data
    response = await mesh_client.request(
        target="query-engine",
        method="POST",
        path="/natural-language",
        body={"query": query}
    )

    # Post response to Scout room
    await post_ai_response(room_id, response)

Part 6: Implementation Phases

Phase 1: System Rooms ✅

  • Add is_system, system_type columns to scout_room
  • Create get_or_create_system_room() function
  • Add API endpoint GET /api/scout/system-rooms
  • Add API endpoint POST /api/scout/system-rooms/{type}
  • Migration script: scripts/migrations/add_scout_system_rooms.py
  • Auto-subscribe admins to system rooms on login
  • Test: Create and access system rooms

Phase 2: System Message Types ✅

  • Define log/alert/job object schemas in models/scout.py
  • Add sender_type="system" handling (already exists!)
  • Add publish_system_message() service function
  • Add POST /api/scout/bridge/publish endpoint
  • Add GET /api/scout/bridge/status endpoint
  • Create object preview renderers for UI
  • Test: Post system messages via API

Phase 3: Address Resolver ✅

  • Implement AddressResolver class (services/scout_address.py)
  • Support semantic addresses (room:system:logs, broadcast:admins)
  • Add address validation with detailed error messages
  • Add POST /api/scout/bridge/send - semantic room addressing
  • Add POST /api/scout/bridge/broadcast - multi-recipient broadcasts
  • Add POST /api/scout/bridge/resolve - address resolution endpoint
  • Test: Resolve various address formats

Phase 4: Bridge Service ✅

  • Create src/api/services/scout_bridge.py (WebSocket bridge)
  • Implement mesh connection with auto-reconnect
  • Implement message routing (mesh → scout)
  • Add POST /api/scout/bridge/start - start bridge
  • Add POST /api/scout/bridge/stop - stop bridge
  • Add GET /api/scout/bridge/connection - connection status
  • Test: Send message from external service to Scout

Phase 5: Outbound Integration ✅

  • Define Scout events to publish to mesh (scout_events.py)
  • Implement EventPublisher with local and mesh publishing
  • Implement MeshClient for outbound service queries
  • Add POST /api/scout/mesh/query - query mesh services
  • Add POST /api/scout/mesh/request - raw mesh requests
  • Add POST /api/scout/events/enable-mesh - toggle mesh publishing
  • Add GET /api/scout/events/status - publisher status
  • Test: End-to-end mesh communication

Part 7: API Reference

New Endpoints

# System Rooms
GET  /api/scout/system-rooms              # List available system rooms
POST /api/scout/system-rooms/{type}       # Get/create and subscribe to system room

# Bridge - Message Publishing
POST /api/scout/bridge/publish            # Publish system message (typed)
POST /api/scout/bridge/send               # Send via semantic address
POST /api/scout/bridge/broadcast          # Broadcast to recipients
POST /api/scout/bridge/resolve            # Resolve semantic address

# Bridge - Connection Management
POST /api/scout/bridge/start              # Start mesh WebSocket bridge
POST /api/scout/bridge/stop               # Stop mesh WebSocket bridge
GET  /api/scout/bridge/status             # Bridge info and capabilities
GET  /api/scout/bridge/connection         # Connection status

# Mesh - Outbound Queries
POST /api/scout/mesh/query                # Query mesh service (NL)
POST /api/scout/mesh/request              # Raw mesh request

# Events - Publishing
POST /api/scout/events/enable-mesh        # Toggle mesh event publishing
GET  /api/scout/events/status             # Event publisher status

WebSocket Message Types (Extended)

# Existing
"connected", "message", "presence", "typing", "read_receipt", "error"

# New
"system"        # System notification (high priority)
"job_update"    # Background job progress
"alert"         # Critical alert (may trigger notification)

Part 8: Security

Service Authentication

Services publishing to Scout must authenticate:

# Option A: API Key header
headers = {"X-Service-Key": "scout-bridge-key-..."}

# Option B: Mesh identity (worker announces with signed token)
# Option C: mTLS between services

Rate Limiting

# Per-service limits
RATE_LIMITS = {
    "cbfiles": {"messages_per_minute": 60},
    "list-loader": {"messages_per_minute": 120},
    "default": {"messages_per_minute": 30}
}

Audit Trail

All system messages logged:

{
    "timestamp": "...",
    "source_service": "cbfiles",
    "target_room": "room:system:jobs",
    "message_type": "job",
    "message_id": "..."
}


Design Decisions

1. Log Verbosity

Configurable via .env, default INFO. Uses standard Python logging levels:

Level Numeric Shows in Scout
DEBUG 10 Only if explicitly set
INFO 20 Default threshold
WARNING 30 Always shown
ERROR 40 Always shown
CRITICAL 50 Always shown
# .env
SCOUT_LOG_LEVEL=INFO  # DEBUG, INFO, WARNING, ERROR, CRITICAL

2. Alert Indicators

  • Bell icon (🔔) for alerts in UI
  • Unread count badge on bell
  • Badge displays count up to "9+" (caps at 9)
  • No action buttons (Acknowledge/Snooze) for now

3. Job Control

Users can cancel/stop long-running jobs from Scout: - Jobs publish to #system-jobs room - Job messages include can_cancel: true flag - Cancel button triggers mesh request to originating worker - Worker must handle cancellation gracefully

4. Message Retention

Configurable via .env, default 30 days:

# .env
SCOUT_MESSAGE_RETENTION_DAYS=30

System messages are purged after retention period via scheduled cleanup job.

5. Notification Delivery

No intrusive notifications yet. Delivery type options for future use:

class NotificationDelivery(str, Enum):
    """Notification delivery methods (future use)."""
    IN_APP = "in_app"       # Scout UI only (current)
    EMAIL = "email"         # Email notification (future)
    SMS = "sms"             # SMS/P2P notification (future)
    PUSH = "push"           # Browser push notification (future)

Currently only IN_APP is active. Email/SMS/Push will be enabled when communication channels are integrated.