Chuyển tới nội dung chính

Agent Invocation

The client.agents resource provides three invocation methods: invoke (sync), ainvoke (async), and stream (SSE streaming).

Synchronous Invocation

from faos import FaosClient

client = FaosClient(api_key="faos_sk_...")
response = client.agents.invoke(
"credit-risk-analyst",
{"query": "Analyze Q1 2024 revenue trends"}
)

print(response.result) # Agent output dict
print(response.metadata) # Execution metadata (model, duration)
print(response.usage) # Token usage breakdown
print(response.request_id) # For debugging/support
<PlaygroundButton code={"from faos import FaosClient\n\nclient = FaosClient(api_key=\"faos_sk_...\")\nresponse = client.agents.invoke(\n \"credit-risk-analyst\",\n {\"query\": \"Analyze Q1 2024 revenue trends\"}\n)\n\nprint(response.result) # Agent output dict\nprint(response.metadata) # Execution metadata (model, duration)\nprint(response.usage) # Token usage breakdown\nprint(response.request_id) # For debugging/support"} lang="python" />

Async Invocation

import asyncio
from faos import FaosClient

async def main():
client = FaosClient(api_key="faos_sk_...")
response = await client.agents.ainvoke(
"credit-risk-analyst",
{"query": "Analyze Q1 2024"}
)
print(response.result)

asyncio.run(main())
<PlaygroundButton code={"import asyncio\nfrom faos import FaosClient\n\nasync def main():\n client = FaosClient(api_key=\"faos_sk_...\")\n response = await client.agents.ainvoke(\n \"credit-risk-analyst\",\n {\"query\": \"Analyze Q1 2024\"}\n )\n print(response.result)\n\nasyncio.run(main())"} lang="python" />

Streaming

Stream responses via Server-Sent Events (SSE). Each chunk has a type field:

TypeDescription
textText content from the agent
tool_callAgent invoked a tool
metadataExecution metadata
errorError during execution
doneStream complete with final metadata
import asyncio
from faos import FaosClient

async def stream():
client = FaosClient(api_key="faos_sk_...")
async for chunk in client.agents.stream(
"credit-risk-analyst",
{"query": "Analyze Q1 2024"}
):
if chunk.type == "text":
print(chunk.data, end="", flush=True)
elif chunk.type == "done":
print(f"\nTokens used: {chunk.data}")

asyncio.run(stream())
<PlaygroundButton code={"import asyncio\nfrom faos import FaosClient\n\nasync def stream():\n client = FaosClient(api_key=\"faos_sk_...\")\n async for chunk in client.agents.stream(\n \"credit-risk-analyst\",\n {\"query\": \"Analyze Q1 2024\"}\n ):\n if chunk.type == \"text\":\n print(chunk.data, end=\"\", flush=True)\n elif chunk.type == \"done\":\n print(f\"\\nTokens used: {chunk.data}\")\n\nasyncio.run(stream())"} lang="python" />

Reconnection

Pass last_event_id to resume a stream after disconnection:

async for chunk in client.agents.stream(
"analyst", {"query": "..."}, last_event_id="evt-42"
):
...

Multi-Tenant Invocation

Pass tenant_id for multi-tenant deployments:

response = client.agents.invoke(
"credit-risk-analyst",
{"query": "Analyze Q1"},
tenant_id="tenant-abc123",
)

A2A Format

The SDK auto-detects A2A (Agent-to-Agent) format when task key is present:

# FAOS-native format
response = client.agents.invoke("analyst", {"query": "..."})

# A2A format — auto-detected
response = client.agents.invoke("analyst", {
"task": {"id": "task-1", "input": {"query": "..."}}
})

Response Model

class FaosResponse(BaseModel):
result: dict # Agent output
metadata: dict # model, duration_ms, etc.
usage: TokenUsage # prompt_tokens, completion_tokens, total_tokens
request_id: str # For debugging