fix: Make cleanup code in docker CodeExecutor asyncio aware (#669)

Co-authored-by: Jack Gerrits <jackgerrits@users.noreply.github.com>
This commit is contained in:
Jacob Alber 2024-10-01 15:36:05 -04:00 committed by Jack Gerrits
parent 6019131480
commit 499b3fcbbf
4 changed files with 139 additions and 7 deletions

View File

@ -24,7 +24,8 @@ dependencies = [
"tiktoken", "tiktoken",
"azure-core", "azure-core",
"docker~=7.0", "docker~=7.0",
"opentelemetry-api~=1.27.0" "opentelemetry-api~=1.27.0",
"asyncio_atexit"
] ]
[tool.uv] [tool.uv]

View File

@ -4,7 +4,6 @@
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import atexit
import logging import logging
import shlex import shlex
import sys import sys
@ -15,6 +14,7 @@ from pathlib import Path
from types import TracebackType from types import TracebackType
from typing import Any, Callable, ClassVar, List, Optional, ParamSpec, Type, Union from typing import Any, Callable, ClassVar, List, Optional, ParamSpec, Type, Union
import asyncio_atexit
import docker import docker
import docker.models import docker.models
import docker.models.containers import docker.models.containers
@ -326,13 +326,12 @@ $functions"""
await _wait_for_ready(self._container) await _wait_for_ready(self._container)
def cleanup() -> None: async def cleanup() -> None:
loop = asyncio.get_event_loop() await self.stop()
loop.run_until_complete(self.stop()) asyncio_atexit.unregister(cleanup) # type: ignore
atexit.unregister(cleanup)
if self._stop_container: if self._stop_container:
atexit.register(cleanup) asyncio_atexit.register(cleanup) # type: ignore
# Check if the container is running # Check if the container is running
if self._container.status != "running": if self._container.status != "running":

View File

@ -0,0 +1,121 @@
import asyncio
import typing as t
from functools import partial
from typing import Protocol
import asyncio_atexit
import pytest
class AtExitImpl(Protocol):
def register(self, func: t.Callable[..., t.Any], /, *args: t.Any, **kwargs: t.Any) -> t.Callable[..., t.Any]: ...
def unregister(self, func: t.Callable[..., t.Any], /) -> None: ...
class AtExitSimulator(AtExitImpl):
def __init__(self) -> None:
self._funcs: t.List[t.Callable[..., t.Any]] = []
def complete(self) -> None:
for func in self._funcs:
func()
self._funcs.clear()
def register(self, func: t.Callable[..., t.Any], /, *args: t.Any, **kwargs: t.Any) -> t.Callable[..., t.Any]:
self._funcs.append(func)
return func
def unregister(self, func: t.Callable[..., t.Any], /) -> None:
self._funcs.remove(func)
class AsyncioAtExitWrapper(AtExitImpl):
"""This only exists to make mypy happy"""
def register(self, func: t.Callable[..., t.Any], /, *args: t.Any, **kwargs: t.Any) -> t.Callable[..., t.Any]:
loop = None
if "loop" in kwargs:
loop = kwargs["loop"]
kwargs.pop("loop")
wrapper = partial(func, *args, **kwargs)
asyncio_atexit.register(wrapper, loop=loop) # type: ignore
return func
def unregister(self, func: t.Callable[..., t.Any], /, **kwargs: t.Any) -> None:
loop = None
if "loop" in kwargs:
loop = kwargs["loop"]
kwargs.pop("loop")
asyncio_atexit.unregister(func, loop=loop) # type: ignore
# From Issue #584: No EventLoop error when agents exit.
# see: https://github.com/microsoft/agnext/issues/584
# This is a minimal implementation of a component that requires cleanup on exit.
class CleanupComponent:
def __init__(self, atexit_impl: AtExitImpl, use_async_cleanup: bool) -> None:
self.atexit_impl = atexit_impl
self.cleanup_has_run = False
self.stop_has_run = False
self.cleanup = self._acleanup if use_async_cleanup else self._cleanup
self.atexit_impl.register(self.cleanup)
async def stop(self) -> None:
self.stop_has_run = True
async def _acleanup(self) -> None:
self.cleanup_has_run = True
await self.stop()
def _cleanup(self) -> None:
self.cleanup_has_run = True
loop = asyncio.get_running_loop()
loop.run_until_complete(self.stop())
async def create_component(atexit_impl: AtExitImpl, /, use_async_cleanup: bool) -> CleanupComponent:
await asyncio.sleep(0.001)
return CleanupComponent(atexit_impl, use_async_cleanup)
def run_test_impl(debug_printer: t.Callable[[str], t.Any] | None = None) -> None:
def validate(component: CleanupComponent, expect_exception: bool, expect_stop: bool) -> None:
if debug_printer is not None:
debug_printer(f"Cleanup ran: {component.cleanup_has_run} (expected True)")
debug_printer(f"Stop ran: {component.stop_has_run} (expected {expect_stop})")
assert component.cleanup_has_run, "Cleanup should always run to be a faithful simulation."
assert component.stop_has_run == expect_stop
# AtExitSimulator behaves like atexit.register, while causes cleanup relying on it to fail.
atexit_simulator = AtExitSimulator()
loop = asyncio.new_event_loop()
component = loop.run_until_complete(create_component(atexit_simulator, use_async_cleanup=False))
loop.close()
with pytest.raises(RuntimeError):
atexit_simulator.complete()
validate(component, expect_exception=True, expect_stop=False)
loop = asyncio.new_event_loop()
component = loop.run_until_complete(create_component(AsyncioAtExitWrapper(), use_async_cleanup=True))
loop.close()
validate(component, expect_exception=False, expect_stop=True)
def test_asyncio_atexit_assumptions() -> None:
run_test_impl()
if __name__ == "__main__":
debug_printer = print
run_test_impl(debug_printer=debug_printer)

View File

@ -324,6 +324,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/a7/fa/e01228c2938de91d47b307831c62ab9e4001e747789d0b05baf779a6488c/async_timeout-4.0.3-py3-none-any.whl", hash = "sha256:7405140ff1230c310e51dc27b3145b9092d659ce68ff733fb0cefe3ee42be028", size = 5721 }, { url = "https://files.pythonhosted.org/packages/a7/fa/e01228c2938de91d47b307831c62ab9e4001e747789d0b05baf779a6488c/async_timeout-4.0.3-py3-none-any.whl", hash = "sha256:7405140ff1230c310e51dc27b3145b9092d659ce68ff733fb0cefe3ee42be028", size = 5721 },
] ]
[[package]]
name = "asyncio-atexit"
version = "1.0.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/22/d3/dd2974be3f67c7ec96e0d6ab454429d0372cb7c7bffa3d0ac67a483cb801/asyncio-atexit-1.0.1.tar.gz", hash = "sha256:1d0c71544b8ee2c484d322844ee72c0875dde6f250c0ed5b6993592ab9f7d436", size = 4373 }
wheels = [
{ url = "https://files.pythonhosted.org/packages/65/10/d6abaefa57a52646651fd0383c056280b0853c0106229ece6bb38cd14463/asyncio_atexit-1.0.1-py3-none-any.whl", hash = "sha256:d93d5f7d5633a534abd521ce2896ed0fbe8de170bb1e65ec871d1c20eac9d376", size = 3752 },
]
[[package]] [[package]]
name = "attrs" name = "attrs"
version = "24.2.0" version = "24.2.0"
@ -350,6 +359,7 @@ version = "0.4.0.dev0"
source = { editable = "packages/autogen-core" } source = { editable = "packages/autogen-core" }
dependencies = [ dependencies = [
{ name = "aiohttp" }, { name = "aiohttp" },
{ name = "asyncio-atexit" },
{ name = "azure-core" }, { name = "azure-core" },
{ name = "docker" }, { name = "docker" },
{ name = "grpcio" }, { name = "grpcio" },
@ -406,6 +416,7 @@ dev = [
[package.metadata] [package.metadata]
requires-dist = [ requires-dist = [
{ name = "aiohttp" }, { name = "aiohttp" },
{ name = "asyncio-atexit" },
{ name = "azure-core" }, { name = "azure-core" },
{ name = "docker", specifier = "~=7.0" }, { name = "docker", specifier = "~=7.0" },
{ name = "grpcio", specifier = "~=1.62.0" }, { name = "grpcio", specifier = "~=1.62.0" },