Retry on EPROTOTYPE on socket writes. (#1706)

Motivation:

When writing to a network socket on Apple platforms it is possible to
see EPROTOTYPE returned as an error. This is an undocumented and
special-case error code that appears to be associated with socket
shutdown, and so can fire when writing to a socket that is being shut
down by the other side. This should not be fired into the pipeline but
instead should be retried.

Modifications:

- Retry EPROTOTYPE errors on socket write methods.
- Add an (unfortunately) probabilistic test bed.

Result:

Should avoid weird error cases.
This commit is contained in:
Cory Benfield 2020-12-17 17:45:23 +00:00 committed by GitHub
parent 43931b7a7d
commit 4853e910e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 95 additions and 6 deletions

View File

@ -150,6 +150,7 @@ private func preconditionIsNotUnacceptableErrno(err: CInt, where function: Strin
@inline(__always)
@discardableResult
internal func syscall<T: FixedWidthInteger>(blocking: Bool,
eprototypeWorkaround: Bool = false,
where function: String = #function,
_ body: () throws -> T)
throws -> IOResult<T> {
@ -157,11 +158,18 @@ internal func syscall<T: FixedWidthInteger>(blocking: Bool,
let res = try body()
if res == -1 {
let err = errno
switch (err, blocking) {
case (EINTR, _):
switch (err, blocking, eprototypeWorkaround) {
case (EINTR, _, _):
continue
case (EWOULDBLOCK, true):
case (EWOULDBLOCK, true, _):
return .wouldBlock(0)
#if os(macOS) || os(iOS) || os(watchOS) || os(tvOS)
case (EPROTOTYPE, _, true):
// EPROTOTYPE can, on Darwin platforms, sometimes fire due to a race in the XNU kernel.
// The socket in question is about to shut down, so we can just retry the syscall and get
// the actual error (usually, but not necessarily, EPIPE).
continue
#endif
default:
preconditionIsNotUnacceptableErrno(err: err, where: function)
throw IOError(errnoCode: err, reason: function)
@ -356,7 +364,7 @@ internal enum Posix {
@inline(never)
public static func write(descriptor: CInt, pointer: UnsafeRawPointer, size: Int) throws -> IOResult<Int> {
return try syscall(blocking: true) {
return try syscall(blocking: true, eprototypeWorkaround: true) {
sysWrite(descriptor, pointer, size)
}
}
@ -371,7 +379,7 @@ internal enum Posix {
#if !os(Windows)
@inline(never)
public static func writev(descriptor: CInt, iovecs: UnsafeBufferPointer<IOVector>) throws -> IOResult<Int> {
return try syscall(blocking: true) {
return try syscall(blocking: true, eprototypeWorkaround: true) {
sysWritev(descriptor, iovecs.baseAddress!, CInt(iovecs.count))
}
}
@ -445,7 +453,7 @@ internal enum Posix {
public static func sendfile(descriptor: CInt, fd: CInt, offset: off_t, count: size_t) throws -> IOResult<Int> {
var written: off_t = 0
do {
_ = try syscall(blocking: false) { () -> ssize_t in
_ = try syscall(blocking: false, eprototypeWorkaround: true) { () -> ssize_t in
#if os(macOS) || os(iOS) || os(watchOS) || os(tvOS)
var w: off_t = off_t(count)
let result: CInt = Darwin.sendfile(fd, descriptor, offset, &w, nil, 0)

View File

@ -84,6 +84,7 @@ extension ChannelTests {
("testFixedSizeRecvByteBufferAllocatorSizeIsConstant", testFixedSizeRecvByteBufferAllocatorSizeIsConstant),
("testCloseInConnectPromise", testCloseInConnectPromise),
("testWritabilityChangeDuringReentrantFlushNow", testWritabilityChangeDuringReentrantFlushNow),
("testTriggerEPROTOTYPE", testTriggerEPROTOTYPE),
]
}
}

View File

@ -2814,6 +2814,40 @@ public final class ChannelTests: XCTestCase {
XCTAssertNoThrow(try handler.becameUnwritable.futureResult.wait())
XCTAssertNoThrow(try handler.becameWritable.futureResult.wait())
}
func testTriggerEPROTOTYPE() throws {
// This is a probabilistic test for https://github.com/swift-server/async-http-client/issues/322.
// We believe we'll see EPROTOTYPE on write syscalls if we write while the connections are being torn down.
// To check this we create 500 connections and close them, while the server attempts to write AS FAST AS IT CAN.
// As this test is probabilistic, we must not ignore transient failures in it.
let group = MultiThreadedEventLoopGroup(numberOfThreads: 2)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}
let serverLoop = group.next()
let clientLoop = group.next()
XCTAssertFalse(serverLoop === clientLoop)
let serverFuture = ServerBootstrap(group: serverLoop)
.childChannelInitializer { channel in
return channel.pipeline.addHandler(AlwaysBeWritingHandler(vectorWrites: [true, false].randomElement()!))
}
.bind(host: "localhost", port: 0)
let server: Channel = try assertNoThrowWithValue(try serverFuture.wait())
defer {
XCTAssertNoThrow(try server.close().wait())
}
let clientFactory = ClientBootstrap(group: clientLoop)
let serverAddress = server.localAddress!
for _ in 0..<500 {
let client = try assertNoThrowWithValue(clientFactory.connect(to: serverAddress).wait())
XCTAssertNoThrow(try client.close().wait())
}
}
}
fileprivate final class FailRegistrationAndDelayCloseHandler: ChannelOutboundHandler {
@ -2926,3 +2960,49 @@ final class ReentrantWritabilityChangingHandler: ChannelInboundHandler {
}
}
}
final class AlwaysBeWritingHandler: ChannelInboundHandler {
typealias InboundIn = ByteBuffer
typealias OutboundOut = ByteBuffer
static let buffer = ByteBuffer(string: "This is some data that I'm sending right now")
private let doVectorWrite: Bool
init(vectorWrites: Bool) {
self.doVectorWrite = vectorWrites
}
func channelActive(context: ChannelHandlerContext) {
self.keepWriting(context: context)
}
func errorCaught(context: ChannelHandlerContext, error: Error) {
if let error = error as? IOError, error.errnoCode == EPROTOTYPE {
XCTFail("Received EPROTOTYPE error")
}
}
private func keepWriting(context: ChannelHandlerContext) {
if self.doVectorWrite {
context.write(self.wrapOutboundOut(AlwaysBeWritingHandler.buffer)).whenFailure { error in
if let error = error as? IOError, error.errnoCode == EPROTOTYPE {
XCTFail("Received EPROTOTYPE error")
}
}
}
context.writeAndFlush(self.wrapOutboundOut(AlwaysBeWritingHandler.buffer)).whenComplete { result in
switch result {
case .success:
// We unroll the stack here to avoid blowing it apart.
context.eventLoop.execute {
self.keepWriting(context: context)
}
case .failure(let error):
if let error = error as? IOError, error.errnoCode == EPROTOTYPE {
XCTFail("Received EPROTOTYPE error")
}
}
}
}
}