fix issues found by TSan (#294)
Motivation: NIO 1.3.1 was TSan clean but `master` regressed. In total I could see two breakages both proudly presented by `SocketChannelLifecycleManager`. Sorry :| : 1. accessing the `isActiveAtomic` from arbitrary threads: this was clearly a bug because `SocketChannelLifecycleManager` is usually held mutabe. So `isActiveAtomic` needs to be injected into `SocketChannelLifecycleManager` rather than contstructed there 2. accessing the `ChannelPipeline` through `SocketChannelLifecycleManager`: same problem as above really. Modifications: inject both the ChannelPipeline and the isActiveAtomic in the appropriate places. Result: NIO much more correct, TSan happy
This commit is contained in:
parent
7e70bf5a87
commit
748e08e62a
|
@ -30,10 +30,10 @@ private struct SocketChannelLifecycleManager {
|
|||
}
|
||||
|
||||
// MARK: properties
|
||||
private let eventLoop: EventLoop
|
||||
// this is queried from the Channel, ie. must be thread-safe
|
||||
internal let isActiveAtomic = Atomic(value: false)
|
||||
internal let isActiveAtomic: Atomic<Bool>
|
||||
// these are only to be accessed on the EventLoop
|
||||
internal let channelPipeline: ChannelPipeline
|
||||
private var currentState: State = .fresh {
|
||||
didSet {
|
||||
assert(self.eventLoop.inEventLoop)
|
||||
|
@ -48,13 +48,11 @@ private struct SocketChannelLifecycleManager {
|
|||
}
|
||||
}
|
||||
|
||||
private var eventLoop: EventLoop {
|
||||
return self.channelPipeline.eventLoop
|
||||
}
|
||||
|
||||
// MARK: API
|
||||
internal init(channelPipeline: ChannelPipeline) {
|
||||
self.channelPipeline = channelPipeline
|
||||
// isActiveAtomic needs to be injected as it's accessed from arbitrary threads and `SocketChannelLifecycleManager` is usually held mutable
|
||||
internal init(eventLoop: EventLoop, isActiveAtomic: Atomic<Bool>) {
|
||||
self.eventLoop = eventLoop
|
||||
self.isActiveAtomic = isActiveAtomic
|
||||
}
|
||||
|
||||
// this is called from Channel's deinit, so don't assert we're on the EventLoop!
|
||||
|
@ -63,23 +61,23 @@ private struct SocketChannelLifecycleManager {
|
|||
}
|
||||
|
||||
@inline(__always) // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined
|
||||
internal mutating func register(promise: EventLoopPromise<Void>?) -> (() -> Void) {
|
||||
internal mutating func register(promise: EventLoopPromise<Void>?) -> ((ChannelPipeline) -> Void) {
|
||||
return self.moveState(event: .register, promise: promise)
|
||||
}
|
||||
|
||||
@inline(__always) // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined
|
||||
internal mutating func close(promise: EventLoopPromise<Void>?) -> (() -> Void) {
|
||||
internal mutating func close(promise: EventLoopPromise<Void>?) -> ((ChannelPipeline) -> Void) {
|
||||
return self.moveState(event: .close, promise: promise)
|
||||
}
|
||||
|
||||
@inline(__always) // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined
|
||||
internal mutating func activate(promise: EventLoopPromise<Void>?) -> (() -> Void) {
|
||||
internal mutating func activate(promise: EventLoopPromise<Void>?) -> ((ChannelPipeline) -> Void) {
|
||||
return self.moveState(event: .activate, promise: promise)
|
||||
}
|
||||
|
||||
// MARK: private API
|
||||
@inline(__always) // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined
|
||||
private mutating func moveState(event: Event, promise: EventLoopPromise<Void>?) -> (() -> Void) {
|
||||
private mutating func moveState(event: Event, promise: EventLoopPromise<Void>?) -> ((ChannelPipeline) -> Void) {
|
||||
assert(self.eventLoop.inEventLoop)
|
||||
|
||||
switch (self.currentState, event) {
|
||||
|
@ -125,11 +123,10 @@ private struct SocketChannelLifecycleManager {
|
|||
}
|
||||
|
||||
@inline(__always) // we need to return a closure here and to not suffer from a potential allocation for that this must be inlined
|
||||
private mutating func doStateTransfer(newState: State, promise: EventLoopPromise<Void>?, _ callouts: @escaping (ChannelPipeline) -> Void) -> (() -> Void) {
|
||||
private mutating func doStateTransfer(newState: State, promise: EventLoopPromise<Void>?, _ callouts: @escaping (ChannelPipeline) -> Void) -> ((ChannelPipeline) -> Void) {
|
||||
self.currentState = newState
|
||||
|
||||
let pipeline = self.channelPipeline
|
||||
return {
|
||||
return { pipeline in
|
||||
promise?.succeed(result: ())
|
||||
callouts(pipeline)
|
||||
}
|
||||
|
@ -180,6 +177,8 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
|
|||
private let selectableEventLoop: SelectableEventLoop
|
||||
private let addressesCached: AtomicBox<Box<(local:SocketAddress?, remote:SocketAddress?)>> = AtomicBox(value: Box((local: nil, remote: nil)))
|
||||
private let bufferAllocatorCached: AtomicBox<Box<ByteBufferAllocator>>
|
||||
private let isActiveAtomic: Atomic<Bool> = Atomic(value: false)
|
||||
private var _pipeline: ChannelPipeline! = nil // this is really a constant (set in .init) but needs `self` to be constructed and therefore a `var`. Do not change as this needs to accessed from arbitrary threads
|
||||
|
||||
internal var interestedEvent: IOEvent = .none
|
||||
|
||||
|
@ -190,7 +189,11 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
|
|||
|
||||
private var inFlushNow: Bool = false // Guard against re-entrance of flushNow() method.
|
||||
private var autoRead: Bool = true
|
||||
private var lifecycleManager: SocketChannelLifecycleManager!
|
||||
private var lifecycleManager: SocketChannelLifecycleManager {
|
||||
didSet {
|
||||
assert(self.eventLoop.inEventLoop)
|
||||
}
|
||||
}
|
||||
|
||||
private var bufferAllocator: ByteBufferAllocator = ByteBufferAllocator() {
|
||||
didSet {
|
||||
|
@ -248,7 +251,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
|
|||
|
||||
// This is `Channel` API so must be thread-safe.
|
||||
public var isActive: Bool {
|
||||
return self.lifecycleManager.isActiveAtomic.load()
|
||||
return self.isActiveAtomic.load()
|
||||
}
|
||||
|
||||
// This is `Channel` API so must be thread-safe.
|
||||
|
@ -276,7 +279,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
|
|||
|
||||
// This is `Channel` API so must be thread-safe.
|
||||
public final var pipeline: ChannelPipeline {
|
||||
return self.lifecycleManager.channelPipeline
|
||||
return self._pipeline
|
||||
}
|
||||
|
||||
// MARK: Methods to override in subclasses.
|
||||
|
@ -335,8 +338,9 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
|
|||
self.closePromise = eventLoop.newPromise()
|
||||
self.parent = parent
|
||||
self.recvAllocator = recvAllocator
|
||||
self.lifecycleManager = SocketChannelLifecycleManager(channelPipeline: ChannelPipeline(channel: self))
|
||||
self.lifecycleManager = SocketChannelLifecycleManager(eventLoop: eventLoop, isActiveAtomic: self.isActiveAtomic)
|
||||
// As the socket may already be connected we should ensure we start with the correct addresses cached.
|
||||
self._pipeline = ChannelPipeline(channel: self)
|
||||
self.addressesCached.store(Box((local: try? socket.localAddress(), remote: try? socket.remoteAddress())))
|
||||
}
|
||||
|
||||
|
@ -364,6 +368,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
|
|||
///
|
||||
/// - returns: If this socket should be registered for write notifications. Ie. `IONotificationState.register` if _not_ all data could be written, so notifications are necessary; and `IONotificationState.unregister` if everything was written and we don't need to be notified about writability at the moment.
|
||||
func flushNow() -> IONotificationState {
|
||||
assert(self.eventLoop.inEventLoop)
|
||||
// Guard against re-entry as data that will be put into `pendingWrites` will just be picked up by
|
||||
// `writeToSocket`.
|
||||
guard !self.inFlushNow && self.isOpen else {
|
||||
|
@ -664,7 +669,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
|
|||
self.unsetCachedAddressesFromSocket()
|
||||
self.cancelWritesOnClose(error: error)
|
||||
|
||||
self.lifecycleManager.close(promise: p)()
|
||||
self.lifecycleManager.close(promise: p)(self.pipeline)
|
||||
|
||||
eventLoop.execute {
|
||||
// ensure this is executed in a delayed fashion as the users code may still traverse the pipeline
|
||||
|
@ -697,7 +702,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
|
|||
do {
|
||||
// We always register with interested .none and will just trigger readIfNeeded0() later to re-register if needed.
|
||||
try self.safeRegister(interested: .none)
|
||||
self.lifecycleManager.register(promise: promise)()
|
||||
self.lifecycleManager.register(promise: promise)(self.pipeline)
|
||||
} catch {
|
||||
promise?.fail(error: error)
|
||||
}
|
||||
|
@ -914,7 +919,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
|
|||
|
||||
final func becomeActive0(promise: EventLoopPromise<Void>?) {
|
||||
assert(eventLoop.inEventLoop)
|
||||
self.lifecycleManager.activate(promise: promise)()
|
||||
self.lifecycleManager.activate(promise: promise)(self.pipeline)
|
||||
self.readIfNeeded0()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue