Throw `CancellationError` if `NIOThrowingAsyncSequenceProducer.AsyncIterator.next()` is cancelled instead of returning `nil` (#2399)

* Throw `CancellationError` if `NIOThrowingAsyncSequenceProducer.AsyncIterator.next()` is cancelled instead of returning `nil`

* Update doc comment

* Fix typo
This commit is contained in:
David Nadoba 2023-04-05 17:37:43 +01:00 committed by GitHub
parent 4f7b78202d
commit 75cea45e61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 11 additions and 8 deletions

View File

@ -198,7 +198,8 @@ extension NIOAsyncSequenceProducer {
@inlinable
public func next() async -> Element? {
return try! await self._throwingIterator.next()
// this call will only throw if cancelled and we want to just return nil in that case
return try? await self._throwingIterator.next()
}
}
}

View File

@ -498,8 +498,8 @@ extension NIOThrowingAsyncSequenceProducer {
return delegate
case .resumeContinuationWithNilAndCallDidTerminate(let continuation):
continuation.resume(returning: nil)
case .resumeContinuationWithCancellationErrorAndCallDidTerminate(let continuation):
continuation.resume(throwing: CancellationError())
let delegate = self._delegate
self._delegate = nil
@ -868,9 +868,9 @@ extension NIOThrowingAsyncSequenceProducer {
enum CancelledAction {
/// Indicates that ``NIOAsyncSequenceProducerDelegate/didTerminate()`` should be called.
case callDidTerminate
/// Indicates that the continuation should be resumed with `nil` and
/// Indicates that the continuation should be resumed with a `CancellationError` and
/// that ``NIOAsyncSequenceProducerDelegate/didTerminate()`` should be called.
case resumeContinuationWithNilAndCallDidTerminate(CheckedContinuation<Element?, Error>)
case resumeContinuationWithCancellationErrorAndCallDidTerminate(CheckedContinuation<Element?, Error>)
/// Indicates that nothing should be done.
case none
}
@ -889,7 +889,7 @@ extension NIOThrowingAsyncSequenceProducer {
// and we can transition to finished here and inform the delegate
self._state = .finished(iteratorInitialized: iteratorInitialized)
return .resumeContinuationWithNilAndCallDidTerminate(continuation)
return .resumeContinuationWithCancellationErrorAndCallDidTerminate(continuation)
case .streaming(_, _, continuation: .none, _, let iteratorInitialized):
self._state = .finished(iteratorInitialized: iteratorInitialized)

View File

@ -457,9 +457,11 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
try await Task.sleep(nanoseconds: 1_000_000)
task.cancel()
let value = try await task.value
let result = await task.result
XCTAssertEqualWithoutAutoclosure(await self.delegate.events.prefix(1).collect(), [.didTerminate])
XCTAssertNil(value)
await XCTAssertThrowsError(try result.get()) { error in
XCTAssertTrue(error is CancellationError)
}
}
func testTaskCancel_whenStreaming_andNotSuspended() async throws {