Handle reentranct reads in ALPNHandler (#2402)

# Motivation
I spotted a bug in the ALPNHandler where it doesn't properly unbuffer reentrant reads. This can lead to dropped reads.

# Modification
Instead of buffering into an array we are now buffering into a Deque and unbuffer as long as there are reads in the Deque.

# Result
No more dropped reads.
This commit is contained in:
Franz Busch 2023-04-13 13:06:15 +01:00 committed by GitHub
parent b4ebd5a64a
commit a2fd8ad077
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 69 additions and 11 deletions

View File

@ -68,7 +68,7 @@ var targets: [PackageDescription.Target] = [
name: "CNIOLLHTTP", name: "CNIOLLHTTP",
cSettings: [.define("LLHTTP_STRICT_MODE")] cSettings: [.define("LLHTTP_STRICT_MODE")]
), ),
.target(name: "NIOTLS", dependencies: ["NIO", "NIOCore"]), .target(name: "NIOTLS", dependencies: ["NIO", "NIOCore", swiftCollections]),
.executableTarget(name: "NIOChatServer", .executableTarget(name: "NIOChatServer",
dependencies: ["NIOPosix", "NIOCore", "NIOConcurrencyHelpers"], dependencies: ["NIOPosix", "NIOCore", "NIOConcurrencyHelpers"],
exclude: ["README.md"]), exclude: ["README.md"]),
@ -112,7 +112,7 @@ var targets: [PackageDescription.Target] = [
.testTarget(name: "NIOHTTP1Tests", .testTarget(name: "NIOHTTP1Tests",
dependencies: ["NIOCore", "NIOEmbedded", "NIOPosix", "NIOHTTP1", "NIOFoundationCompat", "NIOTestUtils"]), dependencies: ["NIOCore", "NIOEmbedded", "NIOPosix", "NIOHTTP1", "NIOFoundationCompat", "NIOTestUtils"]),
.testTarget(name: "NIOTLSTests", .testTarget(name: "NIOTLSTests",
dependencies: ["NIOCore", "NIOEmbedded", "NIOTLS", "NIOFoundationCompat"]), dependencies: ["NIOCore", "NIOEmbedded", "NIOTLS", "NIOFoundationCompat", "NIOTestUtils"]),
.testTarget(name: "NIOWebSocketTests", .testTarget(name: "NIOWebSocketTests",
dependencies: ["NIOCore", "NIOEmbedded", "NIOWebSocket"]), dependencies: ["NIOCore", "NIOEmbedded", "NIOWebSocket"]),
.testTarget(name: "NIOTestUtilsTests", .testTarget(name: "NIOTestUtilsTests",

View File

@ -13,6 +13,7 @@
//===----------------------------------------------------------------------===// //===----------------------------------------------------------------------===//
import NIOCore import NIOCore
import DequeModule
/// The result of an ALPN negotiation. /// The result of an ALPN negotiation.
/// ///
@ -62,7 +63,7 @@ public final class ApplicationProtocolNegotiationHandler: ChannelInboundHandler,
private let completionHandler: (ALPNResult, Channel) -> EventLoopFuture<Void> private let completionHandler: (ALPNResult, Channel) -> EventLoopFuture<Void>
private var waitingForUser: Bool private var waitingForUser: Bool
private var eventBuffer: [NIOAny] private var eventBuffer: Deque<NIOAny>
/// Create an `ApplicationProtocolNegotiationHandler` with the given completion /// Create an `ApplicationProtocolNegotiationHandler` with the given completion
/// callback. /// callback.
@ -125,15 +126,18 @@ public final class ApplicationProtocolNegotiationHandler: ChannelInboundHandler,
} }
private func unbuffer(context: ChannelHandlerContext) { private func unbuffer(context: ChannelHandlerContext) {
for datum in eventBuffer { // First we check if we have anything to unbuffer
guard !self.eventBuffer.isEmpty else {
return
}
// Now we unbuffer until there is nothing left.
// Importantly firing a channel read can lead to new reads being buffered due to reentrancy!
while let datum = self.eventBuffer.popFirst() {
context.fireChannelRead(datum) context.fireChannelRead(datum)
} }
let buffer = eventBuffer
eventBuffer = [] context.fireChannelReadComplete()
waitingForUser = false
if buffer.count > 0 {
context.fireChannelReadComplete()
}
} }
} }

View File

@ -2,7 +2,7 @@
// //
// This source file is part of the SwiftNIO open source project // This source file is part of the SwiftNIO open source project
// //
// Copyright (c) 2017-2021 Apple Inc. and the SwiftNIO project authors // Copyright (c) 2017-2023 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0 // Licensed under Apache License v2.0
// //
// See LICENSE.txt for license information // See LICENSE.txt for license information
@ -35,6 +35,7 @@ extension ApplicationProtocolNegotiationHandlerTests {
("testBufferingWhileWaitingForFuture", testBufferingWhileWaitingForFuture), ("testBufferingWhileWaitingForFuture", testBufferingWhileWaitingForFuture),
("testNothingBufferedDoesNotFireReadCompleted", testNothingBufferedDoesNotFireReadCompleted), ("testNothingBufferedDoesNotFireReadCompleted", testNothingBufferedDoesNotFireReadCompleted),
("testUnbufferingFiresReadCompleted", testUnbufferingFiresReadCompleted), ("testUnbufferingFiresReadCompleted", testUnbufferingFiresReadCompleted),
("testUnbufferingHandlesReentrantReads", testUnbufferingHandlesReentrantReads),
] ]
} }
} }

View File

@ -16,6 +16,7 @@ import XCTest
import NIOCore import NIOCore
import NIOEmbedded import NIOEmbedded
import NIOTLS import NIOTLS
import NIOTestUtils
private class ReadCompletedHandler: ChannelInboundHandler { private class ReadCompletedHandler: ChannelInboundHandler {
public typealias InboundIn = Any public typealias InboundIn = Any
@ -30,6 +31,26 @@ private class ReadCompletedHandler: ChannelInboundHandler {
} }
} }
final class DuplicatingReadHandler: ChannelInboundHandler {
typealias InboundIn = String
private let channel: EmbeddedChannel
private var hasDuplicatedRead = false
init(embeddedChannel: EmbeddedChannel) {
self.channel = embeddedChannel
}
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
if !self.hasDuplicatedRead {
self.hasDuplicatedRead = true
try! self.channel.writeInbound(self.unwrapInboundIn(data))
}
context.fireChannelRead(data)
}
}
class ApplicationProtocolNegotiationHandlerTests: XCTestCase { class ApplicationProtocolNegotiationHandlerTests: XCTestCase {
private enum EventType { private enum EventType {
case basic case basic
@ -222,4 +243,36 @@ class ApplicationProtocolNegotiationHandlerTests: XCTestCase {
XCTAssertTrue(try channel.finish().isClean) XCTAssertTrue(try channel.finish().isClean)
} }
func testUnbufferingHandlesReentrantReads() throws {
let channel = EmbeddedChannel()
let continuePromise = channel.eventLoop.makePromise(of: Void.self)
let handler = ApplicationProtocolNegotiationHandler { result in
continuePromise.futureResult
}
let readCompleteHandler = ReadCompletedHandler()
try channel.pipeline.addHandler(handler).wait()
try channel.pipeline.addHandler(DuplicatingReadHandler(embeddedChannel: channel)).wait()
try channel.pipeline.addHandler(readCompleteHandler).wait()
// Fire in the event.
channel.pipeline.fireUserInboundEventTriggered(negotiatedEvent)
// Send a write, which is buffered.
try channel.writeInbound("a write")
// At this time, readComplete hasn't fired.
XCTAssertEqual(readCompleteHandler.readCompleteCount, 1)
// Now satisfy the future, which forces data unbuffering. This should fire readComplete.
continuePromise.succeed(())
XCTAssertNoThrow(XCTAssertEqual(try channel.readInbound()!, "a write"))
XCTAssertNoThrow(XCTAssertEqual(try channel.readInbound()!, "a write"))
XCTAssertEqual(readCompleteHandler.readCompleteCount, 3)
XCTAssertTrue(try channel.finish().isClean)
}
} }