initial extract of the execnet gateway WIP

This commit is contained in:
Ronny Pfannschmidt 2021-02-09 21:49:52 +01:00
parent e4bcbbee78
commit 510f212298
7 changed files with 133 additions and 77 deletions

87
src/xdist/backends.py Normal file
View File

@ -0,0 +1,87 @@
import re
import fnmatch
import os
import py # TODO remove
import pytest
def parse_spec_config(config):
xspeclist = []
for xspec in config.getvalue("tx"):
i = xspec.find("*")
try:
num = int(xspec[:i])
except ValueError:
xspeclist.append(xspec)
else:
xspeclist.extend([xspec[i + 1 :]] * num)
if not xspeclist:
raise pytest.UsageError(
"MISSING test execution (tx) nodes: please specify --tx"
)
return xspeclist
class ExecnetNodeControl:
@classmethod
def from_config(cls, config, specs, defaultchdir):
final_specs = []
import execnet
group = execnet.Group()
if specs is None:
specs = [execnet.XSpec(x) for x in parse_spec_config(config)]
for spec in specs:
if not isinstance(spec, execnet.XSpec):
spec = execnet.XSpec(spec)
if not spec.chdir and not spec.popen:
spec.chdir = defaultchdir
group.allocate_id(spec)
final_specs.append(spec)
return cls(group, final_specs)
def __init__(self, group, specs):
self.group = group
self.specs = specs
@staticmethod
def get_rsync(source, verbose=False, ignores=None):
import execnet
# todo: cache the class
class HostRSync(execnet.RSync):
""" RSyncer that filters out common files
"""
def __init__(self, sourcedir, *args, **kwargs):
self._synced = {}
ignores = kwargs.pop("ignores", None) or []
self._ignores = [
re.compile(fnmatch.translate(getattr(x, "strpath", x)))
for x in ignores
]
super().__init__(sourcedir=sourcedir, **kwargs)
def filter(self, path):
path = py.path.local(path)
for cre in self._ignores:
if cre.match(path.basename) or cre.match(path.strpath):
return False
else:
return True
def add_target_host(self, gateway, finished=None):
remotepath = os.path.basename(self._sourcedir)
super().add_target(
gateway, remotepath, finishedcallback=finished, delete=True
)
def _report_send_file(self, gateway, modified_rel_path):
if self._verbose > 0:
path = os.path.basename(self._sourcedir) + "/" + modified_rel_path
remotepath = gateway.spec.chdir
print("{}:{} <= {}".format(gateway.spec, remotepath, path))
return HostRSync(source, verbose=verbose, ignores=ignores)

View File

@ -308,7 +308,7 @@ class DSession:
"""
spec = node.gateway.spec
spec.id = None
self.nodemanager.group.allocate_id(spec)
self.nodemanager._execnet.group.allocate_id(spec)
node = self.nodemanager.setup_node(spec, self.queue.put)
self._active_nodes.add(node)
return node

View File

@ -10,7 +10,6 @@ import py
import pytest
import sys
import time
import execnet
def pytest_addoption(parser):
@ -65,6 +64,8 @@ class RemoteControl:
print("RemoteControl:", msg)
def initgateway(self):
import execnet
return execnet.makegateway("popen")
def setup(self, out=None):

View File

@ -1,14 +1,12 @@
import fnmatch
import os
import re
import sys
import uuid
import py
import pytest
import execnet
import xdist.remote
from .backends import ExecnetNodeControl
def parse_spec_config(config):
@ -38,17 +36,9 @@ class NodeManager:
self.testrunuid = self.config.getoption("testrunuid")
if self.testrunuid is None:
self.testrunuid = uuid.uuid4().hex
self.group = execnet.Group()
if specs is None:
specs = self._getxspecs()
self.specs = []
for spec in specs:
if not isinstance(spec, execnet.XSpec):
spec = execnet.XSpec(spec)
if not spec.chdir and not spec.popen:
spec.chdir = defaultchdir
self.group.allocate_id(spec)
self.specs.append(spec)
self._execnet = ExecnetNodeControl.from_config(config, specs, defaultchdir)
self.roots = self._getrsyncdirs()
self.rsyncoptions = self._getrsyncoptions()
self._rsynced_specs = set()
@ -60,12 +50,14 @@ class NodeManager:
self.rsync(gateway, root, **self.rsyncoptions)
def setup_nodes(self, putevent):
self.config.hook.pytest_xdist_setupnodes(config=self.config, specs=self.specs)
self.config.hook.pytest_xdist_setupnodes(
config=self.config, specs=self._execnet.specs
)
self.trace("setting up nodes")
return [self.setup_node(spec, putevent) for spec in self.specs]
return [self.setup_node(spec, putevent) for spec in self._execnet.specs]
def setup_node(self, spec, putevent):
gw = self.group.makegateway(spec)
gw = self._execnet.group.makegateway(spec)
self.config.hook.pytest_xdist_newgateway(gateway=gw)
self.rsync_roots(gw)
node = WorkerController(self, gw, self.config, putevent)
@ -75,13 +67,11 @@ class NodeManager:
return node
def teardown_nodes(self):
self.group.terminate(self.EXIT_TIMEOUT)
def _getxspecs(self):
return [execnet.XSpec(x) for x in parse_spec_config(self.config)]
self._execnet.group.terminate(self.EXIT_TIMEOUT)
def _getrsyncdirs(self):
for spec in self.specs:
# todo: move to backends ?
for spec in self._execnet.specs:
if not spec.popen or spec.chdir:
break
else:
@ -130,7 +120,7 @@ class NodeManager:
# XXX This changes the calling behaviour of
# pytest_xdist_rsyncstart and pytest_xdist_rsyncfinish to
# be called once per rsync target.
rsync = HostRSync(source, verbose=verbose, ignores=ignores)
rsync = self._execnet.get_rsync(source, verbose=verbose, ignores=ignores)
spec = gateway.spec
if spec.popen and not spec.chdir:
# XXX This assumes that sources are python-packages
@ -156,37 +146,6 @@ class NodeManager:
self.config.hook.pytest_xdist_rsyncfinish(source=source, gateways=[gateway])
class HostRSync(execnet.RSync):
""" RSyncer that filters out common files
"""
def __init__(self, sourcedir, *args, **kwargs):
self._synced = {}
ignores = kwargs.pop("ignores", None) or []
self._ignores = [
re.compile(fnmatch.translate(getattr(x, "strpath", x))) for x in ignores
]
super().__init__(sourcedir=sourcedir, **kwargs)
def filter(self, path):
path = py.path.local(path)
for cre in self._ignores:
if cre.match(path.basename) or cre.match(path.strpath):
return False
else:
return True
def add_target_host(self, gateway, finished=None):
remotepath = os.path.basename(self._sourcedir)
super().add_target(gateway, remotepath, finishedcallback=finished, delete=True)
def _report_send_file(self, gateway, modified_rel_path):
if self._verbose > 0:
path = os.path.basename(self._sourcedir) + "/" + modified_rel_path
remotepath = gateway.spec.chdir
print("{}:{} <= {}".format(gateway.spec, remotepath, path))
def make_reltoroot(roots, args):
# XXX introduce/use public API for splitting pytest args
splitcode = "::"
@ -224,7 +183,7 @@ class WorkerController:
self.config = config
self.workerinput = {
"workerid": gateway.id,
"workercount": len(nodemanager.specs),
"workercount": len(nodemanager._execnet.specs),
"testrunuid": nodemanager.testrunuid,
"mainargv": sys.argv,
}

View File

@ -149,7 +149,7 @@ class TestDistOptions:
def test_getxspecs(self, testdir):
config = testdir.parseconfigure("--tx=popen", "--tx", "ssh=xyz")
nodemanager = NodeManager(config)
xspecs = nodemanager._getxspecs()
xspecs = nodemanager._execnet.specs
assert len(xspecs) == 2
print(xspecs)
assert xspecs[0].popen
@ -157,7 +157,7 @@ class TestDistOptions:
def test_xspecs_multiplied(self, testdir):
config = testdir.parseconfigure("--tx=3*popen")
xspecs = NodeManager(config)._getxspecs()
xspecs = NodeManager(config)._execnet.specs
assert len(xspecs) == 3
assert xspecs[1].popen

View File

@ -46,7 +46,9 @@ class WorkerSetup:
class DummyMananger:
testrunuid = uuid.uuid4().hex
specs = [0, 1]
class _execnet:
specs = [0, 1]
self.slp = WorkerController(DummyMananger, self.gateway, config, putevent)
self.request.addfinalizer(self.slp.ensure_teardown)

View File

@ -4,7 +4,8 @@ import textwrap
import execnet
from _pytest.pytester import HookRecorder
from xdist import workermanage, newhooks
from xdist.workermanage import HostRSync, NodeManager
from xdist.workermanage import NodeManager
from xdist.backends import ExecnetNodeControl
pytest_plugins = "pytester"
@ -48,13 +49,13 @@ def workercontroller(monkeypatch):
class TestNodeManagerPopen:
def test_popen_no_default_chdir(self, config):
gm = NodeManager(config, ["popen"])
assert gm.specs[0].chdir is None
assert gm._execnet.specs[0].chdir is None
def test_default_chdir(self, config):
specs = ["ssh=noco", "socket=xyz"]
for spec in NodeManager(config, specs).specs:
for spec in NodeManager(config, specs)._execnet.specs:
assert spec.chdir == "pyexecnetcache"
for spec in NodeManager(config, specs, defaultchdir="abc").specs:
for spec in NodeManager(config, specs, defaultchdir="abc")._execnet.specs:
assert spec.chdir == "abc"
def test_popen_makegateway_events(self, config, hookrecorder, workercontroller):
@ -68,16 +69,16 @@ class TestNodeManagerPopen:
assert call.gateway.id == "gw0"
call = hookrecorder.popcall("pytest_xdist_newgateway")
assert call.gateway.id == "gw1"
assert len(hm.group) == 2
assert len(hm._execnet.group) == 2
hm.teardown_nodes()
assert not len(hm.group)
assert not len(hm._execnet.group)
def test_popens_rsync(self, config, mysetup, workercontroller):
source = mysetup.source
hm = NodeManager(config, ["popen"] * 2)
hm.setup_nodes(None)
assert len(hm.group) == 2
for gw in hm.group:
assert len(hm._execnet.group) == 2
for gw in hm._execnet.group:
class pseudoexec:
args = []
@ -90,11 +91,11 @@ class TestNodeManagerPopen:
gw.remote_exec = pseudoexec
notifications = []
for gw in hm.group:
for gw in hm._execnet.group:
hm.rsync(gw, source, notify=lambda *args: notifications.append(args))
assert not notifications
hm.teardown_nodes()
assert not len(hm.group)
assert not len(hm._execnet.group)
assert "sys.path.insert" in gw.remote_exec.args[0]
def test_rsync_popen_with_path(self, config, mysetup, workercontroller):
@ -103,10 +104,14 @@ class TestNodeManagerPopen:
hm.setup_nodes(None)
source.ensure("dir1", "dir2", "hello")
notifications = []
for gw in hm.group:
for gw in hm._execnet.group:
hm.rsync(gw, source, notify=lambda *args: notifications.append(args))
assert len(notifications) == 1
assert notifications[0] == ("rsyncrootready", hm.group["gw0"].spec, source)
assert notifications[0] == (
"rsyncrootready",
hm._execnet.group["gw0"].spec,
source,
)
hm.teardown_nodes()
dest = dest.join(source.basename)
assert dest.join("dir1").check()
@ -121,12 +126,12 @@ class TestNodeManagerPopen:
hm.roots = []
hm.setup_nodes(None)
source.ensure("dir1", "dir2", "hello")
gw = hm.group[0]
gw = hm._execnet.group[0]
hm.rsync(gw, source)
call = hookrecorder.popcall("pytest_xdist_rsyncstart")
assert call.source == source
assert len(call.gateways) == 1
assert call.gateways[0] in hm.group
assert call.gateways[0] in hm._execnet.group
call = hookrecorder.popcall("pytest_xdist_rsyncfinish")
@ -137,7 +142,9 @@ class TestHRSync:
source.ensure(".svn", "entries")
source.ensure(".somedotfile", "moreentries")
source.ensure("somedir", "editfile~")
syncer = HostRSync(source, ignores=NodeManager.DEFAULT_IGNORES)
syncer = ExecnetNodeControl.get_rsync(
source, ignores=NodeManager.DEFAULT_IGNORES
)
files = list(source.visit(rec=syncer.filter, fil=syncer.filter))
assert len(files) == 3
basenames = [x.basename for x in files]
@ -149,7 +156,7 @@ class TestHRSync:
source, dest = mysetup.source, mysetup.dest
gw = execnet.makegateway("popen//chdir=%s" % dest)
finished = []
rsync = HostRSync(source)
rsync = ExecnetNodeControl.get_rsync(source)
rsync.add_target_host(gw, finished=lambda: finished.append(1))
source.join("hello.py").write("world")
rsync.send()
@ -272,7 +279,7 @@ class TestNodeManager:
config = testdir.parseconfig(source)
nodemanager = NodeManager(config, specs)
nodemanager.setup_nodes(None) # calls .rysnc_roots()
for gwspec in nodemanager.specs:
for gwspec in nodemanager._execnet.specs:
assert gwspec._samefilesystem()
assert not gwspec.chdir