autogen/python/packages/autogen-magentic-one/examples/example.py

119 lines
4.7 KiB
Python

"""This example demonstrates MagenticOne performing a task given by the user and returning a final answer."""
import argparse
import asyncio
import json
import logging
import os
from autogen_core import EVENT_LOGGER_NAME, AgentId, AgentProxy, SingleThreadedAgentRuntime
from autogen_core.code_executor import CodeBlock
from autogen_core.models._model_client import ChatCompletionClient
from autogen_ext.code_executors.docker import DockerCommandLineCodeExecutor
from autogen_magentic_one.agents.coder import Coder, Executor
from autogen_magentic_one.agents.file_surfer import FileSurfer
from autogen_magentic_one.agents.multimodal_web_surfer import MultimodalWebSurfer
from autogen_magentic_one.agents.orchestrator import LedgerOrchestrator
from autogen_magentic_one.agents.user_proxy import UserProxy
from autogen_magentic_one.messages import RequestReplyMessage
from autogen_magentic_one.utils import LogHandler
# NOTE: Don't forget to 'playwright install --with-deps chromium'
async def confirm_code(code: CodeBlock) -> bool:
response = await asyncio.to_thread(
input,
f"Executor is about to execute code (lang: {code.language}):\n{code.code}\n\nDo you want to proceed? (yes/no): ",
)
return response.lower() == "yes"
async def main(logs_dir: str, hil_mode: bool, save_screenshots: bool) -> None:
# Create the runtime.
runtime = SingleThreadedAgentRuntime()
# Create an appropriate client
client = ChatCompletionClient.load_component(json.loads(os.environ["CHAT_COMPLETION_CLIENT_CONFIG"]))
assert client.model_info["family"] == "gpt-4o", "This example requires the gpt-4o model"
async with DockerCommandLineCodeExecutor(work_dir=logs_dir) as code_executor:
# Register agents.
await Coder.register(runtime, "Coder", lambda: Coder(model_client=client))
coder = AgentProxy(AgentId("Coder", "default"), runtime)
await Executor.register(
runtime,
"Executor",
lambda: Executor("A agent for executing code", executor=code_executor, confirm_execution=confirm_code),
)
executor = AgentProxy(AgentId("Executor", "default"), runtime)
# Register agents.
await MultimodalWebSurfer.register(runtime, "WebSurfer", MultimodalWebSurfer)
web_surfer = AgentProxy(AgentId("WebSurfer", "default"), runtime)
await FileSurfer.register(runtime, "file_surfer", lambda: FileSurfer(model_client=client))
file_surfer = AgentProxy(AgentId("file_surfer", "default"), runtime)
await UserProxy.register(
runtime,
"UserProxy",
lambda: UserProxy(description="The current user interacting with you."),
)
user_proxy = AgentProxy(AgentId("UserProxy", "default"), runtime)
agent_list = [web_surfer, coder, executor, file_surfer]
if hil_mode:
agent_list.append(user_proxy)
await LedgerOrchestrator.register(
runtime,
"Orchestrator",
lambda: LedgerOrchestrator(
agents=agent_list,
model_client=client,
max_rounds=30,
max_time=25 * 60,
return_final_answer=True,
),
)
# orchestrator = AgentProxy(AgentId("Orchestrator", "default"), runtime)
runtime.start()
actual_surfer = await runtime.try_get_underlying_agent_instance(web_surfer.id, type=MultimodalWebSurfer)
await actual_surfer.init(
model_client=client,
downloads_folder=logs_dir,
start_page="https://www.bing.com",
browser_channel="chromium",
headless=True,
debug_dir=logs_dir,
to_save_screenshots=save_screenshots,
)
await runtime.send_message(RequestReplyMessage(), user_proxy.id)
await runtime.stop_when_idle()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run MagenticOne example with log directory.")
parser.add_argument("--logs_dir", type=str, required=True, help="Directory to store log files and downloads")
parser.add_argument("--hil_mode", action="store_true", default=False, help="Run in human-in-the-loop mode")
parser.add_argument(
"--save_screenshots", action="store_true", default=False, help="Save additional browser screenshots to file"
)
args = parser.parse_args()
# Ensure the log directory exists
if not os.path.exists(args.logs_dir):
os.makedirs(args.logs_dir)
logger = logging.getLogger(EVENT_LOGGER_NAME)
logger.setLevel(logging.INFO)
log_handler = LogHandler(filename=os.path.join(args.logs_dir, "log.jsonl"))
logger.handlers = [log_handler]
asyncio.run(main(args.logs_dir, args.hil_mode, args.save_screenshots))