clean up the (proto)types for EventLoopFuture

* clean up the (proto)types for EventLoopFuture

* Remove more unnecessary parens
This commit is contained in:
Johannes Weiss 2018-02-09 16:03:28 +00:00
parent 17b5fd58e4
commit 734904d0f4
13 changed files with 181 additions and 76 deletions

View File

@ -148,25 +148,25 @@ public final class ServerBootstrap {
func addAcceptHandlerAndFinishServerSetup() {
let f = serverChannel.pipeline.add(handler: AcceptHandler(childChannelInitializer: childChannelInit,
childChannelOptions: childChannelOptions))
f.whenComplete(callback: { v in
f.whenComplete { v in
switch v {
case .failure(let err):
promise.fail(error: err)
case .success(_):
finishServerSetup()
}
})
}
}
if let serverChannelInit = serverChannelInit {
serverChannelInit(serverChannel).whenComplete(callback: { v in
serverChannelInit(serverChannel).whenComplete { v in
switch v {
case .failure(let err):
promise.fail(error: err)
case .success(_):
addAcceptHandlerAndFinishServerSetup()
}
})
}
} else {
addAcceptHandlerAndFinishServerSetup()
}
@ -190,7 +190,7 @@ public final class ServerBootstrap {
func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
let accepted = self.unwrapInboundIn(data)
self.childChannelOptions.applyAll(channel: accepted).whenComplete(callback: { v in
self.childChannelOptions.applyAll(channel: accepted).whenComplete { v in
// We must return to the server channel.
ctx.channel.eventLoop.execute {
switch v {
@ -199,7 +199,7 @@ public final class ServerBootstrap {
case .success(_):
if let childChannelInit = self.childChannelInit {
childChannelInit(accepted).whenComplete(callback: { v in
childChannelInit(accepted).whenComplete { v in
switch v {
case .failure(let err):
self.closeAndFire(ctx: ctx, accepted: accepted, err: err)
@ -212,13 +212,13 @@ public final class ServerBootstrap {
}
}
}
})
}
} else {
ctx.fireChannelRead(data: data)
}
}
}
})
}
}
private func closeAndFire(ctx: ChannelHandlerContext, accepted: SocketChannel, err: Error) {
@ -351,14 +351,14 @@ public final class ClientBootstrap {
}
if let channelInitializer = channelInitializer {
channelInitializer(channel).whenComplete(callback: { v in
channelInitializer(channel).whenComplete { v in
switch v {
case .failure(let err):
promise.fail(error: err)
case .success(_):
finishClientSetup()
}
})
}
} else {
finishClientSetup()
}
@ -406,14 +406,14 @@ fileprivate struct ChannelOptionStorage {
return
}
applier(channel)(key, value).whenComplete(callback: { v in
applier(channel)(key, value).whenComplete { v in
switch v {
case .failure(let err):
applyPromise.fail(error: err)
case .success(_):
applyNext()
}
})
}
}
applyNext()

View File

@ -86,13 +86,13 @@
///
/// ```
/// ChannelPipeline p = ...;
/// let future = p.add(name: "1", handler: InboundHandlerA()).then(callback: { _ in
/// let future = p.add(name: "1", handler: InboundHandlerA()).then { _ in
/// return p.add(name: "2", handler: InboundHandlerB())
/// }).then(callback: { _ in
/// }.then { _ in
/// return p.add(name: "3", handler: OutboundHandlerA())
/// }).then(callback: { _ in
/// }.then { _ in
/// p.add(name: "4", handler: OutboundHandlerB())
/// }).then(callback: { _ in
/// }.then { _ in
/// p.add(name: "5", handler: InboundOutboundHandlerX())
/// }
/// // Handle the future as well ....
@ -1214,8 +1214,8 @@ public final class ChannelHandlerContext : ChannelInvoker {
outboundHandler.write(ctx: self, data: data, promise: writePromise)
outboundHandler.flush(ctx: self, promise: flushPromise)
writePromise.futureResult.whenComplete(callback: callback)
flushPromise.futureResult.whenComplete(callback: callback)
writePromise.futureResult.whenComplete(callback)
flushPromise.futureResult.whenComplete(callback)
} else {
outboundHandler.write(ctx: self, data: data, promise: nil)
outboundHandler.flush(ctx: self, promise: nil)

View File

@ -26,14 +26,14 @@ public struct Scheduled<T> {
init(promise: EventLoopPromise<T>, cancellationTask: @escaping () -> ()) {
self.promise = promise
promise.futureResult.whenFailure(callback: { error in
promise.futureResult.whenFailure { error in
guard let err = error as? EventLoopError else {
return
}
if err == .cancelled {
cancellationTask()
}
})
}
self.cancellationTask = cancellationTask
}

View File

@ -44,7 +44,8 @@ private struct CallbackList: ExpressibleByArrayLiteral {
}
}
}
mutating func append(callback: @escaping () -> CallbackList) {
mutating func append(_ callback: @escaping () -> CallbackList) {
if self.firstCallback == nil {
self.firstCallback = callback
} else {
@ -342,19 +343,14 @@ extension EventLoopFuture {
- parameter callback: Function that will receive the value of this EventLoopFuture and return a new EventLoopFuture
- returns: A future that will receive the eventual value
*/
public func then<U>(file: StaticString = #file, line: UInt = #line, callback: @escaping (T) throws -> EventLoopFuture<U>) -> EventLoopFuture<U> {
public func then<U>(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (T) -> EventLoopFuture<U>) -> EventLoopFuture<U> {
let next = EventLoopPromise<U>(eventLoop: eventLoop, file: file, line: line)
_whenComplete {
switch self.value! {
case .success(let t):
do {
let futureU = try callback(t)
return futureU._addCallback {
return next._setValue(value: futureU.value!)
}
} catch let error {
return next._setValue(value: .failure(error))
let futureU = callback(t)
return futureU._addCallback {
return next._setValue(value: futureU.value!)
}
case .failure(let error):
return next._setValue(value: .failure(error))
@ -363,6 +359,27 @@ extension EventLoopFuture {
return next.futureResult
}
public func thenThrowing<U>(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (T) throws -> U) -> EventLoopFuture<U> {
return self.then(file: file, line: line) { (value: T) -> EventLoopFuture<U> in
do {
return EventLoopFuture<U>(eventLoop: self.eventLoop, result: try callback(value), file: file, line: line)
} catch {
return EventLoopFuture<U>(eventLoop: self.eventLoop, error: error, file: file, line: line)
}
}
}
public func thenIfErrorThrowing(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (Error) throws -> T) -> EventLoopFuture<T> {
return self.thenIfError(file: file, line: line) { value in
do {
return EventLoopFuture(eventLoop: self.eventLoop, result: try callback(value), file: file, line: line)
} catch {
return EventLoopFuture(eventLoop: self.eventLoop, error: error, file: file, line: line)
}
}
}
/** Chainable transformation.
```
@ -386,8 +403,8 @@ extension EventLoopFuture {
Generally, a simple closure provided to `then()` should never block. If you need to do something time-consuming, your closure can schedule the operation on another queue and return another `EventLoopFuture<>` object instead. See `then(queue:callback:)` for a convenient way to do this.
*/
public func map<U>(file: StaticString = #file, line: UInt = #line, callback: @escaping (T) throws -> (U)) -> EventLoopFuture<U> {
return then { return EventLoopFuture<U>(eventLoop: self.eventLoop, result: try callback($0), file: file, line: line) }
public func map<U>(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (T) -> (U)) -> EventLoopFuture<U> {
return then { return EventLoopFuture<U>(eventLoop: self.eventLoop, result: callback($0), file: file, line: line) }
}
@ -408,53 +425,49 @@ extension EventLoopFuture {
This supports the same overloads as `then()`, including allowing the callback to return a `EventLoopFuture<T>`.
*/
public func thenIfError(file: StaticString = #file, line: UInt = #line, callback: @escaping (Error) throws -> EventLoopFuture<T>) -> EventLoopFuture<T> {
public func thenIfError(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (Error) -> EventLoopFuture<T>) -> EventLoopFuture<T> {
let next = EventLoopPromise<T>(eventLoop: eventLoop, file: file, line: line)
_whenComplete {
switch self.value! {
case .success(let t):
return next._setValue(value: .success(t))
case .failure(let e):
do {
let t = try callback(e)
return t._addCallback {
return next._setValue(value: t.value!)
}
} catch let error {
return next._setValue(value: .failure(error))
let t = callback(e)
return t._addCallback {
return next._setValue(value: t.value!)
}
}
}
return next.futureResult
}
public func mapIfError(file: StaticString = #file, line: UInt = #line, callback: @escaping (Error) throws -> T) -> EventLoopFuture<T> {
return thenIfError { return EventLoopFuture<T>(eventLoop: self.eventLoop, result: try callback($0), file: file, line: line) }
public func mapIfError(file: StaticString = #file, line: UInt = #line, _ callback: @escaping (Error) -> T) -> EventLoopFuture<T> {
return thenIfError { return EventLoopFuture<T>(eventLoop: self.eventLoop, result: callback($0), file: file, line: line) }
}
/// Add a callback. If there's already a value, invoke it and return the resulting list of new callback functions.
fileprivate func _addCallback(callback: @escaping () -> CallbackList) -> CallbackList {
fileprivate func _addCallback(_ callback: @escaping () -> CallbackList) -> CallbackList {
assert(eventLoop.inEventLoop)
if value == nil {
callbacks.append(callback: callback)
callbacks.append(callback)
return CallbackList()
}
return callback()
}
/// Add a callback. If there's already a value, run as much of the chain as we can.
fileprivate func _whenComplete(callback: @escaping () -> CallbackList) {
fileprivate func _whenComplete(_ callback: @escaping () -> CallbackList) {
if eventLoop.inEventLoop {
_addCallback(callback: callback)._run()
_addCallback(callback)._run()
} else {
eventLoop.execute {
self._addCallback(callback: callback)._run()
self._addCallback(callback)._run()
}
}
}
public func whenSuccess(callback: @escaping (T) -> ()) {
public func whenSuccess(_ callback: @escaping (T) -> ()) {
_whenComplete {
if case .success(let t) = self.value! {
callback(t)
@ -463,7 +476,7 @@ extension EventLoopFuture {
}
}
public func whenFailure(callback: @escaping (Error) -> ()) {
public func whenFailure(_ callback: @escaping (Error) -> ()) {
_whenComplete {
if case .failure(let e) = self.value! {
callback(e)
@ -472,7 +485,7 @@ extension EventLoopFuture {
}
}
public func whenComplete(callback: @escaping (EventLoopFutureValue<T>) -> ()) {
public func whenComplete(_ callback: @escaping (EventLoopFutureValue<T>) -> ()) {
_whenComplete {
callback(self.value!)
return CallbackList()
@ -556,20 +569,20 @@ extension EventLoopFuture {
extension EventLoopFuture {
public func cascade(promise: EventLoopPromise<T>) {
whenComplete(callback: { v in
whenComplete { v in
switch v {
case .failure(let err):
promise.fail(error: err)
case .success(let value):
promise.succeed(result: value)
}
})
}
}
public func cascadeFailure<U>(promise: EventLoopPromise<U>) {
self.whenFailure(callback: { err in
self.whenFailure { err in
promise.fail(error: err)
})
}
}
}
@ -607,7 +620,7 @@ extension EventLoopFuture {
return p0.futureResult
}
let fn: EventLoopFuture<Void> = futures.reduce(p0.futureResult, { (f1: EventLoopFuture<Void>, f2: EventLoopFuture<Void>) in f1.and(f2).map(callback: { _ in return () }) })
let fn: EventLoopFuture<Void> = futures.reduce(p0.futureResult, { (f1: EventLoopFuture<Void>, f2: EventLoopFuture<Void>) in f1.and(f2).map({ _ in return () }) })
p0.succeed(result: ())
return fn
}

View File

@ -459,8 +459,8 @@ internal class HappyEyeballsConnector {
// The two queries SHOULD be made as soon after one another as possible,
// with the AAAA query made first and immediately followed by the A
// query.
resolver.initiateAAAAQuery(host: host, port: port).whenComplete(callback: aaaaLookupComplete)
resolver.initiateAQuery(host: host, port: port).whenComplete(callback: aLookupComplete)
resolver.initiateAAAAQuery(host: host, port: port).whenComplete(aaaaLookupComplete)
resolver.initiateAQuery(host: host, port: port).whenComplete(aLookupComplete)
}
/// Called when the A query has completed before the AAAA query.

View File

@ -1059,7 +1059,7 @@ final class ServerSocketChannel : BaseSocketChannel<ServerSocket> {
let ch = data.forceAsOther() as SocketChannel
let f = ch.register()
f.whenComplete(callback: { v in
f.whenComplete { v in
switch v {
case .failure(_):
ch.close(promise: nil)
@ -1067,6 +1067,6 @@ final class ServerSocketChannel : BaseSocketChannel<ServerSocket> {
ch.becomeActive0()
ch.readIfNeeded0()
}
})
}
}
}

View File

@ -48,9 +48,9 @@ let bootstrap = ServerBootstrap(group: group)
// Set the handlers that are appled to the accepted Channels
.childChannelInitializer { channel in
// Ensure we don't read faster then we can write by adding the BackPressureHandler into the pipeline.
return channel.pipeline.add(handler: BackPressureHandler()).then(callback: { v in
return channel.pipeline.add(handler: BackPressureHandler()).then { v in
return channel.pipeline.add(handler: EchoHandler())
})
}
}
// Enable TCP_NODELAY and SO_REUSEADDR for the accepted Channels

View File

@ -333,9 +333,9 @@ private final class HTTPHandler: ChannelInboundHandler {
if keepAlive {
ctx.write(data: self.wrapOutboundOut(HTTPServerResponsePart.end(nil)), promise: nil)
} else {
ctx.write(data: self.wrapOutboundOut(HTTPServerResponsePart.end(nil))).whenComplete(callback: { _ in
ctx.write(data: self.wrapOutboundOut(HTTPServerResponsePart.end(nil))).whenComplete { _ in
ctx.close(promise: nil)
})
}
}
}
}

View File

@ -31,6 +31,10 @@ extension EventLoopFutureTest {
("testAndAllWithAllSuccesses", testAndAllWithAllSuccesses),
("testAndAllWithAllFailures", testAndAllWithAllFailures),
("testAndAllWithOneFailure", testAndAllWithOneFailure),
("testThenThrowingWhichDoesNotThrow", testThenThrowingWhichDoesNotThrow),
("testThenThrowingWhichDoesThrow", testThenThrowingWhichDoesThrow),
("testThenIfErrorThrowingWhichDoesNotThrow", testThenIfErrorThrowingWhichDoesNotThrow),
("testThenIfErrorThrowingWhichDoesThrow", testThenIfErrorThrowingWhichDoesThrow),
]
}
}

View File

@ -81,4 +81,90 @@ class EventLoopFutureTest : XCTestCase {
XCTFail("error of wrong type \(e)")
}
}
func testThenThrowingWhichDoesNotThrow() {
let eventLoop = EmbeddedEventLoop()
var ran = false
let p: EventLoopPromise<String> = eventLoop.newPromise()
p.futureResult.map {
$0.count
}.thenThrowing {
1 + $0
}.whenSuccess {
ran = true
XCTAssertEqual($0, 6)
}
p.succeed(result: "hello")
XCTAssertTrue(ran)
}
func testThenThrowingWhichDoesThrow() {
enum DummyError: Error, Equatable {
case dummyError
}
let eventLoop = EmbeddedEventLoop()
var ran = false
let p: EventLoopPromise<String> = eventLoop.newPromise()
p.futureResult.map {
$0.count
}.thenThrowing { (x: Int) throws -> Int in
XCTAssertEqual(5, x)
throw DummyError.dummyError
}.map { (x: Int) -> Int in
XCTFail("shouldn't have been called")
return x
}.whenFailure {
ran = true
XCTAssertEqual(.some(DummyError.dummyError), $0 as? DummyError)
}
p.succeed(result: "hello")
XCTAssertTrue(ran)
}
func testThenIfErrorThrowingWhichDoesNotThrow() {
enum DummyError: Error, Equatable {
case dummyError
}
let eventLoop = EmbeddedEventLoop()
var ran = false
let p: EventLoopPromise<String> = eventLoop.newPromise()
p.futureResult.map {
$0.count
}.thenIfErrorThrowing {
XCTAssertEqual(.some(DummyError.dummyError), $0 as? DummyError)
return 5
}.thenIfErrorThrowing { _ in
XCTFail("shouldn't have been called")
return 5
}.whenSuccess {
ran = true
XCTAssertEqual($0, 5)
}
p.fail(error: DummyError.dummyError)
XCTAssertTrue(ran)
}
func testThenIfErrorThrowingWhichDoesThrow() {
enum DummyError: Error, Equatable {
case dummyError1
case dummyError2
}
let eventLoop = EmbeddedEventLoop()
var ran = false
let p: EventLoopPromise<String> = eventLoop.newPromise()
p.futureResult.map {
$0.count
}.thenIfErrorThrowing { (x: Error) throws -> Int in
XCTAssertEqual(.some(DummyError.dummyError1), x as? DummyError)
throw DummyError.dummyError2
}.map { (x: Int) -> Int in
XCTFail("shouldn't have been called")
return x
}.whenFailure {
ran = true
XCTAssertEqual(.some(DummyError.dummyError2), $0 as? DummyError)
}
p.fail(error: DummyError.dummyError1)
XCTAssertTrue(ran)
}
}

View File

@ -148,15 +148,17 @@ class FileRegionTest : XCTestCase {
}
try content.write(toFile: filePath, atomically: false, encoding: .ascii)
do {
() = try clientChannel.writeAndFlush(data: NIOAny(FileRegion(file: filePath, readerIndex: 0, endIndex: bytes.count))).then(callback: { _ in
let frFuture = try clientChannel.write(data: NIOAny(FileRegion(file: filePath, readerIndex: 0, endIndex: bytes.count)))
() = try clientChannel.writeAndFlush(data: NIOAny(FileRegion(file: filePath, readerIndex: 0, endIndex: bytes.count))).thenThrowing {
try FileRegion(file: filePath, readerIndex: 0, endIndex: bytes.count)
}.then { (fileRegion: FileRegion) -> EventLoopFuture<()> in
let frFuture = clientChannel.write(data: NIOAny(fileRegion))
var buffer = clientChannel.allocator.buffer(capacity: bytes.count)
buffer.write(bytes: bytes)
let bbFuture = clientChannel.write(data: NIOAny(buffer))
clientChannel.close(promise: nil)
clientChannel.flush(promise: nil)
return frFuture.then { bbFuture }
}).wait()
}.wait()
XCTFail("no error happened even though we closed before flush")
} catch let e as ChannelError {
XCTAssertEqual(ChannelError.alreadyClosed, e)

View File

@ -112,7 +112,7 @@ private extension Channel {
func state() -> ConnectRecorder.State {
return try! self.pipeline.context(name: CONNECT_RECORDER).map {
($0.handler as! ConnectRecorder).state
}.mapIfError {
}.thenIfErrorThrowing {
switch $0 {
case ChannelPipelineError.notFound:
return .closed
@ -248,7 +248,7 @@ private func buildEyeballer(host: String,
public class HappyEyeballsTest : XCTestCase {
func testIPv4OnlyResolution() throws {
let (eyeballer, resolver, loop) = buildEyeballer(host: "example.com", port: 80)
let targetFuture = eyeballer.resolveAndConnect().map { (channel) -> String? in
let targetFuture = eyeballer.resolveAndConnect().thenThrowing { (channel) -> String? in
let target = channel.connectTarget()
_ = try (channel as! EmbeddedChannel).finish()
return target
@ -272,7 +272,7 @@ public class HappyEyeballsTest : XCTestCase {
func testIPv6OnlyResolution() throws {
let (eyeballer, resolver, loop) = buildEyeballer(host: "example.com", port: 80)
let targetFuture = eyeballer.resolveAndConnect().map { (channel) -> String? in
let targetFuture = eyeballer.resolveAndConnect().thenThrowing { (channel) -> String? in
let target = channel.connectTarget()
_ = try (channel as! EmbeddedChannel).finish()
return target
@ -329,7 +329,7 @@ public class HappyEyeballsTest : XCTestCase {
func testAAAAQueryReturningFirst() throws {
let (eyeballer, resolver, loop) = buildEyeballer(host: "example.com", port: 80)
let targetFuture = eyeballer.resolveAndConnect().map { (channel) -> String? in
let targetFuture = eyeballer.resolveAndConnect().thenThrowing { (channel) -> String? in
let target = channel.connectTarget()
_ = try (channel as! EmbeddedChannel).finish()
return target
@ -361,7 +361,7 @@ public class HappyEyeballsTest : XCTestCase {
func testAQueryReturningFirstDelayElapses() throws {
let (eyeballer, resolver, loop) = buildEyeballer(host: "example.com", port: 80)
let targetFuture = eyeballer.resolveAndConnect().map { (channel) -> String? in
let targetFuture = eyeballer.resolveAndConnect().thenThrowing { (channel) -> String? in
let target = channel.connectTarget()
_ = try (channel as! EmbeddedChannel).finish()
return target
@ -403,7 +403,7 @@ public class HappyEyeballsTest : XCTestCase {
func testAQueryReturningFirstThenAAAAReturns() throws {
let (eyeballer, resolver, loop) = buildEyeballer(host: "example.com", port: 80)
let targetFuture = eyeballer.resolveAndConnect().map { (channel) -> String? in
let targetFuture = eyeballer.resolveAndConnect().thenThrowing { (channel) -> String? in
let target = channel.connectTarget()
_ = try (channel as! EmbeddedChannel).finish()
return target
@ -437,7 +437,7 @@ public class HappyEyeballsTest : XCTestCase {
func testAQueryReturningFirstThenAAAAErrors() throws {
let (eyeballer, resolver, loop) = buildEyeballer(host: "example.com", port: 80)
let targetFuture = eyeballer.resolveAndConnect().map { (channel) -> String? in
let targetFuture = eyeballer.resolveAndConnect().thenThrowing { (channel) -> String? in
let target = channel.connectTarget()
_ = try (channel as! EmbeddedChannel).finish()
return target
@ -471,7 +471,7 @@ public class HappyEyeballsTest : XCTestCase {
func testAQueryReturningFirstThenEmptyAAAA() throws {
let (eyeballer, resolver, loop) = buildEyeballer(host: "example.com", port: 80)
let targetFuture = eyeballer.resolveAndConnect().map { (channel) -> String? in
let targetFuture = eyeballer.resolveAndConnect().thenThrowing { (channel) -> String? in
let target = channel.connectTarget()
_ = try (channel as! EmbeddedChannel).finish()
return target

View File

@ -77,9 +77,9 @@ class IdleStateHandlerTest : XCTestCase {
let serverChannel = try ServerBootstrap(group: group)
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.childChannelInitializer { channel in
channel.pipeline.add(handler: handler).then(callback: { f in
channel.pipeline.add(handler: handler).then { f in
channel.pipeline.add(handler: TestWriteHandler(writeToChannel, assertEventFn))
})
}
}.bind(to: "127.0.0.1", on: 0).wait()
defer {