don't crash when Channel goes inactive in read triggered by write error (#594)
Motivation: Previously we asserted that a Channel cannot go inactive after calling out for a read that was triggered by (draining the receive buffer after) a write error. This assertion was put in place to guard against `readComplete` events sent on inactive channels. It did that job just fine but crashes aren't great so we now conditionally fire the `readComplete` event if the Channel stays active. Modifications: make the readComplete event firing conditional Result: fewer crashes, more happy faces
This commit is contained in:
parent
ce0c6d92e9
commit
df764fedf0
|
@ -443,17 +443,20 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
|
|||
} catch let err {
|
||||
// If there is a write error we should try drain the inbound before closing the socket as there may be some data pending.
|
||||
// We ignore any error that is thrown as we will use the original err to close the channel and notify the user.
|
||||
if readIfNeeded0() {
|
||||
if self.readIfNeeded0() {
|
||||
assert(self.lifecycleManager.isActive)
|
||||
|
||||
// We need to continue reading until there is nothing more to be read from the socket as we will not have another chance to drain it.
|
||||
var readAtLeastOnce = false
|
||||
while let read = try? readFromSocket(), read == .some {
|
||||
assert(self.lifecycleManager.isActive)
|
||||
readAtLeastOnce = true
|
||||
}
|
||||
if readAtLeastOnce && self.lifecycleManager.isActive {
|
||||
pipeline.fireChannelReadComplete()
|
||||
}
|
||||
}
|
||||
|
||||
close0(error: err, mode: .all, promise: nil)
|
||||
self.close0(error: err, mode: .all, promise: nil)
|
||||
|
||||
// we handled all writes
|
||||
return .unregister
|
||||
|
|
|
@ -931,10 +931,11 @@ extension ChannelPipeline {
|
|||
}
|
||||
|
||||
private extension CloseMode {
|
||||
/// Returns the error to fail outstanding operations writes with.
|
||||
var error: ChannelError {
|
||||
switch self {
|
||||
case .all:
|
||||
return .alreadyClosed
|
||||
return .ioOnClosedChannel
|
||||
case .output:
|
||||
return .outputClosed
|
||||
case .input:
|
||||
|
|
|
@ -75,6 +75,7 @@ extension ChannelTests {
|
|||
("testFailedRegistrationOfAcceptedSocket", testFailedRegistrationOfAcceptedSocket),
|
||||
("testFailedRegistrationOfServerSocket", testFailedRegistrationOfServerSocket),
|
||||
("testTryingToBindOnPortThatIsAlreadyBoundFailsButDoesNotCrash", testTryingToBindOnPortThatIsAlreadyBoundFailsButDoesNotCrash),
|
||||
("testCloseInReadTriggeredByDrainingTheReceiveBufferBecauseOfWriteError", testCloseInReadTriggeredByDrainingTheReceiveBufferBecauseOfWriteError),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2449,6 +2449,122 @@ public class ChannelTests: XCTestCase {
|
|||
XCTFail("unexpected error: \(error)")
|
||||
}
|
||||
}
|
||||
|
||||
func testCloseInReadTriggeredByDrainingTheReceiveBufferBecauseOfWriteError() throws {
|
||||
final class WriteWhenActiveHandler: ChannelInboundHandler {
|
||||
typealias InboundIn = ByteBuffer
|
||||
typealias OutboundOut = ByteBuffer
|
||||
|
||||
let channelAvailablePromise: EventLoopPromise<Channel>
|
||||
|
||||
init(channelAvailablePromise: EventLoopPromise<Channel>) {
|
||||
self.channelAvailablePromise = channelAvailablePromise
|
||||
}
|
||||
|
||||
func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
|
||||
let buffer = self.unwrapInboundIn(data)
|
||||
XCTFail("unexpected read: \(String(decoding: buffer.readableBytesView, as: UTF8.self))")
|
||||
}
|
||||
|
||||
func channelActive(ctx: ChannelHandlerContext) {
|
||||
var buffer = ctx.channel.allocator.buffer(capacity: 1)
|
||||
buffer.write(staticString: "X")
|
||||
ctx.channel.writeAndFlush(self.wrapOutboundOut(buffer)).map { ctx.channel }.cascade(promise: self.channelAvailablePromise)
|
||||
}
|
||||
}
|
||||
|
||||
final class WriteAlwaysFailingSocket: Socket {
|
||||
init() throws {
|
||||
try super.init(protocolFamily: AF_INET, type: Posix.SOCK_STREAM, setNonBlocking: true)
|
||||
}
|
||||
|
||||
override func write(pointer: UnsafeRawBufferPointer) throws -> IOResult<Int> {
|
||||
throw IOError(errnoCode: ETXTBSY, function: "WriteAlwaysFailingSocket.write fake error")
|
||||
}
|
||||
|
||||
override func writev(iovecs: UnsafeBufferPointer<IOVector>) throws -> IOResult<Int> {
|
||||
throw IOError(errnoCode: ETXTBSY, function: "WriteAlwaysFailingSocket.writev fake error")
|
||||
}
|
||||
}
|
||||
|
||||
final class MakeChannelInactiveInReadCausedByWriteErrorHandler: ChannelInboundHandler {
|
||||
typealias InboundIn = ByteBuffer
|
||||
typealias OutboundOut = ByteBuffer
|
||||
|
||||
let serverChannel: EventLoopFuture<Channel>
|
||||
let allDonePromise: EventLoopPromise<Void>
|
||||
|
||||
init(serverChannel: EventLoopFuture<Channel>,
|
||||
allDonePromise: EventLoopPromise<Void>) {
|
||||
self.serverChannel = serverChannel
|
||||
self.allDonePromise = allDonePromise
|
||||
}
|
||||
|
||||
func channelActive(ctx: ChannelHandlerContext) {
|
||||
XCTAssert(serverChannel.eventLoop === ctx.eventLoop)
|
||||
self.serverChannel.whenSuccess { serverChannel in
|
||||
// all of the following futures need to complete synchronously for this test to test the correct
|
||||
// thing. Therefore we keep track if we're still on the same stack frame.
|
||||
var inSameStackFrame = true
|
||||
defer {
|
||||
inSameStackFrame = false
|
||||
}
|
||||
|
||||
XCTAssertTrue(serverChannel.isActive)
|
||||
// we allow auto-read again to make sure that the socket buffer is drained on write error
|
||||
// (cf. https://github.com/apple/swift-nio/issues/593)
|
||||
ctx.channel.setOption(option: ChannelOptions.autoRead, value: true).then {
|
||||
// let's trigger the write error
|
||||
var buffer = ctx.channel.allocator.buffer(capacity: 16)
|
||||
buffer.write(staticString: "THIS WILL FAIL ANYWAY")
|
||||
return ctx.writeAndFlush(self.wrapOutboundOut(buffer))
|
||||
}.map {
|
||||
XCTFail("this should have failed")
|
||||
}.whenFailure { error in
|
||||
XCTAssertEqual(ChannelError.ioOnClosedChannel, error as? ChannelError)
|
||||
XCTAssertTrue(inSameStackFrame)
|
||||
self.allDonePromise.succeed(result: ())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
|
||||
let buffer = self.unwrapInboundIn(data)
|
||||
XCTAssertEqual("X", String(decoding: buffer.readableBytesView, as: UTF8.self))
|
||||
ctx.close(promise: nil)
|
||||
}
|
||||
}
|
||||
|
||||
let singleThreadedELG = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||
defer {
|
||||
XCTAssertNoThrow(try singleThreadedELG.syncShutdownGracefully())
|
||||
}
|
||||
let serverChannelAvailablePromise: EventLoopPromise<Channel> = singleThreadedELG.next().newPromise()
|
||||
let allDonePromise: EventLoopPromise<Void> = singleThreadedELG.next().newPromise()
|
||||
let server = try assertNoThrowWithValue(ServerBootstrap(group: singleThreadedELG)
|
||||
.childChannelOption(ChannelOptions.allowRemoteHalfClosure, value: true)
|
||||
.childChannelInitializer { channel in
|
||||
channel.pipeline.add(handler: WriteWhenActiveHandler(channelAvailablePromise: serverChannelAvailablePromise))
|
||||
}
|
||||
.bind(host: "127.0.0.1", port: 0)
|
||||
.wait())
|
||||
defer {
|
||||
XCTAssertNoThrow(try server.close().wait())
|
||||
}
|
||||
|
||||
let c = try assertNoThrowWithValue(SocketChannel(socket: WriteAlwaysFailingSocket(),
|
||||
parent: nil,
|
||||
eventLoop: singleThreadedELG.next() as! SelectableEventLoop))
|
||||
XCTAssertNoThrow(try c.setOption(option: ChannelOptions.autoRead, value: false).wait())
|
||||
XCTAssertNoThrow(try c.setOption(option: ChannelOptions.allowRemoteHalfClosure, value: true).wait())
|
||||
XCTAssertNoThrow(try c.pipeline.add(handler: MakeChannelInactiveInReadCausedByWriteErrorHandler(serverChannel: serverChannelAvailablePromise.futureResult,
|
||||
allDonePromise: allDonePromise)).wait())
|
||||
XCTAssertNoThrow(try c.register().wait())
|
||||
XCTAssertNoThrow(try c.connect(to: server.localAddress!).wait())
|
||||
|
||||
XCTAssertNoThrow(try allDonePromise.futureResult.wait())
|
||||
XCTAssertFalse(c.isActive)
|
||||
}
|
||||
}
|
||||
|
||||
fileprivate final class FailRegistrationAndDelayCloseHandler: ChannelOutboundHandler {
|
||||
|
|
|
@ -200,7 +200,7 @@ final class DatagramChannelTests: XCTestCase {
|
|||
do {
|
||||
try $0.wait()
|
||||
XCTFail("Did not error")
|
||||
} catch ChannelError.alreadyClosed {
|
||||
} catch ChannelError.ioOnClosedChannel {
|
||||
// All good
|
||||
} catch {
|
||||
XCTFail("Unexpected error: \(error)")
|
||||
|
|
|
@ -164,7 +164,7 @@ class FileRegionTest : XCTestCase {
|
|||
}.wait()
|
||||
XCTFail("no error happened even though we closed before flush")
|
||||
} catch let e as ChannelError {
|
||||
XCTAssertEqual(ChannelError.alreadyClosed, e)
|
||||
XCTAssertEqual(ChannelError.ioOnClosedChannel, e)
|
||||
} catch let e {
|
||||
XCTFail("unexpected error \(e)")
|
||||
}
|
||||
|
|
|
@ -321,7 +321,7 @@ public class SocketChannelTest : XCTestCase {
|
|||
let writeFut = clientChannel.write(buffer).map {
|
||||
XCTFail("Must not succeed")
|
||||
}.thenIfError { error in
|
||||
XCTAssertEqual(error as? ChannelError, ChannelError.alreadyClosed)
|
||||
XCTAssertEqual(error as? ChannelError, ChannelError.ioOnClosedChannel)
|
||||
return clientChannel.close()
|
||||
}
|
||||
XCTAssertNoThrow(try clientChannel.close().wait())
|
||||
|
@ -470,7 +470,7 @@ public class SocketChannelTest : XCTestCase {
|
|||
do {
|
||||
try connectPromise.futureResult.wait()
|
||||
XCTFail("Did not throw")
|
||||
} catch let err as ChannelError where err == .alreadyClosed {
|
||||
} catch let err as ChannelError where err == .ioOnClosedChannel {
|
||||
// expected
|
||||
}
|
||||
XCTAssertNoThrow(try closePromise.futureResult.wait())
|
||||
|
|
Loading…
Reference in New Issue