Pool buffers for ivecs and storage refs in the event loop. (#2358)

* Pool buffers for ivecs and storage refs in the event loop.

* Introduce PoolElement for poolable objects and add some bounds checks for the pooled buffers.

* Some polishes.

* Fix build failure with Swift 5.5/5.6

* User raw pointers instead of typed.
This commit is contained in:
ser 2023-02-07 16:48:26 +03:00 committed by GitHub
parent dfd4bdad89
commit 1e7ad9a0db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 153 additions and 103 deletions

View File

@ -26,7 +26,7 @@ class BaseStreamSocketChannel<Socket: SocketProtocol>: BaseSocketChannel<Socket>
eventLoop: SelectableEventLoop,
recvAllocator: RecvByteBufferAllocator
) throws {
self.pendingWrites = PendingStreamWritesManager(iovecs: eventLoop.iovecs, storageRefs: eventLoop.storageRefs)
self.pendingWrites = PendingStreamWritesManager(bufferPool: eventLoop.bufferPool)
self.connectTimeoutScheduled = nil
try super.init(
socket: socket,

View File

@ -63,10 +63,9 @@ fileprivate extension Error {
/// Does the setup required to trigger a `sendmmsg`.
private func doPendingDatagramWriteVectorOperation(pending: PendingDatagramWritesState,
iovecs: UnsafeMutableBufferPointer<IOVector>,
bufferPool: Pool<PooledBuffer>,
msgs: UnsafeMutableBufferPointer<MMsgHdr>,
addresses: UnsafeMutableBufferPointer<sockaddr_storage>,
storageRefs: UnsafeMutableBufferPointer<Unmanaged<AnyObject>>,
controlMessageStorage: UnsafeControlMessageStorage,
_ body: (UnsafeMutableBufferPointer<MMsgHdr>) throws -> IOResult<Int>) throws -> IOResult<Int> {
assert(msgs.count >= Socket.writevLimitIOVectors, "Insufficiently sized buffer for a maximal sendmmsg")
@ -77,6 +76,10 @@ private func doPendingDatagramWriteVectorOperation(pending: PendingDatagramWrite
var c = 0
var toWrite: Int = 0
let buffer = bufferPool.get()
defer { bufferPool.put(buffer) }
let (iovecs, storageRefs) = buffer.get()
for p in pending.flushedWrites {
// Must not write more than Int32.max in one go.
// TODO(cory): I can't see this limit documented in a man page anywhere, but it seems
@ -376,18 +379,13 @@ extension PendingDatagramWritesState {
/// value. The most important purpose of this object is to call `sendto` or `sendmmsg` depending on the writes held and
/// the availability of the functions.
final class PendingDatagramWritesManager: PendingWritesManager {
private var bufferPool: Pool<PooledBuffer>
/// Storage for mmsghdr structures. Only present on Linux because Darwin does not support
/// gathering datagram writes.
private var msgs: UnsafeMutableBufferPointer<MMsgHdr>
/// Storage for the references to the buffers used when we perform gathering writes. Only present
/// on Linux because Darwin does not support gathering datagram writes.
private var storageRefs: UnsafeMutableBufferPointer<Unmanaged<AnyObject>>
/// Storage for iovec structures. Only present on Linux because this is only needed when we call
/// sendmmsg: sendto doesn't require any iovecs.
private var iovecs: UnsafeMutableBufferPointer<IOVector>
/// Storage for sockaddr structures. Only present on Linux because Darwin does not support gathering
/// writes.
private var addresses: UnsafeMutableBufferPointer<sockaddr_storage>
@ -408,20 +406,17 @@ final class PendingDatagramWritesManager: PendingWritesManager {
/// one `Channel` on the same `EventLoop` at the same time.
///
/// - parameters:
/// - bufferPool: a pool of buffers to be used for IOVector and storage references
/// - msgs: A pre-allocated array of `MMsgHdr` elements
/// - iovecs: A pre-allocated array of `IOVector` elements
/// - addresses: A pre-allocated array of `sockaddr_storage` elements
/// - storageRefs: A pre-allocated array of storage management tokens used to keep storage elements alive during a vector write operation
/// - controlMessageStorage: Pre-allocated memory for storing cmsghdr data during a vector write operation.
init(msgs: UnsafeMutableBufferPointer<MMsgHdr>,
iovecs: UnsafeMutableBufferPointer<IOVector>,
init(bufferPool: Pool<PooledBuffer>,
msgs: UnsafeMutableBufferPointer<MMsgHdr>,
addresses: UnsafeMutableBufferPointer<sockaddr_storage>,
storageRefs: UnsafeMutableBufferPointer<Unmanaged<AnyObject>>,
controlMessageStorage: UnsafeControlMessageStorage) {
self.bufferPool = bufferPool
self.msgs = msgs
self.iovecs = iovecs
self.addresses = addresses
self.storageRefs = storageRefs
self.controlMessageStorage = controlMessageStorage
}
@ -623,10 +618,9 @@ final class PendingDatagramWritesManager: PendingWritesManager {
assert(self.state.isFlushPending && self.isOpen && !self.state.isEmpty,
"illegal state for vector datagram write operation: flushPending: \(self.state.isFlushPending), isOpen: \(self.isOpen), empty: \(self.state.isEmpty)")
return self.didWrite(try doPendingDatagramWriteVectorOperation(pending: self.state,
iovecs: self.iovecs,
bufferPool: self.bufferPool,
msgs: self.msgs,
addresses: self.addresses,
storageRefs: self.storageRefs,
controlMessageStorage: self.controlMessageStorage,
{ try vectorWriteOperation($0) }),
messages: self.msgs)

View File

@ -23,18 +23,19 @@ private struct PendingStreamWrite {
///
/// - parameters:
/// - pending: The currently pending writes.
/// - iovecs: Pre-allocated storage (per `EventLoop`) for `iovecs`.
/// - storageRefs: Pre-allocated storage references (per `EventLoop`) to manage the lifetime of the buffers to be passed to `writev`.
/// - bufferPool: Pool of buffers to use for iovecs and storageRefs
/// - body: The function that actually does the vector write (usually `writev`).
/// - returns: A tuple of the number of items attempted to write and the result of the write operation.
private func doPendingWriteVectorOperation(pending: PendingStreamWritesState,
iovecs: UnsafeMutableBufferPointer<IOVector>,
storageRefs: UnsafeMutableBufferPointer<Unmanaged<AnyObject>>,
bufferPool: Pool<PooledBuffer>,
_ body: (UnsafeBufferPointer<IOVector>) throws -> IOResult<Int>) throws -> (itemCount: Int, writeResult: IOResult<Int>) {
assert(iovecs.count >= Socket.writevLimitIOVectors, "Insufficiently sized buffer for a maximal writev")
let buffer = bufferPool.get()
defer { bufferPool.put(buffer) }
let (iovecs, storageRefs) = buffer.get()
// Clamp the number of writes we're willing to issue to the limit for writev.
let count = min(pending.flushedChunks, Socket.writevLimitIOVectors)
var count = min(iovecs.count, storageRefs.count)
count = min(pending.flushedChunks, count)
// the numbers of storage refs that we need to decrease later.
var numberOfUsedStorageSlots = 0
@ -279,8 +280,7 @@ private struct PendingStreamWritesState {
/// currently pending writes.
final class PendingStreamWritesManager: PendingWritesManager {
private var state = PendingStreamWritesState()
private var iovecs: UnsafeMutableBufferPointer<IOVector>
private var storageRefs: UnsafeMutableBufferPointer<Unmanaged<AnyObject>>
private let bufferPool: Pool<PooledBuffer>
internal var waterMark: ChannelOptions.Types.WriteBufferWaterMark = ChannelOptions.Types.WriteBufferWaterMark(low: 32 * 1024, high: 64 * 1024)
internal let channelWritabilityFlag = ManagedAtomic(true)
@ -416,8 +416,7 @@ final class PendingStreamWritesManager: PendingWritesManager {
assert(self.state.isFlushPending && !self.state.isEmpty && self.isOpen,
"vector write called in illegal state: flush pending: \(self.state.isFlushPending), empty: \(self.state.isEmpty), isOpen: \(self.isOpen)")
let result = try doPendingWriteVectorOperation(pending: self.state,
iovecs: self.iovecs,
storageRefs: self.storageRefs,
bufferPool: bufferPool,
{ try operation($0) })
return self.didWrite(itemCount: result.itemCount, result: result.writeResult)
}
@ -440,11 +439,9 @@ final class PendingStreamWritesManager: PendingWritesManager {
/// one `Channel` on the same `EventLoop` at the same time.
///
/// - parameters:
/// - iovecs: A pre-allocated array of `IOVector` elements
/// - storageRefs: A pre-allocated array of storage management tokens used to keep storage elements alive during a vector write operation
init(iovecs: UnsafeMutableBufferPointer<IOVector>, storageRefs: UnsafeMutableBufferPointer<Unmanaged<AnyObject>>) {
self.iovecs = iovecs
self.storageRefs = storageRefs
/// - bufferPool: Pool of buffers to be used for iovecs and storage references
init(bufferPool: Pool<PooledBuffer>) {
self.bufferPool = bufferPool
}
}

View File

@ -0,0 +1,52 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2023 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
protocol PoolElement {
init()
func evictedFromPool()
}
class Pool<Element: PoolElement> {
private let maxSize: Int
private var elements: [Element]
init(maxSize: Int) {
self.maxSize = maxSize
self.elements = [Element]()
}
deinit {
for e in elements {
e.evictedFromPool()
}
}
func get() -> Element {
if elements.isEmpty {
return Element()
}
else {
return elements.removeLast()
}
}
func put(_ e: Element) {
if (elements.count == maxSize) {
e.evictedFromPool()
}
else {
elements.append(e)
}
}
}

View File

@ -30,6 +30,44 @@ internal func withAutoReleasePool<T>(_ execute: () throws -> T) rethrows -> T {
#endif
}
struct PooledBuffer: PoolElement {
private let bufferSize: Int
private let buffer: UnsafeMutableRawPointer
init() {
precondition(MemoryLayout<IOVector>.alignment >= MemoryLayout<Unmanaged<AnyObject>>.alignment)
self.bufferSize = (MemoryLayout<IOVector>.stride + MemoryLayout<Unmanaged<AnyObject>>.stride) * Socket.writevLimitIOVectors
var byteCount = self.bufferSize
debugOnly {
byteCount += MemoryLayout<UInt32>.stride
}
self.buffer = UnsafeMutableRawPointer.allocate(byteCount: byteCount, alignment: MemoryLayout<IOVector>.alignment)
debugOnly {
self.buffer.storeBytes(of: 0xdeadbee, toByteOffset: self.bufferSize, as: UInt32.self)
}
}
func evictedFromPool() {
debugOnly {
assert(0xdeadbee == self.buffer.load(fromByteOffset: self.bufferSize, as: UInt32.self))
}
self.buffer.deallocate()
}
func get() -> (UnsafeMutableBufferPointer<IOVector>, UnsafeMutableBufferPointer<Unmanaged<AnyObject>>) {
let count = Socket.writevLimitIOVectors
let iovecs = self.buffer.bindMemory(to: IOVector.self, capacity: count)
let storageRefs = (self.buffer + (count * MemoryLayout<IOVector>.stride)).bindMemory(to: Unmanaged<AnyObject>.self, capacity: count)
assert(UnsafeMutableRawPointer(iovecs) >= self.buffer)
assert(UnsafeMutableRawPointer(iovecs) <= (self.buffer + self.bufferSize))
assert(UnsafeMutableRawPointer(storageRefs) >= self.buffer)
assert(UnsafeMutableRawPointer(storageRefs) <= (self.buffer + self.bufferSize))
assert(UnsafeMutableRawPointer(iovecs + count) == UnsafeMutableRawPointer(storageRefs))
assert(UnsafeMutableRawPointer(storageRefs + count) <= (self.buffer + bufferSize))
return (UnsafeMutableBufferPointer(start: iovecs, count: count), UnsafeMutableBufferPointer(start: storageRefs, count: count))
}
}
/// `EventLoop` implementation that uses a `Selector` to get notified once there is more I/O or tasks to process.
/// The whole processing of I/O and tasks is done by a `NIOThread` that is tied to the `SelectableEventLoop`. This `NIOThread`
/// is guaranteed to never change!
@ -102,8 +140,7 @@ internal final class SelectableEventLoop: EventLoop {
private var internalState: InternalState = .runningAndAcceptingNewRegistrations // protected by the EventLoop thread
private var externalState: ExternalState = .open // protected by externalStateLock
let iovecs: UnsafeMutableBufferPointer<IOVector>
let storageRefs: UnsafeMutableBufferPointer<Unmanaged<AnyObject>>
let bufferPool: Pool<PooledBuffer>
// Used for gathering UDP writes.
let msgs: UnsafeMutableBufferPointer<MMsgHdr>
@ -187,8 +224,7 @@ Further information:
self._parentGroup = parentGroup
self._selector = selector
self.thread = thread
self.iovecs = UnsafeMutableBufferPointer<IOVector>.allocate(capacity: Socket.writevLimitIOVectors)
self.storageRefs = UnsafeMutableBufferPointer<Unmanaged<AnyObject>>.allocate(capacity: Socket.writevLimitIOVectors)
self.bufferPool = Pool<PooledBuffer>(maxSize: 16)
self.msgs = UnsafeMutableBufferPointer<MMsgHdr>.allocate(capacity: Socket.writevLimitIOVectors)
self.addresses = UnsafeMutableBufferPointer<sockaddr_storage>.allocate(capacity: Socket.writevLimitIOVectors)
self.controlMessageStorage = UnsafeControlMessageStorage.allocate(msghdrCount: Socket.writevLimitIOVectors)
@ -208,8 +244,6 @@ Further information:
"illegal internal state on deinit: \(self.internalState)")
assert(self.externalState == .resourcesReclaimed,
"illegal external state on shutdown: \(self.externalState)")
self.iovecs.deallocate()
self.storageRefs.deallocate()
self.msgs.deallocate()
self.addresses.deallocate()
self.controlMessageStorage.deallocate()

View File

@ -423,10 +423,9 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
throw err
}
self.pendingWrites = PendingDatagramWritesManager(msgs: eventLoop.msgs,
iovecs: eventLoop.iovecs,
self.pendingWrites = PendingDatagramWritesManager(bufferPool: eventLoop.bufferPool,
msgs: eventLoop.msgs,
addresses: eventLoop.addresses,
storageRefs: eventLoop.storageRefs,
controlMessageStorage: eventLoop.controlMessageStorage)
try super.init(
@ -441,10 +440,9 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
init(socket: Socket, parent: Channel? = nil, eventLoop: SelectableEventLoop) throws {
self.vectorReadManager = nil
try socket.setNonBlocking()
self.pendingWrites = PendingDatagramWritesManager(msgs: eventLoop.msgs,
iovecs: eventLoop.iovecs,
self.pendingWrites = PendingDatagramWritesManager(bufferPool: eventLoop.bufferPool,
msgs: eventLoop.msgs,
addresses: eventLoop.addresses,
storageRefs: eventLoop.storageRefs,
controlMessageStorage: eventLoop.controlMessageStorage)
try super.init(
socket: socket,

View File

@ -209,30 +209,18 @@ public final class ChannelTests: XCTestCase {
}
private func withPendingStreamWritesManager(_ body: (PendingStreamWritesManager) throws -> Void) rethrows {
try withExtendedLifetime(NSObject()) { o in
var iovecs: [IOVector] = Array(repeating: iovec(), count: Socket.writevLimitIOVectors + 1)
var managed: [Unmanaged<AnyObject>] = Array(repeating: Unmanaged.passUnretained(o), count: Socket.writevLimitIOVectors + 1)
/* put a canary value at the end */
iovecs[iovecs.count - 1] = iovec(iov_base: UnsafeMutableRawPointer(bitPattern: 0xdeadbee)!, iov_len: 0xdeadbee)
try iovecs.withUnsafeMutableBufferPointer { iovecs in
try managed.withUnsafeMutableBufferPointer { managed in
let pwm = NIOPosix.PendingStreamWritesManager(iovecs: iovecs, storageRefs: managed)
XCTAssertTrue(pwm.isEmpty)
XCTAssertTrue(pwm.isOpen)
XCTAssertFalse(pwm.isFlushPending)
XCTAssertTrue(pwm.isWritable)
let bufferPool = Pool<PooledBuffer>(maxSize: 16)
let pwm = NIOPosix.PendingStreamWritesManager(bufferPool: bufferPool)
try body(pwm)
XCTAssertTrue(pwm.isEmpty)
XCTAssertTrue(pwm.isOpen)
XCTAssertFalse(pwm.isFlushPending)
XCTAssertTrue(pwm.isWritable)
XCTAssertTrue(pwm.isEmpty)
XCTAssertFalse(pwm.isFlushPending)
}
}
/* assert that the canary values are still okay, we should definitely have never written those */
XCTAssertEqual(managed.last!.toOpaque(), Unmanaged.passUnretained(o).toOpaque())
XCTAssertEqual(0xdeadbee, Int(bitPattern: iovecs.last!.iov_base))
XCTAssertEqual(0xdeadbee, iovecs.last!.iov_len)
}
try body(pwm)
XCTAssertTrue(pwm.isEmpty)
XCTAssertFalse(pwm.isFlushPending)
}
/// A frankenstein testing monster. It asserts that for `PendingStreamWritesManager` `pwm` and `EventLoopPromises` `promises`

View File

@ -47,43 +47,30 @@ private extension SocketAddress {
class PendingDatagramWritesManagerTests: XCTestCase {
private func withPendingDatagramWritesManager(_ body: (PendingDatagramWritesManager) throws -> Void) rethrows {
try withExtendedLifetime(NSObject()) { o in
var iovecs: [IOVector] = Array(repeating: iovec(), count: Socket.writevLimitIOVectors + 1)
var managed: [Unmanaged<AnyObject>] = Array(repeating: Unmanaged.passUnretained(o), count: Socket.writevLimitIOVectors + 1)
var msgs: [MMsgHdr] = Array(repeating: MMsgHdr(), count: Socket.writevLimitIOVectors + 1)
var addresses: [sockaddr_storage] = Array(repeating: sockaddr_storage(), count: Socket.writevLimitIOVectors + 1)
var controlMessageStorage = UnsafeControlMessageStorage.allocate(msghdrCount: Socket.writevLimitIOVectors)
defer {
controlMessageStorage.deallocate()
}
/* put a canary value at the end */
iovecs[iovecs.count - 1] = iovec(iov_base: UnsafeMutableRawPointer(bitPattern: 0xdeadbee)!, iov_len: 0xdeadbee)
try iovecs.withUnsafeMutableBufferPointer { iovecs in
try managed.withUnsafeMutableBufferPointer { managed in
try msgs.withUnsafeMutableBufferPointer { msgs in
try addresses.withUnsafeMutableBufferPointer { addresses in
let pwm = NIOPosix.PendingDatagramWritesManager(msgs: msgs,
iovecs: iovecs,
addresses: addresses,
storageRefs: managed,
controlMessageStorage: controlMessageStorage)
XCTAssertTrue(pwm.isEmpty)
XCTAssertTrue(pwm.isOpen)
XCTAssertFalse(pwm.isFlushPending)
XCTAssertTrue(pwm.isWritable)
let bufferPool = Pool<PooledBuffer>(maxSize: 16)
var msgs: [MMsgHdr] = Array(repeating: MMsgHdr(), count: Socket.writevLimitIOVectors + 1)
var addresses: [sockaddr_storage] = Array(repeating: sockaddr_storage(), count: Socket.writevLimitIOVectors + 1)
var controlMessageStorage = UnsafeControlMessageStorage.allocate(msghdrCount: Socket.writevLimitIOVectors)
defer {
controlMessageStorage.deallocate()
}
try body(pwm)
try msgs.withUnsafeMutableBufferPointer { msgs in
try addresses.withUnsafeMutableBufferPointer { addresses in
let pwm = NIOPosix.PendingDatagramWritesManager(bufferPool: bufferPool,
msgs: msgs,
addresses: addresses,
controlMessageStorage: controlMessageStorage)
XCTAssertTrue(pwm.isEmpty)
XCTAssertTrue(pwm.isOpen)
XCTAssertFalse(pwm.isFlushPending)
XCTAssertTrue(pwm.isWritable)
XCTAssertTrue(pwm.isEmpty)
XCTAssertFalse(pwm.isFlushPending)
}
}
}
try body(pwm)
XCTAssertTrue(pwm.isEmpty)
XCTAssertFalse(pwm.isFlushPending)
}
/* assert that the canary values are still okay, we should definitely have never written those */
XCTAssertEqual(managed.last!.toOpaque(), Unmanaged.passUnretained(o).toOpaque())
XCTAssertEqual(0xdeadbee, Int(bitPattern: iovecs.last!.iov_base))
XCTAssertEqual(0xdeadbee, iovecs.last!.iov_len)
}
}