299 lines
10 KiB
Python
299 lines
10 KiB
Python
import py
|
|
import pytest
|
|
import textwrap
|
|
import execnet
|
|
from _pytest.pytester import HookRecorder
|
|
from xdist import workermanage, newhooks
|
|
from xdist.workermanage import NodeManager
|
|
from xdist.backends import ExecnetNodeControl
|
|
|
|
pytest_plugins = "pytester"
|
|
|
|
|
|
@pytest.fixture
|
|
def hookrecorder(request, config):
|
|
hookrecorder = HookRecorder(config.pluginmanager)
|
|
if hasattr(hookrecorder, "start_recording"):
|
|
hookrecorder.start_recording(newhooks)
|
|
request.addfinalizer(hookrecorder.finish_recording)
|
|
return hookrecorder
|
|
|
|
|
|
@pytest.fixture
|
|
def config(testdir):
|
|
return testdir.parseconfig()
|
|
|
|
|
|
@pytest.fixture
|
|
def mysetup(tmpdir):
|
|
class mysetup:
|
|
source = tmpdir.mkdir("source")
|
|
dest = tmpdir.mkdir("dest")
|
|
|
|
return mysetup()
|
|
|
|
|
|
@pytest.fixture
|
|
def workercontroller(monkeypatch):
|
|
class MockController:
|
|
def __init__(self, *args):
|
|
pass
|
|
|
|
def setup(self):
|
|
pass
|
|
|
|
monkeypatch.setattr(workermanage, "WorkerController", MockController)
|
|
return MockController
|
|
|
|
|
|
class TestNodeManagerPopen:
|
|
def test_popen_no_default_chdir(self, config):
|
|
gm = NodeManager(config, ["popen"])
|
|
assert gm._execnet.specs[0].chdir is None
|
|
|
|
def test_default_chdir(self, config):
|
|
specs = ["ssh=noco", "socket=xyz"]
|
|
for spec in NodeManager(config, specs)._execnet.specs:
|
|
assert spec.chdir == "pyexecnetcache"
|
|
for spec in NodeManager(config, specs, defaultchdir="abc")._execnet.specs:
|
|
assert spec.chdir == "abc"
|
|
|
|
def test_popen_makegateway_events(self, config, hookrecorder, workercontroller):
|
|
hm = NodeManager(config, ["popen"] * 2)
|
|
hm.setup_nodes(None)
|
|
call = hookrecorder.popcall("pytest_xdist_setupnodes")
|
|
assert len(call.specs) == 2
|
|
|
|
call = hookrecorder.popcall("pytest_xdist_newgateway")
|
|
assert call.gateway.spec == execnet.XSpec("popen")
|
|
assert call.gateway.id == "gw0"
|
|
call = hookrecorder.popcall("pytest_xdist_newgateway")
|
|
assert call.gateway.id == "gw1"
|
|
assert len(hm._execnet.group) == 2
|
|
hm.teardown_nodes()
|
|
assert not len(hm._execnet.group)
|
|
|
|
def test_popens_rsync(self, config, mysetup, workercontroller):
|
|
source = mysetup.source
|
|
hm = NodeManager(config, ["popen"] * 2)
|
|
hm.setup_nodes(None)
|
|
assert len(hm._execnet.group) == 2
|
|
for gw in hm._execnet.group:
|
|
|
|
class pseudoexec:
|
|
args = []
|
|
|
|
def __init__(self, *args):
|
|
self.args.extend(args)
|
|
|
|
def waitclose(self):
|
|
pass
|
|
|
|
gw.remote_exec = pseudoexec
|
|
notifications = []
|
|
for gw in hm._execnet.group:
|
|
hm.rsync(gw, source, notify=lambda *args: notifications.append(args))
|
|
assert not notifications
|
|
hm.teardown_nodes()
|
|
assert not len(hm._execnet.group)
|
|
assert "sys.path.insert" in gw.remote_exec.args[0]
|
|
|
|
def test_rsync_popen_with_path(self, config, mysetup, workercontroller):
|
|
source, dest = mysetup.source, mysetup.dest
|
|
hm = NodeManager(config, ["popen//chdir=%s" % dest] * 1)
|
|
hm.setup_nodes(None)
|
|
source.ensure("dir1", "dir2", "hello")
|
|
notifications = []
|
|
for gw in hm._execnet.group:
|
|
hm.rsync(gw, source, notify=lambda *args: notifications.append(args))
|
|
assert len(notifications) == 1
|
|
assert notifications[0] == (
|
|
"rsyncrootready",
|
|
hm._execnet.group["gw0"].spec,
|
|
source,
|
|
)
|
|
hm.teardown_nodes()
|
|
dest = dest.join(source.basename)
|
|
assert dest.join("dir1").check()
|
|
assert dest.join("dir1", "dir2").check()
|
|
assert dest.join("dir1", "dir2", "hello").check()
|
|
|
|
def test_rsync_same_popen_twice(
|
|
self, config, mysetup, hookrecorder, workercontroller
|
|
):
|
|
source, dest = mysetup.source, mysetup.dest
|
|
hm = NodeManager(config, ["popen//chdir=%s" % dest] * 2)
|
|
hm.roots = []
|
|
hm.setup_nodes(None)
|
|
source.ensure("dir1", "dir2", "hello")
|
|
gw = hm._execnet.group[0]
|
|
hm.rsync(gw, source)
|
|
call = hookrecorder.popcall("pytest_xdist_rsyncstart")
|
|
assert call.source == source
|
|
assert len(call.gateways) == 1
|
|
assert call.gateways[0] in hm._execnet.group
|
|
call = hookrecorder.popcall("pytest_xdist_rsyncfinish")
|
|
|
|
|
|
class TestHRSync:
|
|
def test_hrsync_filter(self, mysetup):
|
|
source, _ = mysetup.source, mysetup.dest # noqa
|
|
source.ensure("dir", "file.txt")
|
|
source.ensure(".svn", "entries")
|
|
source.ensure(".somedotfile", "moreentries")
|
|
source.ensure("somedir", "editfile~")
|
|
syncer = ExecnetNodeControl.get_rsync(
|
|
source, ignores=NodeManager.DEFAULT_IGNORES
|
|
)
|
|
files = list(source.visit(rec=syncer.filter, fil=syncer.filter))
|
|
assert len(files) == 3
|
|
basenames = [x.basename for x in files]
|
|
assert "dir" in basenames
|
|
assert "file.txt" in basenames
|
|
assert "somedir" in basenames
|
|
|
|
def test_hrsync_one_host(self, mysetup):
|
|
source, dest = mysetup.source, mysetup.dest
|
|
gw = execnet.makegateway("popen//chdir=%s" % dest)
|
|
finished = []
|
|
rsync = ExecnetNodeControl.get_rsync(source)
|
|
rsync.add_target_host(gw, finished=lambda: finished.append(1))
|
|
source.join("hello.py").write("world")
|
|
rsync.send()
|
|
gw.exit()
|
|
assert dest.join(source.basename, "hello.py").check()
|
|
assert len(finished) == 1
|
|
|
|
|
|
class TestNodeManager:
|
|
@py.test.mark.xfail(run=False)
|
|
def test_rsync_roots_no_roots(self, testdir, mysetup):
|
|
mysetup.source.ensure("dir1", "file1").write("hello")
|
|
config = testdir.parseconfig(mysetup.source)
|
|
nodemanager = NodeManager(config, ["popen//chdir=%s" % mysetup.dest])
|
|
# assert nodemanager.config.topdir == source == config.topdir
|
|
nodemanager.makegateways()
|
|
nodemanager.rsync_roots()
|
|
(p,) = nodemanager.gwmanager.multi_exec(
|
|
"import os ; channel.send(os.getcwd())"
|
|
).receive_each()
|
|
p = py.path.local(p)
|
|
print("remote curdir", p)
|
|
assert p == mysetup.dest.join(config.topdir.basename)
|
|
assert p.join("dir1").check()
|
|
assert p.join("dir1", "file1").check()
|
|
|
|
def test_popen_rsync_subdir(self, testdir, mysetup, workercontroller):
|
|
source, dest = mysetup.source, mysetup.dest
|
|
dir1 = mysetup.source.mkdir("dir1")
|
|
dir2 = dir1.mkdir("dir2")
|
|
dir2.ensure("hello")
|
|
for rsyncroot in (dir1, source):
|
|
dest.remove()
|
|
nodemanager = NodeManager(
|
|
testdir.parseconfig(
|
|
"--tx", "popen//chdir=%s" % dest, "--rsyncdir", rsyncroot, source
|
|
)
|
|
)
|
|
nodemanager.setup_nodes(None) # calls .rsync_roots()
|
|
if rsyncroot == source:
|
|
dest = dest.join("source")
|
|
assert dest.join("dir1").check()
|
|
assert dest.join("dir1", "dir2").check()
|
|
assert dest.join("dir1", "dir2", "hello").check()
|
|
nodemanager.teardown_nodes()
|
|
|
|
@pytest.mark.parametrize(
|
|
"flag, expects_report", [("-q", False), ("", False), ("-v", True)]
|
|
)
|
|
def test_rsync_report(
|
|
self, testdir, mysetup, workercontroller, capsys, flag, expects_report
|
|
):
|
|
source, dest = mysetup.source, mysetup.dest
|
|
dir1 = mysetup.source.mkdir("dir1")
|
|
args = "--tx", "popen//chdir=%s" % dest, "--rsyncdir", dir1, source
|
|
if flag:
|
|
args += (flag,)
|
|
nodemanager = NodeManager(testdir.parseconfig(*args))
|
|
nodemanager.setup_nodes(None) # calls .rsync_roots()
|
|
out, _ = capsys.readouterr()
|
|
if expects_report:
|
|
assert "<= pytest/__init__.py" in out
|
|
else:
|
|
assert "<= pytest/__init__.py" not in out
|
|
|
|
def test_init_rsync_roots(self, testdir, mysetup, workercontroller):
|
|
source, dest = mysetup.source, mysetup.dest
|
|
dir2 = source.ensure("dir1", "dir2", dir=1)
|
|
source.ensure("dir1", "somefile", dir=1)
|
|
dir2.ensure("hello")
|
|
source.ensure("bogusdir", "file")
|
|
source.join("tox.ini").write(
|
|
textwrap.dedent(
|
|
"""
|
|
[pytest]
|
|
rsyncdirs=dir1/dir2
|
|
"""
|
|
)
|
|
)
|
|
config = testdir.parseconfig(source)
|
|
nodemanager = NodeManager(config, ["popen//chdir=%s" % dest])
|
|
nodemanager.setup_nodes(None) # calls .rsync_roots()
|
|
assert dest.join("dir2").check()
|
|
assert not dest.join("dir1").check()
|
|
assert not dest.join("bogus").check()
|
|
|
|
def test_rsyncignore(self, testdir, mysetup, workercontroller):
|
|
source, dest = mysetup.source, mysetup.dest
|
|
dir2 = source.ensure("dir1", "dir2", dir=1)
|
|
source.ensure("dir5", "dir6", "bogus")
|
|
source.ensure("dir5", "file")
|
|
dir2.ensure("hello")
|
|
source.ensure("foo", "bar")
|
|
source.ensure("bar", "foo")
|
|
source.join("tox.ini").write(
|
|
textwrap.dedent(
|
|
"""
|
|
[pytest]
|
|
rsyncdirs = dir1 dir5
|
|
rsyncignore = dir1/dir2 dir5/dir6 foo*
|
|
"""
|
|
)
|
|
)
|
|
config = testdir.parseconfig(source)
|
|
config.option.rsyncignore = ["bar"]
|
|
nodemanager = NodeManager(config, ["popen//chdir=%s" % dest])
|
|
nodemanager.setup_nodes(None) # calls .rsync_roots()
|
|
assert dest.join("dir1").check()
|
|
assert not dest.join("dir1", "dir2").check()
|
|
assert dest.join("dir5", "file").check()
|
|
assert not dest.join("dir6").check()
|
|
assert not dest.join("foo").check()
|
|
assert not dest.join("bar").check()
|
|
|
|
def test_optimise_popen(self, testdir, mysetup, workercontroller):
|
|
source = mysetup.source
|
|
specs = ["popen"] * 3
|
|
source.join("conftest.py").write("rsyncdirs = ['a']")
|
|
source.ensure("a", dir=1)
|
|
config = testdir.parseconfig(source)
|
|
nodemanager = NodeManager(config, specs)
|
|
nodemanager.setup_nodes(None) # calls .rysnc_roots()
|
|
for gwspec in nodemanager._execnet.specs:
|
|
assert gwspec._samefilesystem()
|
|
assert not gwspec.chdir
|
|
|
|
def test_ssh_setup_nodes(self, specssh, testdir):
|
|
testdir.makepyfile(
|
|
__init__="",
|
|
test_x="""
|
|
def test_one():
|
|
pass
|
|
""",
|
|
)
|
|
reprec = testdir.inline_run(
|
|
"-d", "--rsyncdir=%s" % testdir.tmpdir, "--tx", specssh, testdir.tmpdir
|
|
)
|
|
(rep,) = reprec.getreports("pytest_runtest_logreport")
|
|
assert rep.passed
|