Run pre-commit: black, whitespaces, rst

This commit is contained in:
Bruno Oliveira 2018-07-27 17:50:55 -03:00
parent 1ed3884da9
commit fced1645cb
25 changed files with 1153 additions and 934 deletions

View File

@ -5,10 +5,10 @@
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE

View File

@ -1,78 +1,78 @@
# Overview #
`xdist` works by spawning one or more **workers**, which are controlled
by the **master**. Each **worker** is responsible for performing
by the **master**. Each **worker** is responsible for performing
a full test collection and afterwards running tests as dictated by the **master**.
The execution flow is:
1. **master** spawns one or more **workers** at the beginning of
the test session. The communication between **master** and **worker** nodes makes use of
the test session. The communication between **master** and **worker** nodes makes use of
[execnet](http://codespeak.net/execnet/) and its [gateways](http://codespeak.net/execnet/basics.html#gateways-bootstrapping-python-interpreters).
The actual interpreters executing the code for the **workers** might
be remote or local.
be remote or local.
1. Each **worker** itself is a mini pytest runner. **workers** at this
point perform a full test collection, sending back the collected
point perform a full test collection, sending back the collected
test-ids back to the **master** which does not
perform any collection itself.
1. The **master** receives the result of the collection from all nodes.
At this point the **master** performs some sanity check to ensure that
all **workers** collected the same tests (including order), bailing out otherwise.
If all is well, it converts the list of test-ids into a list of simple
indexes, where each index corresponds to the position of that test in the
original collection list. This works because all nodes have the same
original collection list. This works because all nodes have the same
collection list, and saves bandwidth because the **master** can now tell
one of the workers to just *execute test index 3* index of passing the
full test id.
1. If **dist-mode** is **each**: the **master** just sends the full list
of test indexes to each node at this moment.
1. If **dist-mode** is **load**: the **master** takes around 25% of the
tests and sends them one by one to each **worker** in a round robin
fashion. The rest of the tests will be distributed later as **workers**
finish tests (see below).
1. Note that `pytest_xdist_make_scheduler` hook can be used to implement custom tests distribution logic.
1. **workers** re-implement `pytest_runtestloop`: pytest's default implementation
basically loops over all collected items in the `session` object and executes
the `pytest_runtest_protocol` for each test item, but in xdist **workers** sit idly
the `pytest_runtest_protocol` for each test item, but in xdist **workers** sit idly
waiting for **master** to send tests for execution. As tests are
received by **workers**, `pytest_runtest_protocol` is executed for each test.
Here it worth noting an implementation detail: **workers** always must keep at
least one test item on their queue due to how the `pytest_runtest_protocol(item, nextitem)`
hook is defined: in order to pass the `nextitem` to the hook, the worker must wait for more
instructions from master before executing that remaining test. If it receives more tests,
then it can safely call `pytest_runtest_protocol` because it knows what the `nextitem` parameter will be.
If it receives a "shutdown" signal, then it can execute the hook passing `nextitem` as `None`.
received by **workers**, `pytest_runtest_protocol` is executed for each test.
Here it worth noting an implementation detail: **workers** always must keep at
least one test item on their queue due to how the `pytest_runtest_protocol(item, nextitem)`
hook is defined: in order to pass the `nextitem` to the hook, the worker must wait for more
instructions from master before executing that remaining test. If it receives more tests,
then it can safely call `pytest_runtest_protocol` because it knows what the `nextitem` parameter will be.
If it receives a "shutdown" signal, then it can execute the hook passing `nextitem` as `None`.
1. As tests are started and completed at the **workers**, the results are sent
back to the **master**, which then just forwards the results to
the appropriate pytest hooks: `pytest_runtest_logstart` and
back to the **master**, which then just forwards the results to
the appropriate pytest hooks: `pytest_runtest_logstart` and
`pytest_runtest_logreport`. This way other plugins (for example `junitxml`)
can work normally. The **master** (when in dist-mode **load**)
can work normally. The **master** (when in dist-mode **load**)
decides to send more tests to a node when a test completes, using
some heuristics such as test durations and how many tests each **worker**
still has to run.
1. When the **master** has no more pending tests it will
send a "shutdown" signal to all **workers**, which will then run their
remaining tests to completion and shut down. At this point the
send a "shutdown" signal to all **workers**, which will then run their
remaining tests to completion and shut down. At this point the
**master** will sit waiting for **workers** to shut down, still
processing events such as `pytest_runtest_logreport`.
## FAQ ##
> Why does each worker do its own collection, as opposed to having
> Why does each worker do its own collection, as opposed to having
the master collect once and distribute from that collection to the workers?
If collection was performed by master then it would have to
serialize collected items to send them through the wire, as workers live in another process.
The problem is that test items are not easily (impossible?) to serialize, as they contain references to
the test functions, fixture managers, config objects, etc. Even if one manages to serialize it,
it seems it would be very hard to get it right and easy to break by any small change in pytest.
If collection was performed by master then it would have to
serialize collected items to send them through the wire, as workers live in another process.
The problem is that test items are not easily (impossible?) to serialize, as they contain references to
the test functions, fixture managers, config objects, etc. Even if one manages to serialize it,
it seems it would be very hard to get it right and easy to break by any small change in pytest.

View File

@ -1,8 +1,8 @@
.. note::
Since 1.19.0, the actual implementation of the ``--boxed`` option has been moved to a
separate plugin, `pytest-forked <https://github.com/pytest-dev/pytest-forked>`_
which can be installed independently. The ``--boxed`` command-line options remains
Since 1.19.0, the actual implementation of the ``--boxed`` option has been moved to a
separate plugin, `pytest-forked <https://github.com/pytest-dev/pytest-forked>`_
which can be installed independently. The ``--boxed`` command-line options remains
for backward compatibility reasons.

View File

@ -3,7 +3,6 @@ from unittest import TestCase
class Delta1(TestCase):
def test_delta0(self):
sleep(5)
assert True
@ -46,7 +45,6 @@ class Delta1(TestCase):
class Delta2(TestCase):
def test_delta0(self):
sleep(5)
assert True

View File

@ -1,47 +1,44 @@
from setuptools import setup, find_packages
install_requires = ['execnet>=1.1', 'pytest>=3.4', 'pytest-forked']
install_requires = ["execnet>=1.1", "pytest>=3.4", "pytest-forked"]
setup(
name="pytest-xdist",
use_scm_version={'write_to': 'xdist/_version.py'},
description='pytest xdist plugin for distributed testing'
' and loop-on-failing modes',
long_description=open('README.rst').read(),
license='MIT',
author='holger krekel and contributors',
author_email='pytest-dev@python.org,holger@merlinux.eu',
url='https://github.com/pytest-dev/pytest-xdist',
platforms=['linux', 'osx', 'win32'],
packages=find_packages(exclude=['testing', 'example']),
use_scm_version={"write_to": "xdist/_version.py"},
description="pytest xdist plugin for distributed testing"
" and loop-on-failing modes",
long_description=open("README.rst").read(),
license="MIT",
author="holger krekel and contributors",
author_email="pytest-dev@python.org,holger@merlinux.eu",
url="https://github.com/pytest-dev/pytest-xdist",
platforms=["linux", "osx", "win32"],
packages=find_packages(exclude=["testing", "example"]),
entry_points={
'pytest11': [
'xdist = xdist.plugin',
'xdist.looponfail = xdist.looponfail',
],
"pytest11": ["xdist = xdist.plugin", "xdist.looponfail = xdist.looponfail"]
},
zip_safe=False,
python_requires='>=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*',
python_requires=">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*",
install_requires=install_requires,
setup_requires=['setuptools_scm'],
setup_requires=["setuptools_scm"],
classifiers=[
'Development Status :: 5 - Production/Stable',
'Framework :: Pytest',
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Operating System :: POSIX',
'Operating System :: Microsoft :: Windows',
'Operating System :: MacOS :: MacOS X',
'Topic :: Software Development :: Testing',
'Topic :: Software Development :: Quality Assurance',
'Topic :: Utilities',
'Programming Language :: Python',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
"Development Status :: 5 - Production/Stable",
"Framework :: Pytest",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Operating System :: POSIX",
"Operating System :: Microsoft :: Windows",
"Operating System :: MacOS :: MacOS X",
"Topic :: Software Development :: Testing",
"Topic :: Software Development :: Quality Assurance",
"Topic :: Utilities",
"Programming Language :: Python",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.4",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6",
],
)

File diff suppressed because it is too large Load Diff

View File

@ -20,6 +20,7 @@ pytest_plugins = "pytester"
@pytest.fixture(autouse=True)
def _divert_atexit(request, monkeypatch):
import atexit
finalizers = []
def finish():
@ -31,10 +32,12 @@ def _divert_atexit(request, monkeypatch):
def pytest_addoption(parser):
parser.addoption('--gx',
action="append",
dest="gspecs",
help="add a global test environment, XSpec-syntax. ")
parser.addoption(
"--gx",
action="append",
dest="gspecs",
help="add a global test environment, XSpec-syntax. ",
)
@pytest.fixture

View File

@ -1,9 +1,6 @@
from xdist.dsession import DSession
from xdist.report import report_collection_diff
from xdist.scheduler import (
EachScheduling,
LoadScheduling,
)
from xdist.scheduler import EachScheduling, LoadScheduling
import py
import pytest
@ -60,7 +57,7 @@ class TestEachScheduling:
sched = EachScheduling(config)
sched.add_node(node1)
sched.add_node(node2)
collection = ["a.py::test_1", ]
collection = ["a.py::test_1"]
assert not sched.collection_is_completed
sched.add_node_collection(node1, collection)
assert not sched.collection_is_completed
@ -70,8 +67,8 @@ class TestEachScheduling:
assert sched.node2collection[node2] == collection
sched.schedule()
assert sched.tests_finished
assert node1.sent == ['ALL']
assert node2.sent == ['ALL']
assert node1.sent == ["ALL"]
assert node2.sent == ["ALL"]
sched.mark_test_complete(node1, 0)
assert sched.tests_finished
sched.mark_test_complete(node2, 0)
@ -82,7 +79,7 @@ class TestEachScheduling:
config = testdir.parseconfig("--tx=popen")
sched = EachScheduling(config)
sched.add_node(node1)
collection = ["a.py::test_1", ]
collection = ["a.py::test_1"]
assert not sched.collection_is_completed
sched.add_node_collection(node1, collection)
assert sched.collection_is_completed
@ -230,7 +227,7 @@ class TestLoadScheduling:
sched.schedule()
assert len(collect_hook.reports) == 1
rep = collect_hook.reports[0]
assert 'Different tests were collected between' in rep.longrepr
assert "Different tests were collected between" in rep.longrepr
class TestDistReporter:
@ -238,6 +235,7 @@ class TestDistReporter:
def test_rsync_printing(self, testdir, linecomp):
config = testdir.parseconfig()
from _pytest.pytest_terminal import TerminalReporter
rep = TerminalReporter(config, file=linecomp.stringio)
config.pluginmanager.register(rep, "terminalreporter")
dsession = DSession(config)
@ -249,6 +247,7 @@ class TestDistReporter:
class gw2:
id = "X2"
spec = execnet.XSpec("popen")
# class rinfo:
# version_info = (2, 5, 1, 'final', 0)
# executable = "hello"
@ -260,47 +259,50 @@ class TestDistReporter:
# "*X1*popen*xyz*2.5*"
# ])
dsession.pytest_xdist_rsyncstart(source="hello", gateways=[gw1, gw2])
linecomp.assert_contains_lines(["[X1,X2] rsyncing: hello", ])
linecomp.assert_contains_lines(["[X1,X2] rsyncing: hello"])
def test_report_collection_diff_equal():
"""Test reporting of equal collections."""
from_collection = to_collection = ['aaa', 'bbb', 'ccc']
from_collection = to_collection = ["aaa", "bbb", "ccc"]
assert report_collection_diff(from_collection, to_collection, 1, 2) is None
def test_report_collection_diff_different():
"""Test reporting of different collections."""
from_collection = ['aaa', 'bbb', 'ccc', 'YYY']
to_collection = ['aZa', 'bbb', 'XXX', 'ccc']
from_collection = ["aaa", "bbb", "ccc", "YYY"]
to_collection = ["aZa", "bbb", "XXX", "ccc"]
error_message = (
'Different tests were collected between 1 and 2. The difference is:\n'
'--- 1\n'
'\n'
'+++ 2\n'
'\n'
'@@ -1,4 +1,4 @@\n'
'\n'
'-aaa\n'
'+aZa\n'
' bbb\n'
'+XXX\n'
' ccc\n'
'-YYY')
"Different tests were collected between 1 and 2. The difference is:\n"
"--- 1\n"
"\n"
"+++ 2\n"
"\n"
"@@ -1,4 +1,4 @@\n"
"\n"
"-aaa\n"
"+aZa\n"
" bbb\n"
"+XXX\n"
" ccc\n"
"-YYY"
)
msg = report_collection_diff(from_collection, to_collection, '1', '2')
msg = report_collection_diff(from_collection, to_collection, "1", "2")
assert msg == error_message
@pytest.mark.xfail(reason="duplicate test ids not supported yet")
def test_pytest_issue419(testdir):
testdir.makepyfile("""
testdir.makepyfile(
"""
import pytest
@pytest.mark.parametrize('birth_year', [1988, 1988, ])
def test_2011_table(birth_year):
pass
""")
"""
)
reprec = testdir.inline_run("-n1")
reprec.assertoutcome(passed=2)
assert 0

View File

@ -60,7 +60,7 @@ class TestStatRecorder:
p.remove()
# make check()'s visit() call return our just removed
# path as if we were in a race condition
monkeypatch.setattr(tmp, 'visit', lambda *args: [p])
monkeypatch.setattr(tmp, "visit", lambda *args: [p])
changed = sd.check()
assert changed
@ -84,7 +84,7 @@ class TestStatRecorder:
sd = StatRecorder([tmp])
ret_values = [True, False]
monkeypatch.setattr(StatRecorder, 'check', lambda self: ret_values.pop())
monkeypatch.setattr(StatRecorder, "check", lambda self: ret_values.pop())
sd.waitonchange(checkinterval=0.2)
assert not ret_values
@ -110,19 +110,25 @@ class TestRemoteControl:
assert not failures
def test_failure_change(self, testdir):
modcol = testdir.getitem("""
modcol = testdir.getitem(
"""
def test_func():
assert 0
""")
"""
)
control = RemoteControl(modcol.config)
control.loop_once()
assert control.failures
modcol.fspath.write(py.code.Source("""
modcol.fspath.write(
py.code.Source(
"""
def test_func():
assert 1
def test_new():
assert 0
"""))
"""
)
)
removepyc(modcol.fspath)
control.loop_once()
assert not control.failures
@ -131,14 +137,17 @@ class TestRemoteControl:
assert str(control.failures).find("test_new") != -1
def test_failure_subdir_no_init(self, testdir):
modcol = testdir.getitem("""
modcol = testdir.getitem(
"""
def test_func():
assert 0
""")
"""
)
parent = modcol.fspath.dirpath().dirpath()
parent.chdir()
modcol.config.args = [py.path.local(x).relto(parent)
for x in modcol.config.args]
modcol.config.args = [
py.path.local(x).relto(parent) for x in modcol.config.args
]
control = RemoteControl(modcol.config)
control.loop_once()
assert control.failures
@ -148,70 +157,87 @@ class TestRemoteControl:
class TestLooponFailing:
def test_looponfail_from_fail_to_ok(self, testdir):
modcol = testdir.getmodulecol("""
modcol = testdir.getmodulecol(
"""
def test_one():
x = 0
assert x == 1
def test_two():
assert 1
""")
"""
)
remotecontrol = RemoteControl(modcol.config)
remotecontrol.loop_once()
assert len(remotecontrol.failures) == 1
modcol.fspath.write(py.code.Source("""
modcol.fspath.write(
py.code.Source(
"""
def test_one():
assert 1
def test_two():
assert 1
"""))
"""
)
)
removepyc(modcol.fspath)
remotecontrol.loop_once()
assert not remotecontrol.failures
def test_looponfail_from_one_to_two_tests(self, testdir):
modcol = testdir.getmodulecol("""
modcol = testdir.getmodulecol(
"""
def test_one():
assert 0
""")
"""
)
remotecontrol = RemoteControl(modcol.config)
remotecontrol.loop_once()
assert len(remotecontrol.failures) == 1
assert 'test_one' in remotecontrol.failures[0]
assert "test_one" in remotecontrol.failures[0]
modcol.fspath.write(py.code.Source("""
modcol.fspath.write(
py.code.Source(
"""
def test_one():
assert 1 # passes now
def test_two():
assert 0 # new and fails
"""))
"""
)
)
removepyc(modcol.fspath)
remotecontrol.loop_once()
assert len(remotecontrol.failures) == 0
remotecontrol.loop_once()
assert len(remotecontrol.failures) == 1
assert 'test_one' not in remotecontrol.failures[0]
assert 'test_two' in remotecontrol.failures[0]
assert "test_one" not in remotecontrol.failures[0]
assert "test_two" in remotecontrol.failures[0]
@py.test.mark.xfail(py.test.__version__ >= "3.1",
reason="broken by pytest 3.1+")
@py.test.mark.xfail(py.test.__version__ >= "3.1", reason="broken by pytest 3.1+")
def test_looponfail_removed_test(self, testdir):
modcol = testdir.getmodulecol("""
modcol = testdir.getmodulecol(
"""
def test_one():
assert 0
def test_two():
assert 0
""")
"""
)
remotecontrol = RemoteControl(modcol.config)
remotecontrol.loop_once()
assert len(remotecontrol.failures) == 2
modcol.fspath.write(py.code.Source("""
modcol.fspath.write(
py.code.Source(
"""
def test_xxx(): # renamed test
assert 0
def test_two():
assert 1 # pass now
"""))
"""
)
)
removepyc(modcol.fspath)
remotecontrol.loop_once()
assert len(remotecontrol.failures) == 0
@ -220,10 +246,12 @@ class TestLooponFailing:
assert len(remotecontrol.failures) == 1
def test_looponfail_multiple_errors(self, testdir, monkeypatch):
modcol = testdir.getmodulecol("""
modcol = testdir.getmodulecol(
"""
def test_one():
assert 0
""")
"""
)
remotecontrol = RemoteControl(modcol.config)
orig_runsession = remotecontrol.runsession
@ -233,18 +261,20 @@ class TestLooponFailing:
print(failures)
return failures * 2, reports, collection_failed
monkeypatch.setattr(remotecontrol, 'runsession', runsession_dups)
monkeypatch.setattr(remotecontrol, "runsession", runsession_dups)
remotecontrol.loop_once()
assert len(remotecontrol.failures) == 1
class TestFunctional:
def test_fail_to_ok(self, testdir):
p = testdir.makepyfile("""
p = testdir.makepyfile(
"""
def test_one():
x = 0
assert x == 1
""")
"""
)
# p = testdir.mkdir("sub").join(p1.basename)
# p1.move(p)
child = testdir.spawn_pytest("-f %s --traceconfig" % p)
@ -253,21 +283,27 @@ class TestFunctional:
child.expect("1 failed")
child.expect("### LOOPONFAILING ####")
child.expect("waiting for changes")
p.write(py.code.Source("""
p.write(
py.code.Source(
"""
def test_one():
x = 1
assert x == 1
"""))
"""
)
)
child.expect(".*1 passed.*")
child.kill(15)
def test_xfail_passes(self, testdir):
p = testdir.makepyfile("""
p = testdir.makepyfile(
"""
import py
@py.test.mark.xfail
def test_one():
pass
""")
"""
)
child = testdir.spawn_pytest("-f %s" % p)
child.expect("1 xpass")
# child.expect("### LOOPONFAILING ####")

View File

@ -2,21 +2,23 @@ import pytest
class TestHooks:
@pytest.fixture(autouse=True)
def create_test_file(self, testdir):
testdir.makepyfile("""
testdir.makepyfile(
"""
import os
def test_a(): pass
def test_b(): pass
def test_c(): pass
""")
"""
)
def test_runtest_logreport(self, testdir):
"""Test that log reports from pytest_runtest_logreport when running
with xdist contain "node", "nodeid" and "worker_id" attributes. (#8)
"""
testdir.makeconftest("""
testdir.makeconftest(
"""
def pytest_runtest_logreport(report):
if hasattr(report, 'node'):
if report.when == "call":
@ -27,29 +29,31 @@ class TestHooks:
else:
print("HOOK: %s %s"
% (report.nodeid, report.worker_id))
""")
res = testdir.runpytest('-n1', '-s')
res.stdout.fnmatch_lines([
'*HOOK: test_runtest_logreport.py::test_a gw0*',
'*HOOK: test_runtest_logreport.py::test_b gw0*',
'*HOOK: test_runtest_logreport.py::test_c gw0*',
'*3 passed*',
])
"""
)
res = testdir.runpytest("-n1", "-s")
res.stdout.fnmatch_lines(
[
"*HOOK: test_runtest_logreport.py::test_a gw0*",
"*HOOK: test_runtest_logreport.py::test_b gw0*",
"*HOOK: test_runtest_logreport.py::test_c gw0*",
"*3 passed*",
]
)
def test_node_collection_finished(self, testdir):
"""Test pytest_xdist_node_collection_finished hook (#8).
"""
testdir.makeconftest("""
testdir.makeconftest(
"""
def pytest_xdist_node_collection_finished(node, ids):
workerid = node.workerinput['workerid']
stripped_ids = [x.split('::')[1] for x in ids]
print("HOOK: %s %s" % (workerid, ', '.join(stripped_ids)))
""")
res = testdir.runpytest('-n2', '-s')
res.stdout.fnmatch_lines_random([
'*HOOK: gw0 test_a, test_b, test_c',
'*HOOK: gw1 test_a, test_b, test_c',
])
res.stdout.fnmatch_lines([
'*3 passed*',
])
"""
)
res = testdir.runpytest("-n2", "-s")
res.stdout.fnmatch_lines_random(
["*HOOK: gw0 test_a, test_b, test_c", "*HOOK: gw1 test_a, test_b, test_c"]
)
res.stdout.fnmatch_lines(["*3 passed*"])

View File

@ -16,14 +16,15 @@ def test_dist_incompatibility_messages(testdir):
def test_dist_options(testdir):
from xdist.plugin import pytest_cmdline_main as check_options
config = testdir.parseconfigure("-n 2")
check_options(config)
assert config.option.dist == "load"
assert config.option.tx == ['popen'] * 2
assert config.option.tx == ["popen"] * 2
config = testdir.parseconfigure("--numprocesses", "2")
check_options(config)
assert config.option.dist == "load"
assert config.option.tx == ['popen'] * 2
assert config.option.tx == ["popen"] * 2
config = testdir.parseconfigure("-d")
check_options(config)
assert config.option.dist == "load"
@ -31,28 +32,31 @@ def test_dist_options(testdir):
def test_auto_detect_cpus(testdir, monkeypatch):
import os
if hasattr(os, 'sched_getaffinity'):
monkeypatch.setattr(os, 'sched_getaffinity', lambda _pid: set(range(99)))
elif hasattr(os, 'cpu_count'):
monkeypatch.setattr(os, 'cpu_count', lambda: 99)
if hasattr(os, "sched_getaffinity"):
monkeypatch.setattr(os, "sched_getaffinity", lambda _pid: set(range(99)))
elif hasattr(os, "cpu_count"):
monkeypatch.setattr(os, "cpu_count", lambda: 99)
else:
import multiprocessing
monkeypatch.setattr(multiprocessing, 'cpu_count', lambda: 99)
monkeypatch.setattr(multiprocessing, "cpu_count", lambda: 99)
config = testdir.parseconfigure("-n2")
assert config.getoption('numprocesses') == 2
assert config.getoption("numprocesses") == 2
config = testdir.parseconfigure("-nauto")
assert config.getoption('numprocesses') == 99
assert config.getoption("numprocesses") == 99
monkeypatch.delattr(os, 'sched_getaffinity', raising=False)
monkeypatch.setenv('TRAVIS', 'true')
monkeypatch.delattr(os, "sched_getaffinity", raising=False)
monkeypatch.setenv("TRAVIS", "true")
config = testdir.parseconfigure("-nauto")
assert config.getoption('numprocesses') == 2
assert config.getoption("numprocesses") == 2
def test_boxed_with_collect_only(testdir):
from xdist.plugin import pytest_cmdline_main as check_options
config = testdir.parseconfigure("-n1", "--boxed")
check_options(config)
assert config.option.forked
@ -92,13 +96,13 @@ class TestDistOptions:
assert xspecs[1].ssh == "xyz"
def test_xspecs_multiplied(self, testdir):
config = testdir.parseconfigure("--tx=3*popen", )
config = testdir.parseconfigure("--tx=3*popen")
xspecs = NodeManager(config)._getxspecs()
assert len(xspecs) == 3
assert xspecs[1].popen
def test_getrsyncdirs(self, testdir):
config = testdir.parseconfigure('--rsyncdir=' + str(testdir.tmpdir))
config = testdir.parseconfigure("--rsyncdir=" + str(testdir.tmpdir))
nm = NodeManager(config, specs=[execnet.XSpec("popen")])
assert not nm._getrsyncdirs()
nm = NodeManager(config, specs=[execnet.XSpec("popen//chdir=qwe")])
@ -106,23 +110,24 @@ class TestDistOptions:
assert testdir.tmpdir in nm.roots
def test_getrsyncignore(self, testdir):
config = testdir.parseconfigure('--rsyncignore=fo*')
config = testdir.parseconfigure("--rsyncignore=fo*")
nm = NodeManager(config, specs=[execnet.XSpec("popen//chdir=qwe")])
assert 'fo*' in nm.rsyncoptions['ignores']
assert "fo*" in nm.rsyncoptions["ignores"]
def test_getrsyncdirs_with_conftest(self, testdir):
p = py.path.local()
for bn in 'x y z'.split():
for bn in "x y z".split():
p.mkdir(bn)
testdir.makeini("""
testdir.makeini(
"""
[pytest]
rsyncdirs= x
""")
config = testdir.parseconfigure(
testdir.tmpdir, '--rsyncdir=y', '--rsyncdir=z')
"""
)
config = testdir.parseconfigure(testdir.tmpdir, "--rsyncdir=y", "--rsyncdir=z")
nm = NodeManager(config, specs=[execnet.XSpec("popen//chdir=xyz")])
roots = nm._getrsyncdirs()
# assert len(roots) == 3 + 1 # pylib
assert py.path.local('y') in roots
assert py.path.local('z') in roots
assert testdir.tmpdir.join('x') in roots
assert py.path.local("y") in roots
assert py.path.local("z") in roots
assert testdir.tmpdir.join("x") in roots

View File

@ -35,7 +35,7 @@ class WorkerSetup:
self.testdir = testdir
self.events = Queue()
def setup(self, ):
def setup(self,):
self.testdir.chdir()
# import os ; os.environ['EXECNET_DEBUG'] = "2"
self.gateway = execnet.makegateway()
@ -45,8 +45,7 @@ class WorkerSetup:
class DummyMananger:
specs = [0, 1]
self.slp = WorkerController(DummyMananger, self.gateway, config,
putevent)
self.slp = WorkerController(DummyMananger, self.gateway, config, putevent)
self.request.addfinalizer(self.slp.ensure_teardown)
self.slp.setup()
@ -59,7 +58,7 @@ class WorkerSetup:
ev = EventCall(data)
if name is None or ev.name == name:
return ev
print("skipping %s" % (ev, ))
print("skipping %s" % (ev,))
def sendcommand(self, name, **kwargs):
self.slp.sendcommand(name, **kwargs)
@ -70,9 +69,10 @@ def worker(request, testdir):
return WorkerSetup(request, testdir)
@pytest.mark.xfail(reason='#59')
@pytest.mark.xfail(reason="#59")
def test_remoteinitconfig(testdir):
from xdist.remote import remote_initconfig
config1 = testdir.parseconfig()
config2 = remote_initconfig(config1.option.__dict__, config1.args)
assert config2.option.__dict__ == config1.option.__dict__
@ -81,29 +81,33 @@ def test_remoteinitconfig(testdir):
class TestReportSerialization:
def test_xdist_longrepr_to_str_issue_241(self, testdir):
testdir.makepyfile("""
testdir.makepyfile(
"""
import os
def test_a(): assert False
def test_b(): pass
""")
testdir.makeconftest("""
"""
)
testdir.makeconftest(
"""
def pytest_runtest_logreport(report):
print(report.longrepr)
""")
res = testdir.runpytest('-n1', '-s')
res.stdout.fnmatch_lines([
'*1 failed, 1 passed *'
])
"""
)
res = testdir.runpytest("-n1", "-s")
res.stdout.fnmatch_lines(["*1 failed, 1 passed *"])
def test_xdist_report_longrepr_reprcrash_130(self, testdir):
reprec = testdir.inline_runsource("""
reprec = testdir.inline_runsource(
"""
import py
def test_fail(): assert False, 'Expected Message'
""")
"""
)
reports = reprec.getreports("pytest_runtest_logreport")
assert len(reports) == 3
rep = reports[1]
added_section = ('Failure Metadata', str("metadata metadata"), "*")
added_section = ("Failure Metadata", str("metadata metadata"), "*")
rep.longrepr.sections.append(added_section)
d = serialize_report(rep)
check_marshallable(d)
@ -111,28 +115,31 @@ class TestReportSerialization:
# Check assembled == rep
assert a.__dict__.keys() == rep.__dict__.keys()
for key in rep.__dict__.keys():
if key != 'longrepr':
if key != "longrepr":
assert getattr(a, key) == getattr(rep, key)
assert rep.longrepr.reprcrash.lineno == a.longrepr.reprcrash.lineno
assert rep.longrepr.reprcrash.message == a.longrepr.reprcrash.message
assert rep.longrepr.reprcrash.path == a.longrepr.reprcrash.path
assert rep.longrepr.reprtraceback.entrysep \
== a.longrepr.reprtraceback.entrysep
assert rep.longrepr.reprtraceback.extraline \
== a.longrepr.reprtraceback.extraline
assert rep.longrepr.reprtraceback.style \
== a.longrepr.reprtraceback.style
assert rep.longrepr.reprtraceback.entrysep == a.longrepr.reprtraceback.entrysep
assert (
rep.longrepr.reprtraceback.extraline == a.longrepr.reprtraceback.extraline
)
assert rep.longrepr.reprtraceback.style == a.longrepr.reprtraceback.style
assert rep.longrepr.sections == a.longrepr.sections
# Missing section attribute PR171
assert added_section in a.longrepr.sections
def test_reprentries_serialization_170(self, testdir):
from _pytest._code.code import ReprEntry
reprec = testdir.inline_runsource("""
reprec = testdir.inline_runsource(
"""
def test_repr_entry():
x = 0
assert x
""", '--showlocals')
""",
"--showlocals",
)
reports = reprec.getreports("pytest_runtest_logreport")
assert len(reports) == 3
rep = reports[1]
@ -146,7 +153,9 @@ class TestReportSerialization:
assert rep_entries[i].lines == a_entries[i].lines
assert rep_entries[i].localssep == a_entries[i].localssep
assert rep_entries[i].reprfileloc.lineno == a_entries[i].reprfileloc.lineno
assert rep_entries[i].reprfileloc.message == a_entries[i].reprfileloc.message
assert (
rep_entries[i].reprfileloc.message == a_entries[i].reprfileloc.message
)
assert rep_entries[i].reprfileloc.path == a_entries[i].reprfileloc.path
assert rep_entries[i].reprfuncargs.args == a_entries[i].reprfuncargs.args
assert rep_entries[i].reprlocals.lines == a_entries[i].reprlocals.lines
@ -154,11 +163,15 @@ class TestReportSerialization:
def test_reprentries_serialization_196(self, testdir):
from _pytest._code.code import ReprEntryNative
reprec = testdir.inline_runsource("""
reprec = testdir.inline_runsource(
"""
def test_repr_entry_native():
x = 0
assert x
""", '--tb=native')
""",
"--tb=native",
)
reports = reprec.getreports("pytest_runtest_logreport")
assert len(reports) == 3
rep = reports[1]
@ -172,7 +185,8 @@ class TestReportSerialization:
assert rep_entries[i].lines == a_entries[i].lines
def test_itemreport_outcomes(self, testdir):
reprec = testdir.inline_runsource("""
reprec = testdir.inline_runsource(
"""
import py
def test_pass(): pass
def test_fail(): 0/0
@ -184,7 +198,8 @@ class TestReportSerialization:
def test_xfail(): 0/0
def test_xfail_imperative():
py.test.xfail("hello")
""")
"""
)
reports = reprec.getreports("pytest_runtest_logreport")
assert len(reports) == 17 # with setup/teardown "passed" reports
for rep in reports:
@ -246,10 +261,12 @@ class TestReportSerialization:
class TestWorkerInteractor:
def test_basic_collect_and_runtests(self, worker):
worker.testdir.makepyfile("""
worker.testdir.makepyfile(
"""
def test_func():
pass
""")
"""
)
worker.setup()
ev = worker.popevent()
assert ev.name == "workerready"
@ -257,8 +274,8 @@ class TestWorkerInteractor:
assert ev.name == "collectionstart"
assert not ev.kwargs
ev = worker.popevent("collectionfinish")
assert ev.kwargs['topdir'] == worker.testdir.tmpdir
ids = ev.kwargs['ids']
assert ev.kwargs["topdir"] == worker.testdir.tmpdir
ids = ev.kwargs["ids"]
assert len(ids) == 1
worker.sendcommand("runtests", indices=list(range(len(ids))))
worker.sendcommand("shutdown")
@ -268,20 +285,23 @@ class TestWorkerInteractor:
ev = worker.popevent("testreport") # setup
ev = worker.popevent("testreport")
assert ev.name == "testreport"
rep = unserialize_report(ev.name, ev.kwargs['data'])
rep = unserialize_report(ev.name, ev.kwargs["data"])
assert rep.nodeid.endswith("::test_func")
assert rep.passed
assert rep.when == "call"
ev = worker.popevent("workerfinished")
assert 'workeroutput' in ev.kwargs
assert "workeroutput" in ev.kwargs
@pytest.mark.skipif(pytest.__version__ >= '3.0',
reason='skip at module level illegal in pytest 3.0')
@pytest.mark.skipif(
pytest.__version__ >= "3.0", reason="skip at module level illegal in pytest 3.0"
)
def test_remote_collect_skip(self, worker):
worker.testdir.makepyfile("""
worker.testdir.makepyfile(
"""
import py
py.test.skip("hello")
""")
"""
)
worker.setup()
ev = worker.popevent("collectionstart")
assert not ev.kwargs
@ -289,10 +309,10 @@ class TestWorkerInteractor:
assert ev.name == "collectreport"
ev = worker.popevent()
assert ev.name == "collectreport"
rep = unserialize_report(ev.name, ev.kwargs['data'])
rep = unserialize_report(ev.name, ev.kwargs["data"])
assert rep.skipped
ev = worker.popevent("collectionfinish")
assert not ev.kwargs['ids']
assert not ev.kwargs["ids"]
def test_remote_collect_fail(self, worker):
worker.testdir.makepyfile("""aasd qwe""")
@ -303,16 +323,18 @@ class TestWorkerInteractor:
assert ev.name == "collectreport"
ev = worker.popevent()
assert ev.name == "collectreport"
rep = unserialize_report(ev.name, ev.kwargs['data'])
rep = unserialize_report(ev.name, ev.kwargs["data"])
assert rep.failed
ev = worker.popevent("collectionfinish")
assert not ev.kwargs['ids']
assert not ev.kwargs["ids"]
def test_runtests_all(self, worker):
worker.testdir.makepyfile("""
worker.testdir.makepyfile(
"""
def test_func(): pass
def test_func2(): pass
""")
"""
)
worker.setup()
ev = worker.popevent()
assert ev.name == "workerready"
@ -320,57 +342,63 @@ class TestWorkerInteractor:
assert ev.name == "collectionstart"
assert not ev.kwargs
ev = worker.popevent("collectionfinish")
ids = ev.kwargs['ids']
ids = ev.kwargs["ids"]
assert len(ids) == 2
worker.sendcommand("runtests_all", )
worker.sendcommand("shutdown", )
worker.sendcommand("runtests_all")
worker.sendcommand("shutdown")
for func in "::test_func", "::test_func2":
for i in range(3): # setup/call/teardown
ev = worker.popevent("testreport")
assert ev.name == "testreport"
rep = unserialize_report(ev.name, ev.kwargs['data'])
rep = unserialize_report(ev.name, ev.kwargs["data"])
assert rep.nodeid.endswith(func)
ev = worker.popevent("workerfinished")
assert 'workeroutput' in ev.kwargs
assert "workeroutput" in ev.kwargs
def test_happy_run_events_converted(self, testdir, worker):
py.test.xfail("implement a simple test for event production")
assert not worker.use_callback
worker.testdir.makepyfile("""
worker.testdir.makepyfile(
"""
def test_func():
pass
""")
"""
)
worker.setup()
hookrec = testdir.getreportrecorder(worker.config)
for data in worker.slp.channel:
worker.slp.process_from_remote(data)
worker.slp.process_from_remote(worker.slp.ENDMARK)
pprint.pprint(hookrec.hookrecorder.calls)
hookrec.hookrecorder.contains([
("pytest_collectstart", "collector.fspath == aaa"),
("pytest_pycollect_makeitem", "name == 'test_func'"),
("pytest_collectreport", "report.collector.fspath == aaa"),
("pytest_collectstart", "collector.fspath == bbb"),
("pytest_pycollect_makeitem", "name == 'test_func'"),
("pytest_collectreport", "report.collector.fspath == bbb"),
])
hookrec.hookrecorder.contains(
[
("pytest_collectstart", "collector.fspath == aaa"),
("pytest_pycollect_makeitem", "name == 'test_func'"),
("pytest_collectreport", "report.collector.fspath == aaa"),
("pytest_collectstart", "collector.fspath == bbb"),
("pytest_pycollect_makeitem", "name == 'test_func'"),
("pytest_collectreport", "report.collector.fspath == bbb"),
]
)
def test_process_from_remote_error_handling(self, worker, capsys):
worker.use_callback = True
worker.setup()
worker.slp.process_from_remote(('<nonono>', ()))
worker.slp.process_from_remote(("<nonono>", ()))
out, err = capsys.readouterr()
assert 'INTERNALERROR> ValueError: unknown event: <nonono>' in out
assert "INTERNALERROR> ValueError: unknown event: <nonono>" in out
ev = worker.popevent()
assert ev.name == "errordown"
def test_remote_env_vars(testdir):
testdir.makepyfile('''
testdir.makepyfile(
"""
import os
def test():
assert os.environ['PYTEST_XDIST_WORKER'] in ('gw0', 'gw1')
assert os.environ['PYTEST_XDIST_WORKER_COUNT'] == '2'
''')
result = testdir.runpytest('-n2', '--max-worker-restart=0')
"""
)
result = testdir.runpytest("-n2", "--max-worker-restart=0")
assert result.ret == 0

View File

@ -42,7 +42,7 @@ def workercontroller(monkeypatch):
def setup(self):
pass
monkeypatch.setattr(workermanage, 'WorkerController', MockController)
monkeypatch.setattr(workermanage, "WorkerController", MockController)
return MockController
@ -58,8 +58,7 @@ class TestNodeManagerPopen:
for spec in NodeManager(config, specs, defaultchdir="abc").specs:
assert spec.chdir == "abc"
def test_popen_makegateway_events(self, config, hookrecorder,
workercontroller):
def test_popen_makegateway_events(self, config, hookrecorder, workercontroller):
hm = NodeManager(config, ["popen"] * 2)
hm.setup_nodes(None)
call = hookrecorder.popcall("pytest_xdist_setupnodes")
@ -108,15 +107,16 @@ class TestNodeManagerPopen:
for gw in hm.group:
hm.rsync(gw, source, notify=lambda *args: notifications.append(args))
assert len(notifications) == 1
assert notifications[0] == ("rsyncrootready", hm.group['gw0'].spec, source)
assert notifications[0] == ("rsyncrootready", hm.group["gw0"].spec, source)
hm.teardown_nodes()
dest = dest.join(source.basename)
assert dest.join("dir1").check()
assert dest.join("dir1", "dir2").check()
assert dest.join("dir1", "dir2", 'hello').check()
assert dest.join("dir1", "dir2", "hello").check()
def test_rsync_same_popen_twice(self, config, mysetup, hookrecorder,
workercontroller):
def test_rsync_same_popen_twice(
self, config, mysetup, hookrecorder, workercontroller
):
source, dest = mysetup.source, mysetup.dest
hm = NodeManager(config, ["popen//chdir=%s" % dest] * 2)
hm.roots = []
@ -142,9 +142,9 @@ class TestHRSync:
files = list(source.visit(rec=syncer.filter, fil=syncer.filter))
assert len(files) == 3
basenames = [x.basename for x in files]
assert 'dir' in basenames
assert 'file.txt' in basenames
assert 'somedir' in basenames
assert "dir" in basenames
assert "file.txt" in basenames
assert "somedir" in basenames
def test_hrsync_one_host(self, mysetup):
source, dest = mysetup.source, mysetup.dest
@ -169,7 +169,8 @@ class TestNodeManager:
nodemanager.makegateways()
nodemanager.rsync_roots()
p, = nodemanager.gwmanager.multi_exec(
"import os ; channel.send(os.getcwd())").receive_each()
"import os ; channel.send(os.getcwd())"
).receive_each()
p = py.path.local(p)
print("remote curdir", p)
assert p == mysetup.dest.join(config.topdir.basename)
@ -183,15 +184,17 @@ class TestNodeManager:
dir2.ensure("hello")
for rsyncroot in (dir1, source):
dest.remove()
nodemanager = NodeManager(testdir.parseconfig(
"--tx", "popen//chdir=%s" % dest, "--rsyncdir", rsyncroot,
source, ))
nodemanager = NodeManager(
testdir.parseconfig(
"--tx", "popen//chdir=%s" % dest, "--rsyncdir", rsyncroot, source
)
)
nodemanager.setup_nodes(None) # calls .rsync_roots()
if rsyncroot == source:
dest = dest.join("source")
assert dest.join("dir1").check()
assert dest.join("dir1", "dir2").check()
assert dest.join("dir1", "dir2", 'hello').check()
assert dest.join("dir1", "dir2", "hello").check()
nodemanager.teardown_nodes()
def test_init_rsync_roots(self, testdir, mysetup, workercontroller):
@ -200,10 +203,14 @@ class TestNodeManager:
source.ensure("dir1", "somefile", dir=1)
dir2.ensure("hello")
source.ensure("bogusdir", "file")
source.join("tox.ini").write(textwrap.dedent("""
source.join("tox.ini").write(
textwrap.dedent(
"""
[pytest]
rsyncdirs=dir1/dir2
"""))
"""
)
)
config = testdir.parseconfig(source)
nodemanager = NodeManager(config, ["popen//chdir=%s" % dest])
nodemanager.setup_nodes(None) # calls .rsync_roots()
@ -219,27 +226,31 @@ class TestNodeManager:
dir2.ensure("hello")
source.ensure("foo", "bar")
source.ensure("bar", "foo")
source.join("tox.ini").write(textwrap.dedent("""
source.join("tox.ini").write(
textwrap.dedent(
"""
[pytest]
rsyncdirs = dir1 dir5
rsyncignore = dir1/dir2 dir5/dir6 foo*
"""))
"""
)
)
config = testdir.parseconfig(source)
config.option.rsyncignore = ['bar']
config.option.rsyncignore = ["bar"]
nodemanager = NodeManager(config, ["popen//chdir=%s" % dest])
nodemanager.setup_nodes(None) # calls .rsync_roots()
assert dest.join("dir1").check()
assert not dest.join("dir1", "dir2").check()
assert dest.join("dir5", "file").check()
assert not dest.join("dir6").check()
assert not dest.join('foo').check()
assert not dest.join('bar').check()
assert not dest.join("foo").check()
assert not dest.join("bar").check()
def test_optimise_popen(self, testdir, mysetup, workercontroller):
source = mysetup.source
specs = ["popen"] * 3
source.join("conftest.py").write("rsyncdirs = ['a']")
source.ensure('a', dir=1)
source.ensure("a", dir=1)
config = testdir.parseconfig(source)
nodemanager = NodeManager(config, specs)
nodemanager.setup_nodes(None) # calls .rysnc_roots()
@ -248,12 +259,15 @@ class TestNodeManager:
assert not gwspec.chdir
def test_ssh_setup_nodes(self, specssh, testdir):
testdir.makepyfile(__init__="",
test_x="""
testdir.makepyfile(
__init__="",
test_x="""
def test_one():
pass
""")
reprec = testdir.inline_run("-d", "--rsyncdir=%s" % testdir.tmpdir,
"--tx", specssh, testdir.tmpdir)
""",
)
reprec = testdir.inline_run(
"-d", "--rsyncdir=%s" % testdir.tmpdir, "--tx", specssh, testdir.tmpdir
)
rep, = reprec.getreports("pytest_runtest_logreport")
assert rep.passed

View File

@ -48,3 +48,8 @@ commands =
[pytest]
addopts = -rsfxX
[flake8]
max-line-length = 120
ignore = E203,W503

View File

@ -1,3 +1,3 @@
from xdist._version import version as __version__
__all__ = ['__version__']
__all__ = ["__version__"]

View File

@ -84,7 +84,7 @@ class DSession(object):
def pytest_sessionfinish(self, session):
"""Shutdown all nodes."""
nm = getattr(self, 'nodemanager', None) # if not fully initialized
nm = getattr(self, "nodemanager", None) # if not fully initialized
if nm is not None:
nm.teardown_nodes()
self._session = None
@ -95,19 +95,18 @@ class DSession(object):
@pytest.mark.trylast
def pytest_xdist_make_scheduler(self, config, log):
dist = config.getvalue('dist')
dist = config.getvalue("dist")
schedulers = {
'each': EachScheduling,
'load': LoadScheduling,
'loadscope': LoadScopeScheduling,
'loadfile': LoadFileScheduling,
"each": EachScheduling,
"load": LoadScheduling,
"loadscope": LoadScopeScheduling,
"loadfile": LoadFileScheduling,
}
return schedulers[dist](config, log)
def pytest_runtestloop(self):
self.sched = self.config.hook.pytest_xdist_make_scheduler(
config=self.config,
log=self.log
config=self.config, log=self.log
)
assert self.sched is not None
@ -151,8 +150,8 @@ class DSession(object):
collection without any further input.
"""
node.workerinfo = workerinfo
node.workerinfo['id'] = node.gateway.id
node.workerinfo['spec'] = node.gateway.spec
node.workerinfo["id"] = node.gateway.id
node.workerinfo["spec"] = node.gateway.spec
# TODO: (#234 task) needs this for pytest. Remove when refactor in pytest repo
node.slaveinfo = node.workerinfo
@ -172,7 +171,7 @@ class DSession(object):
workerready before shutdown was triggered.
"""
self.config.hook.pytest_testnodedown(node=node, error=None)
if node.workeroutput['exitstatus'] == 2: # keyboard-interrupt
if node.workeroutput["exitstatus"] == 2: # keyboard-interrupt
self.shouldstop = "%s received keyboard-interrupt" % (node,)
self.worker_errordown(node, "keyboard-interrupt")
return
@ -193,14 +192,15 @@ class DSession(object):
self.handle_crashitem(crashitem, node)
self._failed_nodes_count += 1
maximum_reached = (self._max_worker_restart is not None and
self._failed_nodes_count > self._max_worker_restart)
maximum_reached = (
self._max_worker_restart is not None
and self._failed_nodes_count > self._max_worker_restart
)
if maximum_reached:
if self._max_worker_restart == 0:
msg = 'Worker restarting disabled'
msg = "Worker restarting disabled"
else:
msg = "Maximum crashed workers reached: %d" % \
self._max_worker_restart
msg = "Maximum crashed workers reached: %d" % self._max_worker_restart
self.report_line(msg)
else:
self.report_line("Replacing crashed worker %s" % node.gateway.id)
@ -218,8 +218,7 @@ class DSession(object):
"""
if self.shuttingdown:
return
self.config.hook.pytest_xdist_node_collection_finished(node=node,
ids=ids)
self.config.hook.pytest_xdist_node_collection_finished(node=node, ids=ids)
# tell session which items were effectively collected otherwise
# the master node will finish the session with EXIT_NOTESTSCOLLECTED
self._session.testscollected = len(ids)
@ -230,19 +229,18 @@ class DSession(object):
if self.terminal and not self.sched.has_pending:
self.trdist.ensure_show_status()
self.terminal.write_line("")
self.terminal.write_line("scheduling tests via %s" % (
self.sched.__class__.__name__))
self.terminal.write_line(
"scheduling tests via %s" % (self.sched.__class__.__name__)
)
self.sched.schedule()
def worker_logstart(self, node, nodeid, location):
"""Emitted when a node calls the pytest_runtest_logstart hook."""
self.config.hook.pytest_runtest_logstart(
nodeid=nodeid, location=location)
self.config.hook.pytest_runtest_logstart(nodeid=nodeid, location=location)
def worker_logfinish(self, node, nodeid, location):
"""Emitted when a node calls the pytest_runtest_logfinish hook."""
self.config.hook.pytest_runtest_logfinish(
nodeid=nodeid, location=location)
self.config.hook.pytest_runtest_logfinish(nodeid=nodeid, location=location)
def worker_testreport(self, node, rep):
"""Emitted when a node calls the pytest_runtest_logreport hook."""
@ -295,8 +293,7 @@ class DSession(object):
if rep.failed:
self.countfailures += 1
if self.maxfail and self.countfailures >= self.maxfail:
self.shouldstop = "stopping after %d failures" % (
self.countfailures)
self.shouldstop = "stopping after %d failures" % (self.countfailures)
def triggershutdown(self):
self.log("triggering shutdown")
@ -310,8 +307,9 @@ class DSession(object):
runner = self.config.pluginmanager.getplugin("runner")
fspath = nodeid.split("::")[0]
msg = "Worker %r crashed while running %r" % (worker.gateway.id, nodeid)
rep = runner.TestReport(nodeid, (fspath, None, fspath),
(), "failed", msg, "???")
rep = runner.TestReport(
nodeid, (fspath, None, fspath), (), "failed", msg, "???"
)
rep.node = worker
self.config.hook.pytest_runtest_logreport(report=rep)
@ -322,7 +320,7 @@ class TerminalDistReporter(object):
self.tr = config.pluginmanager.getplugin("terminalreporter")
self._status = {}
self._lastlen = 0
self._isatty = getattr(self.tr, 'isatty', self.tr.hasmarkup)
self._isatty = getattr(self.tr, "isatty", self.tr.hasmarkup)
def write_line(self, msg):
self.tr.write_line(msg)
@ -337,8 +335,7 @@ class TerminalDistReporter(object):
self.rewrite(self.getstatus())
def getstatus(self):
parts = ["%s %s" % (spec.id, self._status[spec.id])
for spec in self._specs]
parts = ["%s %s" % (spec.id, self._status[spec.id]) for spec in self._specs]
return " / ".join(parts)
def rewrite(self, line, newline=False):
@ -361,17 +358,17 @@ class TerminalDistReporter(object):
if self.config.option.verbose > 0:
rinfo = gateway._rinfo()
version = "%s.%s.%s" % rinfo.version_info[:3]
self.rewrite("[%s] %s Python %s cwd: %s" % (
gateway.id, rinfo.platform, version, rinfo.cwd),
newline=True)
self.rewrite(
"[%s] %s Python %s cwd: %s"
% (gateway.id, rinfo.platform, version, rinfo.cwd),
newline=True,
)
self.setstatus(gateway.spec, "C")
def pytest_testnodeready(self, node):
if self.config.option.verbose > 0:
d = node.workerinfo
infoline = "[%s] Python %s" % (
d['id'],
d['version'].replace('\n', ' -- '),)
infoline = "[%s] Python %s" % (d["id"], d["version"].replace("\n", " -- "))
self.rewrite(infoline, newline=True)
self.setstatus(node.gateway.spec, "ok")

View File

@ -17,19 +17,22 @@ import execnet
def pytest_addoption(parser):
group = parser.getgroup("xdist", "distributed and subprocess testing")
group._addoption(
'-f', '--looponfail',
action="store_true", dest="looponfail", default=False,
"-f",
"--looponfail",
action="store_true",
dest="looponfail",
default=False,
help="run tests in subprocess, wait for modified files "
"and re-run failing test set until all pass.")
"and re-run failing test set until all pass.",
)
def pytest_cmdline_main(config):
if config.getoption("looponfail"):
usepdb = config.getoption('usepdb') # a core option
usepdb = config.getoption("usepdb") # a core option
if usepdb:
raise pytest.UsageError(
"--pdb incompatible with --looponfail.")
raise pytest.UsageError("--pdb incompatible with --looponfail.")
looponfail_main(config)
return 2 # looponfail only can get stop with ctrl-C anyway
@ -45,8 +48,8 @@ def looponfail_main(config):
# the last failures passed, let's immediately rerun all
continue
repr_pytest_looponfailinfo(
failreports=remotecontrol.failures,
rootdirs=rootdirs)
failreports=remotecontrol.failures, rootdirs=rootdirs
)
statrecorder.waitonchange(checkinterval=2.0)
except KeyboardInterrupt:
print()
@ -68,7 +71,7 @@ class RemoteControl(object):
def setup(self, out=None):
if out is None:
out = py.io.TerminalWriter()
if hasattr(self, 'gateway'):
if hasattr(self, "gateway"):
raise ValueError("already have gateway %r" % self.gateway)
self.trace("setting up worker session")
self.gateway = self.initgateway()
@ -82,15 +85,16 @@ class RemoteControl(object):
def write(s):
out._file.write(s)
out._file.flush()
remote_outchannel.setcallback(write)
def ensure_teardown(self):
if hasattr(self, 'channel'):
if hasattr(self, "channel"):
if not self.channel.isclosed():
self.trace("closing", self.channel)
self.channel.close()
del self.channel
if hasattr(self, 'gateway'):
if hasattr(self, "gateway"):
self.trace("exiting", self.gateway)
self.gateway.exit()
del self.gateway
@ -138,8 +142,9 @@ def repr_pytest_looponfailinfo(failreports, rootdirs):
def init_worker_session(channel, args, option_dict):
import os
import sys
outchannel = channel.gateway.newchannel()
sys.stdout = sys.stderr = outchannel.makefile('w')
sys.stdout = sys.stderr = outchannel.makefile("w")
channel.send(outchannel)
# prune sys.path to not contain relative paths
newpaths = []
@ -152,9 +157,11 @@ def init_worker_session(channel, args, option_dict):
# fullwidth, hasmarkup = channel.receive()
from _pytest.config import Config
config = Config.fromdictargs(option_dict, list(args))
config.args = args
from xdist.looponfail import WorkerFailSession
WorkerFailSession(config, channel).main()
@ -181,7 +188,8 @@ class WorkerFailSession(object):
except pytest.UsageError:
items = session.perform_collect(None)
hook.pytest_collection_modifyitems(
session=session, config=session.config, items=items)
session=session, config=session.config, items=items
)
hook.pytest_collection_finish(session=session)
return True
@ -207,7 +215,7 @@ class WorkerFailSession(object):
for rep in self.recorded_failures:
trails.append(rep.nodeid)
loc = rep.longrepr
loc = str(getattr(loc, 'reprcrash', loc))
loc = str(getattr(loc, "reprcrash", loc))
failreports.append(loc)
self.channel.send((trails, failreports, self.collection_failed))
@ -245,8 +253,10 @@ class StatRecorder(object):
changed = True
else:
if oldstat:
if oldstat.mtime != curstat.mtime or \
oldstat.size != curstat.size:
if (
oldstat.mtime != curstat.mtime
or oldstat.size != curstat.size
):
changed = True
print("# MODIFIED", path)
if removepycfiles and path.ext == ".py":

View File

@ -8,7 +8,7 @@ def auto_detect_cpus():
try:
from os import sched_getaffinity
except ImportError:
if os.environ.get('TRAVIS') == 'true':
if os.environ.get("TRAVIS") == "true":
# workaround https://bitbucket.org/pypy/pypy/issues/2375
return 2
try:
@ -16,6 +16,7 @@ def auto_detect_cpus():
except ImportError:
from multiprocessing import cpu_count
else:
def cpu_count():
return len(sched_getaffinity(0))
@ -27,7 +28,7 @@ def auto_detect_cpus():
def parse_numprocesses(s):
if s == 'auto':
if s == "auto":
return auto_detect_cpus()
else:
return int(s)
@ -36,60 +37,101 @@ def parse_numprocesses(s):
def pytest_addoption(parser):
group = parser.getgroup("xdist", "distributed and subprocess testing")
group._addoption(
'-n', '--numprocesses', dest="numprocesses", metavar="numprocesses",
"-n",
"--numprocesses",
dest="numprocesses",
metavar="numprocesses",
action="store",
type=parse_numprocesses,
help="shortcut for '--dist=load --tx=NUM*popen', "
"you can use 'auto' here for auto detection CPUs number on "
"host system")
group.addoption('--max-worker-restart', '--max-slave-restart', action="store", default=None,
dest="maxworkerrestart",
help="maximum number of workers that can be restarted "
"when crashed (set to zero to disable this feature)\n"
"'--max-slave-restart' option is deprecated and will be removed in "
"a future release")
"you can use 'auto' here for auto detection CPUs number on "
"host system",
)
group.addoption(
'--dist', metavar="distmode",
action="store", choices=['each', 'load', 'loadscope', 'loadfile', 'no'],
dest="dist", default="no",
help=("set mode for distributing tests to exec environments.\n\n"
"each: send each test to all available environments.\n\n"
"load: load balance by sending any pending test to any"
" available environment.\n\n"
"loadscope: load balance by sending pending groups of tests in"
" the same scope to any available environment.\n\n"
"loadfile: load balance by sending test grouped by file"
" to any available environment.\n\n"
"(default) no: run tests inprocess, don't distribute."))
"--max-worker-restart",
"--max-slave-restart",
action="store",
default=None,
dest="maxworkerrestart",
help="maximum number of workers that can be restarted "
"when crashed (set to zero to disable this feature)\n"
"'--max-slave-restart' option is deprecated and will be removed in "
"a future release",
)
group.addoption(
'--tx', dest="tx", action="append", default=[],
"--dist",
metavar="distmode",
action="store",
choices=["each", "load", "loadscope", "loadfile", "no"],
dest="dist",
default="no",
help=(
"set mode for distributing tests to exec environments.\n\n"
"each: send each test to all available environments.\n\n"
"load: load balance by sending any pending test to any"
" available environment.\n\n"
"loadscope: load balance by sending pending groups of tests in"
" the same scope to any available environment.\n\n"
"loadfile: load balance by sending test grouped by file"
" to any available environment.\n\n"
"(default) no: run tests inprocess, don't distribute."
),
)
group.addoption(
"--tx",
dest="tx",
action="append",
default=[],
metavar="xspec",
help=("add a test execution environment. some examples: "
"--tx popen//python=python2.5 --tx socket=192.168.1.102:8888 "
"--tx ssh=user@codespeak.net//chdir=testcache"))
help=(
"add a test execution environment. some examples: "
"--tx popen//python=python2.5 --tx socket=192.168.1.102:8888 "
"--tx ssh=user@codespeak.net//chdir=testcache"
),
)
group._addoption(
'-d',
action="store_true", dest="distload", default=False,
help="load-balance tests. shortcut for '--dist=load'")
"-d",
action="store_true",
dest="distload",
default=False,
help="load-balance tests. shortcut for '--dist=load'",
)
group.addoption(
'--rsyncdir', action="append", default=[], metavar="DIR",
help="add directory for rsyncing to remote tx nodes.")
"--rsyncdir",
action="append",
default=[],
metavar="DIR",
help="add directory for rsyncing to remote tx nodes.",
)
group.addoption(
'--rsyncignore', action="append", default=[], metavar="GLOB",
help="add expression for ignores when rsyncing to remote tx nodes.")
"--rsyncignore",
action="append",
default=[],
metavar="GLOB",
help="add expression for ignores when rsyncing to remote tx nodes.",
)
group.addoption(
"--boxed", action="store_true",
help="backward compatibility alias for pytest-forked --forked")
"--boxed",
action="store_true",
help="backward compatibility alias for pytest-forked --forked",
)
parser.addini(
'rsyncdirs', 'list of (relative) paths to be rsynced for'
' remote distributed testing.', type="pathlist")
"rsyncdirs",
"list of (relative) paths to be rsynced for" " remote distributed testing.",
type="pathlist",
)
parser.addini(
'rsyncignore', 'list of (relative) glob-style paths to be ignored '
'for rsyncing.', type="pathlist")
"rsyncignore",
"list of (relative) glob-style paths to be ignored " "for rsyncing.",
type="pathlist",
)
parser.addini(
"looponfailroots", type="pathlist",
help="directories to check for changes", default=[py.path.local()])
"looponfailroots",
type="pathlist",
help="directories to check for changes",
default=[py.path.local()],
)
# -------------------------------------------------------------------------
@ -99,12 +141,14 @@ def pytest_addoption(parser):
def pytest_addhooks(pluginmanager):
from xdist import newhooks
# avoid warnings with pytest-2.8
method = getattr(pluginmanager, "add_hookspecs", None)
if method is None:
method = pluginmanager.addhooks
method(newhooks)
# -------------------------------------------------------------------------
# distributed testing initialization
# -------------------------------------------------------------------------
@ -114,6 +158,7 @@ def pytest_addhooks(pluginmanager):
def pytest_configure(config):
if config.getoption("dist") != "no" and not config.getvalue("collectonly"):
from xdist.dsession import DSession
session = DSession(config)
config.pluginmanager.register(session, "dsession")
tr = config.pluginmanager.getplugin("terminalreporter")
@ -125,18 +170,20 @@ def pytest_configure(config):
@pytest.mark.tryfirst
def pytest_cmdline_main(config):
if config.option.numprocesses:
if config.option.dist == 'no':
if config.option.dist == "no":
config.option.dist = "load"
config.option.tx = ['popen'] * config.option.numprocesses
config.option.tx = ["popen"] * config.option.numprocesses
if config.option.distload:
config.option.dist = "load"
val = config.getvalue
if not val("collectonly"):
usepdb = config.getoption('usepdb') # a core option
usepdb = config.getoption("usepdb") # a core option
if val("dist") != "no":
if usepdb:
raise pytest.UsageError(
"--pdb is incompatible with distributing tests; try using -n0.") # noqa: E501
"--pdb is incompatible with distributing tests; try using -n0."
) # noqa: E501
# -------------------------------------------------------------------------
# fixtures
@ -148,7 +195,7 @@ def worker_id(request):
"""Return the id of the current worker ('gw0', 'gw1', etc) or 'master'
if running on the master node.
"""
if hasattr(request.config, 'workerinput'):
return request.config.workerinput['workerid']
if hasattr(request.config, "workerinput"):
return request.config.workerinput["workerid"]
else:
return 'master'
return "master"

View File

@ -17,7 +17,7 @@ import pytest
class WorkerInteractor(object):
def __init__(self, config, channel):
self.config = config
self.workerid = config.workerinput.get('workerid', "?")
self.workerid = config.workerinput.get("workerid", "?")
self.log = py.log.Producer("worker-%s" % self.workerid)
if not config.option.debug:
py.log.setconsumer(self.log._keywords, None)
@ -39,7 +39,7 @@ class WorkerInteractor(object):
@pytest.hookimpl(hookwrapper=True)
def pytest_sessionfinish(self, exitstatus):
self.config.workeroutput['exitstatus'] = exitstatus
self.config.workeroutput["exitstatus"] = exitstatus
yield
self.sendevent("workerfinished", workeroutput=self.config.workeroutput)
@ -56,7 +56,7 @@ class WorkerInteractor(object):
return True
self.log("received command", name, kwargs)
if name == "runtests":
torun.extend(kwargs['indices'])
torun.extend(kwargs["indices"])
elif name == "runtests_all":
torun.extend(range(len(session.items)))
self.log("items to run:", torun)
@ -79,24 +79,25 @@ class WorkerInteractor(object):
nextitem = None
start = time.time()
self.config.hook.pytest_runtest_protocol(
item=item,
nextitem=nextitem)
self.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
duration = time.time() - start
self.sendevent("runtest_protocol_complete", item_index=self.item_index,
duration=duration)
self.sendevent(
"runtest_protocol_complete", item_index=self.item_index, duration=duration
)
def pytest_collection_finish(self, session):
self.sendevent(
"collectionfinish",
topdir=str(session.fspath),
ids=[item.nodeid for item in session.items])
ids=[item.nodeid for item in session.items],
)
def pytest_runtest_logstart(self, nodeid, location):
self.sendevent("logstart", nodeid=nodeid, location=location)
# the pytest_runtest_logfinish hook was introduced in pytest 3.4
if hasattr(_pytest.hookspec, 'pytest_runtest_logfinish'):
if hasattr(_pytest.hookspec, "pytest_runtest_logfinish"):
def pytest_runtest_logfinish(self, nodeid, location):
self.sendevent("logfinish", nodeid=nodeid, location=location)
@ -112,8 +113,13 @@ class WorkerInteractor(object):
self.sendevent("collectreport", data=data)
def pytest_logwarning(self, message, code, nodeid, fslocation):
self.sendevent("logwarning", message=message, code=code, nodeid=nodeid,
fslocation=str(fslocation))
self.sendevent(
"logwarning",
message=message,
code=code,
nodeid=nodeid,
fslocation=str(fslocation),
)
def serialize_report(rep):
@ -122,34 +128,33 @@ def serialize_report(rep):
reprcrash = rep.longrepr.reprcrash.__dict__.copy()
new_entries = []
for entry in reprtraceback['reprentries']:
entry_data = {
'type': type(entry).__name__,
'data': entry.__dict__.copy(),
}
for key, value in entry_data['data'].items():
if hasattr(value, '__dict__'):
entry_data['data'][key] = value.__dict__.copy()
for entry in reprtraceback["reprentries"]:
entry_data = {"type": type(entry).__name__, "data": entry.__dict__.copy()}
for key, value in entry_data["data"].items():
if hasattr(value, "__dict__"):
entry_data["data"][key] = value.__dict__.copy()
new_entries.append(entry_data)
reprtraceback['reprentries'] = new_entries
reprtraceback["reprentries"] = new_entries
return {
'reprcrash': reprcrash,
'reprtraceback': reprtraceback,
'sections': rep.longrepr.sections
"reprcrash": reprcrash,
"reprtraceback": reprtraceback,
"sections": rep.longrepr.sections,
}
import py
d = rep.__dict__.copy()
if hasattr(rep.longrepr, 'toterminal'):
if hasattr(rep.longrepr, 'reprtraceback') \
and hasattr(rep.longrepr, 'reprcrash'):
d['longrepr'] = disassembled_report(rep)
if hasattr(rep.longrepr, "toterminal"):
if hasattr(rep.longrepr, "reprtraceback") and hasattr(
rep.longrepr, "reprcrash"
):
d["longrepr"] = disassembled_report(rep)
else:
d['longrepr'] = str(rep.longrepr)
d["longrepr"] = str(rep.longrepr)
else:
d['longrepr'] = rep.longrepr
d["longrepr"] = rep.longrepr
for name in d:
if isinstance(d[name], py.path.local):
d[name] = str(d[name])
@ -160,6 +165,7 @@ def serialize_report(rep):
def getinfodict():
import platform
return dict(
version=sys.version,
version_info=tuple(sys.version_info),
@ -172,7 +178,8 @@ def getinfodict():
def remote_initconfig(option_dict, args):
from _pytest.config import Config
option_dict['plugins'].append("no:terminal")
option_dict["plugins"].append("no:terminal")
config = Config.fromdictargs(option_dict, args)
config.option.looponfail = False
config.option.usepdb = False
@ -183,18 +190,19 @@ def remote_initconfig(option_dict, args):
return config
if __name__ == '__channelexec__':
if __name__ == "__channelexec__":
channel = channel # noqa
workerinput, args, option_dict = channel.receive()
importpath = os.getcwd()
sys.path.insert(0, importpath) # XXX only for remote situations
os.environ['PYTHONPATH'] = (
importpath + os.pathsep +
os.environ.get('PYTHONPATH', ''))
os.environ['PYTEST_XDIST_WORKER'] = workerinput['workerid']
os.environ['PYTEST_XDIST_WORKER_COUNT'] = str(workerinput['workercount'])
os.environ["PYTHONPATH"] = (
importpath + os.pathsep + os.environ.get("PYTHONPATH", "")
)
os.environ["PYTEST_XDIST_WORKER"] = workerinput["workerid"]
os.environ["PYTEST_XDIST_WORKER_COUNT"] = str(workerinput["workercount"])
# os.environ['PYTHONPATH'] = importpath
import py
config = remote_initconfig(option_dict, args)
config.workerinput = workerinput
config.workeroutput = {}

View File

@ -11,16 +11,11 @@ def report_collection_diff(from_collection, to_collection, from_id, to_id):
if from_collection == to_collection:
return None
diff = unified_diff(
from_collection,
to_collection,
fromfile=from_id,
tofile=to_id,
)
diff = unified_diff(from_collection, to_collection, fromfile=from_id, tofile=to_id)
error_message = (
u'Different tests were collected between {from_id} and {to_id}. '
u'The difference is:\n'
u'{diff}'
).format(from_id=from_id, to_id=to_id, diff='\n'.join(diff))
u"Different tests were collected between {from_id} and {to_id}. "
u"The difference is:\n"
u"{diff}"
).format(from_id=from_id, to_id=to_id, diff="\n".join(diff))
msg = "\n".join([x.rstrip() for x in error_message.split("\n")])
return msg

View File

@ -86,10 +86,12 @@ class EachScheduling(object):
if deadnode.gateway.spec == node.gateway.spec:
dead_collection = self.node2collection[deadnode]
if collection != dead_collection:
msg = report_collection_diff(dead_collection,
collection,
deadnode.gateway.id,
node.gateway.id)
msg = report_collection_diff(
dead_collection,
collection,
deadnode.gateway.id,
node.gateway.id,
)
self.log(msg)
return
pending = self._removed2pending.pop(deadnode)

View File

@ -23,7 +23,7 @@ class LoadFileScheduling(LoadScopeScheduling):
def __init__(self, config, log=None):
super(LoadFileScheduling, self).__init__(config, log)
if log is None:
self.log = Producer('loadfilesched')
self.log = Producer("loadfilesched")
else:
self.log = log.loadfilesched
@ -49,4 +49,4 @@ class LoadFileScheduling(LoadScopeScheduling):
example/loadsuite/test/test_delta.py
example/loadsuite/epsilon/__init__.py
"""
return nodeid.split('::', 1)[0]
return nodeid.split("::", 1)[0]

View File

@ -133,10 +133,9 @@ class LoadScheduling(object):
assert self.collection
if collection != self.collection:
other_node = next(iter(self.node2collection.keys()))
msg = report_collection_diff(self.collection,
collection,
other_node.gateway.id,
node.gateway.id)
msg = report_collection_diff(
self.collection, collection, other_node.gateway.id, node.gateway.id
)
self.log(msg)
return
self.node2collection[node] = list(collection)
@ -226,7 +225,7 @@ class LoadScheduling(object):
# XXX allow nodes to have different collections
if not self._check_nodes_have_same_collection():
self.log('**Different tests collected, aborting run**')
self.log("**Different tests collected, aborting run**")
return
# Collections are identical, create the index of pending items.
@ -238,8 +237,7 @@ class LoadScheduling(object):
# Send a batch of tests to run. If we don't have at least two
# tests per node, we have to send them all so that we can send
# shutdown signals and get all nodes working.
initial_batch = max(len(self.pending) // 4,
2 * len(self.nodes))
initial_batch = max(len(self.pending) // 4, 2 * len(self.nodes))
# distribute tests round-robin up to the batch size
# (or until we run out)
@ -271,18 +269,15 @@ class LoadScheduling(object):
same_collection = True
for node, collection in node_collection_items[1:]:
msg = report_collection_diff(
col,
collection,
first_node.gateway.id,
node.gateway.id,
col, collection, first_node.gateway.id, node.gateway.id
)
if msg:
same_collection = False
self.log(msg)
if self.config is not None:
rep = CollectReport(
node.gateway.id, 'failed',
longrepr=msg, result=[])
node.gateway.id, "failed", longrepr=msg, result=[]
)
self.config.hook.pytest_collectreport(report=rep)
return same_collection

View File

@ -93,7 +93,7 @@ class LoadScopeScheduling(object):
self.registered_collections = OrderedDict()
if log is None:
self.log = Producer('loadscopesched')
self.log = Producer("loadscopesched")
else:
self.log = log.loadscopesched
@ -187,8 +187,7 @@ class LoadScopeScheduling(object):
break
else:
raise RuntimeError(
'Unable to identify crashitem on a workload with '
'pending items'
"Unable to identify crashitem on a workload with " "pending items"
)
# Made uncompleted work unit available again
@ -224,10 +223,7 @@ class LoadScopeScheduling(object):
other_node = next(iter(self.registered_collections.keys()))
msg = report_collection_diff(
self.collection,
collection,
other_node.gateway.id,
node.gateway.id
self.collection, collection, other_node.gateway.id, node.gateway.id
)
self.log(msg)
return
@ -255,9 +251,7 @@ class LoadScopeScheduling(object):
scope, work_unit = self.workqueue.popitem(last=False)
# Keep track of the assigned work
assigned_to_node = self.assigned_work.setdefault(
node, default=OrderedDict()
)
assigned_to_node = self.assigned_work.setdefault(node, default=OrderedDict())
assigned_to_node[scope] = work_unit
# Ask the node to execute the workload
@ -292,14 +286,11 @@ class LoadScopeScheduling(object):
example/loadsuite/test/test_delta.py::Delta1
example/loadsuite/epsilon/__init__.py
"""
return nodeid.rsplit('::', 1)[0]
return nodeid.rsplit("::", 1)[0]
def _pending_of(self, workload):
"""Return the number of pending tests in a workload."""
pending = sum(
list(scope.values()).count(False)
for scope in workload.values()
)
pending = sum(list(scope.values()).count(False) for scope in workload.values())
return pending
def _reschedule(self, node):
@ -317,7 +308,7 @@ class LoadScopeScheduling(object):
if not self.workqueue:
return
self.log('Number of units waiting for node:', len(self.workqueue))
self.log("Number of units waiting for node:", len(self.workqueue))
# Check that the node is almost depleted of work
# 2: Heuristic of minimum tests to enqueue more work
@ -348,13 +339,11 @@ class LoadScopeScheduling(object):
# Check that all nodes collected the same tests
if not self._check_nodes_have_same_collection():
self.log('**Different tests collected, aborting run**')
self.log("**Different tests collected, aborting run**")
return
# Collections are identical, create the final list of items
self.collection = list(
next(iter(self.registered_collections.values()))
)
self.collection = list(next(iter(self.registered_collections.values())))
if not self.collection:
return
@ -368,12 +357,12 @@ class LoadScopeScheduling(object):
extra_nodes = len(self.nodes) - len(self.workqueue)
if extra_nodes > 0:
self.log('Shuting down {0} nodes'.format(extra_nodes))
self.log("Shuting down {0} nodes".format(extra_nodes))
for _ in range(extra_nodes):
unused_node, assigned = self.assigned_work.popitem(last=True)
self.log('Shuting down unused node {0}'.format(unused_node))
self.log("Shuting down unused node {0}".format(unused_node))
unused_node.shutdown()
# Assign initial workload
@ -402,10 +391,7 @@ class LoadScopeScheduling(object):
for node, collection in node_collection_items[1:]:
msg = report_collection_diff(
col,
collection,
first_node.gateway.id,
node.gateway.id,
col, collection, first_node.gateway.id, node.gateway.id
)
if not msg:
continue
@ -416,12 +402,7 @@ class LoadScopeScheduling(object):
if self.config is None:
continue
rep = CollectReport(
node.gateway.id,
'failed',
longrepr=msg,
result=[]
)
rep = CollectReport(node.gateway.id, "failed", longrepr=msg, result=[])
self.config.hook.pytest_collectreport(report=rep)
return same_collection

View File

@ -22,16 +22,17 @@ def parse_spec_config(config):
except ValueError:
xspeclist.append(xspec)
else:
xspeclist.extend([xspec[i + 1:]] * num)
xspeclist.extend([xspec[i + 1 :]] * num)
if not xspeclist:
raise pytest.UsageError(
"MISSING test execution (tx) nodes: please specify --tx")
"MISSING test execution (tx) nodes: please specify --tx"
)
return xspeclist
class NodeManager(object):
EXIT_TIMEOUT = 10
DEFAULT_IGNORES = ['.*', '*.pyc', '*.pyo', '*~']
DEFAULT_IGNORES = [".*", "*.pyc", "*.pyo", "*~"]
def __init__(self, config, specs=None, defaultchdir="pyexecnetcache"):
self.config = config
@ -59,8 +60,7 @@ class NodeManager(object):
self.rsync(gateway, root, **self.rsyncoptions)
def setup_nodes(self, putevent):
self.config.hook.pytest_xdist_setupnodes(config=self.config,
specs=self.specs)
self.config.hook.pytest_xdist_setupnodes(config=self.config, specs=self.specs)
self.trace("setting up nodes")
nodes = []
for spec in self.specs:
@ -72,7 +72,7 @@ class NodeManager(object):
self.config.hook.pytest_xdist_newgateway(gateway=gw)
self.rsync_roots(gw)
node = WorkerController(self, gw, self.config, putevent)
gw.node = node # keep the node alive
gw.node = node # keep the node alive
node.setup()
self.trace("started node %r" % node)
return node
@ -91,6 +91,7 @@ class NodeManager(object):
return []
import pytest
import _pytest
pytestpath = pytest.__file__.rstrip("co")
pytestdir = py.path.local(_pytest.__file__).dirpath()
config = self.config
@ -114,10 +115,7 @@ class NodeManager(object):
ignores += self.config.option.rsyncignore
ignores += self.config.getini("rsyncignore")
return {
'ignores': ignores,
'verbose': self.config.option.verbose,
}
return {"ignores": ignores, "verbose": self.config.option.verbose}
def rsync(self, gateway, source, notify=None, verbose=False, ignores=None):
"""Perform rsync to remote hosts for node."""
@ -129,9 +127,12 @@ class NodeManager(object):
if spec.popen and not spec.chdir:
# XXX This assumes that sources are python-packages
# and that adding the basedir does not hurt.
gateway.remote_exec("""
gateway.remote_exec(
"""
import sys ; sys.path.insert(0, %r)
""" % os.path.dirname(str(source))).waitclose()
"""
% os.path.dirname(str(source))
).waitclose()
return
if (spec, source) in self._rsynced_specs:
return
@ -139,28 +140,24 @@ class NodeManager(object):
def finished():
if notify:
notify("rsyncrootready", spec, source)
rsync.add_target_host(gateway, finished=finished)
self._rsynced_specs.add((spec, source))
self.config.hook.pytest_xdist_rsyncstart(
source=source,
gateways=[gateway],
)
self.config.hook.pytest_xdist_rsyncstart(source=source, gateways=[gateway])
rsync.send()
self.config.hook.pytest_xdist_rsyncfinish(
source=source,
gateways=[gateway],
)
self.config.hook.pytest_xdist_rsyncfinish(source=source, gateways=[gateway])
class HostRSync(execnet.RSync):
""" RSyncer that filters out common files
"""
def __init__(self, sourcedir, *args, **kwargs):
self._synced = {}
self._ignores = []
ignores = kwargs.pop('ignores', None) or []
ignores = kwargs.pop("ignores", None) or []
for x in ignores:
x = getattr(x, 'strpath', x)
x = getattr(x, "strpath", x)
self._ignores.append(re.compile(fnmatch.translate(x)))
super(HostRSync, self).__init__(sourcedir=sourcedir, **kwargs)
@ -174,15 +171,15 @@ class HostRSync(execnet.RSync):
def add_target_host(self, gateway, finished=None):
remotepath = os.path.basename(self._sourcedir)
super(HostRSync, self).add_target(gateway, remotepath,
finishedcallback=finished,
delete=True,)
super(HostRSync, self).add_target(
gateway, remotepath, finishedcallback=finished, delete=True
)
def _report_send_file(self, gateway, modified_rel_path):
if self._verbose:
path = os.path.basename(self._sourcedir) + "/" + modified_rel_path
remotepath = gateway.spec.chdir
print('%s:%s <= %s' % (gateway.spec, remotepath, path))
print("%s:%s <= %s" % (gateway.spec, remotepath, path))
def make_reltoroot(roots, args):
@ -211,11 +208,12 @@ class WorkerController(object):
self.putevent = putevent
self.gateway = gateway
self.config = config
self.workerinput = {'workerid': gateway.id,
'workercount': len(nodemanager.specs),
'slaveid': gateway.id,
'slavecount': len(nodemanager.specs)
}
self.workerinput = {
"workerid": gateway.id,
"workercount": len(nodemanager.specs),
"slaveid": gateway.id,
"slavecount": len(nodemanager.specs),
}
# TODO: deprecated name, backward compatibility only. Remove it in future
self.slaveinput = self.workerinput
self._down = False
@ -225,7 +223,7 @@ class WorkerController(object):
py.log.setconsumer(self.log._keywords, None)
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.gateway.id,)
return "<%s %s>" % (self.__class__.__name__, self.gateway.id)
@property
def shutting_down(self):
@ -240,24 +238,22 @@ class WorkerController(object):
option_dict = vars(self.config.option)
if spec.popen:
name = "popen-%s" % self.gateway.id
if hasattr(self.config, '_tmpdirhandler'):
if hasattr(self.config, "_tmpdirhandler"):
basetemp = self.config._tmpdirhandler.getbasetemp()
option_dict['basetemp'] = str(basetemp.join(name))
option_dict["basetemp"] = str(basetemp.join(name))
self.config.hook.pytest_configure_node(node=self)
self.channel = self.gateway.remote_exec(xdist.remote)
self.channel.send((self.workerinput, args, option_dict))
if self.putevent:
self.channel.setcallback(
self.process_from_remote,
endmarker=self.ENDMARK)
self.channel.setcallback(self.process_from_remote, endmarker=self.ENDMARK)
def ensure_teardown(self):
if hasattr(self, 'channel'):
if hasattr(self, "channel"):
if not self.channel.isclosed():
self.log("closing", self.channel)
self.channel.close()
# del self.channel
if hasattr(self, 'gateway'):
if hasattr(self, "gateway"):
self.log("exiting", self.gateway)
self.gateway.exit()
# del self.gateway
@ -266,7 +262,7 @@ class WorkerController(object):
self.sendcommand("runtests", indices=indices)
def send_runtest_all(self):
self.sendcommand("runtests_all",)
self.sendcommand("runtests_all")
def shutdown(self):
if not self._down:
@ -309,25 +305,28 @@ class WorkerController(object):
self.notify_inproc(eventname, node=self, **kwargs)
elif eventname == "workerfinished":
self._down = True
self.workeroutput = self.slaveoutput = kwargs['workeroutput']
self.workeroutput = self.slaveoutput = kwargs["workeroutput"]
self.notify_inproc("workerfinished", node=self)
elif eventname in ("logstart", "logfinish"):
self.notify_inproc(eventname, node=self, **kwargs)
elif eventname in (
"testreport", "collectreport", "teardownreport"):
elif eventname in ("testreport", "collectreport", "teardownreport"):
item_index = kwargs.pop("item_index", None)
rep = unserialize_report(eventname, kwargs['data'])
rep = unserialize_report(eventname, kwargs["data"])
if item_index is not None:
rep.item_index = item_index
self.notify_inproc(eventname, node=self, rep=rep)
elif eventname == "collectionfinish":
self.notify_inproc(eventname, node=self, ids=kwargs['ids'])
self.notify_inproc(eventname, node=self, ids=kwargs["ids"])
elif eventname == "runtest_protocol_complete":
self.notify_inproc(eventname, node=self, **kwargs)
elif eventname == "logwarning":
self.notify_inproc(eventname, message=kwargs['message'],
code=kwargs['code'], nodeid=kwargs['nodeid'],
fslocation=kwargs['nodeid'])
self.notify_inproc(
eventname,
message=kwargs["message"],
code=kwargs["code"],
nodeid=kwargs["nodeid"],
fslocation=kwargs["nodeid"],
)
else:
raise ValueError("unknown event: %s" % (eventname,))
except KeyboardInterrupt:
@ -335,6 +334,7 @@ class WorkerController(object):
raise
except: # noqa
from _pytest._code import ExceptionInfo
excinfo = ExceptionInfo()
print("!" * 20, excinfo)
self.config.notify_exception(excinfo)
@ -351,56 +351,56 @@ def unserialize_report(name, reportdict):
ReprFileLocation,
ReprFuncArgs,
ReprLocals,
ReprTraceback
ReprTraceback,
)
if reportdict['longrepr']:
if 'reprcrash' in reportdict['longrepr'] and 'reprtraceback' in reportdict['longrepr']:
reprtraceback = reportdict['longrepr']['reprtraceback']
reprcrash = reportdict['longrepr']['reprcrash']
if reportdict["longrepr"]:
if (
"reprcrash" in reportdict["longrepr"]
and "reprtraceback" in reportdict["longrepr"]
):
reprtraceback = reportdict["longrepr"]["reprtraceback"]
reprcrash = reportdict["longrepr"]["reprcrash"]
unserialized_entries = []
reprentry = None
for entry_data in reprtraceback['reprentries']:
data = entry_data['data']
entry_type = entry_data['type']
if entry_type == 'ReprEntry':
for entry_data in reprtraceback["reprentries"]:
data = entry_data["data"]
entry_type = entry_data["type"]
if entry_type == "ReprEntry":
reprfuncargs = None
reprfileloc = None
reprlocals = None
if data['reprfuncargs']:
reprfuncargs = ReprFuncArgs(
**data['reprfuncargs'])
if data['reprfileloc']:
reprfileloc = ReprFileLocation(
**data['reprfileloc'])
if data['reprlocals']:
reprlocals = ReprLocals(
data['reprlocals']['lines'])
if data["reprfuncargs"]:
reprfuncargs = ReprFuncArgs(**data["reprfuncargs"])
if data["reprfileloc"]:
reprfileloc = ReprFileLocation(**data["reprfileloc"])
if data["reprlocals"]:
reprlocals = ReprLocals(data["reprlocals"]["lines"])
reprentry = ReprEntry(
lines=data['lines'],
lines=data["lines"],
reprfuncargs=reprfuncargs,
reprlocals=reprlocals,
filelocrepr=reprfileloc,
style=data['style']
style=data["style"],
)
elif entry_type == 'ReprEntryNative':
reprentry = ReprEntryNative(data['lines'])
elif entry_type == "ReprEntryNative":
reprentry = ReprEntryNative(data["lines"])
else:
report_unserialization_failure(
entry_type, name, reportdict)
report_unserialization_failure(entry_type, name, reportdict)
unserialized_entries.append(reprentry)
reprtraceback['reprentries'] = unserialized_entries
reprtraceback["reprentries"] = unserialized_entries
exception_info = ReprExceptionInfo(
reprtraceback=ReprTraceback(**reprtraceback),
reprcrash=ReprFileLocation(**reprcrash),
)
for section in reportdict['longrepr']['sections']:
for section in reportdict["longrepr"]["sections"]:
exception_info.addsection(*section)
reportdict['longrepr'] = exception_info
reportdict["longrepr"] = exception_info
return reportdict
if name == "testreport":
@ -411,13 +411,13 @@ def unserialize_report(name, reportdict):
def report_unserialization_failure(type_name, report_name, reportdict):
from pprint import pprint
url = 'https://github.com/pytest-dev/pytest-xdist/issues'
url = "https://github.com/pytest-dev/pytest-xdist/issues"
stream = py.io.TextIO()
pprint('-' * 100, stream=stream)
pprint('INTERNALERROR: Unknown entry type returned: %s' % type_name,
stream=stream)
pprint('report_name: %s' % report_name, stream=stream)
pprint("-" * 100, stream=stream)
pprint("INTERNALERROR: Unknown entry type returned: %s" % type_name, stream=stream)
pprint("report_name: %s" % report_name, stream=stream)
pprint(reportdict, stream=stream)
pprint('Please report this bug at %s' % url, stream=stream)
pprint('-' * 100, stream=stream)
pprint("Please report this bug at %s" % url, stream=stream)
pprint("-" * 100, stream=stream)
assert 0, stream.getvalue()