autogen/python/samples/agentchat_chainlit/app_team.py

101 lines
3.9 KiB
Python

from typing import List, cast
import chainlit as cl
import yaml
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.base import TaskResult
from autogen_agentchat.conditions import TextMentionTermination
from autogen_agentchat.messages import ModelClientStreamingChunkEvent, TextMessage
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_core import CancellationToken
from autogen_core.models import ChatCompletionClient
@cl.on_chat_start # type: ignore
async def start_chat() -> None:
# Load model configuration and create the model client.
with open("model_config.yaml", "r") as f:
model_config = yaml.safe_load(f)
model_client = ChatCompletionClient.load_component(model_config)
# Create the assistant agent.
assistant = AssistantAgent(
name="assistant",
model_client=model_client,
system_message="You are a helpful assistant.",
model_client_stream=True, # Enable model client streaming.
)
# Create the critic agent.
critic = AssistantAgent(
name="critic",
model_client=model_client,
system_message="You are a critic. Provide constructive feedback. "
"Respond with 'APPROVE' if your feedback has been addressed.",
model_client_stream=True, # Enable model client streaming.
)
# Termination condition.
termination = TextMentionTermination("APPROVE", sources=["critic"])
# Chain the assistant and critic agents using RoundRobinGroupChat.
group_chat = RoundRobinGroupChat([assistant, critic], termination_condition=termination)
# Set the assistant agent in the user session.
cl.user_session.set("prompt_history", "") # type: ignore
cl.user_session.set("team", group_chat) # type: ignore
@cl.set_starters # type: ignore
async def set_starts() -> List[cl.Starter]:
return [
cl.Starter(
label="Poem Writing",
message="Write a poem about the ocean.",
),
cl.Starter(
label="Story Writing",
message="Write a story about a detective solving a mystery.",
),
cl.Starter(
label="Write Code",
message="Write a function that merge two list of numbers into single sorted list.",
),
]
@cl.on_message # type: ignore
async def chat(message: cl.Message) -> None:
# Get the team from the user session.
team = cast(RoundRobinGroupChat, cl.user_session.get("team")) # type: ignore
# Streaming response message.
streaming_response: cl.Message | None = None
# Stream the messages from the team.
async for msg in team.run_stream(
task=[TextMessage(content=message.content, source="user")],
cancellation_token=CancellationToken(),
):
if isinstance(msg, ModelClientStreamingChunkEvent):
# Stream the model client response to the user.
if streaming_response is None:
# Start a new streaming response.
streaming_response = cl.Message(content=msg.source + ": ", author=msg.source)
await streaming_response.stream_token(msg.content)
elif streaming_response is not None:
# Done streaming the model client response.
# We can skip the current message as it is just the complete message
# of the streaming response.
await streaming_response.send()
# Reset the streaming response so we won't enter this block again
# until the next streaming response is complete.
streaming_response = None
elif isinstance(msg, TaskResult):
# Send the task termination message.
final_message = "Task terminated. "
if msg.stop_reason:
final_message += msg.stop_reason
await cl.Message(content=final_message).send()
else:
# Skip all other message types.
pass