Scout + cbmesh Integration Plan¶
Overview¶
This document outlines how Scout (real-time team chat) integrates with cbmesh (API mesh) to enable system services to communicate through the chat interface.
Architecture Decision¶
Bridge Pattern - A bridge service translates between Scout's chat protocol and cbmesh's service protocol.
┌─────────────────────────────────────────────────────────────────┐
│ USERS │
│ (Web Browser) │
│ │ │
│ ▼ │
│ Scout WebSocket (/ws/scout) │
│ │ │
│ ┌───────────────────┼───────────────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ User Chat System Rooms Scout Bridge │
│ (DMs, Groups) (#logs, #alerts) │ │
│ │ │
│ ▼ │
│ cbmesh Gateway │
│ │ │ │ │
│ ┌───────┘ │ └───────┐ │
│ ▼ ▼ ▼ │
│ CBFiles CBIntel List Loader │
│ Worker Worker Worker │
└─────────────────────────────────────────────────────────────────┘
Why Bridge, Not Replace?¶
| Scout WebSocket | cbmesh WebSocket |
|---|---|
| User-to-user chat | Service-to-service API |
| Persistent connections | Request-response |
| Room-based broadcast | Target-based routing |
| Real-time presence | Heartbeat health |
Different purposes = keep both, bridge between them.
Part 1: System Rooms¶
Concept¶
Pre-defined rooms for system messages that admins/users can subscribe to:
| Room | Purpose | Access |
|---|---|---|
#system-logs |
Application logs, debug info | Admin |
#system-alerts |
Critical alerts, errors | Admin |
#system-jobs |
Background job progress | All |
#announcements |
System-wide notices | All |
Database Changes¶
-- Add to scout_room table
ALTER TABLE scout_room ADD COLUMN is_system BOOLEAN DEFAULT FALSE;
ALTER TABLE scout_room ADD COLUMN system_type VARCHAR;
-- Values: 'logs', 'alerts', 'jobs', 'announcements'
CREATE INDEX idx_scout_room_system ON scout_room(is_system, system_type);
Auto-Creation¶
System rooms are created on-demand when first accessed:
def get_or_create_system_room(system_type: str, user_id: str) -> ScoutRoom:
"""Get or create a system room by type."""
existing = execute_and_fetch_one(
"SELECT * FROM scout_room WHERE is_system = TRUE AND system_type = ?",
(system_type,)
)
if existing:
# Ensure user is subscribed
ensure_system_subscription(existing["id"], user_id, system_type)
return existing
# Create system room
room_id = generate_id()
# ... create with is_system=True, system_type=system_type
Part 2: System Message Objects¶
Object Schema¶
System messages use Scout's generic object transport:
# Log Entry
{
"type": "log",
"id": "log-20240112-153045-abc123",
"title": "Import completed",
"data": {
"level": "info", # debug, info, warning, error, critical
"service": "list-loader",
"message": "Imported 1,247 contacts",
"details": {
"file": "voters.csv",
"records": 1247,
"skipped": 3,
"duration_ms": 4521
},
"timestamp": "2024-01-12T15:30:45Z",
"trace_id": "abc123" # For correlation
}
}
# Alert
{
"type": "alert",
"id": "alert-uuid",
"title": "Database connection lost",
"data": {
"severity": "critical", # info, warning, critical
"service": "cbapp-api",
"message": "Connection to DuckDB failed",
"action_required": true,
"action_url": "/admin/health",
"auto_resolve": false
}
}
# Job Progress
{
"type": "job",
"id": "job-uuid",
"title": "Generating embeddings",
"data": {
"status": "running", # queued, running, completed, failed, cancelled
"progress": 0.75, # 0.0 to 1.0
"current_step": "Processing batch 3/4",
"started_at": "2024-01-12T15:00:00Z",
"eta": "2024-01-12T15:35:00Z",
"can_cancel": true
}
}
# Service Event
{
"type": "event",
"id": "event-uuid",
"title": "Voter file updated",
"data": {
"event_type": "data.updated",
"service": "i360-sync",
"summary": "12,456 records refreshed",
"action_url": "/audience",
"metadata": {...}
}
}
Part 3: Addressing Scheme¶
Current (UUID-based)¶
Extended (Semantic addresses)¶
# Rooms
room:uuid # Direct room ID
room:system:logs # System log room
room:system:alerts # System alerts
room:dm:user1:user2 # DM between two users
# Users
user:uuid # Direct user ID
user:email:john@... # By email
service:cbfiles # Service identity
# Broadcasts
broadcast:admins # All admin users
broadcast:room:uuid # All members of room
broadcast:online # All online users
Address Resolution¶
class AddressResolver:
"""Resolve semantic addresses to concrete IDs."""
def resolve_room(self, address: str) -> str | None:
if address.startswith("room:system:"):
system_type = address.split(":")[2]
return get_system_room_id(system_type)
elif address.startswith("room:"):
return address.split(":")[1] # UUID
return None
def resolve_recipients(self, address: str) -> list[str]:
if address.startswith("broadcast:admins"):
return get_admin_user_ids()
elif address.startswith("broadcast:room:"):
room_id = address.split(":")[2]
return get_room_member_ids(room_id)
elif address.startswith("user:"):
return [address.split(":")[1]]
return []
Part 4: Scout Bridge Service¶
Purpose¶
Bidirectional bridge between Scout WebSocket and cbmesh gateway.
Implementation¶
# src/api/services/scout_bridge.py
class ScoutBridge:
"""Bridge between Scout chat and cbmesh service mesh."""
def __init__(self, gateway_url: str):
self.gateway_url = gateway_url
self.mesh_ws: WebSocket | None = None
self.scout_ws: WebSocket | None = None
self.address_resolver = AddressResolver()
async def connect(self):
"""Connect to both Scout and cbmesh."""
# Connect to cbmesh as a worker
self.mesh_ws = await websockets.connect(
f"{self.gateway_url}/ws/worker"
)
# Announce ourselves
await self.announce_to_mesh()
# Connect to Scout as system user
self.scout_ws = await self.connect_to_scout_as_system()
async def announce_to_mesh(self):
"""Announce bridge capabilities to mesh."""
card = {
"worker_id": str(uuid4()),
"service_name": "scout-bridge",
"service_version": "1.0.0",
"endpoints": [
{"path": "/publish", "method": "POST"},
{"path": "/broadcast", "method": "POST"},
]
}
await self.mesh_ws.send(WSMessage(
type="announce",
source="scout-bridge",
payload={"card": card}
).model_dump_json())
async def handle_mesh_message(self, msg: WSMessage):
"""Route mesh message to Scout."""
if msg.type == "request":
request = msg.payload["request"]
if request["path"] == "/publish":
# Publish to specific room
await self.publish_to_scout(
room=request["body"]["room"],
message=request["body"]["message"]
)
elif request["path"] == "/broadcast":
# Broadcast to multiple recipients
await self.broadcast_to_scout(
address=request["body"]["address"],
message=request["body"]["message"]
)
async def publish_to_scout(self, room: str, message: dict):
"""Publish a system message to a Scout room."""
room_id = self.address_resolver.resolve_room(room)
if not room_id:
return
# Create Scout message
scout_msg = ScoutMessageCreate(
content=message.get("text", ""),
content_type="object" if "object" in message else "text",
object=message.get("object")
)
# Send via Scout service
created = scout_service.create_message(
room_id=room_id,
sender_id=None, # System
data=scout_msg,
sender_type="system"
)
# Broadcast to room via WebSocket
await manager.broadcast_to_room(room_id, {
"type": "message",
"room_id": room_id,
"message": created.model_dump(mode="json")
})
Message Flow: Mesh → Scout¶
1. CBFiles Worker completes upload
2. Sends to mesh: POST /scout-bridge/publish
{
"room": "room:system:jobs",
"message": {
"text": "File upload complete",
"object": {
"type": "file",
"id": "file-123",
"title": "report.pdf",
"data": {"size": 1024000, "url": "..."}
}
}
}
3. Bridge receives via mesh WebSocket
4. Resolves "room:system:jobs" → actual room UUID
5. Creates Scout message with sender_type="system"
6. Broadcasts to room subscribers via Scout WebSocket
7. Users see file notification in #system-jobs
Message Flow: Scout → Mesh¶
1. User types: /query voters where score > 80
2. Scout detects command, routes to bridge
3. Bridge sends to mesh: target="query-engine"
{
"method": "POST",
"path": "/execute",
"body": {"query": "voters where score > 80"}
}
4. Query engine worker processes
5. Response flows back through mesh
6. Bridge posts results to Scout room as object
Part 5: cbmesh Integration Points¶
Scout as Mesh Worker (Optional)¶
Expose Scout API through mesh for other services:
apimesh worker http://localhost:8000 \
--gateway ws://mesh-gateway:8080/ws/worker \
--prefix /api/scout
Other services can then:
# From any mesh-connected service
await mesh.request(
target="cbapp",
method="POST",
path="/api/scout/system/logs/publish",
body={"content": "Log message", "level": "info"}
)
Scout as Mesh Client¶
Scout backend can query other services:
# In scout_ai.py - when user asks a question
async def handle_scout_query(query: str, room_id: str):
# Query the mesh for data
response = await mesh_client.request(
target="query-engine",
method="POST",
path="/natural-language",
body={"query": query}
)
# Post response to Scout room
await post_ai_response(room_id, response)
Part 6: Implementation Phases¶
Phase 1: System Rooms ✅¶
- Add
is_system,system_typecolumns toscout_room - Create
get_or_create_system_room()function - Add API endpoint
GET /api/scout/system-rooms - Add API endpoint
POST /api/scout/system-rooms/{type} - Migration script:
scripts/migrations/add_scout_system_rooms.py - Auto-subscribe admins to system rooms on login
- Test: Create and access system rooms
Phase 2: System Message Types ✅¶
- Define log/alert/job object schemas in
models/scout.py - Add
sender_type="system"handling (already exists!) - Add
publish_system_message()service function - Add
POST /api/scout/bridge/publishendpoint - Add
GET /api/scout/bridge/statusendpoint - Create object preview renderers for UI
- Test: Post system messages via API
Phase 3: Address Resolver ✅¶
- Implement
AddressResolverclass (services/scout_address.py) - Support semantic addresses (
room:system:logs,broadcast:admins) - Add address validation with detailed error messages
- Add
POST /api/scout/bridge/send- semantic room addressing - Add
POST /api/scout/bridge/broadcast- multi-recipient broadcasts - Add
POST /api/scout/bridge/resolve- address resolution endpoint - Test: Resolve various address formats
Phase 4: Bridge Service ✅¶
- Create
src/api/services/scout_bridge.py(WebSocket bridge) - Implement mesh connection with auto-reconnect
- Implement message routing (mesh → scout)
- Add
POST /api/scout/bridge/start- start bridge - Add
POST /api/scout/bridge/stop- stop bridge - Add
GET /api/scout/bridge/connection- connection status - Test: Send message from external service to Scout
Phase 5: Outbound Integration ✅¶
- Define Scout events to publish to mesh (
scout_events.py) - Implement EventPublisher with local and mesh publishing
- Implement MeshClient for outbound service queries
- Add
POST /api/scout/mesh/query- query mesh services - Add
POST /api/scout/mesh/request- raw mesh requests - Add
POST /api/scout/events/enable-mesh- toggle mesh publishing - Add
GET /api/scout/events/status- publisher status - Test: End-to-end mesh communication
Part 7: API Reference¶
New Endpoints¶
# System Rooms
GET /api/scout/system-rooms # List available system rooms
POST /api/scout/system-rooms/{type} # Get/create and subscribe to system room
# Bridge - Message Publishing
POST /api/scout/bridge/publish # Publish system message (typed)
POST /api/scout/bridge/send # Send via semantic address
POST /api/scout/bridge/broadcast # Broadcast to recipients
POST /api/scout/bridge/resolve # Resolve semantic address
# Bridge - Connection Management
POST /api/scout/bridge/start # Start mesh WebSocket bridge
POST /api/scout/bridge/stop # Stop mesh WebSocket bridge
GET /api/scout/bridge/status # Bridge info and capabilities
GET /api/scout/bridge/connection # Connection status
# Mesh - Outbound Queries
POST /api/scout/mesh/query # Query mesh service (NL)
POST /api/scout/mesh/request # Raw mesh request
# Events - Publishing
POST /api/scout/events/enable-mesh # Toggle mesh event publishing
GET /api/scout/events/status # Event publisher status
WebSocket Message Types (Extended)¶
# Existing
"connected", "message", "presence", "typing", "read_receipt", "error"
# New
"system" # System notification (high priority)
"job_update" # Background job progress
"alert" # Critical alert (may trigger notification)
Part 8: Security¶
Service Authentication¶
Services publishing to Scout must authenticate:
# Option A: API Key header
headers = {"X-Service-Key": "scout-bridge-key-..."}
# Option B: Mesh identity (worker announces with signed token)
# Option C: mTLS between services
Rate Limiting¶
# Per-service limits
RATE_LIMITS = {
"cbfiles": {"messages_per_minute": 60},
"list-loader": {"messages_per_minute": 120},
"default": {"messages_per_minute": 30}
}
Audit Trail¶
All system messages logged:
{
"timestamp": "...",
"source_service": "cbfiles",
"target_room": "room:system:jobs",
"message_type": "job",
"message_id": "..."
}
Design Decisions¶
1. Log Verbosity¶
Configurable via .env, default INFO. Uses standard Python logging levels:
| Level | Numeric | Shows in Scout |
|---|---|---|
DEBUG |
10 | Only if explicitly set |
INFO |
20 | Default threshold |
WARNING |
30 | Always shown |
ERROR |
40 | Always shown |
CRITICAL |
50 | Always shown |
2. Alert Indicators¶
- Bell icon (🔔) for alerts in UI
- Unread count badge on bell
- Badge displays count up to "9+" (caps at 9)
- No action buttons (Acknowledge/Snooze) for now
3. Job Control¶
Users can cancel/stop long-running jobs from Scout:
- Jobs publish to #system-jobs room
- Job messages include can_cancel: true flag
- Cancel button triggers mesh request to originating worker
- Worker must handle cancellation gracefully
4. Message Retention¶
Configurable via .env, default 30 days:
System messages are purged after retention period via scheduled cleanup job.
5. Notification Delivery¶
No intrusive notifications yet. Delivery type options for future use:
class NotificationDelivery(str, Enum):
"""Notification delivery methods (future use)."""
IN_APP = "in_app" # Scout UI only (current)
EMAIL = "email" # Email notification (future)
SMS = "sms" # SMS/P2P notification (future)
PUSH = "push" # Browser push notification (future)
Currently only IN_APP is active. Email/SMS/Push will be enabled when
communication channels are integrated.