HTTP/Fetch¶
The cbintel.net module provides HTTP operations with proxy support, search capabilities, and URL utilities.
Module Structure¶
src/cbintel/net/
├── __init__.py # Public exports
├── http_client.py # Async HTTP client
├── search_client.py # Multi-engine search
├── url_cleaner.py # URL normalization
└── webhook.py # Webhook sending
HTTPClient¶
Async HTTP client with proxy support, retries, and timeout handling.
Basic Usage¶
from cbintel.net import HTTPClient
async with HTTPClient() as client:
# Simple GET
response = await client.get("https://example.com")
print(response.status_code)
print(response.text)
# POST with JSON
response = await client.post(
"https://api.example.com/data",
json={"key": "value"}
)
# POST with form data
response = await client.post(
"https://api.example.com/upload",
data={"field": "value"},
files={"file": open("document.pdf", "rb")}
)
Proxy Support¶
async with HTTPClient() as client:
# Direct proxy
response = await client.get(
"https://example.com",
proxy="http://17.0.0.1:8894"
)
# SOCKS5 proxy (for Tor)
response = await client.get(
"https://example.com",
proxy="socks5://127.0.0.1:9050"
)
With GeoRouter¶
from cbintel.net import HTTPClient
from cbintel.geo import GeoRouter
router = GeoRouter()
async with HTTPClient() as client:
# Get proxy for geographic region
proxy = await router.get_proxy("us:ca")
# Fetch with geographic routing
response = await client.get("https://example.com", proxy=proxy)
Headers and Authentication¶
async with HTTPClient() as client:
# Custom headers
response = await client.get(
"https://api.example.com",
headers={"Authorization": "Bearer token123"}
)
# Basic auth
response = await client.get(
"https://api.example.com",
auth=("username", "password")
)
Timeouts and Retries¶
from cbintel.net import HTTPClient
client = HTTPClient(
timeout=30.0, # Request timeout in seconds
max_retries=3, # Retry failed requests
retry_delay=1.0, # Delay between retries
)
Response Object¶
response = await client.get("https://example.com")
# Status
print(response.status_code) # 200
print(response.ok) # True
# Content
print(response.text) # String content
print(response.content) # Bytes content
print(response.json()) # Parsed JSON
# Headers
print(response.headers) # Response headers
# URL (after redirects)
print(response.url) # Final URL
SearchClient¶
Multi-engine web search with result aggregation.
Basic Usage¶
from cbintel.net import SearchClient
client = SearchClient(provider="duckduckgo")
# Simple search
results = await client.search("AI regulation", max_results=20)
for result in results:
print(f"{result.title}")
print(f" URL: {result.url}")
print(f" Snippet: {result.snippet}")
Search Providers¶
| Provider | Description |
|---|---|
duckduckgo |
DuckDuckGo search (default) |
google |
Google search |
bing |
Bing search |
brave |
Brave search |
Advanced Search¶
# Multiple providers
results = await client.search(
"AI regulation",
providers=["duckduckgo", "brave"],
max_results=50
)
# With date filtering
results = await client.search(
"AI regulation news",
time_range="week" # day, week, month, year
)
# Site-specific
results = await client.search("AI regulation site:gov")
URLCleaner¶
URL normalization and cleaning utilities.
Basic Usage¶
from cbintel.net import URLCleaner
cleaner = URLCleaner()
# Remove tracking parameters
clean_url = cleaner.clean(
"https://example.com/page?utm_source=twitter&id=123"
)
# Result: https://example.com/page?id=123
# Extract domain
domain = cleaner.extract_domain("https://sub.example.com/path")
# Result: example.com
# Normalize URL
normalized = cleaner.normalize("HTTPS://Example.COM/PATH/")
# Result: https://example.com/path
URL Validation¶
# Check if valid URL
is_valid = cleaner.is_valid("https://example.com") # True
is_valid = cleaner.is_valid("not-a-url") # False
# Check if same domain
same = cleaner.same_domain(
"https://example.com/page1",
"https://example.com/page2"
) # True
Tracking Parameter Removal¶
# Default removes common tracking params
clean = cleaner.clean(url)
# Custom tracking params
cleaner = URLCleaner(
tracking_params=["utm_source", "utm_medium", "fbclid", "ref"]
)
Webhook Sending¶
Send webhooks to Slack, Discord, or custom endpoints.
Basic Webhook¶
from cbintel.net import webhook_send
result = await webhook_send(
url="https://hooks.slack.com/...",
payload={"text": "Hello from cbintel!"}
)
Slack Helper¶
from cbintel.net import send_slack
await send_slack(
webhook_url="https://hooks.slack.com/...",
message="Research complete",
attachments=[{
"title": "Results",
"text": "Found 42 relevant documents"
}]
)
Discord Helper¶
from cbintel.net import send_discord
await send_discord(
webhook_url="https://discord.com/api/webhooks/...",
content="Research complete!",
embeds=[{
"title": "Results",
"description": "Found 42 relevant documents",
"color": 0x00ff00
}]
)
Graph Operations¶
HTTP operations in graphs:
stages:
- name: discover
sequential:
- op: search
params:
query: "AI regulation"
max_results: 50
output: urls
- name: fetch
parallel:
- op: fetch_batch
input: urls
params:
geo: "us:ca"
timeout: 30
output: pages
fetch Operation¶
- op: fetch
params:
url: "https://example.com"
geo: "us:ca" # Geographic routing
timeout: 30 # Timeout in seconds
headers:
User-Agent: "Mozilla/5.0"
output: content
fetch_batch Operation¶
- op: fetch_batch
input: urls # List of URLs
params:
geo: "de"
concurrency: 10 # Parallel requests
output: pages
Error Handling¶
from cbintel.net import HTTPClient, NetworkError, TimeoutError
async with HTTPClient() as client:
try:
response = await client.get(url)
except TimeoutError:
print("Request timed out")
except NetworkError as e:
print(f"Network error: {e}")
Configuration¶
Environment Variables¶
# Default timeout
HTTP_TIMEOUT=30.0
# Max retries
HTTP_MAX_RETRIES=3
# User agent
HTTP_USER_AGENT="cbintel/1.0"
# Proxy (default for all requests)
HTTP_PROXY=http://proxy:8080
Client Configuration¶
client = HTTPClient(
timeout=30.0,
max_retries=3,
retry_delay=1.0,
user_agent="cbintel/1.0",
verify_ssl=True,
follow_redirects=True,
max_redirects=10,
)
Best Practices¶
Concurrency Control¶
import asyncio
async def fetch_many(urls, concurrency=10):
semaphore = asyncio.Semaphore(concurrency)
async def fetch_one(url):
async with semaphore:
async with HTTPClient() as client:
return await client.get(url)
return await asyncio.gather(*[fetch_one(url) for url in urls])
Rate Limiting¶
import asyncio
async def fetch_with_rate_limit(urls, requests_per_second=2):
delay = 1.0 / requests_per_second
results = []
async with HTTPClient() as client:
for url in urls:
result = await client.get(url)
results.append(result)
await asyncio.sleep(delay)
return results