Use CircularBuffer in EmbeddedChannel. (#1700)

Motivation:

When running load through EmbeddedChannel we spend an enormous amount of
time screwing around with removing things from Arrays. Arrays are not a
natural data type for `removeFirst()`, and in fact that method is
linear-time on Array due to the need for Array to be zero-indexed. Let's
stop using (and indeed misusing) Array on EmbeddedChannel.

While we're here, if we add some judicious @inlinable annotations we can
also save additional work generating results that users don't need.

Modifications:

- Replace arrays with circular buffers (including marked versions).
- Avoid CoWs and extra allocations on flush.
- Make some API methods inlinable to make them cheaper.

Result:

- Much cheaper EmbeddedChannel for benchmark purposes.
This commit is contained in:
Cory Benfield 2020-12-02 16:01:21 +00:00 committed by GitHub
parent 2bae395344
commit 076fda1521
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 61 additions and 38 deletions

View File

@ -193,6 +193,7 @@ public final class EmbeddedEventLoop: EventLoop {
}
}
@usableFromInline
class EmbeddedChannelCore: ChannelCore {
var isOpen: Bool = true
var isActive: Bool = false
@ -217,23 +218,29 @@ class EmbeddedChannelCore: ChannelCore {
}
/// Contains the flushed items that went into the `Channel` (and on a regular channel would have hit the network).
var outboundBuffer: [NIOAny] = []
@usableFromInline
var outboundBuffer: CircularBuffer<NIOAny> = CircularBuffer()
/// Contains the unflushed items that went into the `Channel`
var pendingOutboundBuffer: [(NIOAny, EventLoopPromise<Void>?)] = []
@usableFromInline
var pendingOutboundBuffer: MarkedCircularBuffer<(NIOAny, EventLoopPromise<Void>?)> = MarkedCircularBuffer(initialCapacity: 16)
/// Contains the items that travelled the `ChannelPipeline` all the way and hit the tail channel handler. On a
/// regular `Channel` these items would be lost.
var inboundBuffer: [NIOAny] = []
@usableFromInline
var inboundBuffer: CircularBuffer<NIOAny> = CircularBuffer()
@usableFromInline
func localAddress0() throws -> SocketAddress {
throw ChannelError.operationUnsupported
}
@usableFromInline
func remoteAddress0() throws -> SocketAddress {
throw ChannelError.operationUnsupported
}
@usableFromInline
func close0(error: Error, mode: CloseMode, promise: EventLoopPromise<Void>?) {
guard self.isOpen else {
promise?.fail(ChannelError.alreadyClosed)
@ -254,43 +261,47 @@ class EmbeddedChannelCore: ChannelCore {
}
}
@usableFromInline
func bind0(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
promise?.succeed(())
}
@usableFromInline
func connect0(to address: SocketAddress, promise: EventLoopPromise<Void>?) {
isActive = true
promise?.succeed(())
pipeline.fireChannelActive0()
}
@usableFromInline
func register0(promise: EventLoopPromise<Void>?) {
promise?.succeed(())
pipeline.fireChannelRegistered0()
}
@usableFromInline
func registerAlreadyConfigured0(promise: EventLoopPromise<Void>?) {
isActive = true
register0(promise: promise)
pipeline.fireChannelActive0()
}
@usableFromInline
func write0(_ data: NIOAny, promise: EventLoopPromise<Void>?) {
self.pendingOutboundBuffer.append((data, promise))
}
@usableFromInline
func flush0() {
let pendings = self.pendingOutboundBuffer
// removeAll(keepingCapacity:) is strictly more expensive than doing this, see
// https://bugs.swift.org/browse/SR-13923.
self.pendingOutboundBuffer = []
self.pendingOutboundBuffer.reserveCapacity(pendings.capacity)
for dataAndPromise in pendings {
self.pendingOutboundBuffer.mark()
while self.pendingOutboundBuffer.hasMark, let dataAndPromise = self.pendingOutboundBuffer.popFirst() {
self.addToBuffer(buffer: &self.outboundBuffer, data: dataAndPromise.0)
dataAndPromise.1?.succeed(())
}
}
@usableFromInline
func read0() {
// NOOP
}
@ -299,6 +310,7 @@ class EmbeddedChannelCore: ChannelCore {
promise?.fail(ChannelError.operationUnsupported)
}
@usableFromInline
func channelRead0(_ data: NIOAny) {
addToBuffer(buffer: &inboundBuffer, data: data)
}
@ -309,7 +321,7 @@ class EmbeddedChannelCore: ChannelCore {
}
}
private func addToBuffer<T>(buffer: inout [T], data: T) {
private func addToBuffer<T>(buffer: inout CircularBuffer<T>, data: T) {
buffer.append(data)
}
}
@ -409,6 +421,11 @@ public final class EmbeddedChannel: Channel {
/// The type of the actual first element.
public let actual: Any.Type
public init(expected: Any.Type, actual: Any.Type) {
self.expected = expected
self.actual = actual
}
public static func == (lhs: WrongTypeError, rhs: WrongTypeError) -> Bool {
return lhs.expected == rhs.expected && lhs.actual == rhs.actual
}
@ -424,7 +441,8 @@ public final class EmbeddedChannel: Channel {
/// - see: `Channel.closeFuture`
public var closeFuture: EventLoopFuture<Void> { return channelcore.closePromise.futureResult }
private lazy var channelcore: EmbeddedChannelCore = EmbeddedChannelCore(pipeline: self._pipeline, eventLoop: self.eventLoop)
@usableFromInline
/*private but usableFromInline */ lazy var channelcore: EmbeddedChannelCore = EmbeddedChannelCore(pipeline: self._pipeline, eventLoop: self.eventLoop)
/// - see: `Channel._channelCore`
public var _channelCore: ChannelCore {
@ -464,8 +482,8 @@ public final class EmbeddedChannel: Channel {
if c.outboundBuffer.isEmpty && c.inboundBuffer.isEmpty && c.pendingOutboundBuffer.isEmpty {
return .clean
} else {
return .leftOvers(inbound: c.inboundBuffer,
outbound: c.outboundBuffer,
return .leftOvers(inbound: Array(c.inboundBuffer),
outbound: Array(c.outboundBuffer),
pendingOutbound: c.pendingOutboundBuffer.map { $0.0 })
}
}
@ -518,8 +536,9 @@ public final class EmbeddedChannel: Channel {
/// - note: Outbound events travel the `ChannelPipeline` _back to front_.
/// - note: `EmbeddedChannel.writeOutbound` will `write` data through the `ChannelPipeline`, starting with last
/// `ChannelHandler`.
@inlinable
public func readOutbound<T>(as type: T.Type = T.self) throws -> T? {
return try readFromBuffer(buffer: &channelcore.outboundBuffer)
return try _readFromBuffer(buffer: &channelcore.outboundBuffer)
}
/// If available, this method reads one element of type `T` out of the `EmbeddedChannel`'s inbound buffer. If the
@ -532,8 +551,9 @@ public final class EmbeddedChannel: Channel {
/// `ChannelHandlerContext.fireChannelRead`) or implicitly by not implementing `channelRead`.
///
/// - note: `EmbeddedChannel.writeInbound` will fire data through the `ChannelPipeline` using `fireChannelRead`.
@inlinable
public func readInbound<T>(as type: T.Type = T.self) throws -> T? {
return try readFromBuffer(buffer: &channelcore.inboundBuffer)
return try _readFromBuffer(buffer: &channelcore.inboundBuffer)
}
/// Sends an inbound `channelRead` event followed by a `channelReadComplete` event through the `ChannelPipeline`.
@ -545,11 +565,12 @@ public final class EmbeddedChannel: Channel {
/// - data: The data to fire through the pipeline.
/// - returns: The state of the inbound buffer which contains all the events that travelled the `ChannelPipeline`
// all the way.
@inlinable
@discardableResult public func writeInbound<T>(_ data: T) throws -> BufferState {
pipeline.fireChannelRead(NIOAny(data))
pipeline.fireChannelReadComplete()
try throwIfErrorCaught()
return self.channelcore.inboundBuffer.isEmpty ? .empty : .full(self.channelcore.inboundBuffer)
return self.channelcore.inboundBuffer.isEmpty ? .empty : .full(Array(self.channelcore.inboundBuffer))
}
/// Sends an outbound `writeAndFlush` event through the `ChannelPipeline`.
@ -562,9 +583,10 @@ public final class EmbeddedChannel: Channel {
/// - data: The data to fire through the pipeline.
/// - returns: The state of the outbound buffer which contains all the events that travelled the `ChannelPipeline`
// all the way.
@inlinable
@discardableResult public func writeOutbound<T>(_ data: T) throws -> BufferState {
try writeAndFlush(NIOAny(data)).wait()
return self.channelcore.outboundBuffer.isEmpty ? .empty : .full(self.channelcore.outboundBuffer)
return self.channelcore.outboundBuffer.isEmpty ? .empty : .full(Array(self.channelcore.outboundBuffer))
}
/// This method will throw the error that is stored in the `EmbeddedChannel` if any.
@ -577,7 +599,8 @@ public final class EmbeddedChannel: Channel {
}
}
private func readFromBuffer<T>(buffer: inout [NIOAny]) throws -> T? {
@inlinable
func _readFromBuffer<T>(buffer: inout CircularBuffer<NIOAny>) throws -> T? {
if buffer.isEmpty {
return nil
}

View File

@ -26,11 +26,11 @@ services:
- MAX_ALLOCS_ALLOWED_creating_10000_headers=100 # 5.2 improvement 10000
- MAX_ALLOCS_ALLOWED_scheduling_10000_executions=20150
- MAX_ALLOCS_ALLOWED_modifying_1000_circular_buffer_elements=50
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_holding_buffer=1010
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_holding_buffer_with_space=1010
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_new_buffer=4010
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_new_buffer_with_space=4010
- MAX_ALLOCS_ALLOWED_decode_1000_ws_frames=1000
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_holding_buffer=1000
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_holding_buffer_with_space=1000
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_new_buffer=5010
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_new_buffer_with_space=5010
- MAX_ALLOCS_ALLOWED_decode_1000_ws_frames=2000
- MAX_ALLOCS_ALLOWED_modifying_byte_buffer_view=2010 # 5.2 improvement 4000
- MAX_ALLOCS_ALLOWED_schedule_10000_tasks=90050
- MAX_ALLOCS_ALLOWED_read_10000_chunks_from_file=210050

View File

@ -26,11 +26,11 @@ services:
- MAX_ALLOCS_ALLOWED_creating_10000_headers=100
- MAX_ALLOCS_ALLOWED_scheduling_10000_executions=20150
- MAX_ALLOCS_ALLOWED_modifying_1000_circular_buffer_elements=50
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_holding_buffer=1010
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_holding_buffer_with_space=1010
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_new_buffer=4010
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_new_buffer_with_space=4010
- MAX_ALLOCS_ALLOWED_decode_1000_ws_frames=1000
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_holding_buffer=1000
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_holding_buffer_with_space=1000
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_new_buffer=5010
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_new_buffer_with_space=5010
- MAX_ALLOCS_ALLOWED_decode_1000_ws_frames=2000
- MAX_ALLOCS_ALLOWED_modifying_byte_buffer_view=2010
- MAX_ALLOCS_ALLOWED_schedule_10000_tasks=90050
- MAX_ALLOCS_ALLOWED_read_10000_chunks_from_file=200500 #5.3 improvement 210050

View File

@ -26,11 +26,11 @@ services:
- MAX_ALLOCS_ALLOWED_scheduling_10000_executions=20150
- MAX_ALLOCS_ALLOWED_creating_10000_headers=10100
- MAX_ALLOCS_ALLOWED_modifying_1000_circular_buffer_elements=50
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_holding_buffer=1010
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_holding_buffer_with_space=1010
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_new_buffer=4010
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_new_buffer_with_space=4010
- MAX_ALLOCS_ALLOWED_decode_1000_ws_frames=1000
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_holding_buffer=1000
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_holding_buffer_with_space=1000
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_new_buffer=5010
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_new_buffer_with_space=5010
- MAX_ALLOCS_ALLOWED_decode_1000_ws_frames=2000
- MAX_ALLOCS_ALLOWED_modifying_byte_buffer_view=6010
- MAX_ALLOCS_ALLOWED_schedule_10000_tasks=90050
- MAX_ALLOCS_ALLOWED_read_10000_chunks_from_file=230050

View File

@ -26,11 +26,11 @@ services:
- MAX_ALLOCS_ALLOWED_creating_10000_headers=10100
- MAX_ALLOCS_ALLOWED_scheduling_10000_executions=20150
- MAX_ALLOCS_ALLOWED_modifying_1000_circular_buffer_elements=50
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_holding_buffer=1010
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_holding_buffer_with_space=1010
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_new_buffer=4010
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_new_buffer_with_space=4010
- MAX_ALLOCS_ALLOWED_decode_1000_ws_frames=1000
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_holding_buffer=1000
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_holding_buffer_with_space=1000
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_new_buffer=5010
- MAX_ALLOCS_ALLOWED_encode_1000_ws_frames_new_buffer_with_space=5010
- MAX_ALLOCS_ALLOWED_decode_1000_ws_frames=2000
- MAX_ALLOCS_ALLOWED_modifying_byte_buffer_view=6010
- MAX_ALLOCS_ALLOWED_schedule_10000_tasks=90050
- MAX_ALLOCS_ALLOWED_read_10000_chunks_from_file=210050