Skip to content

HTTP/Fetch

The cbintel.net module provides HTTP operations with proxy support, search capabilities, and URL utilities.

Module Structure

src/cbintel/net/
├── __init__.py          # Public exports
├── http_client.py       # Async HTTP client
├── search_client.py     # Multi-engine search
├── url_cleaner.py       # URL normalization
└── webhook.py           # Webhook sending

HTTPClient

Async HTTP client with proxy support, retries, and timeout handling.

Basic Usage

from cbintel.net import HTTPClient

async with HTTPClient() as client:
    # Simple GET
    response = await client.get("https://example.com")
    print(response.status_code)
    print(response.text)

    # POST with JSON
    response = await client.post(
        "https://api.example.com/data",
        json={"key": "value"}
    )

    # POST with form data
    response = await client.post(
        "https://api.example.com/upload",
        data={"field": "value"},
        files={"file": open("document.pdf", "rb")}
    )

Proxy Support

async with HTTPClient() as client:
    # Direct proxy
    response = await client.get(
        "https://example.com",
        proxy="http://17.0.0.1:8894"
    )

    # SOCKS5 proxy (for Tor)
    response = await client.get(
        "https://example.com",
        proxy="socks5://127.0.0.1:9050"
    )

With GeoRouter

from cbintel.net import HTTPClient
from cbintel.geo import GeoRouter

router = GeoRouter()

async with HTTPClient() as client:
    # Get proxy for geographic region
    proxy = await router.get_proxy("us:ca")

    # Fetch with geographic routing
    response = await client.get("https://example.com", proxy=proxy)

Headers and Authentication

async with HTTPClient() as client:
    # Custom headers
    response = await client.get(
        "https://api.example.com",
        headers={"Authorization": "Bearer token123"}
    )

    # Basic auth
    response = await client.get(
        "https://api.example.com",
        auth=("username", "password")
    )

Timeouts and Retries

from cbintel.net import HTTPClient

client = HTTPClient(
    timeout=30.0,        # Request timeout in seconds
    max_retries=3,       # Retry failed requests
    retry_delay=1.0,     # Delay between retries
)

Response Object

response = await client.get("https://example.com")

# Status
print(response.status_code)  # 200
print(response.ok)           # True

# Content
print(response.text)         # String content
print(response.content)      # Bytes content
print(response.json())       # Parsed JSON

# Headers
print(response.headers)      # Response headers

# URL (after redirects)
print(response.url)          # Final URL

SearchClient

Multi-engine web search with result aggregation.

Basic Usage

from cbintel.net import SearchClient

client = SearchClient(provider="duckduckgo")

# Simple search
results = await client.search("AI regulation", max_results=20)

for result in results:
    print(f"{result.title}")
    print(f"  URL: {result.url}")
    print(f"  Snippet: {result.snippet}")

Search Providers

Provider Description
duckduckgo DuckDuckGo search (default)
google Google search
bing Bing search
brave Brave search
# Multiple providers
results = await client.search(
    "AI regulation",
    providers=["duckduckgo", "brave"],
    max_results=50
)

# With date filtering
results = await client.search(
    "AI regulation news",
    time_range="week"  # day, week, month, year
)

# Site-specific
results = await client.search("AI regulation site:gov")

URLCleaner

URL normalization and cleaning utilities.

Basic Usage

from cbintel.net import URLCleaner

cleaner = URLCleaner()

# Remove tracking parameters
clean_url = cleaner.clean(
    "https://example.com/page?utm_source=twitter&id=123"
)
# Result: https://example.com/page?id=123

# Extract domain
domain = cleaner.extract_domain("https://sub.example.com/path")
# Result: example.com

# Normalize URL
normalized = cleaner.normalize("HTTPS://Example.COM/PATH/")
# Result: https://example.com/path

URL Validation

# Check if valid URL
is_valid = cleaner.is_valid("https://example.com")  # True
is_valid = cleaner.is_valid("not-a-url")            # False

# Check if same domain
same = cleaner.same_domain(
    "https://example.com/page1",
    "https://example.com/page2"
)  # True

Tracking Parameter Removal

# Default removes common tracking params
clean = cleaner.clean(url)

# Custom tracking params
cleaner = URLCleaner(
    tracking_params=["utm_source", "utm_medium", "fbclid", "ref"]
)

Webhook Sending

Send webhooks to Slack, Discord, or custom endpoints.

Basic Webhook

from cbintel.net import webhook_send

result = await webhook_send(
    url="https://hooks.slack.com/...",
    payload={"text": "Hello from cbintel!"}
)

Slack Helper

from cbintel.net import send_slack

await send_slack(
    webhook_url="https://hooks.slack.com/...",
    message="Research complete",
    attachments=[{
        "title": "Results",
        "text": "Found 42 relevant documents"
    }]
)

Discord Helper

from cbintel.net import send_discord

await send_discord(
    webhook_url="https://discord.com/api/webhooks/...",
    content="Research complete!",
    embeds=[{
        "title": "Results",
        "description": "Found 42 relevant documents",
        "color": 0x00ff00
    }]
)

Graph Operations

HTTP operations in graphs:

stages:
  - name: discover
    sequential:
      - op: search
        params:
          query: "AI regulation"
          max_results: 50
        output: urls

  - name: fetch
    parallel:
      - op: fetch_batch
        input: urls
        params:
          geo: "us:ca"
          timeout: 30
        output: pages

fetch Operation

- op: fetch
  params:
    url: "https://example.com"
    geo: "us:ca"              # Geographic routing
    timeout: 30               # Timeout in seconds
    headers:
      User-Agent: "Mozilla/5.0"
  output: content

fetch_batch Operation

- op: fetch_batch
  input: urls                 # List of URLs
  params:
    geo: "de"
    concurrency: 10           # Parallel requests
  output: pages

Error Handling

from cbintel.net import HTTPClient, NetworkError, TimeoutError

async with HTTPClient() as client:
    try:
        response = await client.get(url)
    except TimeoutError:
        print("Request timed out")
    except NetworkError as e:
        print(f"Network error: {e}")

Configuration

Environment Variables

# Default timeout
HTTP_TIMEOUT=30.0

# Max retries
HTTP_MAX_RETRIES=3

# User agent
HTTP_USER_AGENT="cbintel/1.0"

# Proxy (default for all requests)
HTTP_PROXY=http://proxy:8080

Client Configuration

client = HTTPClient(
    timeout=30.0,
    max_retries=3,
    retry_delay=1.0,
    user_agent="cbintel/1.0",
    verify_ssl=True,
    follow_redirects=True,
    max_redirects=10,
)

Best Practices

Concurrency Control

import asyncio

async def fetch_many(urls, concurrency=10):
    semaphore = asyncio.Semaphore(concurrency)

    async def fetch_one(url):
        async with semaphore:
            async with HTTPClient() as client:
                return await client.get(url)

    return await asyncio.gather(*[fetch_one(url) for url in urls])

Rate Limiting

import asyncio

async def fetch_with_rate_limit(urls, requests_per_second=2):
    delay = 1.0 / requests_per_second
    results = []

    async with HTTPClient() as client:
        for url in urls:
            result = await client.get(url)
            results.append(result)
            await asyncio.sleep(delay)

    return results

Error Recovery

async def fetch_with_fallback(url, proxies):
    async with HTTPClient() as client:
        for proxy in proxies:
            try:
                return await client.get(url, proxy=proxy)
            except NetworkError:
                continue
        raise Exception("All proxies failed")