Skip to main content

Overview

The AsyncComposo class provides an asynchronous client for evaluating chat messages with support for concurrent processing. Ideal for large batch evaluation scenarios and high-throughput applications.

Constructor

from composo import AsyncComposo

client = AsyncComposo(
    api_key="your_api_key",
    base_url="https://platform.composo.ai",
    num_retries=1,
    model_core=None,
    max_concurrent_requests=5,
    timeout=60.0
)

Parameters

api_key
string
Your Composo API key for authentication. If not provided, will be loaded from the COMPOSO_API_KEY environment variable.
base_url
string
default:"https://platform.composo.ai"
API base URL. Change only if using a custom Composo deployment.
num_retries
integer
default:"1"
Number of retries on request failure. Each retry uses exponential backoff with jitter. Minimum value is 1 (retries cannot be disabled).
model_core
string
Optional model core identifier for specifying the evaluation model.
max_concurrent_requests
integer
default:"5"
Maximum number of concurrent API requests. Controls throughput and prevents rate limit issues.Recommendations:
  • 5-10: Most use cases
  • 20+: High-performance scenarios with adequate rate limits
timeout
float
default:"60.0"
Request timeout in seconds. Total time to wait for a single request (including retries).

Example

from composo import AsyncComposo
import asyncio

async def main():
    # Using API key directly
    client = AsyncComposo(api_key="your_api_key_here")

    # With custom concurrency
    client = AsyncComposo(
        api_key="your_api_key",
        max_concurrent_requests=10,
        num_retries=3
    )

asyncio.run(main())

evaluate()

Asynchronously evaluate messages against one or more evaluation criteria.
result = await client.evaluate(
    messages=[...],
    criteria="Your evaluation criterion",
    system=None,
    tools=None,
    result=None,
    block=True
)

Parameters

messages
list[dict]
required
List of chat messages to evaluate. Each message should be a dictionary with role and content keys.Supported roles: system, user, assistant, tool
criteria
string | list[string]
Evaluation criterion or list of criteria. Multiple criteria are evaluated concurrently for better performance.
system
string
Optional system message to set AI behavior and context.
tools
list[dict]
Optional list of tool definitions for evaluating tool calls.
result
dict
Optional LLM result to append to the conversation.
block
boolean
default:"True"
If False, returns a dictionary with task_id instead of blocking for results.

Returns

result
EvaluationResponse | list[EvaluationResponse]
  • Returns single EvaluationResponse if one criterion provided
  • Returns list[EvaluationResponse] if multiple criteria provided (evaluated concurrently)
  • Returns dict with task_id if block=False

Response Schema

EvaluationResponse
score
float | null
Evaluation score between 0.0 and 1.0. Returns null if criterion not applicable.
explanation
string
Detailed explanation of the evaluation score.

Examples

Basic Async Evaluation

from composo import AsyncComposo
import asyncio

async def evaluate_single():
    async with AsyncComposo() as client:
        messages = [
            {"role": "user", "content": "What's 2+2?"},
            {"role": "assistant", "content": "2+2 equals 4."}
        ]

        result = await client.evaluate(
            messages=messages,
            criteria="Reward accurate mathematical responses"
        )

        print(f"Score: {result.score}")
        print(f"Explanation: {result.explanation}")

asyncio.run(evaluate_single())

Batch Evaluation with Concurrency

from composo import AsyncComposo
import asyncio

async def batch_evaluate():
    async with AsyncComposo(max_concurrent_requests=10) as client:
        # Prepare multiple evaluations
        conversations = [
            [{"role": "user", "content": "Hello"}],
            [{"role": "user", "content": "Goodbye"}],
            [{"role": "user", "content": "Help me"}],
            # ... more conversations
        ]

        # Create tasks for concurrent evaluation
        tasks = [
            client.evaluate(
                messages=conv,
                criteria="Reward helpful responses"
            )
            for conv in conversations
        ]

        # Execute all evaluations concurrently
        results = await asyncio.gather(*tasks)

        for i, result in enumerate(results):
            print(f"Conversation {i}: Score = {result.score}")

asyncio.run(batch_evaluate())

Multiple Criteria (Evaluated Concurrently)

async def evaluate_multi_criteria():
    async with AsyncComposo() as client:
        result = await client.evaluate(
            messages=[...],
            criteria=[
                "Reward accurate information",
                "Reward clear communication",
                "Penalize inappropriate tone"
            ]
        )

        # All criteria evaluated concurrently
        for res in result:
            print(f"Score: {res.score}")

asyncio.run(evaluate_multi_criteria())

High-Performance Batch Processing

from composo import AsyncComposo
import asyncio

async def process_large_dataset():
    # Configure for high throughput
    async with AsyncComposo(max_concurrent_requests=20) as client:
        # Process 1000 conversations
        conversations = load_conversations()  # Your data loading function

        # Split into batches to avoid memory issues
        batch_size = 100
        all_results = []

        for i in range(0, len(conversations), batch_size):
            batch = conversations[i:i+batch_size]

            tasks = [
                client.evaluate(
                    messages=conv,
                    criteria="Your criterion"
                )
                for conv in batch
            ]

            batch_results = await asyncio.gather(*tasks)
            all_results.extend(batch_results)

            print(f"Processed {len(all_results)} / {len(conversations)}")

        return all_results

asyncio.run(process_large_dataset())

evaluate_trace()

Asynchronously evaluate multi-agent traces.
result = await client.evaluate_trace(
    trace=trace_object,
    criteria="Your evaluation criterion",
    model_core=None,
    block=True
)

Parameters

trace
MultiAgentTrace
required
Multi-agent trace object containing agent interactions.
criteria
string | list[string]
required
Evaluation criterion or list of criteria. Multiple criteria are evaluated concurrently.
model_core
ModelCore
Optional model core identifier.
block
boolean
default:"True"
If False, returns task_id instead of blocking.

Returns

result
MultiAgentTraceResponse | list[MultiAgentTraceResponse]
  • Single or list of trace evaluation responses
  • Multiple criteria evaluated concurrently

Example

async def evaluate_agent_trace():
    async with AsyncComposo() as client:
        # Assuming trace was captured using AgentTracer
        result = await client.evaluate_trace(
            trace=my_trace,
            criteria=[
                "Reward effective exploration",
                "Reward proper tool usage"
            ]
        )

        for res in result:
            print(f"Overall Score: {res.overall_score}")
            print(f"Agent Scores: {res.agent_scores}")

asyncio.run(evaluate_agent_trace())

Context Manager Usage

The AsyncComposo client supports async context managers for automatic resource cleanup:
import asyncio
from composo import AsyncComposo

async def main():
    async with AsyncComposo() as client:
        result = await client.evaluate(
            messages=[...],
            criteria="Your criterion"
        )
        print(result.score)
    # Client automatically closed

asyncio.run(main())

Concurrency Control

The AsyncComposo client uses a semaphore to limit concurrent requests, preventing rate limit issues and excessive resource usage.
# Low concurrency (safer for rate limits)
client = AsyncComposo(max_concurrent_requests=5)

# Medium concurrency (balanced)
client = AsyncComposo(max_concurrent_requests=10)

# High concurrency (requires adequate rate limits)
client = AsyncComposo(max_concurrent_requests=20)

Best Practices

  1. Start Conservative: Begin with max_concurrent_requests=5 and increase if needed
  2. Monitor Rate Limits: Watch for RateLimitError exceptions and adjust accordingly
  3. Use Batching: For very large datasets, process in batches to manage memory
  4. Handle Errors: Use asyncio.gather(..., return_exceptions=True) for error resilience

Performance Optimization

Example: Optimal Batch Processing

from composo import AsyncComposo
import asyncio

async def optimized_evaluation(conversations, criteria):
    async with AsyncComposo(max_concurrent_requests=10) as client:
        # Use list comprehension for task creation
        tasks = [
            client.evaluate(messages=conv, criteria=criteria)
            for conv in conversations
        ]

        # Gather with error handling
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Process results and handle errors
        successes = []
        failures = []

        for i, result in enumerate(results):
            if isinstance(result, Exception):
                failures.append((i, result))
            else:
                successes.append(result)

        print(f"Success: {len(successes)}, Failures: {len(failures)}")
        return successes, failures

# Run
asyncio.run(optimized_evaluation(my_conversations, "Your criterion"))

Comparison with Sync Client

FeatureComposoAsyncComposo
Use CaseSingle evaluationsBatch processing
ConcurrencySequentialConcurrent
PerformanceSlower for batchesOptimized for batches
APISynchronousAsynchronous
ComplexitySimplerRequires async/await
Concurrency ControlN/Amax_concurrent_requests
When to use AsyncComposo:
  • Evaluating 10+ conversations
  • Multiple criteria per evaluation
  • High-throughput applications
  • Integration with async frameworks (FastAPI, aiohttp)
When to use Composo:
  • Single evaluations
  • Simple scripts
  • Synchronous applications
  • Learning/prototyping