Converted Socket to use a promise for connecting.

This commit is contained in:
Tyler Cloutier 2017-11-28 19:08:03 -08:00
parent 48d7058bbf
commit fa0ccd61cf
2 changed files with 25 additions and 27 deletions

View File

@ -15,6 +15,7 @@ import Dispatch
import StreamKit
import POSIX
import IOStream
import PromiseKit
public final class Socket: WritableIOStream, ReadableIOStream {
public static let defaultReuseAddress = false
@ -73,14 +74,22 @@ public final class Socket: WritableIOStream, ReadableIOStream {
fileDescriptor: fd.rawValue,
queue: .main
) { error in
// Close the file descriptor for the channel
// fd.close()
// Throw any error
if let systemError = SystemError(errorNumber: error) {
try! { throw systemError }()
}
}
}
public func connect(host: String, port: Port) -> Source<Socket> {
return Source { [socketFD, fd, channel = self.channel] observer in
// public func close() {
// channel.close()
// }
public func connect(host: String, port: Port) -> Promise<()> {
return Promise { [socketFD, fd, channel = self.channel] resolve, reject in
var addrInfoPointer: UnsafeMutablePointer<addrinfo>? = nil
#if os(Linux)
@ -109,8 +118,8 @@ public final class Socket: WritableIOStream, ReadableIOStream {
let ret = getaddrinfo(host, String(port), &hints, &addrInfoPointer)
if let systemError = SystemError(errorNumber: ret) {
observer.sendFailed(systemError)
return nil
reject(systemError)
return
}
let addressInfo = addrInfoPointer!.pointee
@ -132,17 +141,17 @@ public final class Socket: WritableIOStream, ReadableIOStream {
// Blocking, connect immediately or throw error
if socketFD.blocking {
if connectRet != 0 {
observer.sendFailed(SystemError(errorNumber: errno)!)
reject(SystemError(errorNumber: errno)!)
} else {
observer.sendCompleted()
resolve(())
}
return nil
return
}
// Non-blocking, check for immediate connection
if connectRet == 0 {
observer.sendCompleted()
return nil
resolve(())
return
}
// Non-blocking, dispatch connection, check errno for connection error.
@ -154,27 +163,21 @@ public final class Socket: WritableIOStream, ReadableIOStream {
var resultLength = socklen_t(MemoryLayout<Int>.stride)
let ret = getsockopt(fd.rawValue, SOL_SOCKET, SO_ERROR, &result, &resultLength)
if let systemError = SystemError(errorNumber: ret) {
observer.sendFailed(systemError)
reject(systemError)
return
}
if let systemError = SystemError(errorNumber: Int32(result)) {
observer.sendFailed(systemError)
reject(systemError)
return
}
if let systemError = SystemError(errorNumber: error) {
observer.sendFailed(systemError)
reject(systemError)
return
}
observer.sendNext(self)
observer.sendCompleted()
resolve(())
}
} else if let error = error {
observer.sendFailed(error)
}
return ActionDisposable {
channel.close(flags: .stop)
//NOTE: Not super clear if the fd must be closed after closing the channel.
fd.close()
reject(error)
}
}
}

View File

@ -45,26 +45,21 @@ class ConnectionTests: XCTestCase {
}
let socket = try Socket(reusePort: true)
let connect = socket.connect(host: "localhost", port: 50000)
connect.onCompleted {
socket.connect(host: "localhost", port: 50000).then {
let buffer = Data("This is a test".utf8)
let write = socket.write(buffer: buffer)
write.onCompleted {
completeWriteExpectation.fulfill()
connect.stop()
}
write.onFailed { error in
XCTFail("Write failed with error: \(error)")
}
write.start()
}
connect.onFailed { error in
}.catch { error in
XCTFail("Connection failed with error: \(error)")
}
connect.start()
waitForExpectations(timeout: 1)
connect.stop()
connections.stop()
}