autogen/python/packages/autogen-studio/autogenstudio/eval/runners.py

202 lines
7.7 KiB
Python

from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Dict, Optional, Sequence, Type, Union
from autogen_agentchat.base import TaskResult, Team
from autogen_agentchat.messages import ChatMessage, MultiModalMessage, TextMessage
from autogen_core import CancellationToken, Component, ComponentBase, ComponentModel, Image
from autogen_core.models import ChatCompletionClient, UserMessage
from pydantic import BaseModel
from typing_extensions import Self
from ..datamodel.eval import EvalRunResult, EvalTask
class BaseEvalRunnerConfig(BaseModel):
"""Base configuration for evaluation runners."""
name: str
description: str = ""
metadata: Dict[str, Any] = {}
class BaseEvalRunner(ABC, ComponentBase[BaseEvalRunnerConfig]):
"""Base class for evaluation runners that defines the interface for running evaluations.
This class provides the core interface that all evaluation runners must implement.
Subclasses should implement the run method to define how a specific evaluation is executed.
"""
component_type = "eval_runner"
def __init__(self, name: str, description: str = "", metadata: Optional[Dict[str, Any]] = None):
self.name = name
self.description = description
self.metadata = metadata or {}
@abstractmethod
async def run(self, task: EvalTask, cancellation_token: Optional[CancellationToken] = None) -> EvalRunResult:
"""Run the evaluation on the provided task and return a result.
Args:
task: The task to evaluate
cancellation_token: Optional token to cancel the evaluation
Returns:
EvaluationResult: The result of the evaluation
"""
pass
def _to_config(self) -> BaseEvalRunnerConfig:
"""Convert the runner configuration to a configuration object for serialization."""
return BaseEvalRunnerConfig(name=self.name, description=self.description, metadata=self.metadata)
class ModelEvalRunnerConfig(BaseEvalRunnerConfig):
"""Configuration for ModelEvalRunner."""
model_client: ComponentModel
class ModelEvalRunner(BaseEvalRunner, Component[ModelEvalRunnerConfig]):
"""Evaluation runner that uses a single LLM to process tasks.
This runner sends the task directly to a model client and returns the response.
"""
component_config_schema = ModelEvalRunnerConfig
component_type = "eval_runner"
component_provider_override = "autogenstudio.eval.runners.ModelEvalRunner"
def __init__(
self,
model_client: ChatCompletionClient,
name: str = "Model Runner",
description: str = "Evaluates tasks using a single LLM",
metadata: Optional[Dict[str, Any]] = None,
):
super().__init__(name, description, metadata)
self.model_client = model_client
async def run(self, task: EvalTask, cancellation_token: Optional[CancellationToken] = None) -> EvalRunResult:
"""Run the task with the model client and return the result."""
# Create initial result object
result = EvalRunResult()
try:
model_input = []
if isinstance(task.input, str):
text_message = UserMessage(content=task.input, source="user")
model_input.append(text_message)
elif isinstance(task.input, list):
message_content = [x for x in task.input]
model_input.append(UserMessage(content=message_content, source="user"))
# Run with the model
model_result = await self.model_client.create(messages=model_input, cancellation_token=cancellation_token)
model_response = model_result.content if isinstance(model_result, str) else model_result.model_dump()
task_result = TaskResult(
messages=[TextMessage(content=str(model_response), source="model")],
)
result = EvalRunResult(result=task_result, status=True, start_time=datetime.now(), end_time=datetime.now())
except Exception as e:
result = EvalRunResult(status=False, error=str(e), end_time=datetime.now())
return result
def _to_config(self) -> ModelEvalRunnerConfig:
"""Convert to configuration object including model client configuration."""
base_config = super()._to_config()
return ModelEvalRunnerConfig(
name=base_config.name,
description=base_config.description,
metadata=base_config.metadata,
model_client=self.model_client.dump_component(),
)
@classmethod
def _from_config(cls, config: ModelEvalRunnerConfig) -> Self:
"""Create from configuration object with serialized model client."""
model_client = ChatCompletionClient.load_component(config.model_client)
return cls(
name=config.name,
description=config.description,
metadata=config.metadata,
model_client=model_client,
)
class TeamEvalRunnerConfig(BaseEvalRunnerConfig):
"""Configuration for TeamEvalRunner."""
team: ComponentModel
class TeamEvalRunner(BaseEvalRunner, Component[TeamEvalRunnerConfig]):
"""Evaluation runner that uses a team of agents to process tasks.
This runner creates and runs a team based on a team configuration.
"""
component_config_schema = TeamEvalRunnerConfig
component_type = "eval_runner"
component_provider_override = "autogenstudio.eval.runners.TeamEvalRunner"
def __init__(
self,
team: Union[Team, ComponentModel],
name: str = "Team Runner",
description: str = "Evaluates tasks using a team of agents",
metadata: Optional[Dict[str, Any]] = None,
):
super().__init__(name, description, metadata)
self._team = team if isinstance(team, Team) else Team.load_component(team)
async def run(self, task: EvalTask, cancellation_token: Optional[CancellationToken] = None) -> EvalRunResult:
"""Run the task with the team and return the result."""
# Create initial result object
result = EvalRunResult()
try:
team_task: Sequence[ChatMessage] = []
if isinstance(task.input, str):
team_task.append(TextMessage(content=task.input, source="user"))
if isinstance(task.input, list):
for message in task.input:
if isinstance(message, str):
team_task.append(TextMessage(content=message, source="user"))
elif isinstance(message, Image):
team_task.append(MultiModalMessage(source="user", content=[message]))
# Run task with team
team_result = await self._team.run(task=team_task, cancellation_token=cancellation_token)
result = EvalRunResult(result=team_result, status=True, start_time=datetime.now(), end_time=datetime.now())
except Exception as e:
result = EvalRunResult(status=False, error=str(e), end_time=datetime.now())
return result
def _to_config(self) -> TeamEvalRunnerConfig:
"""Convert to configuration object including team configuration."""
base_config = super()._to_config()
return TeamEvalRunnerConfig(
name=base_config.name,
description=base_config.description,
metadata=base_config.metadata,
team=self._team.dump_component(),
)
@classmethod
def _from_config(cls, config: TeamEvalRunnerConfig) -> Self:
"""Create from configuration object with serialized team configuration."""
return cls(
team=Team.load_component(config.team),
name=config.name,
description=config.description,
metadata=config.metadata,
)