Ensure localAddress / remoteAddress are still accessible in channelInactive / handlerRemoved (#346)
Motivation: Often its useful to be still be able to access the local / remote address during channelInactive / handlerRemoved callbacks to for example log it. We should ensure its still accessible during it. Modifications: - Fallback to slow-path in ChannelHandlerContext.localAddress0 / remoteAddress0 if fast-path fails to try accessing the address via the cache. - Clear cached addresses after all callbacks are run. - Add unit test. Result: Be able to access addresses while handlers are notified.
This commit is contained in:
parent
f06a9ccb79
commit
9f01374169
|
@ -681,9 +681,6 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
|
|||
p = nil
|
||||
}
|
||||
|
||||
// Fail all pending writes and so ensure all pending promises are notified
|
||||
self.unsetCachedAddressesFromSocket()
|
||||
|
||||
// Transition our internal state.
|
||||
let callouts = self.lifecycleManager.close(promise: p)
|
||||
|
||||
|
@ -701,6 +698,9 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
|
|||
self.pipeline.removeHandlers()
|
||||
|
||||
self.closePromise.succeed(result: ())
|
||||
|
||||
// Now reset the addresses as we notified all handlers / futures.
|
||||
self.unsetCachedAddressesFromSocket()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1017,11 +1017,29 @@ public final class ChannelHandlerContext: ChannelInvoker {
|
|||
}
|
||||
|
||||
public var remoteAddress: SocketAddress? {
|
||||
return try? self.channel._unsafe.remoteAddress0()
|
||||
do {
|
||||
// Fast-path access to the remoteAddress.
|
||||
return try self.channel._unsafe.remoteAddress0()
|
||||
} catch ChannelError.ioOnClosedChannel {
|
||||
// Channel was closed already but we may still have the address cached so try to access it via the Channel
|
||||
// so we are able to use it in channelInactive(...) / handlerRemoved(...) methods.
|
||||
return self.channel.remoteAddress
|
||||
} catch {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
public var localAddress: SocketAddress? {
|
||||
return try? self.channel._unsafe.localAddress0()
|
||||
do {
|
||||
// Fast-path access to the localAddress.
|
||||
return try self.channel._unsafe.localAddress0()
|
||||
} catch ChannelError.ioOnClosedChannel {
|
||||
// Channel was closed already but we may still have the address cached so try to access it via the Channel
|
||||
// so we are able to use it in channelInactive(...) / handlerRemoved(...) methods.
|
||||
return self.channel.localAddress
|
||||
} catch {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
public var eventLoop: EventLoop {
|
||||
|
|
|
@ -1437,9 +1437,16 @@ public class ChannelTests: XCTestCase {
|
|||
try serverChannel.syncCloseAcceptingAlreadyClosed()
|
||||
try clientChannel.syncCloseAcceptingAlreadyClosed()
|
||||
|
||||
for f in [ serverChannel.remoteAddress, serverChannel.localAddress, clientChannel.remoteAddress, clientChannel.localAddress ] {
|
||||
XCTAssertNil(f)
|
||||
}
|
||||
XCTAssertNoThrow(try serverChannel.closeFuture.wait())
|
||||
XCTAssertNoThrow(try clientChannel.closeFuture.wait())
|
||||
|
||||
// Schedule on the EventLoop to ensure we scheduled the cleanup of the cached addresses before.
|
||||
XCTAssertNoThrow(try group.next().submit {
|
||||
for f in [ serverChannel.remoteAddress, serverChannel.localAddress, clientChannel.remoteAddress, clientChannel.localAddress ] {
|
||||
XCTAssertNil(f)
|
||||
}
|
||||
}.wait())
|
||||
|
||||
}
|
||||
|
||||
func testReceiveAddressAfterAccept() throws {
|
||||
|
|
|
@ -43,6 +43,7 @@ extension SocketChannelTest {
|
|||
("testWithConfiguredStreamSocket", testWithConfiguredStreamSocket),
|
||||
("testWithConfiguredDatagramSocket", testWithConfiguredDatagramSocket),
|
||||
("testPendingConnectNotificationOrder", testPendingConnectNotificationOrder),
|
||||
("testLocalAndRemoteAddressNotNilInChannelInactiveAndHandlerRemoved", testLocalAndRemoteAddressNotNilInChannelInactiveAndHandlerRemoved),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -446,4 +446,58 @@ public class SocketChannelTest : XCTestCase {
|
|||
XCTAssertNoThrow(try channel.closeFuture.wait())
|
||||
XCTAssertNoThrow(try promise.futureResult.wait())
|
||||
}
|
||||
|
||||
public func testLocalAndRemoteAddressNotNilInChannelInactiveAndHandlerRemoved() throws {
|
||||
|
||||
class AddressVerificationHandler: ChannelInboundHandler {
|
||||
typealias InboundIn = Never
|
||||
typealias OutboundIn = Never
|
||||
|
||||
enum HandlerState {
|
||||
case created
|
||||
case inactive
|
||||
case removed
|
||||
}
|
||||
|
||||
let promise: EventLoopPromise<Void>
|
||||
var state = HandlerState.created
|
||||
|
||||
init(promise: EventLoopPromise<Void>) {
|
||||
self.promise = promise
|
||||
}
|
||||
|
||||
func channelInactive(ctx: ChannelHandlerContext) {
|
||||
XCTAssertNotNil(ctx.localAddress)
|
||||
XCTAssertNotNil(ctx.remoteAddress)
|
||||
XCTAssertEqual(.created, state)
|
||||
state = .inactive
|
||||
}
|
||||
|
||||
func handlerRemoved(ctx: ChannelHandlerContext) {
|
||||
XCTAssertNotNil(ctx.localAddress)
|
||||
XCTAssertNotNil(ctx.remoteAddress)
|
||||
XCTAssertEqual(.inactive, state)
|
||||
state = .removed
|
||||
|
||||
ctx.channel.closeFuture.whenComplete {
|
||||
XCTAssertNil(ctx.localAddress)
|
||||
XCTAssertNil(ctx.remoteAddress)
|
||||
|
||||
self.promise.succeed(result: ())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let group = MultiThreadedEventLoopGroup(numThreads: 1)
|
||||
defer { XCTAssertNoThrow(try group.syncShutdownGracefully()) }
|
||||
|
||||
let handler = AddressVerificationHandler(promise: group.next().newPromise())
|
||||
let serverChannel = try ServerBootstrap(group: group).childChannelInitializer { $0.pipeline.add(handler: handler) }.bind(host: "127.0.0.1", port: 0).wait()
|
||||
defer { XCTAssertNoThrow(try serverChannel.close().wait()) }
|
||||
|
||||
let clientChannel = try ClientBootstrap(group: group).connect(to: serverChannel.localAddress!).wait()
|
||||
|
||||
XCTAssertNoThrow(try clientChannel.close().wait())
|
||||
XCTAssertNoThrow(try handler.promise.futureResult.wait())
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue