Python SDK Reference

Complete reference for the dailyai-agents Python package (v1.4.0).

Installation

Requires Python 3.9 or later.

bash
# Install from PyPI
pip install dailyai-agents

# Install with async support
pip install dailyai-agents[async]

# Install a specific version
pip install dailyai-agents==1.4.0

Authentication

The SDK supports two authentication methods: API key and JWT token. API key is recommended for server-side applications.

python
from dailyai import AgentOS

# Option 1: Pass API key directly
client = AgentOS(api_key="sk-your-api-key")

# Option 2: Use environment variable (recommended)
# Set DAILYAI_API_KEY in your environment
client = AgentOS()  # Reads DAILYAI_API_KEY automatically

# Option 3: Use a JWT token (from /auth/login)
client = AgentOS(token="eyJhbGciOiJIUzI1NiIs...")
Best practice: Store your API key in an environment variable rather than hardcoding it. Never commit API keys to version control.

Client Initialization

class AgentOS(
  api_key: str | None = None,
  token: str | None = None,
  base_url: str = "https://api.dailyai.dev/v1",
  timeout: float = 30.0,
  max_retries: int = 3,
  debug: bool = False,
)
api_keystr | NoneOptional
Your API key. If not provided, reads from DAILYAI_API_KEY environment variable.
tokenstr | NoneOptional
JWT access token from /auth/login. Use instead of api_key for token-based auth.
base_urlstrOptional
API base URL. Override for self-hosted deployments or staging environments.
timeoutfloatOptional
Request timeout in seconds. Default: 30.0.
max_retriesintOptional
Maximum number of retries for failed requests (429 and 5xx errors). Default: 3.
debugboolOptional
Enable debug logging. Prints request/response details to stderr. Default: False.

Agents

The client.agents namespace provides methods to list and run agents.

agents.list()

client.agents.list(
  category: str | None = None,
  status: str | None = None,
) → list[Agent]

Returns a list of all available agents, optionally filtered by category or status.

python
agents = client.agents.list(category="trading")
for agent in agents:
    print(f"{agent.id}: {agent.name} ({agent.status})")
    print(f"  Strategies: {', '.join(agent.strategies)}")

agents.{agent_id}.run()

client.agents.trading.run(
  asset: str,
  strategy: str,
  timeframe: str = "1h",
  risk_level: str = "medium",
  async_run: bool = False,
  **options: Any,
) → Signal | RunJob

Execute an agent and return the result. Access agents by ID as attributes on client.agents (e.g., client.agents.trading, client.agents.weather, client.agents.seo).

assetstrRequired
Target asset ticker (e.g., "AAPL") or keyword for content agents.
strategystrRequired
Strategy to execute. Valid values depend on the agent.
timeframestrOptional
Analysis timeframe: "1m", "5m", "1h", "1d". Default: "1h".
risk_levelstrOptional
Risk tolerance: "low", "medium", "high". Default: "medium".
async_runboolOptional
If True, returns a RunJob for polling. Default: False.

Returns: Signal (sync) or RunJob (async)

python
# Synchronous run
signal = client.agents.trading.run(asset="AAPL", strategy="momentum")
print(signal.direction)    # "long"
print(signal.confidence)   # 0.87
print(signal.entry_price)  # 198.45

# Async run (returns immediately)
job = client.agents.trading.run(
    asset="AAPL",
    strategy="momentum",
    async_run=True
)
print(job.run_id)  # "run_abc123"

# Poll for result
result = job.wait()  # Blocks until complete
print(result.direction)

Signals

signals.list()

client.signals.list(
  asset: str | None = None,
  strategy: str | None = None,
  direction: str | None = None,
  min_confidence: float | None = None,
  since: str | datetime | None = None,
  limit: int = 25,
  offset: int = 0,
) → list[Signal]

Retrieve a paginated list of your generated signals. Returns a list of Signal objects.

python
from datetime import datetime, timedelta

# Get recent high-confidence signals
signals = client.signals.list(
    min_confidence=0.8,
    since=datetime.utcnow() - timedelta(hours=24),
    limit=10
)

for s in signals:
    print(f"{s.direction:5s} {s.asset:8s} conf={s.confidence:.0%}")

# Paginate through all signals
offset = 0
all_signals = []
while True:
    batch = client.signals.list(limit=100, offset=offset)
    if not batch:
        break
    all_signals.extend(batch)
    offset += 100

signals.get()

client.signals.get(signal_id: str) → Signal

Retrieve a single signal by ID.

python
signal = client.signals.get("sig_abc123")
print(signal.metadata.reasoning)

signals.weather()

client.signals.weather(
  region: str | None = None,
  commodity: str | None = None,
  severity: str | None = None,
  limit: int = 25,
) → list[WeatherSignal]

Retrieve weather-based trading signals.

python
weather = client.signals.weather(
    region="us",
    commodity="natural_gas",
    severity="high"
)
for w in weather:
    print(f"{w.commodity}: {w.expected_impact}")

Dashboard (WebSocket)

dashboard.stream()

async client.dashboard.stream(
  channels: list[str] | None = None,
  assets: list[str] | None = None,
) → AsyncIterator[DashboardEvent]

Open a WebSocket connection and stream real-time events. Must be used with async for.

python
import asyncio
from dailyai import AgentOS

async def main():
    client = AgentOS()

    async for event in client.dashboard.stream(
        channels=["signal.new", "agent.status"],
        assets=["AAPL", "TSLA"]
    ):
        print(f"[{event.type}] {event.data}")

asyncio.run(main())

Webhooks

webhooks.create()

client.webhooks.create(
  url: str,
  events: list[str],
  secret: str | None = None,
) → Webhook

webhooks.list()

client.webhooks.list() → list[Webhook]

webhooks.delete()

client.webhooks.delete(webhook_id: str) → None
python
# Create a webhook
webhook = client.webhooks.create(
    url="https://your-app.com/hooks/dailyai",
    events=["signal.created", "agent.error"],
    secret="whsec_your_secret"
)
print(f"Created: {webhook.id}")

# List all webhooks
for wh in client.webhooks.list():
    print(f"{wh.id}: {wh.url} ({wh.status})")

# Delete a webhook
client.webhooks.delete("whk_abc123")

Error Handling

The SDK raises typed exceptions for different error conditions. All exceptions inherit from DailyAIError.

ExceptionHTTP StatusDescription
DailyAIErrorBase exception class
AuthenticationError401Invalid or missing API key / token
NotFoundError404Resource not found (invalid agent ID, signal ID, etc.)
ValidationError400Invalid request parameters
RateLimitError429Rate limit exceeded (see Rate Limiting)
ServerError500+Internal server error
TimeoutErrorRequest timed out
ConnectionErrorNetwork connectivity issue
python
from dailyai import AgentOS
from dailyai.exceptions import (
    DailyAIError,
    AuthenticationError,
    RateLimitError,
    NotFoundError,
    ValidationError,
)

client = AgentOS()

try:
    signal = client.agents.trading.run(asset="AAPL", strategy="momentum")
except AuthenticationError:
    print("Check your API key")
except RateLimitError as e:
    print(f"Rate limited. Retry after {e.retry_after} seconds")
except NotFoundError:
    print("Agent or asset not found")
except ValidationError as e:
    print(f"Invalid parameters: {e.message}")
except DailyAIError as e:
    print(f"API error: {e.status_code} — {e.message}")

Rate Limiting

The API enforces per-key rate limits. The SDK automatically handles rate limiting with exponential backoff (up to max_retries attempts).

PlanLimitBurst
Free100 requests/minute20 requests/second
Pro1,000 requests/minute50 requests/second
Enterprise10,000 requests/minute200 requests/second

Rate limit information is available on every response:

python
# Access rate limit headers from last request
info = client.rate_limit_info
print(f"Remaining: {info.remaining}")   # Requests remaining
print(f"Limit: {info.limit}")           # Total allowed
print(f"Reset: {info.reset}")           # UTC timestamp of reset

# Configure custom retry behavior
client = AgentOS(
    max_retries=5,      # Retry up to 5 times on 429/5xx
    timeout=60.0,       # Increase timeout for long-running agents
)
Best practice: If you're running multiple agents concurrently, use the async client (AsyncAgentOS) with a semaphore to control concurrency and avoid hitting rate limits.

Async Support

The SDK provides a fully async client for use with asyncio. Install with async extras:

bash
pip install dailyai-agents[async]

All methods on AsyncAgentOS are async equivalents of the sync client:

python
import asyncio
from dailyai import AsyncAgentOS

async def main():
    client = AsyncAgentOS()  # Reads DAILYAI_API_KEY

    # Run multiple agents concurrently
    signals = await asyncio.gather(
        client.agents.trading.run(asset="AAPL", strategy="momentum"),
        client.agents.trading.run(asset="MSFT", strategy="breakout"),
        client.agents.trading.run(asset="GOOGL", strategy="sentiment"),
    )

    for signal in signals:
        print(f"{signal.asset}: {signal.direction} ({signal.confidence:.0%})")

    # List signals (async)
    recent = await client.signals.list(min_confidence=0.8)
    print(f"Found {len(recent)} high-confidence signals")

    # Stream dashboard events
    async for event in client.dashboard.stream():
        print(event)

asyncio.run(main())

Context Manager

Use the async client as a context manager to ensure connections are properly closed:

python
async with AsyncAgentOS() as client:
    signal = await client.agents.trading.run(
        asset="AAPL",
        strategy="momentum"
    )
    print(signal)
# Connection pool automatically closed