NIOAsyncWriter: Provide a fast path for single element writes (#2365)
# Motivation We are currently always allocating a new `Deque` when we get a single element write in the `NIOAsyncWriter` # Modification Provide a fast path method on the `NIOAsyncWriterSinkDelegate` protocol which will be called when we receive a single element write and are currently writable. # Result Performance win for the single write cases.
This commit is contained in:
parent
39047aec7c
commit
b6b62665e9
|
@ -37,6 +37,18 @@ public protocol NIOAsyncWriterSinkDelegate: Sendable {
|
|||
/// - Important: You **MUST NOT** call ``NIOAsyncWriter/Sink/setWritability(to:)`` from within this method.
|
||||
func didYield(contentsOf sequence: Deque<Element>)
|
||||
|
||||
/// This method is called once a single element was yielded to the ``NIOAsyncWriter``.
|
||||
///
|
||||
/// If the ``NIOAsyncWriter`` was writable when the sequence was yielded, the sequence will be forwarded
|
||||
/// right away to the delegate. If the ``NIOAsyncWriter`` was _NOT_ writable then the sequence will be buffered
|
||||
/// until the ``NIOAsyncWriter`` becomes writable again. All buffered writes, while the ``NIOAsyncWriter`` is not writable,
|
||||
/// will be coalesced into a single sequence.
|
||||
///
|
||||
/// - Note: This a fast path that you can optionally implement. By default this will just call ``NIOAsyncWriterSinkDelegate/didYield(contentsOf:)``.
|
||||
///
|
||||
/// - Important: You **MUST NOT** call ``NIOAsyncWriter/Sink/setWritability(to:)`` from within this method.
|
||||
func didYield(_ element: Element)
|
||||
|
||||
/// This method is called once the ``NIOAsyncWriter`` is terminated.
|
||||
///
|
||||
/// Termination happens if:
|
||||
|
@ -55,6 +67,14 @@ public protocol NIOAsyncWriterSinkDelegate: Sendable {
|
|||
func didTerminate(error: Error?)
|
||||
}
|
||||
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
extension NIOAsyncWriterSinkDelegate {
|
||||
@inlinable
|
||||
public func didYield(_ element: Element) {
|
||||
self.didYield(contentsOf: .init(CollectionOfOne(element)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Errors thrown by the ``NIOAsyncWriter``.
|
||||
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
|
||||
public struct NIOAsyncWriterError: Error, Hashable, CustomStringConvertible {
|
||||
|
@ -246,7 +266,7 @@ public struct NIOAsyncWriter<
|
|||
/// - Parameter element: The element to yield.
|
||||
@inlinable
|
||||
public func yield(_ element: Element) async throws {
|
||||
try await self.yield(contentsOf: CollectionOfOne(element))
|
||||
try await self._storage.yield(element)
|
||||
}
|
||||
|
||||
/// Finishes the writer.
|
||||
|
@ -513,6 +533,61 @@ extension NIOAsyncWriter {
|
|||
}
|
||||
}
|
||||
|
||||
@inlinable
|
||||
/* fileprivate */ internal func yield(_ element: Element) async throws {
|
||||
let yieldID = self._yieldIDGenerator.generateUniqueYieldID()
|
||||
|
||||
try await withTaskCancellationHandler {
|
||||
// We are manually locking here to hold the lock across the withCheckedContinuation call
|
||||
self._lock.lock()
|
||||
|
||||
let action = self._stateMachine.yield(contentsOf: CollectionOfOne(element), yieldID: yieldID)
|
||||
|
||||
switch action {
|
||||
case .callDidYield(let delegate):
|
||||
// We are calling the delegate while holding lock. This can lead to potential crashes
|
||||
// if the delegate calls `setWritability` reentrantly. However, we call this
|
||||
// out in the docs of the delegate
|
||||
|
||||
delegate.didYield(element)
|
||||
self._lock.unlock()
|
||||
|
||||
case .returnNormally:
|
||||
self._lock.unlock()
|
||||
return
|
||||
|
||||
case .throwError(let error):
|
||||
self._lock.unlock()
|
||||
throw error
|
||||
|
||||
case .suspendTask:
|
||||
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
|
||||
self._stateMachine.yield(
|
||||
contentsOf: CollectionOfOne(element),
|
||||
continuation: continuation,
|
||||
yieldID: yieldID
|
||||
)
|
||||
|
||||
self._lock.unlock()
|
||||
}
|
||||
}
|
||||
} onCancel: {
|
||||
self._lock.withLock {
|
||||
let action = self._stateMachine.cancel(yieldID: yieldID)
|
||||
|
||||
switch action {
|
||||
case .resumeContinuation(let continuation):
|
||||
// It is safe to resume the continuations while holding the lock since resume
|
||||
// is immediately returning and just enqueues the Job on the executor
|
||||
continuation.resume()
|
||||
|
||||
case .none:
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@inlinable
|
||||
/* fileprivate */ internal func writerFinish(error: Error?) {
|
||||
self._lock.withLock {
|
||||
|
|
Loading…
Reference in New Issue