Rework the `NIOAsyncSequenceProducer` tests to rely less on timings (#2386)

This commit is contained in:
Franz Busch 2023-03-07 17:28:34 +00:00 committed by GitHub
parent d1fa3e29bf
commit ef7dc666e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 135 additions and 169 deletions

View File

@ -16,20 +16,31 @@ import NIOCore
import XCTest import XCTest
final class MockNIOElementStreamBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategy, @unchecked Sendable { final class MockNIOElementStreamBackPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategy, @unchecked Sendable {
var didYieldCallCount = 0 enum Event {
case didYield
case didNext
}
let events: AsyncStream<Event>
private let eventsContinuation: AsyncStream<Event>.Continuation
init() {
var eventsContinuation: AsyncStream<Event>.Continuation!
self.events = .init() { eventsContinuation = $0 }
self.eventsContinuation = eventsContinuation!
}
var didYieldHandler: ((Int) -> Bool)? var didYieldHandler: ((Int) -> Bool)?
func didYield(bufferDepth: Int) -> Bool { func didYield(bufferDepth: Int) -> Bool {
self.didYieldCallCount += 1 self.eventsContinuation.yield(.didYield)
if let didYieldHandler = self.didYieldHandler { if let didYieldHandler = self.didYieldHandler {
return didYieldHandler(bufferDepth) return didYieldHandler(bufferDepth)
} }
return false return false
} }
var didNextCallCount = 0
var didNextHandler: ((Int) -> Bool)? var didNextHandler: ((Int) -> Bool)?
func didConsume(bufferDepth: Int) -> Bool { func didConsume(bufferDepth: Int) -> Bool {
self.didNextCallCount += 1 self.eventsContinuation.yield(.didNext)
if let didNextHandler = self.didNextHandler { if let didNextHandler = self.didNextHandler {
return didNextHandler(bufferDepth) return didNextHandler(bufferDepth)
} }
@ -38,19 +49,30 @@ final class MockNIOElementStreamBackPressureStrategy: NIOAsyncSequenceProducerBa
} }
final class MockNIOBackPressuredStreamSourceDelegate: NIOAsyncSequenceProducerDelegate, @unchecked Sendable { final class MockNIOBackPressuredStreamSourceDelegate: NIOAsyncSequenceProducerDelegate, @unchecked Sendable {
var produceMoreCallCount = 0 enum Event {
case produceMore
case didTerminate
}
let events: AsyncStream<Event>
private let eventsContinuation: AsyncStream<Event>.Continuation
init() {
var eventsContinuation: AsyncStream<Event>.Continuation!
self.events = .init() { eventsContinuation = $0 }
self.eventsContinuation = eventsContinuation!
}
var produceMoreHandler: (() -> Void)? var produceMoreHandler: (() -> Void)?
func produceMore() { func produceMore() {
self.produceMoreCallCount += 1 self.eventsContinuation.yield(.produceMore)
if let produceMoreHandler = self.produceMoreHandler { if let produceMoreHandler = self.produceMoreHandler {
return produceMoreHandler() return produceMoreHandler()
} }
} }
var didTerminateCallCount = 0
var didTerminateHandler: (() -> Void)? var didTerminateHandler: (() -> Void)?
func didTerminate() { func didTerminate() {
self.didTerminateCallCount += 1 self.eventsContinuation.yield(.didTerminate)
if let didTerminateHandler = self.didTerminateHandler { if let didTerminateHandler = self.didTerminateHandler {
return didTerminateHandler() return didTerminateHandler()
} }
@ -109,14 +131,12 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
XCTAssertEqual(self.source.yield(contentsOf: [1, 2, 3]), .produceMore) XCTAssertEqual(self.source.yield(contentsOf: [1, 2, 3]), .produceMore)
XCTAssertEqual(self.source.yield(contentsOf: [4, 5, 6]), .stopProducing) XCTAssertEqual(self.source.yield(contentsOf: [4, 5, 6]), .stopProducing)
XCTAssertEqual(self.delegate.produceMoreCallCount, 0)
XCTAssertEqualWithoutAutoclosure(await iterator.next(), 1) XCTAssertEqualWithoutAutoclosure(await iterator.next(), 1)
XCTAssertEqualWithoutAutoclosure(await iterator.next(), 2) XCTAssertEqualWithoutAutoclosure(await iterator.next(), 2)
XCTAssertEqualWithoutAutoclosure(await iterator.next(), 3) XCTAssertEqualWithoutAutoclosure(await iterator.next(), 3)
XCTAssertEqualWithoutAutoclosure(await iterator.next(), 4) XCTAssertEqualWithoutAutoclosure(await iterator.next(), 4)
XCTAssertEqual(self.delegate.produceMoreCallCount, 0)
XCTAssertEqualWithoutAutoclosure(await iterator.next(), 5) XCTAssertEqualWithoutAutoclosure(await iterator.next(), 5)
XCTAssertEqual(self.delegate.produceMoreCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.produceMore])
XCTAssertEqual(self.source.yield(contentsOf: [7, 8, 9, 10, 11]), .stopProducing) XCTAssertEqual(self.source.yield(contentsOf: [7, 8, 9, 10, 11]), .stopProducing)
} }
@ -126,7 +146,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
self.backPressureStrategy.didYieldHandler = { _ in false } self.backPressureStrategy.didYieldHandler = { _ in false }
let result = self.source.yield(contentsOf: [1]) let result = self.source.yield(contentsOf: [1])
XCTAssertEqual(self.backPressureStrategy.didYieldCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didYield])
XCTAssertEqual(result, .stopProducing) XCTAssertEqual(result, .stopProducing)
} }
@ -134,7 +154,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
self.backPressureStrategy.didYieldHandler = { _ in true } self.backPressureStrategy.didYieldHandler = { _ in true }
let result = self.source.yield(contentsOf: [1]) let result = self.source.yield(contentsOf: [1])
XCTAssertEqual(self.backPressureStrategy.didYieldCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didYield])
XCTAssertEqual(result, .produceMore) XCTAssertEqual(result, .produceMore)
} }
@ -145,14 +165,13 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
// sure the other child task runs when the demand is registered // sure the other child task runs when the demand is registered
let sequence = try XCTUnwrap(self.sequence) let sequence = try XCTUnwrap(self.sequence)
async let element = sequence.first { _ in true } async let element = sequence.first { _ in true }
try await Task.sleep(nanoseconds: 1_000_000) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didNext])
XCTAssertEqual(self.backPressureStrategy.didNextCallCount, 1)
let result = self.source.yield(contentsOf: [1]) let result = self.source.yield(contentsOf: [1])
XCTAssertEqual(result, .stopProducing) XCTAssertEqual(result, .stopProducing)
XCTAssertEqualWithoutAutoclosure(await element, 1) XCTAssertEqualWithoutAutoclosure(await element, 1)
XCTAssertEqual(self.backPressureStrategy.didYieldCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didYield])
} }
func testYield_whenStreaming_andSuspended_andDemandMore() async throws { func testYield_whenStreaming_andSuspended_andDemandMore() async throws {
@ -162,14 +181,13 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
// sure the other child task runs when the demand is registered // sure the other child task runs when the demand is registered
let sequence = try XCTUnwrap(self.sequence) let sequence = try XCTUnwrap(self.sequence)
async let element = sequence.first { _ in true } async let element = sequence.first { _ in true }
try await Task.sleep(nanoseconds: 1_000_000) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didNext])
XCTAssertEqual(self.backPressureStrategy.didNextCallCount, 1)
let result = self.source.yield(contentsOf: [1]) let result = self.source.yield(contentsOf: [1])
XCTAssertEqual(result, .produceMore) XCTAssertEqual(result, .produceMore)
XCTAssertEqualWithoutAutoclosure(await element, 1) XCTAssertEqualWithoutAutoclosure(await element, 1)
XCTAssertEqual(self.backPressureStrategy.didYieldCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didYield])
} }
func testYieldEmptySequence_whenStreaming_andSuspended_andStopDemanding() async throws { func testYieldEmptySequence_whenStreaming_andSuspended_andStopDemanding() async throws {
@ -182,13 +200,11 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
// Would prefer to use async let _ here but that is not allowed yet // Would prefer to use async let _ here but that is not allowed yet
_ = await sequence.first { _ in true } _ = await sequence.first { _ in true }
} }
try await Task.sleep(nanoseconds: 1_000_000) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didNext])
XCTAssertEqual(self.backPressureStrategy.didNextCallCount, 1)
let result = self.source.yield(contentsOf: []) let result = self.source.yield(contentsOf: [])
XCTAssertEqual(result, .stopProducing) XCTAssertEqual(result, .stopProducing)
XCTAssertEqual(self.backPressureStrategy.didYieldCallCount, 0)
} }
func testYieldEmptySequence_whenStreaming_andSuspended_andDemandMore() async throws { func testYieldEmptySequence_whenStreaming_andSuspended_andDemandMore() async throws {
@ -201,13 +217,11 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
// Would prefer to use async let _ here but that is not allowed yet // Would prefer to use async let _ here but that is not allowed yet
_ = await sequence.first { _ in true } _ = await sequence.first { _ in true }
} }
try await Task.sleep(nanoseconds: 1_000_000) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didNext])
XCTAssertEqual(self.backPressureStrategy.didNextCallCount, 1)
let result = self.source.yield(contentsOf: []) let result = self.source.yield(contentsOf: [])
XCTAssertEqual(result, .stopProducing) XCTAssertEqual(result, .stopProducing)
XCTAssertEqual(self.backPressureStrategy.didYieldCallCount, 0)
} }
func testYield_whenStreaming_andNotSuspended_andStopDemanding() async throws { func testYield_whenStreaming_andNotSuspended_andStopDemanding() async throws {
@ -218,7 +232,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
let result = self.source.yield(contentsOf: [1]) let result = self.source.yield(contentsOf: [1])
XCTAssertEqual(result, .stopProducing) XCTAssertEqual(result, .stopProducing)
XCTAssertEqual(self.backPressureStrategy.didYieldCallCount, 2) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(2).collect(), [.didYield, .didYield])
} }
func testYield_whenStreaming_andNotSuspended_andDemandMore() async throws { func testYield_whenStreaming_andNotSuspended_andDemandMore() async throws {
@ -229,7 +243,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
let result = self.source.yield(contentsOf: [1]) let result = self.source.yield(contentsOf: [1])
XCTAssertEqual(result, .produceMore) XCTAssertEqual(result, .produceMore)
XCTAssertEqual(self.backPressureStrategy.didYieldCallCount, 2) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(2).collect(), [.didYield, .didYield])
} }
func testYield_whenSourceFinished() async throws { func testYield_whenSourceFinished() async throws {
@ -238,15 +252,12 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
let result = self.source.yield(contentsOf: [1]) let result = self.source.yield(contentsOf: [1])
XCTAssertEqual(result, .dropped) XCTAssertEqual(result, .dropped)
XCTAssertEqual(self.backPressureStrategy.didYieldCallCount, 0)
} }
// MARK: - Finish // MARK: - Finish
func testFinish_whenInitial() async { func testFinish_whenInitial() async {
self.source.finish() self.source.finish()
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
} }
func testFinish_whenStreaming_andSuspended() async throws { func testFinish_whenStreaming_andSuspended() async throws {
@ -259,7 +270,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
self.source.finish() self.source.finish()
XCTAssertEqualWithoutAutoclosure(await element, nil) XCTAssertEqualWithoutAutoclosure(await element, nil)
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
} }
func testFinish_whenStreaming_andNotSuspended_andBufferEmpty() async throws { func testFinish_whenStreaming_andNotSuspended_andBufferEmpty() async throws {
@ -269,7 +280,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
let element = await self.sequence.first { _ in true } let element = await self.sequence.first { _ in true }
XCTAssertNil(element) XCTAssertNil(element)
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
} }
func testFinish_whenStreaming_andNotSuspended_andBufferNotEmpty() async throws { func testFinish_whenStreaming_andNotSuspended_andBufferNotEmpty() async throws {
@ -277,34 +288,26 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
self.source.finish() self.source.finish()
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
let element = await self.sequence.first { _ in true } let element = await self.sequence.first { _ in true }
XCTAssertEqual(element, 1) XCTAssertEqual(element, 1)
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
} }
func testFinish_whenFinished() async throws { func testFinish_whenFinished() async throws {
self.source.finish() self.source.finish()
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
_ = await self.sequence.first { _ in true } _ = await self.sequence.first { _ in true }
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
self.source.finish() self.source.finish()
XCTAssertEqual(self.delegate.didTerminateCallCount, 1)
} }
// MARK: - Source Deinited // MARK: - Source Deinited
func testSourceDeinited_whenInitial() async { func testSourceDeinited_whenInitial() async {
self.source = nil self.source = nil
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
} }
func testSourceDeinited_whenStreaming_andSuspended() async throws { func testSourceDeinited_whenStreaming_andSuspended() async throws {
@ -325,7 +328,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
} }
XCTAssertEqual(element, nil) XCTAssertEqual(element, nil)
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
} }
func testSourceDeinited_whenStreaming_andNotSuspended_andBufferEmpty() async throws { func testSourceDeinited_whenStreaming_andNotSuspended_andBufferEmpty() async throws {
@ -343,7 +346,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
} }
XCTAssertNil(element) XCTAssertNil(element)
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
} }
func testSourceDeinited_whenStreaming_andNotSuspended_andBufferNotEmpty() async throws { func testSourceDeinited_whenStreaming_andNotSuspended_andBufferNotEmpty() async throws {
@ -351,8 +354,6 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
self.source = nil self.source = nil
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
let sequence = try XCTUnwrap(self.sequence) let sequence = try XCTUnwrap(self.sequence)
let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in
group.addTask { group.addTask {
@ -364,7 +365,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
XCTAssertEqual(element, 1) XCTAssertEqual(element, 1)
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
} }
// MARK: - Task cancel // MARK: - Task cancel
@ -379,10 +380,9 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
} }
try await Task.sleep(nanoseconds: 1_000_000) try await Task.sleep(nanoseconds: 1_000_000)
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
task.cancel() task.cancel()
let value = await task.value let value = await task.value
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
XCTAssertNil(value) XCTAssertNil(value)
} }
@ -405,10 +405,9 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
_ = self.source.yield(contentsOf: [1]) _ = self.source.yield(contentsOf: [1])
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
task.cancel() task.cancel()
let value = await task.value let value = await task.value
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
XCTAssertEqual(value, 1) XCTAssertEqual(value, 1)
} }
@ -422,12 +421,10 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
} }
try await Task.sleep(nanoseconds: 1_000_000) try await Task.sleep(nanoseconds: 1_000_000)
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
self.source.finish() self.source.finish()
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
task.cancel() task.cancel()
let value = await task.value let value = await task.value
XCTAssertEqual(self.delegate.didTerminateCallCount, 1)
XCTAssertNil(value) XCTAssertNil(value)
} }
@ -459,10 +456,9 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
// Would prefer to use async let _ here but that is not allowed yet // Would prefer to use async let _ here but that is not allowed yet
_ = await sequence.first { _ in true } _ = await sequence.first { _ in true }
} }
try await Task.sleep(nanoseconds: 1_000_000)
XCTAssertEqual(self.backPressureStrategy.didNextCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didNext])
XCTAssertEqual(self.delegate.produceMoreCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.produceMore])
} }
func testNext_whenInitial_whenNoDemand() async throws { func testNext_whenInitial_whenNoDemand() async throws {
@ -474,10 +470,8 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
// Would prefer to use async let _ here but that is not allowed yet // Would prefer to use async let _ here but that is not allowed yet
_ = await sequence.first { _ in true } _ = await sequence.first { _ in true }
} }
try await Task.sleep(nanoseconds: 1_000_000)
XCTAssertEqual(self.backPressureStrategy.didNextCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didNext])
XCTAssertEqual(self.delegate.produceMoreCallCount, 0)
} }
func testNext_whenStreaming_whenEmptyBuffer_whenDemand() async throws { func testNext_whenStreaming_whenEmptyBuffer_whenDemand() async throws {
@ -491,10 +485,9 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
// Would prefer to use async let _ here but that is not allowed yet // Would prefer to use async let _ here but that is not allowed yet
_ = await sequence.first { _ in true } _ = await sequence.first { _ in true }
} }
try await Task.sleep(nanoseconds: 1_000_000)
XCTAssertEqual(self.backPressureStrategy.didNextCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(2).collect(), [.didYield, .didNext])
XCTAssertEqual(self.delegate.produceMoreCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.produceMore])
} }
func testNext_whenStreaming_whenEmptyBuffer_whenNoDemand() async throws { func testNext_whenStreaming_whenEmptyBuffer_whenNoDemand() async throws {
@ -508,10 +501,8 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
// Would prefer to use async let _ here but that is not allowed yet // Would prefer to use async let _ here but that is not allowed yet
_ = await sequence.first { _ in true } _ = await sequence.first { _ in true }
} }
try await Task.sleep(nanoseconds: 1_000_000)
XCTAssertEqual(self.backPressureStrategy.didNextCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(2).collect(), [.didYield, .didNext])
XCTAssertEqual(self.delegate.produceMoreCallCount, 0)
} }
func testNext_whenStreaming_whenNotEmptyBuffer_whenNoDemand() async throws { func testNext_whenStreaming_whenNotEmptyBuffer_whenNoDemand() async throws {
@ -521,8 +512,7 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
let element = await self.sequence.first { _ in true } let element = await self.sequence.first { _ in true }
XCTAssertEqual(element, 1) XCTAssertEqual(element, 1)
XCTAssertEqual(self.backPressureStrategy.didNextCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(2).collect(), [.didYield, .didNext])
XCTAssertEqual(self.delegate.produceMoreCallCount, 0)
} }
func testNext_whenStreaming_whenNotEmptyBuffer_whenNewDemand() async throws { func testNext_whenStreaming_whenNotEmptyBuffer_whenNewDemand() async throws {
@ -532,8 +522,8 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
let element = await self.sequence.first { _ in true } let element = await self.sequence.first { _ in true }
XCTAssertEqual(element, 1) XCTAssertEqual(element, 1)
XCTAssertEqual(self.backPressureStrategy.didNextCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(2).collect(), [.didYield, .didNext])
XCTAssertEqual(self.delegate.produceMoreCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.produceMore])
} }
func testNext_whenStreaming_whenNotEmptyBuffer_whenNewAndOutstandingDemand() async throws { func testNext_whenStreaming_whenNotEmptyBuffer_whenNewAndOutstandingDemand() async throws {
@ -541,13 +531,11 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
self.backPressureStrategy.didYieldHandler = { _ in true } self.backPressureStrategy.didYieldHandler = { _ in true }
_ = self.source.yield(contentsOf: [1]) _ = self.source.yield(contentsOf: [1])
XCTAssertEqual(self.delegate.produceMoreCallCount, 0)
let element = await self.sequence.first { _ in true } let element = await self.sequence.first { _ in true }
XCTAssertEqual(element, 1) XCTAssertEqual(element, 1)
XCTAssertEqual(self.backPressureStrategy.didNextCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(2).collect(), [.didYield, .didNext])
XCTAssertEqual(self.delegate.produceMoreCallCount, 0)
} }
func testNext_whenSourceFinished() async throws { func testNext_whenSourceFinished() async throws {
@ -568,18 +556,17 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
func testSequenceDeinitialized() async { func testSequenceDeinitialized() async {
self.sequence = nil self.sequence = nil
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
} }
func testSequenceDeinitialized_whenIteratorReferenced() async { func testSequenceDeinitialized_whenIteratorReferenced() async {
var iterator = self.sequence?.makeAsyncIterator() var iterator = self.sequence?.makeAsyncIterator()
self.sequence = nil self.sequence = nil
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
XCTAssertNotNil(iterator) XCTAssertNotNil(iterator)
iterator = nil iterator = nil
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
} }
// MARK: - IteratorDeinitialized // MARK: - IteratorDeinitialized
@ -589,32 +576,30 @@ final class NIOAsyncSequenceProducerTests: XCTestCase {
XCTAssertNotNil(iterator) XCTAssertNotNil(iterator)
iterator = nil iterator = nil
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
self.sequence = nil self.sequence = nil
XCTAssertEqual(self.delegate.didTerminateCallCount, 1)
} }
func testIteratorDeinitialized_whenSequenceFinished() { func testIteratorDeinitialized_whenSequenceFinished() async {
self.source.finish() self.source.finish()
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
var iterator = self.sequence?.makeAsyncIterator() var iterator = self.sequence?.makeAsyncIterator()
XCTAssertNotNil(iterator) XCTAssertNotNil(iterator)
iterator = nil iterator = nil
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
} }
func testIteratorDeinitialized_whenStreaming() { func testIteratorDeinitialized_whenStreaming() async {
_ = self.source.yield(contentsOf: [1]) _ = self.source.yield(contentsOf: [1])
var iterator = self.sequence?.makeAsyncIterator() var iterator = self.sequence?.makeAsyncIterator()
XCTAssertNotNil(iterator) XCTAssertNotNil(iterator)
iterator = nil iterator = nil
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
} }
} }
@ -626,6 +611,14 @@ fileprivate func XCTAssertEqualWithoutAutoclosure<T>(
file: StaticString = #filePath, file: StaticString = #filePath,
line: UInt = #line line: UInt = #line
) where T: Equatable { ) where T: Equatable {
let result = expression1 == expression2 XCTAssertEqual(expression1, expression2, message(), file: file, line: line)
XCTAssertTrue(result, message(), file: file, line: line) }
extension AsyncSequence {
/// Collect all elements in the sequence into an array.
fileprivate func collect() async rethrows -> [Element] {
try await self.reduce(into: []) { accumulated, next in
accumulated.append(next)
}
}
} }

View File

@ -70,14 +70,12 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
XCTAssertEqual(self.source.yield(contentsOf: [1, 2, 3]), .produceMore) XCTAssertEqual(self.source.yield(contentsOf: [1, 2, 3]), .produceMore)
XCTAssertEqual(self.source.yield(contentsOf: [4, 5, 6]), .stopProducing) XCTAssertEqual(self.source.yield(contentsOf: [4, 5, 6]), .stopProducing)
XCTAssertEqual(self.delegate.produceMoreCallCount, 0)
XCTAssertEqualWithoutAutoclosure(try await iterator.next(), 1) XCTAssertEqualWithoutAutoclosure(try await iterator.next(), 1)
XCTAssertEqualWithoutAutoclosure(try await iterator.next(), 2) XCTAssertEqualWithoutAutoclosure(try await iterator.next(), 2)
XCTAssertEqualWithoutAutoclosure(try await iterator.next(), 3) XCTAssertEqualWithoutAutoclosure(try await iterator.next(), 3)
XCTAssertEqualWithoutAutoclosure(try await iterator.next(), 4) XCTAssertEqualWithoutAutoclosure(try await iterator.next(), 4)
XCTAssertEqual(self.delegate.produceMoreCallCount, 0)
XCTAssertEqualWithoutAutoclosure(try await iterator.next(), 5) XCTAssertEqualWithoutAutoclosure(try await iterator.next(), 5)
XCTAssertEqual(self.delegate.produceMoreCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.produceMore])
XCTAssertEqual(self.source.yield(contentsOf: [7, 8, 9, 10, 11]), .stopProducing) XCTAssertEqual(self.source.yield(contentsOf: [7, 8, 9, 10, 11]), .stopProducing)
} }
@ -87,7 +85,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
self.backPressureStrategy.didYieldHandler = { _ in false } self.backPressureStrategy.didYieldHandler = { _ in false }
let result = self.source.yield(contentsOf: [1]) let result = self.source.yield(contentsOf: [1])
XCTAssertEqual(self.backPressureStrategy.didYieldCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didYield])
XCTAssertEqual(result, .stopProducing) XCTAssertEqual(result, .stopProducing)
} }
@ -95,7 +93,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
self.backPressureStrategy.didYieldHandler = { _ in true } self.backPressureStrategy.didYieldHandler = { _ in true }
let result = self.source.yield(contentsOf: [1]) let result = self.source.yield(contentsOf: [1])
XCTAssertEqual(self.backPressureStrategy.didYieldCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didYield])
XCTAssertEqual(result, .produceMore) XCTAssertEqual(result, .produceMore)
} }
@ -111,7 +109,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
} }
try await Task.sleep(nanoseconds: 1_000_000) try await Task.sleep(nanoseconds: 1_000_000)
XCTAssertEqual(self.backPressureStrategy.didNextCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didNext])
let result = self.source.yield(contentsOf: [1]) let result = self.source.yield(contentsOf: [1])
@ -121,7 +119,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
} }
XCTAssertEqual(element, 1) XCTAssertEqual(element, 1)
XCTAssertEqual(self.backPressureStrategy.didYieldCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didYield])
} }
func testYield_whenStreaming_andSuspended_andProduceMore() async throws { func testYield_whenStreaming_andSuspended_andProduceMore() async throws {
@ -136,7 +134,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
} }
try await Task.sleep(nanoseconds: 1_000_000) try await Task.sleep(nanoseconds: 1_000_000)
XCTAssertEqual(self.backPressureStrategy.didNextCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didNext])
let result = self.source.yield(contentsOf: [1]) let result = self.source.yield(contentsOf: [1])
@ -146,7 +144,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
} }
XCTAssertEqual(element, 1) XCTAssertEqual(element, 1)
XCTAssertEqual(self.backPressureStrategy.didYieldCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didYield])
} }
func testYieldEmptySequence_whenStreaming_andSuspended_andStopProducing() async throws { func testYieldEmptySequence_whenStreaming_andSuspended_andStopProducing() async throws {
@ -161,14 +159,12 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
} }
try await Task.sleep(nanoseconds: 1_000_000) try await Task.sleep(nanoseconds: 1_000_000)
XCTAssertEqual(self.backPressureStrategy.didNextCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didNext])
let result = self.source.yield(contentsOf: []) let result = self.source.yield(contentsOf: [])
XCTAssertEqual(result, .stopProducing) XCTAssertEqual(result, .stopProducing)
XCTAssertEqual(self.backPressureStrategy.didYieldCallCount, 0)
group.cancelAll() group.cancelAll()
} }
} }
@ -185,14 +181,12 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
} }
try await Task.sleep(nanoseconds: 1_000_000) try await Task.sleep(nanoseconds: 1_000_000)
XCTAssertEqual(self.backPressureStrategy.didNextCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didNext])
let result = self.source.yield(contentsOf: []) let result = self.source.yield(contentsOf: [])
XCTAssertEqual(result, .stopProducing) XCTAssertEqual(result, .stopProducing)
XCTAssertEqual(self.backPressureStrategy.didYieldCallCount, 0)
group.cancelAll() group.cancelAll()
} }
} }
@ -205,7 +199,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
let result = self.source.yield(contentsOf: [1]) let result = self.source.yield(contentsOf: [1])
XCTAssertEqual(result, .stopProducing) XCTAssertEqual(result, .stopProducing)
XCTAssertEqual(self.backPressureStrategy.didYieldCallCount, 2) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(2).collect(), [.didYield, .didYield])
} }
func testYield_whenStreaming_andNotSuspended_andProduceMore() async throws { func testYield_whenStreaming_andNotSuspended_andProduceMore() async throws {
@ -216,7 +210,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
let result = self.source.yield(contentsOf: [1]) let result = self.source.yield(contentsOf: [1])
XCTAssertEqual(result, .produceMore) XCTAssertEqual(result, .produceMore)
XCTAssertEqual(self.backPressureStrategy.didYieldCallCount, 2) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(2).collect(), [.didYield, .didYield])
} }
func testYield_whenSourceFinished() async throws { func testYield_whenSourceFinished() async throws {
@ -225,15 +219,12 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
let result = self.source.yield(contentsOf: [1]) let result = self.source.yield(contentsOf: [1])
XCTAssertEqual(result, .dropped) XCTAssertEqual(result, .dropped)
XCTAssertEqual(self.backPressureStrategy.didYieldCallCount, 0)
} }
// MARK: - Finish // MARK: - Finish
func testFinish_whenInitial() async { func testFinish_whenInitial() async {
self.source.finish() self.source.finish()
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
} }
func testFinish_whenStreaming_andSuspended() async throws { func testFinish_whenStreaming_andSuspended() async throws {
@ -254,7 +245,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
} }
XCTAssertEqual(element, nil) XCTAssertEqual(element, nil)
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
} }
func testFinish_whenStreaming_andNotSuspended_andBufferEmpty() async throws { func testFinish_whenStreaming_andNotSuspended_andBufferEmpty() async throws {
@ -272,7 +263,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
} }
XCTAssertNil(element) XCTAssertNil(element)
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
} }
func testFinish_whenStreaming_andNotSuspended_andBufferNotEmpty() async throws { func testFinish_whenStreaming_andNotSuspended_andBufferNotEmpty() async throws {
@ -280,8 +271,6 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
self.source.finish() self.source.finish()
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
let sequence = try XCTUnwrap(self.sequence) let sequence = try XCTUnwrap(self.sequence)
let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in
group.addTask { group.addTask {
@ -293,21 +282,18 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
XCTAssertEqual(element, 1) XCTAssertEqual(element, 1)
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
} }
func testFinish_whenFinished() async throws { func testFinish_whenFinished() async throws {
self.source.finish() self.source.finish()
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
_ = try await self.sequence.first { _ in true } _ = try await self.sequence.first { _ in true }
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
self.source.finish() self.source.finish()
XCTAssertEqual(self.delegate.didTerminateCallCount, 1)
} }
// MARK: - Finish with Error // MARK: - Finish with Error
@ -315,13 +301,11 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
func testFinishError_whenInitial() async { func testFinishError_whenInitial() async {
self.source.finish(ChannelError.alreadyClosed) self.source.finish(ChannelError.alreadyClosed)
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
await XCTAssertThrowsError(try await self.sequence.first { _ in true }) { error in await XCTAssertThrowsError(try await self.sequence.first { _ in true }) { error in
XCTAssertEqual(error as? ChannelError, .alreadyClosed) XCTAssertEqual(error as? ChannelError, .alreadyClosed)
} }
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
} }
func testFinishError_whenStreaming_andSuspended() async throws { func testFinishError_whenStreaming_andSuspended() async throws {
@ -342,7 +326,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
XCTAssertEqual(error as? ChannelError, .alreadyClosed) XCTAssertEqual(error as? ChannelError, .alreadyClosed)
} }
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
} }
func testFinishError_whenStreaming_andNotSuspended_andBufferEmpty() async throws { func testFinishError_whenStreaming_andNotSuspended_andBufferEmpty() async throws {
@ -361,7 +345,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
XCTAssertEqual(error as? ChannelError, .alreadyClosed) XCTAssertEqual(error as? ChannelError, .alreadyClosed)
} }
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
} }
func testFinishError_whenStreaming_andNotSuspended_andBufferNotEmpty() async throws { func testFinishError_whenStreaming_andNotSuspended_andBufferNotEmpty() async throws {
@ -369,8 +353,6 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
self.source.finish(ChannelError.alreadyClosed) self.source.finish(ChannelError.alreadyClosed)
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
var elements = [Int]() var elements = [Int]()
await XCTAssertThrowsError(try await { await XCTAssertThrowsError(try await {
@ -382,33 +364,26 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
} }
XCTAssertEqual(elements, [1]) XCTAssertEqual(elements, [1])
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
} }
func testFinishError_whenFinished() async throws { func testFinishError_whenFinished() async throws {
self.source.finish() self.source.finish()
let iterator = self.sequence.makeAsyncIterator() let iterator = self.sequence.makeAsyncIterator()
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
_ = try await iterator.next() _ = try await iterator.next()
XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
XCTAssertEqual(self.delegate.didTerminateCallCount, 1)
self.source.finish(ChannelError.alreadyClosed) self.source.finish(ChannelError.alreadyClosed)
// This call should just return nil // This call should just return nil
_ = try await iterator.next() _ = try await iterator.next()
XCTAssertEqual(self.delegate.didTerminateCallCount, 1)
} }
// MARK: - Source Deinited // MARK: - Source Deinited
func testSourceDeinited_whenInitial() async { func testSourceDeinited_whenInitial() async {
self.source = nil self.source = nil
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
} }
func testSourceDeinited_whenStreaming_andSuspended() async throws { func testSourceDeinited_whenStreaming_andSuspended() async throws {
@ -429,7 +404,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
} }
XCTAssertEqual(element, nil) XCTAssertEqual(element, nil)
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
} }
func testSourceDeinited_whenStreaming_andNotSuspended_andBufferEmpty() async throws { func testSourceDeinited_whenStreaming_andNotSuspended_andBufferEmpty() async throws {
@ -447,7 +422,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
} }
XCTAssertNil(element) XCTAssertNil(element)
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
} }
func testSourceDeinited_whenStreaming_andNotSuspended_andBufferNotEmpty() async throws { func testSourceDeinited_whenStreaming_andNotSuspended_andBufferNotEmpty() async throws {
@ -455,8 +430,6 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
self.source = nil self.source = nil
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
let sequence = try XCTUnwrap(self.sequence) let sequence = try XCTUnwrap(self.sequence)
let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in let element: Int? = try await withThrowingTaskGroup(of: Int?.self) { group in
group.addTask { group.addTask {
@ -468,7 +441,7 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
XCTAssertEqual(element, 1) XCTAssertEqual(element, 1)
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
} }
// MARK: - Task cancel // MARK: - Task cancel
@ -483,10 +456,9 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
} }
try await Task.sleep(nanoseconds: 1_000_000) try await Task.sleep(nanoseconds: 1_000_000)
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
task.cancel() task.cancel()
let value = try await task.value let value = try await task.value
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
XCTAssertNil(value) XCTAssertNil(value)
} }
@ -506,10 +478,9 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
_ = self.source.yield(contentsOf: [1]) _ = self.source.yield(contentsOf: [1])
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
task.cancel() task.cancel()
let value = try await task.value let value = try await task.value
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
XCTAssertEqual(value, 1) XCTAssertEqual(value, 1)
} }
@ -523,12 +494,10 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
} }
try await Task.sleep(nanoseconds: 1_000_000) try await Task.sleep(nanoseconds: 1_000_000)
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
self.source.finish() self.source.finish()
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
task.cancel() task.cancel()
let value = try await task.value let value = try await task.value
XCTAssertEqual(self.delegate.didTerminateCallCount, 1)
XCTAssertNil(value) XCTAssertNil(value)
} }
@ -562,8 +531,8 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
} }
try await Task.sleep(nanoseconds: 1_000_000) try await Task.sleep(nanoseconds: 1_000_000)
XCTAssertEqual(self.backPressureStrategy.didNextCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didNext])
XCTAssertEqual(self.delegate.produceMoreCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.produceMore])
} }
func testNext_whenInitial_whenNoDemand() async throws { func testNext_whenInitial_whenNoDemand() async throws {
@ -577,13 +546,13 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
} }
try await Task.sleep(nanoseconds: 1_000_000) try await Task.sleep(nanoseconds: 1_000_000)
XCTAssertEqual(self.backPressureStrategy.didNextCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didNext])
XCTAssertEqual(self.delegate.produceMoreCallCount, 0)
} }
func testNext_whenStreaming_whenEmptyBuffer_whenDemand() async throws { func testNext_whenStreaming_whenEmptyBuffer_whenDemand() async throws {
self.backPressureStrategy.didNextHandler = { _ in true } self.backPressureStrategy.didNextHandler = { _ in true }
_ = self.source.yield(contentsOf: []) _ = self.source.yield(contentsOf: [])
XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didYield])
// We are registering our demand and sleeping a bit to make // We are registering our demand and sleeping a bit to make
// sure the other child task runs when the demand is registered // sure the other child task runs when the demand is registered
@ -594,13 +563,13 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
} }
try await Task.sleep(nanoseconds: 1_000_000) try await Task.sleep(nanoseconds: 1_000_000)
XCTAssertEqual(self.backPressureStrategy.didNextCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didNext])
XCTAssertEqual(self.delegate.produceMoreCallCount, 1)
} }
func testNext_whenStreaming_whenEmptyBuffer_whenNoDemand() async throws { func testNext_whenStreaming_whenEmptyBuffer_whenNoDemand() async throws {
self.backPressureStrategy.didNextHandler = { _ in false } self.backPressureStrategy.didNextHandler = { _ in false }
_ = self.source.yield(contentsOf: []) _ = self.source.yield(contentsOf: [])
XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didYield])
// We are registering our demand and sleeping a bit to make // We are registering our demand and sleeping a bit to make
// sure the other child task runs when the demand is registered // sure the other child task runs when the demand is registered
@ -611,30 +580,30 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
} }
try await Task.sleep(nanoseconds: 1_000_000) try await Task.sleep(nanoseconds: 1_000_000)
XCTAssertEqual(self.backPressureStrategy.didNextCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didNext])
XCTAssertEqual(self.delegate.produceMoreCallCount, 0)
} }
func testNext_whenStreaming_whenNotEmptyBuffer_whenNoDemand() async throws { func testNext_whenStreaming_whenNotEmptyBuffer_whenNoDemand() async throws {
self.backPressureStrategy.didNextHandler = { _ in false } self.backPressureStrategy.didNextHandler = { _ in false }
_ = self.source.yield(contentsOf: [1]) _ = self.source.yield(contentsOf: [1])
XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didYield])
let element = try await self.sequence.first { _ in true } let element = try await self.sequence.first { _ in true }
XCTAssertEqual(element, 1) XCTAssertEqual(element, 1)
XCTAssertEqual(self.backPressureStrategy.didNextCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didNext])
XCTAssertEqual(self.delegate.produceMoreCallCount, 0)
} }
func testNext_whenStreaming_whenNotEmptyBuffer_whenNewDemand() async throws { func testNext_whenStreaming_whenNotEmptyBuffer_whenNewDemand() async throws {
self.backPressureStrategy.didNextHandler = { _ in true } self.backPressureStrategy.didNextHandler = { _ in true }
_ = self.source.yield(contentsOf: [1]) _ = self.source.yield(contentsOf: [1])
XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didYield])
let element = try await self.sequence.first { _ in true } let element = try await self.sequence.first { _ in true }
XCTAssertEqual(element, 1) XCTAssertEqual(element, 1)
XCTAssertEqual(self.backPressureStrategy.didNextCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didNext])
XCTAssertEqual(self.delegate.produceMoreCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.produceMore])
} }
func testNext_whenStreaming_whenNotEmptyBuffer_whenNewAndOutstandingDemand() async throws { func testNext_whenStreaming_whenNotEmptyBuffer_whenNewAndOutstandingDemand() async throws {
@ -642,13 +611,12 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
self.backPressureStrategy.didYieldHandler = { _ in true } self.backPressureStrategy.didYieldHandler = { _ in true }
_ = self.source.yield(contentsOf: [1]) _ = self.source.yield(contentsOf: [1])
XCTAssertEqual(self.delegate.produceMoreCallCount, 0) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didYield])
let element = try await self.sequence.first { _ in true } let element = try await self.sequence.first { _ in true }
XCTAssertEqual(element, 1) XCTAssertEqual(element, 1)
XCTAssertEqual(self.backPressureStrategy.didNextCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.backPressureStrategy.events.prefix(1).collect(), [.didNext])
XCTAssertEqual(self.delegate.produceMoreCallCount, 0)
} }
func testNext_whenSourceFinished() async throws { func testNext_whenSourceFinished() async throws {
@ -669,18 +637,17 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
func testSequenceDeinitialized() async { func testSequenceDeinitialized() async {
self.sequence = nil self.sequence = nil
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
} }
func testSequenceDeinitialized_whenIteratorReferenced() async { func testSequenceDeinitialized_whenIteratorReferenced() async {
var iterator = self.sequence?.makeAsyncIterator() var iterator = self.sequence?.makeAsyncIterator()
self.sequence = nil self.sequence = nil
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
XCTAssertNotNil(iterator) XCTAssertNotNil(iterator)
iterator = nil iterator = nil
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
} }
// MARK: - IteratorDeinitialized // MARK: - IteratorDeinitialized
@ -690,32 +657,30 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
XCTAssertNotNil(iterator) XCTAssertNotNil(iterator)
iterator = nil iterator = nil
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
self.sequence = nil self.sequence = nil
XCTAssertEqual(self.delegate.didTerminateCallCount, 1)
} }
func testIteratorDeinitialized_whenSequenceFinished() { func testIteratorDeinitialized_whenSequenceFinished() async {
self.source.finish() self.source.finish()
XCTAssertEqual(self.delegate.didTerminateCallCount, 0)
var iterator = self.sequence?.makeAsyncIterator() var iterator = self.sequence?.makeAsyncIterator()
XCTAssertNotNil(iterator) XCTAssertNotNil(iterator)
iterator = nil iterator = nil
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
} }
func testIteratorDeinitialized_whenStreaming() { func testIteratorDeinitialized_whenStreaming() async {
_ = self.source.yield(contentsOf: [1]) _ = self.source.yield(contentsOf: [1])
var iterator = self.sequence?.makeAsyncIterator() var iterator = self.sequence?.makeAsyncIterator()
XCTAssertNotNil(iterator) XCTAssertNotNil(iterator)
iterator = nil iterator = nil
XCTAssertEqual(self.delegate.didTerminateCallCount, 1) XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
} }
} }
@ -727,6 +692,14 @@ fileprivate func XCTAssertEqualWithoutAutoclosure<T>(
file: StaticString = #filePath, file: StaticString = #filePath,
line: UInt = #line line: UInt = #line
) where T: Equatable { ) where T: Equatable {
let result = expression1 == expression2 XCTAssertEqual(expression1, expression2, message(), file: file, line: line)
XCTAssertTrue(result, message(), file: file, line: line) }
extension AsyncSequence {
/// Collect all elements in the sequence into an array.
fileprivate func collect() async rethrows -> [Element] {
try await self.reduce(into: []) { accumulated, next in
accumulated.append(next)
}
}
} }