Jupyter Code Executor in v0.4 (alternative implementation) (#4885)

This commit is contained in:
Leon De Andrade 2025-01-18 22:11:40 +01:00 committed by GitHub
parent 918292f51e
commit 34bc82e24f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 457 additions and 0 deletions

View File

@ -58,6 +58,7 @@ python/autogen_ext.tools.code_execution
python/autogen_ext.tools.semantic_kernel
python/autogen_ext.code_executors.local
python/autogen_ext.code_executors.docker
python/autogen_ext.code_executors.jupyter
python/autogen_ext.code_executors.azure
python/autogen_ext.cache_store.diskcache
python/autogen_ext.cache_store.redis

View File

@ -0,0 +1,8 @@
autogen\_ext.code\_executors.jupyter
====================================
.. automodule:: autogen_ext.code_executors.jupyter
:members:
:undoc-members:
:show-inheritance:

View File

@ -56,6 +56,10 @@ redis = [
grpc = [
"grpcio~=1.62.0", # TODO: update this once we have a stable version.
]
jupyter-executor = [
"ipykernel>=6.29.5",
"nbclient>=0.10.2",
]
semantic-kernel-core = [
"semantic-kernel>=1.17.1",

View File

@ -0,0 +1,6 @@
from ._jupyter_code_executor import JupyterCodeExecutor, JupyterCodeResult
__all__ = [
"JupyterCodeExecutor",
"JupyterCodeResult",
]

View File

@ -0,0 +1,263 @@
import asyncio
import base64
import json
import re
import sys
import uuid
from dataclasses import dataclass
from pathlib import Path
from types import TracebackType
if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self
from autogen_core import CancellationToken
from autogen_core.code_executor import CodeBlock, CodeExecutor, CodeResult
from nbclient import NotebookClient
from nbformat import NotebookNode
from nbformat import v4 as nbformat
from .._common import silence_pip
@dataclass
class JupyterCodeResult(CodeResult):
"""A code result class for Jupyter code executor."""
output_files: list[Path]
class JupyterCodeExecutor(CodeExecutor):
"""A code executor class that executes code statefully using [nbclient](https://github.com/jupyter/nbclient).
.. danger::
This will execute code on the local machine. If being used with LLM generated code, caution should be used.
Example of using it directly:
.. code-block:: python
import asyncio
from autogen_core import CancellationToken
from autogen_core.code_executor import CodeBlock
from autogen_ext.code_executors.jupyter import JupyterCodeExecutor
async def main() -> None:
async with JupyterCodeExecutor() as executor:
cancel_token = CancellationToken()
code_blocks = [CodeBlock(code="print('hello world!')", language="python")]
code_result = await executor.execute_code_blocks(code_blocks, cancel_token)
print(code_result)
asyncio.run(main())
Example of using it with :class:`~autogen_ext.tools.code_execution.PythonCodeExecutionTool`:
.. code-block:: python
import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.code_executors.jupyter import JupyterCodeExecutor
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_ext.tools.code_execution import PythonCodeExecutionTool
async def main() -> None:
async with JupyterCodeExecutor() as executor:
tool = PythonCodeExecutionTool(executor)
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent = AssistantAgent("assistant", model_client=model_client, tools=[tool])
result = await agent.run(task="What is the 10th Fibonacci number? Use Python to calculate it.")
print(result)
asyncio.run(main())
Example of using it inside a :class:`~autogen_agentchat.agents._code_executor_agent.CodeExecutorAgent`:
.. code-block:: python
import asyncio
from autogen_agentchat.agents import CodeExecutorAgent
from autogen_agentchat.messages import TextMessage
from autogen_ext.code_executors.jupyter import JupyterCodeExecutor
from autogen_core import CancellationToken
async def main() -> None:
async with JupyterCodeExecutor() as executor:
code_executor_agent = CodeExecutorAgent("code_executor", code_executor=executor)
task = TextMessage(
content='''Here is some code
```python
print('Hello world')
```
''',
source="user",
)
response = await code_executor_agent.on_messages([task], CancellationToken())
print(response.chat_message)
asyncio.run(main())
Args:
kernel_name (str): The kernel name to use. By default, "python3".
timeout (int): The timeout for code execution, by default 60.
output_dir (Path): The directory to save output files, by default ".".
"""
def __init__(
self,
kernel_name: str = "python3",
timeout: int = 60,
output_dir: Path = Path("."),
):
if timeout < 1:
raise ValueError("Timeout must be greater than or equal to 1.")
self._kernel_name = kernel_name
self._timeout = timeout
self._output_dir = output_dir
# TODO: Forward arguments perhaps?
self._client = NotebookClient(
nb=nbformat.new_notebook(), # type: ignore
kernel_name=self._kernel_name,
timeout=self._timeout,
allow_errors=True,
)
async def execute_code_blocks(
self, code_blocks: list[CodeBlock], cancellation_token: CancellationToken
) -> JupyterCodeResult:
"""Execute code blocks and return the result.
Args:
code_blocks (list[CodeBlock]): The code blocks to execute.
Returns:
JupyterCodeResult: The result of the code execution.
"""
outputs: list[str] = []
output_files: list[Path] = []
exit_code = 0
for code_block in code_blocks:
result = await self._execute_code_block(code_block, cancellation_token)
exit_code = result.exit_code
outputs.append(result.output)
output_files.extend(result.output_files)
# Stop execution if one code block fails
if exit_code != 0:
break
return JupyterCodeResult(exit_code=exit_code, output="\n".join(outputs), output_files=output_files)
async def _execute_code_block(
self, code_block: CodeBlock, cancellation_token: CancellationToken
) -> JupyterCodeResult:
"""Execute single code block and return the result.
Args:
code_block (CodeBlock): The code block to execute.
Returns:
JupyterCodeResult: The result of the code execution.
"""
execute_task = asyncio.create_task(
self._execute_cell(
nbformat.new_code_cell(silence_pip(code_block.code, code_block.language)) # type: ignore
)
)
cancellation_token.link_future(execute_task)
output_cell = await asyncio.wait_for(asyncio.shield(execute_task), timeout=self._timeout)
outputs: list[str] = []
output_files: list[Path] = []
exit_code = 0
for output in output_cell.get("outputs", []):
match output.get("output_type"):
case "stream":
outputs.append(output.get("text", ""))
case "error":
traceback = re.sub(r"\x1b\[[0-9;]*[A-Za-z]", "", "\n".join(output["traceback"]))
outputs.append(traceback)
exit_code = 1
case "execute_result" | "display_data":
data = output.get("data", {})
for mime, content in data.items():
match mime:
case "text/plain":
outputs.append(content)
case "image/png":
path = self._save_image(content)
output_files.append(path)
case "image/jpeg":
# TODO: Should this also be encoded? Images are encoded as both png and jpg
pass
case "text/html":
path = self._save_html(content)
output_files.append(path)
case _:
outputs.append(json.dumps(content))
case _:
pass
return JupyterCodeResult(exit_code=exit_code, output="\n".join(outputs), output_files=output_files)
async def _execute_cell(self, cell: NotebookNode) -> NotebookNode:
# Temporary push cell to nb as async_execute_cell expects it. But then we want to remove it again as cells can take up significant amount of memory (especially with images)
self._client.nb.cells.append(cell)
output = await self._client.async_execute_cell(
cell,
cell_index=0,
)
self._client.nb.cells.pop()
return output
def _save_image(self, image_data_base64: str) -> Path:
"""Save image data to a file."""
image_data = base64.b64decode(image_data_base64)
path = self._output_dir / f"{uuid.uuid4().hex}.png"
path.write_bytes(image_data)
return path.absolute()
def _save_html(self, html_data: str) -> Path:
"""Save HTML data to a file."""
path = self._output_dir / f"{uuid.uuid4().hex}.html"
path.write_text(html_data)
return path.absolute()
async def restart(self) -> None:
"""Restart the code executor."""
await self.stop()
await self.start()
async def start(self) -> None:
self.kernel_context = self._client.async_setup_kernel()
await self.kernel_context.__aenter__()
async def stop(self) -> None:
"""Stop the kernel."""
await self.kernel_context.__aexit__(None, None, None)
async def __aenter__(self) -> Self:
await self.start()
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
await self.stop()

View File

@ -0,0 +1,169 @@
import asyncio
import inspect
from pathlib import Path
import pytest
from autogen_core import CancellationToken
from autogen_core.code_executor import CodeBlock
from autogen_ext.code_executors.jupyter import JupyterCodeExecutor, JupyterCodeResult
@pytest.mark.asyncio
async def test_execute_code(tmp_path: Path) -> None:
async with JupyterCodeExecutor(output_dir=tmp_path) as executor:
code_blocks = [CodeBlock(code="import sys; print('hello world!')", language="python")]
code_result = await executor.execute_code_blocks(code_blocks, CancellationToken())
assert code_result == JupyterCodeResult(exit_code=0, output="hello world!\n", output_files=[])
@pytest.mark.asyncio
async def test_execute_code_error(tmp_path: Path) -> None:
async with JupyterCodeExecutor(output_dir=tmp_path) as executor:
code_blocks = [CodeBlock(code="print(undefined_variable)", language="python")]
code_result = await executor.execute_code_blocks(code_blocks, CancellationToken())
assert code_result == JupyterCodeResult(
exit_code=1,
output=inspect.cleandoc("""
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[1], line 1
----> 1 print(undefined_variable)
NameError: name 'undefined_variable' is not defined
"""),
output_files=[],
)
@pytest.mark.asyncio
async def test_execute_multiple_code_blocks(tmp_path: Path) -> None:
async with JupyterCodeExecutor(output_dir=tmp_path) as executor:
code_blocks = [
CodeBlock(code="import sys; print('hello world!')", language="python"),
CodeBlock(code="a = 100 + 100; print(a)", language="python"),
]
code_result = await executor.execute_code_blocks(code_blocks, CancellationToken())
assert code_result == JupyterCodeResult(exit_code=0, output="hello world!\n\n200\n", output_files=[])
@pytest.mark.asyncio
async def test_depedent_executions(tmp_path: Path) -> None:
async with JupyterCodeExecutor(output_dir=tmp_path) as executor:
code_blocks_1 = [CodeBlock(code="a = 'hello world!'", language="python")]
code_blocks_2 = [
CodeBlock(code="print(a)", language="python"),
]
await executor.execute_code_blocks(code_blocks_1, CancellationToken())
code_result = await executor.execute_code_blocks(code_blocks_2, CancellationToken())
assert code_result == JupyterCodeResult(exit_code=0, output="hello world!\n", output_files=[])
@pytest.mark.asyncio
async def test_execute_multiple_code_blocks_error(tmp_path: Path) -> None:
async with JupyterCodeExecutor(output_dir=tmp_path) as executor:
code_blocks = [
CodeBlock(code="import sys; print('hello world!')", language="python"),
CodeBlock(code="a = 100 + 100; print(a); print(undefined_variable)", language="python"),
]
code_result = await executor.execute_code_blocks(code_blocks, CancellationToken())
assert code_result == JupyterCodeResult(
exit_code=1,
output=inspect.cleandoc("""
hello world!
200
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
Cell In[2], line 1
----> 1 a = 100 + 100; print(a); print(undefined_variable)
NameError: name 'undefined_variable' is not defined
"""),
output_files=[],
)
@pytest.mark.asyncio
async def test_execute_code_after_restart(tmp_path: Path) -> None:
async with JupyterCodeExecutor(output_dir=tmp_path) as executor:
await executor.restart()
code_blocks = [CodeBlock(code="import sys; print('hello world!')", language="python")]
code_result = await executor.execute_code_blocks(code_blocks, CancellationToken())
assert code_result == JupyterCodeResult(exit_code=0, output="hello world!\n", output_files=[])
@pytest.mark.asyncio
async def test_commandline_code_executor_timeout(tmp_path: Path) -> None:
async with JupyterCodeExecutor(output_dir=tmp_path, timeout=2) as executor:
code_blocks = [CodeBlock(code="import time; time.sleep(10); print('hello world!')", language="python")]
with pytest.raises(asyncio.TimeoutError):
await executor.execute_code_blocks(code_blocks, CancellationToken())
@pytest.mark.asyncio
async def test_commandline_code_executor_cancellation(tmp_path: Path) -> None:
async with JupyterCodeExecutor(output_dir=tmp_path) as executor:
code_blocks = [CodeBlock(code="import time; time.sleep(10); print('hello world!')", language="python")]
cancellation_token = CancellationToken()
code_result_coroutine = executor.execute_code_blocks(code_blocks, cancellation_token)
await asyncio.sleep(1)
cancellation_token.cancel()
with pytest.raises(asyncio.CancelledError):
await code_result_coroutine
@pytest.mark.asyncio
async def test_execute_code_with_image_output(tmp_path: Path) -> None:
async with JupyterCodeExecutor(output_dir=tmp_path) as executor:
code_blocks = [
CodeBlock(
code=inspect.cleandoc("""
from PIL import Image, ImageDraw
img = Image.new("RGB", (100, 100), color="white")
draw = ImageDraw.Draw(img)
draw.rectangle((10, 10, 90, 90), outline="black", fill="blue")
display(img)
"""),
language="python",
)
]
code_result = await executor.execute_code_blocks(code_blocks, CancellationToken())
assert len(code_result.output_files) == 1
assert code_result == JupyterCodeResult(
exit_code=0,
output="<PIL.Image.Image image mode=RGB size=100x100>",
output_files=code_result.output_files,
)
assert code_result.output_files[0].parent == tmp_path
@pytest.mark.asyncio
async def test_execute_code_with_html_output(tmp_path: Path) -> None:
async with JupyterCodeExecutor(output_dir=tmp_path) as executor:
code_blocks = [
CodeBlock(
code=inspect.cleandoc("""
from IPython.core.display import HTML
HTML("<div style='color:blue'>Hello, HTML world!</div>")
"""),
language="python",
)
]
code_result = await executor.execute_code_blocks(code_blocks, CancellationToken())
assert len(code_result.output_files) == 1
assert code_result == JupyterCodeResult(
exit_code=0,
output="<IPython.core.display.HTML object>",
output_files=code_result.output_files,
)
assert code_result.output_files[0].parent == tmp_path

View File

@ -580,6 +580,10 @@ graphrag = [
grpc = [
{ name = "grpcio" },
]
jupyter-executor = [
{ name = "ipykernel" },
{ name = "nbclient" },
]
langchain = [
{ name = "langchain-core" },
]
@ -665,10 +669,12 @@ requires-dist = [
{ name = "ffmpeg-python", marker = "extra == 'video-surfer'" },
{ name = "graphrag", marker = "extra == 'graphrag'", specifier = ">=1.0.1" },
{ name = "grpcio", marker = "extra == 'grpc'", specifier = "~=1.62.0" },
{ name = "ipykernel", marker = "extra == 'jupyter-executor'", specifier = ">=6.29.5" },
{ name = "langchain-core", marker = "extra == 'langchain'", specifier = "~=0.3.3" },
{ name = "markitdown", marker = "extra == 'file-surfer'", specifier = ">=0.0.1a2" },
{ name = "markitdown", marker = "extra == 'magentic-one'", specifier = ">=0.0.1a2" },
{ name = "markitdown", marker = "extra == 'web-surfer'", specifier = ">=0.0.1a2" },
{ name = "nbclient", marker = "extra == 'jupyter-executor'", specifier = ">=0.10.2" },
{ name = "openai", marker = "extra == 'openai'", specifier = ">=1.52.2" },
{ name = "openai-whisper", marker = "extra == 'video-surfer'" },
{ name = "opencv-python", marker = "extra == 'video-surfer'", specifier = ">=4.5" },