wip: fix capfd

This commit is contained in:
Anthony Sottile 2021-04-26 19:41:02 -07:00
parent 2049ae271e
commit da61bb2a24
2 changed files with 41 additions and 23 deletions

View File

@ -347,6 +347,17 @@ class SysCapture(SysCaptureBinary):
self._old.flush() self._old.flush()
def flush(func):
def flush_inner(*args, **kwargs):
sys.stdout.flush()
sys.stderr.flush()
ret = func(*args, **kwargs)
sys.stdout.flush()
sys.stderr.flush()
return ret
return flush_inner
class FDCaptureBinary: class FDCaptureBinary:
"""Capture IO to/from a given OS-level file descriptor. """Capture IO to/from a given OS-level file descriptor.
@ -380,7 +391,7 @@ class FDCaptureBinary:
if targetfd == 0: if targetfd == 0:
self.tmpfile = open(os.devnull) self.tmpfile = open(os.devnull)
self.syscapture = SysCapture(targetfd) #self.syscapture = SysCapture(targetfd)
else: else:
self.tmpfile = EncodedFile( self.tmpfile = EncodedFile(
TemporaryFile(buffering=0), TemporaryFile(buffering=0),
@ -389,10 +400,10 @@ class FDCaptureBinary:
newline="", newline="",
write_through=True, write_through=True,
) )
if targetfd in patchsysdict: #if targetfd in patchsysdict:
self.syscapture = SysCapture(targetfd, self.tmpfile) #self.syscapture = SysCapture(targetfd, self.tmpfile)
else: #else:
self.syscapture = NoCapture() #self.syscapture = NoCapture()
self._state = "initialized" self._state = "initialized"
@ -412,13 +423,17 @@ class FDCaptureBinary:
op, self._state, ", ".join(states) op, self._state, ", ".join(states)
) )
@flush
def start(self) -> None: def start(self) -> None:
"""Start capturing on targetfd using memorized tmpfile.""" """Start capturing on targetfd using memorized tmpfile."""
self._assert_state("start", ("initialized",)) self._assert_state("start", ("initialized",))
sys.stdout.flush()
sys.stderr.flush()
os.dup2(self.tmpfile.fileno(), self.targetfd) os.dup2(self.tmpfile.fileno(), self.targetfd)
self.syscapture.start() #self.syscapture.start()
self._state = "started" self._state = "started"
@flush
def snap(self): def snap(self):
self._assert_state("snap", ("started", "suspended")) self._assert_state("snap", ("started", "suspended"))
self.tmpfile.seek(0) self.tmpfile.seek(0)
@ -427,35 +442,42 @@ class FDCaptureBinary:
self.tmpfile.truncate() self.tmpfile.truncate()
return res return res
@flush
def done(self) -> None: def done(self) -> None:
"""Stop capturing, restore streams, return original capture file, """Stop capturing, restore streams, return original capture file,
seeked to position zero.""" seeked to position zero."""
self._assert_state("done", ("initialized", "started", "suspended", "done")) self._assert_state("done", ("initialized", "started", "suspended", "done"))
if self._state == "done": if self._state == "done":
return return
sys.stdout.flush()
sys.stderr.flush()
os.dup2(self.targetfd_save, self.targetfd) os.dup2(self.targetfd_save, self.targetfd)
os.close(self.targetfd_save) os.close(self.targetfd_save)
if self.targetfd_invalid is not None: if self.targetfd_invalid is not None:
if self.targetfd_invalid != self.targetfd: if self.targetfd_invalid != self.targetfd:
os.close(self.targetfd) os.close(self.targetfd)
os.close(self.targetfd_invalid) os.close(self.targetfd_invalid)
self.syscapture.done() #self.syscapture.done()
self.tmpfile.close() self.tmpfile.close()
self._state = "done" self._state = "done"
@flush
def suspend(self) -> None: def suspend(self) -> None:
self._assert_state("suspend", ("started", "suspended")) self._assert_state("suspend", ("started", "suspended"))
if self._state == "suspended": if self._state == "suspended":
return return
self.syscapture.suspend() #self.syscapture.suspend()
sys.stdout.flush()
sys.stderr.flush()
os.dup2(self.targetfd_save, self.targetfd) os.dup2(self.targetfd_save, self.targetfd)
self._state = "suspended" self._state = "suspended"
@flush
def resume(self) -> None: def resume(self) -> None:
self._assert_state("resume", ("started", "suspended")) self._assert_state("resume", ("started", "suspended"))
if self._state == "started": if self._state == "started":
return return
self.syscapture.resume() #self.syscapture.resume()
os.dup2(self.tmpfile.fileno(), self.targetfd) os.dup2(self.tmpfile.fileno(), self.targetfd)
self._state = "started" self._state = "started"
@ -475,6 +497,8 @@ class FDCapture(FDCaptureBinary):
EMPTY_BUFFER = "" # type: ignore EMPTY_BUFFER = "" # type: ignore
def snap(self): def snap(self):
sys.stdout.flush()
sys.stderr.flush()
self._assert_state("snap", ("started", "suspended")) self._assert_state("snap", ("started", "suspended"))
self.tmpfile.seek(0) self.tmpfile.seek(0)
res = self.tmpfile.read() res = self.tmpfile.read()

View File

@ -275,7 +275,7 @@ class TestPerTestCapturing:
raise ValueError raise ValueError
""" """
) )
result = pytester.runpytest(p1) result = pytester.runpytest_subprocess(p1)
result.stdout.fnmatch_lines( result.stdout.fnmatch_lines(
[ [
"*test_capturing_outerr.py .F*", "*test_capturing_outerr.py .F*",
@ -510,7 +510,7 @@ class TestCaptureFixture:
method method
) )
) )
result = pytester.runpytest(p) result = pytester.runpytest_subprocess(p)
result.stdout.fnmatch_lines(["xxx42xxx"]) result.stdout.fnmatch_lines(["xxx42xxx"])
def test_stdfd_functional(self, pytester: Pytester) -> None: def test_stdfd_functional(self, pytester: Pytester) -> None:
@ -571,7 +571,7 @@ class TestCaptureFixture:
print("stderr after", file=sys.stderr) print("stderr after", file=sys.stderr)
""" """
) )
result = pytester.runpytest(str(p1), "-rA") result = pytester.runpytest_subprocess(str(p1), "-rA")
result.stdout.fnmatch_lines( result.stdout.fnmatch_lines(
[ [
"*- Captured stdout call -*", "*- Captured stdout call -*",
@ -738,8 +738,8 @@ class TestCaptureFixture:
cap=cap cap=cap
) )
) )
reprec = pytester.inline_run() result = pytester.runpytest_subprocess()
reprec.assertoutcome(passed=1) result.stdout.fnmatch_lines(["*1 passed*"])
def test_setup_failure_does_not_kill_capturing(pytester: Pytester) -> None: def test_setup_failure_does_not_kill_capturing(pytester: Pytester) -> None:
@ -765,7 +765,7 @@ def test_capture_conftest_runtest_setup(pytester: Pytester) -> None:
""" """
) )
pytester.makepyfile("def test_func(): pass") pytester.makepyfile("def test_func(): pass")
result = pytester.runpytest() result = pytester.runpytest_subprocess()
assert result.ret == 0 assert result.ret == 0
result.stdout.no_fnmatch_line("*hello19*") result.stdout.no_fnmatch_line("*hello19*")
@ -1039,12 +1039,6 @@ class TestFDCapture:
cap.targetfd_save, cap.tmpfile cap.targetfd_save, cap.tmpfile
) )
) )
# Should not crash with missing "_old".
assert repr(cap.syscapture) == (
"<SysCapture stdout _old=<UNSET> _state='done' tmpfile={!r}>".format(
cap.syscapture.tmpfile
)
)
def test_capfd_sys_stdout_mode(self, capfd) -> None: def test_capfd_sys_stdout_mode(self, capfd) -> None:
assert "b" not in sys.stdout.mode assert "b" not in sys.stdout.mode
@ -1422,8 +1416,8 @@ def test_error_attribute_issue555(pytester: Pytester) -> None:
""" """
import sys import sys
def test_capattr(): def test_capattr():
assert sys.stdout.errors == "replace" assert sys.stdout.errors == "strict"
assert sys.stderr.errors == "replace" assert sys.stderr.errors == "strict"
""" """
) )
reprec = pytester.inline_run() reprec = pytester.inline_run()