Synchronous connection failures must close channels. (#329)

Motivation:

When a connect() call returns an error other than EINPROGRESS, we
currently leave the FD registered and don't close the channel. This is
wrong, we should take this as a signal to close the socket immediately,
rathern than letting the selector tell us this FD is dead: after all,
we know it's dead.

Modifications:

Call `close0()` in synchronous connect failures.

Result:

Faster failures!
This commit is contained in:
Cory Benfield 2018-04-18 16:07:05 +01:00 committed by Johannes Weiß
parent 17af3c7c67
commit 707b413ec6
4 changed files with 98 additions and 39 deletions

View File

@ -902,7 +902,6 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}
do {
assert(self.lifecycleManager.isRegistered)
if try !connectSocket(to: address) {
// We aren't connected, we'll get the remote address later.
self.updateCachedAddressesFromSocket(updateLocal: true, updateRemote: false)
@ -917,7 +916,13 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
becomeActive0(promise: promise)
}
} catch let error {
promise?.fail(error: error)
assert(self.lifecycleManager.isRegistered)
// We would like to have this assertion here, but we want to be able to go through this
// code path in cases where connect() is being called on channels that are already active.
//assert(!self.lifecycleManager.isActive)
// We're going to set the promise as the pending connect promise, and let close0 fail it for us.
self.pendingConnect = promise
self.close0(error: error, mode: .all, promise: nil)
}
}

View File

@ -66,6 +66,7 @@ extension ChannelTests {
("testAppropriateAndInappropriateOperationsForUnregisteredSockets", testAppropriateAndInappropriateOperationsForUnregisteredSockets),
("testCloseSocketWhenReadErrorWasReceivedAndMakeSureNoReadCompleteArrives", testCloseSocketWhenReadErrorWasReceivedAndMakeSureNoReadCompleteArrives),
("testSocketFailingAsyncCorrectlyTearsTheChannelDownAndDoesntCrash", testSocketFailingAsyncCorrectlyTearsTheChannelDownAndDoesntCrash),
("testSocketErroringSynchronouslyCorrectlyTearsTheChannelDown", testSocketErroringSynchronouslyCorrectlyTearsTheChannelDown),
]
}
}

View File

@ -2142,40 +2142,6 @@ public class ChannelTests: XCTestCase {
}
}
class VerifyThingsAreRightHandler: ChannelInboundHandler {
typealias InboundIn = Never
private let allDone: EventLoopPromise<Void>
enum State {
case fresh
case registered
case unregistered
}
private var state: State = .fresh
init(allDone: EventLoopPromise<Void>) {
self.allDone = allDone
}
deinit { XCTAssertEqual(.unregistered, self.state) }
func channelActive(ctx: ChannelHandlerContext) { XCTFail("should never become active") }
func channelRead(ctx: ChannelHandlerContext, data: NIOAny) { XCTFail("should never read") }
func channelReadComplete(ctx: ChannelHandlerContext) { XCTFail("should never readComplete") }
func errorCaught(ctx: ChannelHandlerContext, error: Error) { XCTFail("pipeline shouldn't be told about connect error") }
func channelRegistered(ctx: ChannelHandlerContext) {
XCTAssertEqual(.fresh, self.state)
self.state = .registered
}
func channelUnregistered(ctx: ChannelHandlerContext) {
XCTAssertEqual(.registered, self.state)
self.state = .unregistered
self.allDone.succeed(result: ())
}
}
let group = MultiThreadedEventLoopGroup(numThreads: 2)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
@ -2191,7 +2157,7 @@ public class ChannelTests: XCTestCase {
let allDone: EventLoopPromise<Void> = group.next().newPromise()
XCTAssertNoThrow(try sc.eventLoop.submit {
sc.pipeline.add(handler: VerifyThingsAreRightHandler(allDone: allDone)).then {
sc.pipeline.add(handler: VerifyConnectionFailureHandler(allDone: allDone)).then {
sc.register().then {
sc.connect(to: serverChannel.localAddress!)
}
@ -2200,4 +2166,87 @@ public class ChannelTests: XCTestCase {
XCTAssertNoThrow(try allDone.futureResult.wait())
XCTAssertNoThrow(try sc.syncCloseAcceptingAlreadyClosed())
}
func testSocketErroringSynchronouslyCorrectlyTearsTheChannelDown() throws {
// regression test for #322
enum DummyError: Error { case dummy }
class SocketFailingConnect: Socket {
init() throws {
try super.init(protocolFamily: PF_INET, type: Posix.SOCK_STREAM, setNonBlocking: true)
}
override func connect(to address: SocketAddress) throws -> Bool {
throw DummyError.dummy
}
}
let group = MultiThreadedEventLoopGroup(numThreads: 2)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}
let sc = try SocketChannel(socket: SocketFailingConnect(), eventLoop: group.next() as! SelectableEventLoop)
let serverChannel = try ServerBootstrap(group: group.next())
.bind(host: "127.0.0.1", port: 0)
.wait()
defer {
XCTAssertNoThrow(try serverChannel.syncCloseAcceptingAlreadyClosed())
}
let allDone: EventLoopPromise<Void> = group.next().newPromise()
XCTAssertNoThrow(try sc.eventLoop.submit {
let f = sc.pipeline.add(handler: VerifyConnectionFailureHandler(allDone: allDone)).then {
sc.register().then {
sc.connect(to: serverChannel.localAddress!)
}
}
f.whenSuccess {
XCTFail("Must not succeed")
}
f.whenFailure { err in
XCTAssertEqual(err as? DummyError, .dummy)
}
// We can block here because connect must have failed synchronously.
XCTAssertTrue(f.isFulfilled)
}.wait())
XCTAssertNoThrow(try allDone.futureResult.wait())
XCTAssertNoThrow(try sc.closeFuture.wait())
XCTAssertNoThrow(try sc.syncCloseAcceptingAlreadyClosed())
}
}
fileprivate class VerifyConnectionFailureHandler: ChannelInboundHandler {
typealias InboundIn = Never
private let allDone: EventLoopPromise<Void>
enum State {
case fresh
case registered
case unregistered
}
private var state: State = .fresh
init(allDone: EventLoopPromise<Void>) {
self.allDone = allDone
}
deinit { XCTAssertEqual(.unregistered, self.state) }
func channelActive(ctx: ChannelHandlerContext) { XCTFail("should never become active") }
func channelRead(ctx: ChannelHandlerContext, data: NIOAny) { XCTFail("should never read") }
func channelReadComplete(ctx: ChannelHandlerContext) { XCTFail("should never readComplete") }
func errorCaught(ctx: ChannelHandlerContext, error: Error) { XCTFail("pipeline shouldn't be told about connect error") }
func channelRegistered(ctx: ChannelHandlerContext) {
XCTAssertEqual(.fresh, self.state)
self.state = .registered
}
func channelUnregistered(ctx: ChannelHandlerContext) {
XCTAssertEqual(.registered, self.state)
self.state = .unregistered
self.allDone.succeed(result: ())
}
}

View File

@ -262,10 +262,14 @@ public class SocketChannelTest : XCTestCase {
let serverChannel = try ServerBootstrap(group: group).bind(host: "127.0.0.1", port: 0).wait()
do {
try serverChannel.connect(to: serverChannel.localAddress!).wait()
XCTFail("Did not throw")
XCTAssertNoThrow(try serverChannel.close().wait())
} catch let err as ChannelError where err == .operationUnsupported {
// expected
// expected, no close here as the channel is already closed.
} catch {
XCTFail("Unexpected error \(error)")
XCTAssertNoThrow(try serverChannel.close().wait())
}
try serverChannel.close().wait()
}
public func testCloseDuringWriteFailure() throws {