Overview
The AsyncComposo class provides an asynchronous client for evaluating chat messages with support for concurrent processing. Ideal for large batch evaluation scenarios and high-throughput applications.
Constructor
from composo import AsyncComposo
client = AsyncComposo(
api_key="your_api_key",
base_url="https://platform.composo.ai",
num_retries=1,
model_core=None,
max_concurrent_requests=5,
timeout=60.0
)
Parameters
Your Composo API key for authentication. If not provided, will be loaded from the COMPOSO_API_KEY environment variable.
base_url
string
default:"https://platform.composo.ai"
API base URL. Change only if using a custom Composo deployment.
Number of retries on request failure. Each retry uses exponential backoff with jitter. Minimum value is 1 (retries cannot be disabled).
Optional model core identifier for specifying the evaluation model.
Maximum number of concurrent API requests. Controls throughput and prevents rate limit issues.Recommendations:
5-10: Most use cases
20+: High-performance scenarios with adequate rate limits
Request timeout in seconds. Total time to wait for a single request (including retries).
Example
from composo import AsyncComposo
import asyncio
async def main():
# Using API key directly
client = AsyncComposo(api_key="your_api_key_here")
# With custom concurrency
client = AsyncComposo(
api_key="your_api_key",
max_concurrent_requests=10,
num_retries=3
)
asyncio.run(main())
evaluate()
Asynchronously evaluate messages against one or more evaluation criteria.
result = await client.evaluate(
messages=[...],
criteria="Your evaluation criterion",
system=None,
tools=None,
result=None,
block=True
)
Parameters
List of chat messages to evaluate. Each message should be a dictionary with role and content keys.Supported roles: system, user, assistant, tool
Evaluation criterion or list of criteria. Multiple criteria are evaluated concurrently for better performance.
Optional system message to set AI behavior and context.
Optional list of tool definitions for evaluating tool calls.
Optional LLM result to append to the conversation.
If False, returns a dictionary with task_id instead of blocking for results.
Returns
result
EvaluationResponse | list[EvaluationResponse]
- Returns single
EvaluationResponse if one criterion provided
- Returns
list[EvaluationResponse] if multiple criteria provided (evaluated concurrently)
- Returns
dict with task_id if block=False
Response Schema
EvaluationResponse
Evaluation score between 0.0 and 1.0. Returns null if criterion not applicable.
Detailed explanation of the evaluation score.
Examples
Basic Async Evaluation
from composo import AsyncComposo
import asyncio
async def evaluate_single():
async with AsyncComposo() as client:
messages = [
{"role": "user", "content": "What's 2+2?"},
{"role": "assistant", "content": "2+2 equals 4."}
]
result = await client.evaluate(
messages=messages,
criteria="Reward accurate mathematical responses"
)
print(f"Score: {result.score}")
print(f"Explanation: {result.explanation}")
asyncio.run(evaluate_single())
Batch Evaluation with Concurrency
from composo import AsyncComposo
import asyncio
async def batch_evaluate():
async with AsyncComposo(max_concurrent_requests=10) as client:
# Prepare multiple evaluations
conversations = [
[{"role": "user", "content": "Hello"}],
[{"role": "user", "content": "Goodbye"}],
[{"role": "user", "content": "Help me"}],
# ... more conversations
]
# Create tasks for concurrent evaluation
tasks = [
client.evaluate(
messages=conv,
criteria="Reward helpful responses"
)
for conv in conversations
]
# Execute all evaluations concurrently
results = await asyncio.gather(*tasks)
for i, result in enumerate(results):
print(f"Conversation {i}: Score = {result.score}")
asyncio.run(batch_evaluate())
Multiple Criteria (Evaluated Concurrently)
async def evaluate_multi_criteria():
async with AsyncComposo() as client:
result = await client.evaluate(
messages=[...],
criteria=[
"Reward accurate information",
"Reward clear communication",
"Penalize inappropriate tone"
]
)
# All criteria evaluated concurrently
for res in result:
print(f"Score: {res.score}")
asyncio.run(evaluate_multi_criteria())
from composo import AsyncComposo
import asyncio
async def process_large_dataset():
# Configure for high throughput
async with AsyncComposo(max_concurrent_requests=20) as client:
# Process 1000 conversations
conversations = load_conversations() # Your data loading function
# Split into batches to avoid memory issues
batch_size = 100
all_results = []
for i in range(0, len(conversations), batch_size):
batch = conversations[i:i+batch_size]
tasks = [
client.evaluate(
messages=conv,
criteria="Your criterion"
)
for conv in batch
]
batch_results = await asyncio.gather(*tasks)
all_results.extend(batch_results)
print(f"Processed {len(all_results)} / {len(conversations)}")
return all_results
asyncio.run(process_large_dataset())
evaluate_trace()
Asynchronously evaluate multi-agent traces.
result = await client.evaluate_trace(
trace=trace_object,
criteria="Your evaluation criterion",
model_core=None,
block=True
)
Parameters
Multi-agent trace object containing agent interactions.
criteria
string | list[string]
required
Evaluation criterion or list of criteria. Multiple criteria are evaluated concurrently.
Optional model core identifier.
If False, returns task_id instead of blocking.
Returns
result
MultiAgentTraceResponse | list[MultiAgentTraceResponse]
- Single or list of trace evaluation responses
- Multiple criteria evaluated concurrently
Example
async def evaluate_agent_trace():
async with AsyncComposo() as client:
# Assuming trace was captured using AgentTracer
result = await client.evaluate_trace(
trace=my_trace,
criteria=[
"Reward effective exploration",
"Reward proper tool usage"
]
)
for res in result:
print(f"Overall Score: {res.overall_score}")
print(f"Agent Scores: {res.agent_scores}")
asyncio.run(evaluate_agent_trace())
Context Manager Usage
The AsyncComposo client supports async context managers for automatic resource cleanup:
import asyncio
from composo import AsyncComposo
async def main():
async with AsyncComposo() as client:
result = await client.evaluate(
messages=[...],
criteria="Your criterion"
)
print(result.score)
# Client automatically closed
asyncio.run(main())
Concurrency Control
The AsyncComposo client uses a semaphore to limit concurrent requests, preventing rate limit issues and excessive resource usage.
# Low concurrency (safer for rate limits)
client = AsyncComposo(max_concurrent_requests=5)
# Medium concurrency (balanced)
client = AsyncComposo(max_concurrent_requests=10)
# High concurrency (requires adequate rate limits)
client = AsyncComposo(max_concurrent_requests=20)
Best Practices
- Start Conservative: Begin with
max_concurrent_requests=5 and increase if needed
- Monitor Rate Limits: Watch for
RateLimitError exceptions and adjust accordingly
- Use Batching: For very large datasets, process in batches to manage memory
- Handle Errors: Use
asyncio.gather(..., return_exceptions=True) for error resilience
Example: Optimal Batch Processing
from composo import AsyncComposo
import asyncio
async def optimized_evaluation(conversations, criteria):
async with AsyncComposo(max_concurrent_requests=10) as client:
# Use list comprehension for task creation
tasks = [
client.evaluate(messages=conv, criteria=criteria)
for conv in conversations
]
# Gather with error handling
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results and handle errors
successes = []
failures = []
for i, result in enumerate(results):
if isinstance(result, Exception):
failures.append((i, result))
else:
successes.append(result)
print(f"Success: {len(successes)}, Failures: {len(failures)}")
return successes, failures
# Run
asyncio.run(optimized_evaluation(my_conversations, "Your criterion"))
Comparison with Sync Client
| Feature | Composo | AsyncComposo |
|---|
| Use Case | Single evaluations | Batch processing |
| Concurrency | Sequential | Concurrent |
| Performance | Slower for batches | Optimized for batches |
| API | Synchronous | Asynchronous |
| Complexity | Simpler | Requires async/await |
| Concurrency Control | N/A | max_concurrent_requests |
When to use AsyncComposo:
- Evaluating 10+ conversations
- Multiple criteria per evaluation
- High-throughput applications
- Integration with async frameworks (FastAPI, aiohttp)
When to use Composo:
- Single evaluations
- Simple scripts
- Synchronous applications
- Learning/prototyping