mirror of https://github.com/microsoft/autogen.git
269 lines
12 KiB
Python
269 lines
12 KiB
Python
import logging
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Union
|
|
|
|
from loguru import logger
|
|
|
|
from ..datamodel.db import Agent, LinkTypes, Model, Team, Tool
|
|
from ..datamodel.types import ComponentConfigInput, ComponentTypes, Response
|
|
from .component_factory import ComponentFactory
|
|
from .db_manager import DatabaseManager
|
|
|
|
|
|
class ConfigurationManager:
|
|
"""Manages persistence and relationships of components using ComponentFactory for validation"""
|
|
|
|
DEFAULT_UNIQUENESS_FIELDS = {
|
|
ComponentTypes.MODEL: ["model_type", "model"],
|
|
ComponentTypes.TOOL: ["name"],
|
|
ComponentTypes.AGENT: ["agent_type", "name"],
|
|
ComponentTypes.TEAM: ["team_type", "name"],
|
|
}
|
|
|
|
def __init__(self, db_manager: DatabaseManager, uniqueness_fields: Dict[ComponentTypes, List[str]] = None):
|
|
self.db_manager = db_manager
|
|
self.component_factory = ComponentFactory()
|
|
self.uniqueness_fields = uniqueness_fields or self.DEFAULT_UNIQUENESS_FIELDS
|
|
|
|
async def import_component(
|
|
self, component_config: ComponentConfigInput, user_id: str, check_exists: bool = False
|
|
) -> Response:
|
|
"""
|
|
Import a component configuration, validate it, and store the resulting component.
|
|
|
|
Args:
|
|
component_config: Configuration for the component (file path, dict, or ComponentConfig)
|
|
user_id: User ID to associate with imported component
|
|
check_exists: Whether to check for existing components before storing (default: False)
|
|
|
|
Returns:
|
|
Response containing import results or error
|
|
"""
|
|
try:
|
|
# Get validated config as dict
|
|
config = await self.component_factory.load(component_config, return_type="dict")
|
|
|
|
# Get component type
|
|
component_type = self._determine_component_type(config)
|
|
if not component_type:
|
|
raise ValueError("Unable to determine component type from config")
|
|
|
|
# Check existence if requested
|
|
if check_exists:
|
|
existing = self._check_exists(component_type, config, user_id)
|
|
if existing:
|
|
return Response(
|
|
message=self._format_exists_message(component_type, config),
|
|
status=True,
|
|
data={"id": existing.id},
|
|
)
|
|
|
|
# Route to appropriate storage method
|
|
if component_type == ComponentTypes.TEAM:
|
|
return await self._store_team(config, user_id, check_exists)
|
|
elif component_type == ComponentTypes.AGENT:
|
|
return await self._store_agent(config, user_id, check_exists)
|
|
elif component_type == ComponentTypes.MODEL:
|
|
return await self._store_model(config, user_id)
|
|
elif component_type == ComponentTypes.TOOL:
|
|
return await self._store_tool(config, user_id)
|
|
else:
|
|
raise ValueError(f"Unsupported component type: {component_type}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to import component: {str(e)}")
|
|
return Response(message=str(e), status=False)
|
|
|
|
async def import_directory(self, directory: Union[str, Path], user_id: str, check_exists: bool = False) -> Response:
|
|
"""
|
|
Import all component configurations from a directory.
|
|
|
|
Args:
|
|
directory: Path to directory containing configuration files
|
|
user_id: User ID to associate with imported components
|
|
check_exists: Whether to check for existing components before storing (default: False)
|
|
|
|
Returns:
|
|
Response containing import results for all files
|
|
"""
|
|
try:
|
|
configs = await self.component_factory.load_directory(directory, return_type="dict")
|
|
|
|
results = []
|
|
for config in configs:
|
|
result = await self.import_component(config, user_id, check_exists)
|
|
results.append(
|
|
{
|
|
"component": self._get_component_type(config),
|
|
"status": result.status,
|
|
"message": result.message,
|
|
"id": result.data.get("id") if result.status else None,
|
|
}
|
|
)
|
|
|
|
return Response(message="Directory import complete", status=True, data=results)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to import directory: {str(e)}")
|
|
return Response(message=str(e), status=False)
|
|
|
|
async def _store_team(self, config: dict, user_id: str, check_exists: bool = False) -> Response:
|
|
"""Store team component and manage its relationships with agents"""
|
|
try:
|
|
# Store the team
|
|
team_db = Team(user_id=user_id, config=config)
|
|
team_result = self.db_manager.upsert(team_db)
|
|
if not team_result.status:
|
|
return team_result
|
|
|
|
team_id = team_result.data["id"]
|
|
|
|
# Handle participants (agents)
|
|
for participant in config.get("participants", []):
|
|
if check_exists:
|
|
# Check for existing agent
|
|
agent_type = self._determine_component_type(participant)
|
|
existing_agent = self._check_exists(agent_type, participant, user_id)
|
|
if existing_agent:
|
|
# Link existing agent
|
|
self.db_manager.link(LinkTypes.TEAM_AGENT, team_id, existing_agent.id)
|
|
logger.info(f"Linked existing agent to team: {existing_agent}")
|
|
continue
|
|
|
|
# Store and link new agent
|
|
agent_result = await self._store_agent(participant, user_id, check_exists)
|
|
if agent_result.status:
|
|
self.db_manager.link(LinkTypes.TEAM_AGENT, team_id, agent_result.data["id"])
|
|
|
|
return team_result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to store team: {str(e)}")
|
|
return Response(message=str(e), status=False)
|
|
|
|
async def _store_agent(self, config: dict, user_id: str, check_exists: bool = False) -> Response:
|
|
"""Store agent component and manage its relationships with tools and model"""
|
|
try:
|
|
# Store the agent
|
|
agent_db = Agent(user_id=user_id, config=config)
|
|
agent_result = self.db_manager.upsert(agent_db)
|
|
if not agent_result.status:
|
|
return agent_result
|
|
|
|
agent_id = agent_result.data["id"]
|
|
|
|
# Handle model client
|
|
if "model_client" in config:
|
|
if check_exists:
|
|
# Check for existing model
|
|
model_type = self._determine_component_type(config["model_client"])
|
|
existing_model = self._check_exists(model_type, config["model_client"], user_id)
|
|
if existing_model:
|
|
# Link existing model
|
|
self.db_manager.link(LinkTypes.AGENT_MODEL, agent_id, existing_model.id)
|
|
logger.info(f"Linked existing model to agent: {existing_model.config.model_type}")
|
|
else:
|
|
# Store and link new model
|
|
model_result = await self._store_model(config["model_client"], user_id)
|
|
if model_result.status:
|
|
self.db_manager.link(LinkTypes.AGENT_MODEL, agent_id, model_result.data["id"])
|
|
else:
|
|
# Store and link new model without checking
|
|
model_result = await self._store_model(config["model_client"], user_id)
|
|
if model_result.status:
|
|
self.db_manager.link(LinkTypes.AGENT_MODEL, agent_id, model_result.data["id"])
|
|
|
|
# Handle tools
|
|
for tool_config in config.get("tools", []):
|
|
if check_exists:
|
|
# Check for existing tool
|
|
tool_type = self._determine_component_type(tool_config)
|
|
existing_tool = self._check_exists(tool_type, tool_config, user_id)
|
|
if existing_tool:
|
|
# Link existing tool
|
|
self.db_manager.link(LinkTypes.AGENT_TOOL, agent_id, existing_tool.id)
|
|
logger.info(f"Linked existing tool to agent: {existing_tool.config.name}")
|
|
continue
|
|
|
|
# Store and link new tool
|
|
tool_result = await self._store_tool(tool_config, user_id)
|
|
if tool_result.status:
|
|
self.db_manager.link(LinkTypes.AGENT_TOOL, agent_id, tool_result.data["id"])
|
|
|
|
return agent_result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to store agent: {str(e)}")
|
|
return Response(message=str(e), status=False)
|
|
|
|
async def _store_model(self, config: dict, user_id: str) -> Response:
|
|
"""Store model component (leaf node - no relationships)"""
|
|
try:
|
|
model_db = Model(user_id=user_id, config=config)
|
|
return self.db_manager.upsert(model_db)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to store model: {str(e)}")
|
|
return Response(message=str(e), status=False)
|
|
|
|
async def _store_tool(self, config: dict, user_id: str) -> Response:
|
|
"""Store tool component (leaf node - no relationships)"""
|
|
try:
|
|
tool_db = Tool(user_id=user_id, config=config)
|
|
return self.db_manager.upsert(tool_db)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to store tool: {str(e)}")
|
|
return Response(message=str(e), status=False)
|
|
|
|
def _check_exists(
|
|
self, component_type: ComponentTypes, config: dict, user_id: str
|
|
) -> Optional[Union[Model, Tool, Agent, Team]]:
|
|
"""Check if component exists based on configured uniqueness fields."""
|
|
fields = self.uniqueness_fields.get(component_type, [])
|
|
if not fields:
|
|
return None
|
|
|
|
component_class = {
|
|
ComponentTypes.MODEL: Model,
|
|
ComponentTypes.TOOL: Tool,
|
|
ComponentTypes.AGENT: Agent,
|
|
ComponentTypes.TEAM: Team,
|
|
}.get(component_type)
|
|
|
|
components = self.db_manager.get(component_class, {"user_id": user_id}).data
|
|
|
|
for component in components:
|
|
matches = all(component.config.get(field) == config.get(field) for field in fields)
|
|
if matches:
|
|
return component
|
|
|
|
return None
|
|
|
|
def _format_exists_message(self, component_type: ComponentTypes, config: dict) -> str:
|
|
"""Format existence message with identifying fields."""
|
|
fields = self.uniqueness_fields.get(component_type, [])
|
|
field_values = [f"{field}='{config.get(field)}'" for field in fields]
|
|
return f"{component_type.value} with {' and '.join(field_values)} already exists"
|
|
|
|
def _determine_component_type(self, config: dict) -> Optional[ComponentTypes]:
|
|
"""Determine component type from configuration dictionary"""
|
|
if "team_type" in config:
|
|
return ComponentTypes.TEAM
|
|
elif "agent_type" in config:
|
|
return ComponentTypes.AGENT
|
|
elif "model_type" in config:
|
|
return ComponentTypes.MODEL
|
|
elif "tool_type" in config:
|
|
return ComponentTypes.TOOL
|
|
return None
|
|
|
|
def _get_component_type(self, config: dict) -> str:
|
|
"""Helper to get component type string from config"""
|
|
component_type = self._determine_component_type(config)
|
|
return component_type.value if component_type else "unknown"
|
|
|
|
async def cleanup(self):
|
|
"""Cleanup resources"""
|
|
await self.component_factory.cleanup()
|