stop using AtomicBox (#1289)
Motivation: AtomicBox doesn't properly work (#1286) and despite fixing it in #1287, it's really not worth using AtomicBox (which as of #1287 instroduces a CAS loop) anymore. Modifications: Remove the two uses of AtomicBox. Result: No usage of broken/slow data types.
This commit is contained in:
parent
e3290d891a
commit
88c7994869
|
@ -205,19 +205,85 @@ private struct SocketChannelLifecycleManager {
|
|||
class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, ChannelCore {
|
||||
typealias SelectableType = SocketType.SelectableType
|
||||
|
||||
// MARK: Stored Properties
|
||||
// Visible to access from EventLoop directly
|
||||
struct AddressCache {
|
||||
// deliberately lets because they must always be updated together (so forcing `init` is useful).
|
||||
let local: Optional<SocketAddress>
|
||||
let remote: Optional<SocketAddress>
|
||||
|
||||
init(local: SocketAddress?, remote: SocketAddress?) {
|
||||
self.local = local
|
||||
self.remote = remote
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: - Stored Properties
|
||||
// MARK: Constants & atomics (accessible everywhere)
|
||||
public let parent: Channel?
|
||||
internal let socket: SocketType
|
||||
private let closePromise: EventLoopPromise<Void>
|
||||
internal let selectableEventLoop: SelectableEventLoop
|
||||
private let addressesCached: AtomicBox<Box<(local:SocketAddress?, remote:SocketAddress?)>> = AtomicBox(value: Box((local: nil, remote: nil)))
|
||||
private let bufferAllocatorCached: AtomicBox<Box<ByteBufferAllocator>>
|
||||
private let _offEventLoopLock = Lock()
|
||||
private let isActiveAtomic: NIOAtomic<Bool> = .makeAtomic(value: false)
|
||||
private var _pipeline: ChannelPipeline! = nil // this is really a constant (set in .init) but needs `self` to be constructed and therefore a `var`. Do not change as this needs to accessed from arbitrary threads
|
||||
// just a thread-safe way of having something to print about the socket from any thread
|
||||
internal let socketDescription: String
|
||||
|
||||
// MARK: Variables, on EventLoop thread only
|
||||
var readPending = false
|
||||
var pendingConnect: Optional<EventLoopPromise<Void>>
|
||||
var recvAllocator: RecvByteBufferAllocator
|
||||
var maxMessagesPerRead: UInt = 4
|
||||
private var inFlushNow: Bool = false // Guard against re-entrance of flushNow() method.
|
||||
private var autoRead: Bool = true
|
||||
|
||||
// MARK: Variables that are really constants
|
||||
private var _pipeline: ChannelPipeline! = nil // this is really a constant (set in .init) but needs `self` to be constructed and therefore a `var`. Do not change as this needs to accessed from arbitrary threads
|
||||
|
||||
// MARK: Special variables, please read comments.
|
||||
// For reads guarded by _either_ `self._offEventLoopLock` or the EL thread
|
||||
// Writes are guarded by _offEventLoopLock _and_ the EL thread.
|
||||
// PLEASE don't use these directly and use the non-underscored computed properties instead.
|
||||
private var _addressCache = AddressCache(local: nil, remote: nil) // please use `self.addressesCached` instead
|
||||
private var _bufferAllocatorCache: ByteBufferAllocator // please use `self.bufferAllocatorCached` instead.
|
||||
|
||||
// MARK: - Computed properties
|
||||
// This is called from arbitrary threads.
|
||||
internal var addressesCached: AddressCache {
|
||||
get {
|
||||
if self.eventLoop.inEventLoop {
|
||||
return self._addressCache
|
||||
} else {
|
||||
return self._offEventLoopLock.withLock {
|
||||
return self._addressCache
|
||||
}
|
||||
}
|
||||
}
|
||||
set {
|
||||
self.eventLoop.preconditionInEventLoop()
|
||||
self._offEventLoopLock.withLock {
|
||||
self._addressCache = newValue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This is called from arbitrary threads.
|
||||
private var bufferAllocatorCached: ByteBufferAllocator {
|
||||
get {
|
||||
if self.eventLoop.inEventLoop {
|
||||
return self._bufferAllocatorCache
|
||||
} else {
|
||||
return self._offEventLoopLock.withLock {
|
||||
return self._bufferAllocatorCache
|
||||
}
|
||||
}
|
||||
}
|
||||
set {
|
||||
self.eventLoop.preconditionInEventLoop()
|
||||
self._offEventLoopLock.withLock {
|
||||
self._bufferAllocatorCache = newValue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We start with the invalid empty set of selector events we're interested in. This is to make sure we later on
|
||||
// (in `becomeFullyRegistered0`) seed the initial event correctly.
|
||||
internal var interestedEvent: SelectorEventSet = [] {
|
||||
|
@ -226,13 +292,6 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
|
|||
}
|
||||
}
|
||||
|
||||
var readPending = false
|
||||
var pendingConnect: Optional<EventLoopPromise<Void>>
|
||||
var recvAllocator: RecvByteBufferAllocator
|
||||
var maxMessagesPerRead: UInt = 4
|
||||
|
||||
private var inFlushNow: Bool = false // Guard against re-entrance of flushNow() method.
|
||||
private var autoRead: Bool = true
|
||||
private var lifecycleManager: SocketChannelLifecycleManager {
|
||||
didSet {
|
||||
self.eventLoop.assertInEventLoop()
|
||||
|
@ -242,53 +301,20 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
|
|||
private var bufferAllocator: ByteBufferAllocator = ByteBufferAllocator() {
|
||||
didSet {
|
||||
self.eventLoop.assertInEventLoop()
|
||||
self.bufferAllocatorCached.store(Box(self.bufferAllocator))
|
||||
self.bufferAllocatorCached = self.bufferAllocator
|
||||
}
|
||||
}
|
||||
|
||||
// MARK: Datatypes
|
||||
|
||||
/// Indicates if a selectable should registered or not for IO notifications.
|
||||
enum IONotificationState {
|
||||
/// We should be registered for IO notifications.
|
||||
case register
|
||||
|
||||
/// We should not be registered for IO notifications.
|
||||
case unregister
|
||||
}
|
||||
|
||||
enum ReadResult {
|
||||
/// Nothing was read by the read operation.
|
||||
case none
|
||||
|
||||
/// Some data was read by the read operation.
|
||||
case some
|
||||
}
|
||||
|
||||
/// Returned by the `private func readable0()` to inform the caller about the current state of the underlying read stream.
|
||||
/// This is mostly useful when receiving `.readEOF` as we then need to drain the read stream fully (ie. until we receive EOF or error of course)
|
||||
private enum ReadStreamState: Equatable {
|
||||
/// Everything seems normal
|
||||
case normal(ReadResult)
|
||||
|
||||
/// We saw EOF.
|
||||
case eof
|
||||
|
||||
/// A read error was received.
|
||||
case error
|
||||
}
|
||||
|
||||
// MARK: Computed Properties
|
||||
public final var _channelCore: ChannelCore { return self }
|
||||
|
||||
// This is `Channel` API so must be thread-safe.
|
||||
public final var localAddress: SocketAddress? {
|
||||
return self.addressesCached.load().value.local
|
||||
return self.addressesCached.local
|
||||
}
|
||||
|
||||
// This is `Channel` API so must be thread-safe.
|
||||
public final var remoteAddress: SocketAddress? {
|
||||
return self.addressesCached.load().value.remote
|
||||
return self.addressesCached.remote
|
||||
}
|
||||
|
||||
/// `false` if the whole `Channel` is closed and so no more IO operation can be done.
|
||||
|
@ -323,11 +349,7 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
|
|||
|
||||
// This is `Channel` API so must be thread-safe.
|
||||
public final var allocator: ByteBufferAllocator {
|
||||
if eventLoop.inEventLoop {
|
||||
return bufferAllocator
|
||||
} else {
|
||||
return self.bufferAllocatorCached.load().value
|
||||
}
|
||||
return self.bufferAllocatorCached
|
||||
}
|
||||
|
||||
// This is `Channel` API so must be thread-safe.
|
||||
|
@ -347,6 +369,38 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
|
|||
fatalError("this must be overridden by sub class")
|
||||
}
|
||||
|
||||
// MARK: - Datatypes
|
||||
|
||||
/// Indicates if a selectable should registered or not for IO notifications.
|
||||
enum IONotificationState {
|
||||
/// We should be registered for IO notifications.
|
||||
case register
|
||||
|
||||
/// We should not be registered for IO notifications.
|
||||
case unregister
|
||||
}
|
||||
|
||||
enum ReadResult {
|
||||
/// Nothing was read by the read operation.
|
||||
case none
|
||||
|
||||
/// Some data was read by the read operation.
|
||||
case some
|
||||
}
|
||||
|
||||
/// Returned by the `private func readable0()` to inform the caller about the current state of the underlying read stream.
|
||||
/// This is mostly useful when receiving `.readEOF` as we then need to drain the read stream fully (ie. until we receive EOF or error of course)
|
||||
private enum ReadStreamState: Equatable {
|
||||
/// Everything seems normal
|
||||
case normal(ReadResult)
|
||||
|
||||
/// We saw EOF.
|
||||
case eof
|
||||
|
||||
/// A read error was received.
|
||||
case error
|
||||
}
|
||||
|
||||
/// Begin connection of the underlying socket.
|
||||
///
|
||||
/// - parameters:
|
||||
|
@ -380,15 +434,15 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
|
|||
|
||||
// MARK: Common base socket logic.
|
||||
init(socket: SocketType, parent: Channel?, eventLoop: SelectableEventLoop, recvAllocator: RecvByteBufferAllocator) throws {
|
||||
self.bufferAllocatorCached = AtomicBox(value: Box(self.bufferAllocator))
|
||||
self._bufferAllocatorCache = self.bufferAllocator
|
||||
self.socket = socket
|
||||
self.selectableEventLoop = eventLoop
|
||||
self.closePromise = eventLoop.makePromise()
|
||||
self.parent = parent
|
||||
self.recvAllocator = recvAllocator
|
||||
self.lifecycleManager = SocketChannelLifecycleManager(eventLoop: eventLoop, isActiveAtomic: self.isActiveAtomic)
|
||||
// As the socket may already be connected we should ensure we start with the correct addresses cached.
|
||||
self.addressesCached.store(Box((local: try? socket.localAddress(), remote: try? socket.remoteAddress())))
|
||||
self._addressCache = .init(local: try? socket.localAddress(), remote: try? socket.remoteAddress())
|
||||
self.lifecycleManager = SocketChannelLifecycleManager(eventLoop: eventLoop, isActiveAtomic: self.isActiveAtomic)
|
||||
self.socketDescription = socket.description
|
||||
self.pendingConnect = nil
|
||||
self._pipeline = ChannelPipeline(channel: self)
|
||||
|
@ -1034,15 +1088,15 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
|
|||
internal final func updateCachedAddressesFromSocket(updateLocal: Bool = true, updateRemote: Bool = true) {
|
||||
self.eventLoop.assertInEventLoop()
|
||||
assert(updateLocal || updateRemote)
|
||||
let cached = addressesCached.load().value
|
||||
let cached = self.addressesCached
|
||||
let local = updateLocal ? try? self.localAddress0() : cached.local
|
||||
let remote = updateRemote ? try? self.remoteAddress0() : cached.remote
|
||||
self.addressesCached.store(Box((local: local, remote: remote)))
|
||||
self.addressesCached = AddressCache(local: local, remote: remote)
|
||||
}
|
||||
|
||||
internal final func unsetCachedAddressesFromSocket() {
|
||||
self.eventLoop.assertInEventLoop()
|
||||
self.addressesCached.store(Box((local: nil, remote: nil)))
|
||||
self.addressesCached = AddressCache(local: nil, remote: nil)
|
||||
}
|
||||
|
||||
public final func connect0(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
|
||||
|
|
|
@ -19,7 +19,7 @@ services:
|
|||
image: swift-nio:16.04-5.1
|
||||
environment:
|
||||
- MAX_ALLOCS_ALLOWED_1000_reqs_1_conn=30540
|
||||
- MAX_ALLOCS_ALLOWED_1_reqs_1000_conn=522050
|
||||
- MAX_ALLOCS_ALLOWED_1_reqs_1000_conn=501050
|
||||
- MAX_ALLOCS_ALLOWED_ping_pong_1000_reqs_1_conn=4440
|
||||
- MAX_ALLOCS_ALLOWED_bytebuffer_lots_of_rw=2100
|
||||
- MAX_ALLOCS_ALLOWED_future_lots_of_callbacks=75010
|
||||
|
|
|
@ -19,7 +19,7 @@ services:
|
|||
image: swift-nio:18.04-5.0
|
||||
environment:
|
||||
- MAX_ALLOCS_ALLOWED_1000_reqs_1_conn=30990
|
||||
- MAX_ALLOCS_ALLOWED_1_reqs_1000_conn=1005050
|
||||
- MAX_ALLOCS_ALLOWED_1_reqs_1000_conn=973050
|
||||
- MAX_ALLOCS_ALLOWED_ping_pong_1000_reqs_1_conn=4500
|
||||
- MAX_ALLOCS_ALLOWED_bytebuffer_lots_of_rw=2100
|
||||
- MAX_ALLOCS_ALLOWED_future_lots_of_callbacks=75010
|
||||
|
|
Loading…
Reference in New Issue