Python SDK Reference
Complete reference for the dailyai-agents Python package (v1.4.0).
Installation
Requires Python 3.9 or later.
# Install from PyPI
pip install dailyai-agents
# Install with async support
pip install dailyai-agents[async]
# Install a specific version
pip install dailyai-agents==1.4.0
Authentication
The SDK supports two authentication methods: API key and JWT token. API key is recommended for server-side applications.
from dailyai import AgentOS
# Option 1: Pass API key directly
client = AgentOS(api_key="sk-your-api-key")
# Option 2: Use environment variable (recommended)
# Set DAILYAI_API_KEY in your environment
client = AgentOS() # Reads DAILYAI_API_KEY automatically
# Option 3: Use a JWT token (from /auth/login)
client = AgentOS(token="eyJhbGciOiJIUzI1NiIs...")
Client Initialization
api_key: str | None = None,
token: str | None = None,
base_url: str = "https://api.dailyai.dev/v1",
timeout: float = 30.0,
max_retries: int = 3,
debug: bool = False,
)
DAILYAI_API_KEY environment variable./auth/login. Use instead of api_key for token-based auth.Agents
The client.agents namespace provides methods to list and run agents.
agents.list()
category: str | None = None,
status: str | None = None,
) → list[Agent]
Returns a list of all available agents, optionally filtered by category or status.
agents = client.agents.list(category="trading")
for agent in agents:
print(f"{agent.id}: {agent.name} ({agent.status})")
print(f" Strategies: {', '.join(agent.strategies)}")
agents.{agent_id}.run()
asset: str,
strategy: str,
timeframe: str = "1h",
risk_level: str = "medium",
async_run: bool = False,
**options: Any,
) → Signal | RunJob
Execute an agent and return the result. Access agents by ID as attributes on client.agents (e.g., client.agents.trading, client.agents.weather, client.agents.seo).
"AAPL") or keyword for content agents."1m", "5m", "1h", "1d". Default: "1h"."low", "medium", "high". Default: "medium".RunJob for polling. Default: False.Returns: Signal (sync) or RunJob (async)
# Synchronous run
signal = client.agents.trading.run(asset="AAPL", strategy="momentum")
print(signal.direction) # "long"
print(signal.confidence) # 0.87
print(signal.entry_price) # 198.45
# Async run (returns immediately)
job = client.agents.trading.run(
asset="AAPL",
strategy="momentum",
async_run=True
)
print(job.run_id) # "run_abc123"
# Poll for result
result = job.wait() # Blocks until complete
print(result.direction)
Signals
signals.list()
asset: str | None = None,
strategy: str | None = None,
direction: str | None = None,
min_confidence: float | None = None,
since: str | datetime | None = None,
limit: int = 25,
offset: int = 0,
) → list[Signal]
Retrieve a paginated list of your generated signals. Returns a list of Signal objects.
from datetime import datetime, timedelta
# Get recent high-confidence signals
signals = client.signals.list(
min_confidence=0.8,
since=datetime.utcnow() - timedelta(hours=24),
limit=10
)
for s in signals:
print(f"{s.direction:5s} {s.asset:8s} conf={s.confidence:.0%}")
# Paginate through all signals
offset = 0
all_signals = []
while True:
batch = client.signals.list(limit=100, offset=offset)
if not batch:
break
all_signals.extend(batch)
offset += 100
signals.get()
Retrieve a single signal by ID.
signal = client.signals.get("sig_abc123")
print(signal.metadata.reasoning)
signals.weather()
region: str | None = None,
commodity: str | None = None,
severity: str | None = None,
limit: int = 25,
) → list[WeatherSignal]
Retrieve weather-based trading signals.
weather = client.signals.weather(
region="us",
commodity="natural_gas",
severity="high"
)
for w in weather:
print(f"{w.commodity}: {w.expected_impact}")
Dashboard (WebSocket)
dashboard.stream()
channels: list[str] | None = None,
assets: list[str] | None = None,
) → AsyncIterator[DashboardEvent]
Open a WebSocket connection and stream real-time events. Must be used with async for.
import asyncio
from dailyai import AgentOS
async def main():
client = AgentOS()
async for event in client.dashboard.stream(
channels=["signal.new", "agent.status"],
assets=["AAPL", "TSLA"]
):
print(f"[{event.type}] {event.data}")
asyncio.run(main())
Webhooks
webhooks.create()
url: str,
events: list[str],
secret: str | None = None,
) → Webhook
webhooks.list()
webhooks.delete()
# Create a webhook
webhook = client.webhooks.create(
url="https://your-app.com/hooks/dailyai",
events=["signal.created", "agent.error"],
secret="whsec_your_secret"
)
print(f"Created: {webhook.id}")
# List all webhooks
for wh in client.webhooks.list():
print(f"{wh.id}: {wh.url} ({wh.status})")
# Delete a webhook
client.webhooks.delete("whk_abc123")
Error Handling
The SDK raises typed exceptions for different error conditions. All exceptions inherit from DailyAIError.
| Exception | HTTP Status | Description |
|---|---|---|
| DailyAIError | — | Base exception class |
| AuthenticationError | 401 | Invalid or missing API key / token |
| NotFoundError | 404 | Resource not found (invalid agent ID, signal ID, etc.) |
| ValidationError | 400 | Invalid request parameters |
| RateLimitError | 429 | Rate limit exceeded (see Rate Limiting) |
| ServerError | 500+ | Internal server error |
| TimeoutError | — | Request timed out |
| ConnectionError | — | Network connectivity issue |
from dailyai import AgentOS
from dailyai.exceptions import (
DailyAIError,
AuthenticationError,
RateLimitError,
NotFoundError,
ValidationError,
)
client = AgentOS()
try:
signal = client.agents.trading.run(asset="AAPL", strategy="momentum")
except AuthenticationError:
print("Check your API key")
except RateLimitError as e:
print(f"Rate limited. Retry after {e.retry_after} seconds")
except NotFoundError:
print("Agent or asset not found")
except ValidationError as e:
print(f"Invalid parameters: {e.message}")
except DailyAIError as e:
print(f"API error: {e.status_code} — {e.message}")
Rate Limiting
The API enforces per-key rate limits. The SDK automatically handles rate limiting with exponential backoff (up to max_retries attempts).
| Plan | Limit | Burst |
|---|---|---|
| Free | 100 requests/minute | 20 requests/second |
| Pro | 1,000 requests/minute | 50 requests/second |
| Enterprise | 10,000 requests/minute | 200 requests/second |
Rate limit information is available on every response:
# Access rate limit headers from last request
info = client.rate_limit_info
print(f"Remaining: {info.remaining}") # Requests remaining
print(f"Limit: {info.limit}") # Total allowed
print(f"Reset: {info.reset}") # UTC timestamp of reset
# Configure custom retry behavior
client = AgentOS(
max_retries=5, # Retry up to 5 times on 429/5xx
timeout=60.0, # Increase timeout for long-running agents
)
AsyncAgentOS) with a semaphore to control concurrency and avoid hitting rate limits.
Async Support
The SDK provides a fully async client for use with asyncio. Install with async extras:
pip install dailyai-agents[async]
All methods on AsyncAgentOS are async equivalents of the sync client:
import asyncio
from dailyai import AsyncAgentOS
async def main():
client = AsyncAgentOS() # Reads DAILYAI_API_KEY
# Run multiple agents concurrently
signals = await asyncio.gather(
client.agents.trading.run(asset="AAPL", strategy="momentum"),
client.agents.trading.run(asset="MSFT", strategy="breakout"),
client.agents.trading.run(asset="GOOGL", strategy="sentiment"),
)
for signal in signals:
print(f"{signal.asset}: {signal.direction} ({signal.confidence:.0%})")
# List signals (async)
recent = await client.signals.list(min_confidence=0.8)
print(f"Found {len(recent)} high-confidence signals")
# Stream dashboard events
async for event in client.dashboard.stream():
print(event)
asyncio.run(main())
Context Manager
Use the async client as a context manager to ensure connections are properly closed:
async with AsyncAgentOS() as client:
signal = await client.agents.trading.run(
asset="AAPL",
strategy="momentum"
)
print(signal)
# Connection pool automatically closed