Fixed tests and added a reusePort option.

This commit is contained in:
Tyler Cloutier 2017-11-24 20:54:48 -08:00
parent f0028c810e
commit db1e8b2c72
8 changed files with 84 additions and 30 deletions

View File

@ -10,6 +10,14 @@ import Foundation
public class Response: Serializable, HTTPMessage {
static let noBodyStatuses = Set([
100,
101,
102,
204,
304,
])
public var version: Version
public var status: Status
public var rawHeaders: [String]
@ -46,13 +54,18 @@ public class Response: Serializable, HTTPMessage {
self.version = version
self.status = status
self.body = body
self.rawHeaders = Array([
rawHeaders,
[
"Content-Length",
"\(body.count)",
]
].joined())
if !Response.noBodyStatuses.contains(self.status.code) &&
!rawHeaders.contains("Content-Length") {
self.rawHeaders = Array([
rawHeaders,
[
"Content-Length",
"\(body.count)",
]
].joined())
} else {
self.rawHeaders = rawHeaders
}
}
public convenience init(

View File

@ -21,9 +21,13 @@ public final class Server {
let delegate: ServerDelegate
private var disposable: ActionDisposable? = nil
let reuseAddress: Bool
let reusePort: Bool
public init(delegate: ServerDelegate? = nil) {
public init(delegate: ServerDelegate? = nil, reuseAddress: Bool = false, reusePort: Bool = false) {
self.delegate = delegate ?? Router()
self.reuseAddress = reuseAddress
self.reusePort = reusePort
}
deinit {
@ -35,8 +39,8 @@ public final class Server {
}
func clientSource(host: String, port: POSIX.Port) -> Source<ClientConnection> {
return Source { observer in
let tcpServer = try! TCP.Server()
return Source { [reuseAddress, reusePort] observer in
let tcpServer = try! TCP.Server(reuseAddress: reuseAddress, reusePort: reusePort)
try! tcpServer.bind(host: host, port: port)
let listen = tcpServer.listen()

View File

@ -18,20 +18,21 @@ import IOStream
public final class Server {
public static let defaultReuseAddress = true
public static let defaultReuseAddress = false
public static let defaultReusePort = false
private let fd: SocketFileDescriptor
private let listeningSource: DispatchSourceRead
public convenience init(reuseAddress: Bool = defaultReuseAddress) throws {
public convenience init(reuseAddress: Bool = defaultReuseAddress, reusePort: Bool = defaultReusePort) throws {
let fd = try SocketFileDescriptor(
socketType: SocketType.stream,
addressFamily: AddressFamily.inet
)
try self.init(fd: fd, reuseAddress: reuseAddress)
try self.init(fd: fd, reuseAddress: reuseAddress, reusePort: reusePort)
}
public init(fd: SocketFileDescriptor, reuseAddress: Bool = defaultReuseAddress) throws {
public init(fd: SocketFileDescriptor, reuseAddress: Bool = defaultReuseAddress, reusePort: Bool = defaultReusePort) throws {
self.fd = fd
if reuseAddress {
// Set SO_REUSEADDR
@ -48,6 +49,21 @@ public final class Server {
}
}
if reusePort {
// Set SO_REUSEPORT
var reusePort = 1
let error = setsockopt(
self.fd.rawValue,
SOL_SOCKET,
SO_REUSEPORT,
&reusePort,
socklen_t(MemoryLayout<Int>.stride)
)
if let systemError = SystemError(errorNumber: error) {
throw systemError
}
}
self.listeningSource = DispatchSource.makeReadSource(
fileDescriptor: self.fd.rawValue,
queue: .main
@ -165,7 +181,6 @@ public final class Server {
}
return ActionDisposable {
listeningSource.cancel()
fd.close()
}
}
}

View File

@ -17,7 +17,8 @@ import POSIX
import IOStream
public final class Socket: WritableIOStream, ReadableIOStream {
public static let defaultReuseAddress = true
public static let defaultReuseAddress = false
public static let defaultReusePort = false
private let socketFD: SocketFileDescriptor
public var fd: FileDescriptor {
@ -25,15 +26,15 @@ public final class Socket: WritableIOStream, ReadableIOStream {
}
public let channel: DispatchIO
public convenience init() throws {
public convenience init(reuseAddress: Bool = defaultReuseAddress, reusePort: Bool = defaultReusePort) throws {
let fd = try SocketFileDescriptor(
socketType: SocketType.stream,
addressFamily: AddressFamily.inet
)
try self.init(fd: fd)
try self.init(fd: fd, reuseAddress: reuseAddress, reusePort: reusePort)
}
public init(fd: SocketFileDescriptor, reuseAddress: Bool = defaultReuseAddress) throws {
public init(fd: SocketFileDescriptor, reuseAddress: Bool = defaultReuseAddress, reusePort: Bool = defaultReusePort) throws {
self.socketFD = fd
if reuseAddress {
@ -51,6 +52,21 @@ public final class Socket: WritableIOStream, ReadableIOStream {
}
}
if reusePort {
// Set SO_REUSEPORT
var reusePort = 1
let error = setsockopt(
self.socketFD.rawValue,
SOL_SOCKET,
SO_REUSEPORT,
&reusePort,
socklen_t(MemoryLayout<Int>.stride)
)
if let systemError = SystemError(errorNumber: error) {
throw systemError
}
}
// Create the dispatch source for listening
self.channel = DispatchIO(
type: .stream,
@ -156,7 +172,9 @@ public final class Socket: WritableIOStream, ReadableIOStream {
observer.sendFailed(error)
}
return ActionDisposable {
channel.close()
channel.close(flags: .stop)
//NOTE: Not super clear if the fd must be closed after closing the channel.
fd.close()
}
}
}

View File

@ -13,7 +13,7 @@ import XCTest
class ResponseSerializationTests: XCTestCase {
func testBasicSerialization() {
let expected = "HTTP/1.1 200 OK\r\n\r\n"
let expected = "HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"
let response = Response(
version: Version(major: 1, minor: 1),
status: .ok,
@ -29,6 +29,7 @@ class ResponseSerializationTests: XCTestCase {
"HTTP/1.1 200 OK\r\n" +
"Date: Sun, 30 Oct 2016 09:06:40 GMT\r\n" +
"Content-Type: text/html; charset=ISO-8859-1\r\n" +
"Content-Length: 0\r\n" +
"\r\n"
let response = Response(
version: Version(major: 1, minor: 1),

View File

@ -78,7 +78,7 @@ class ServerTests: XCTestCase {
return try! Response(json: jsonResponse)
}
let server = HTTP.Server()
let server = HTTP.Server(reusePort: true)
server.clientSource(host: "0.0.0.0", port: 3001).startWithNext { client in
let requestStream = client

View File

@ -118,7 +118,7 @@ class RouterTests: XCTestCase {
app.add("/v1.0", api)
app.add(notFound)
let server = HTTP.Server(delegate: app)
let server = HTTP.Server(delegate: app, reusePort: true)
server.listen(host: "0.0.0.0", port: 3000)
sendRequest(path: "/v1.0/users", method: "GET")
@ -143,7 +143,7 @@ class RouterTests: XCTestCase {
let app = Router()
app.add("/foo/:bar", sub)
let server = HTTP.Server(delegate: app)
let server = HTTP.Server(delegate: app, reusePort: true)
server.listen(host: "0.0.0.0", port: 3000)
sendRequest(path: "/foo/users/far", method: "GET")
@ -192,7 +192,7 @@ class RouterTests: XCTestCase {
a.add(b)
let server = HTTP.Server(delegate: a)
let server = HTTP.Server(delegate: a, reusePort: true)
server.listen(host: "0.0.0.0", port: 3000)
waitForExpectations(timeout: 1) { error in
server.stop()

View File

@ -11,8 +11,11 @@ class ConnectionTests: XCTestCase {
let receiveMessageExpectation = expectation(description: "Did not receive any message.")
let completeWriteExpectation = expectation(description: "Did not complete write.")
let server = try Server()
try server.bind(host: "localhost", port: 50000)
let server = try Server(reusePort: true)
while (try? server.bind(host: "localhost", port: 50000)) == nil {
RunLoop.current.run(until: Date(timeIntervalSinceNow: 1))
}
let connections = server.listen()
connections.startWithNext { connection in
@ -41,7 +44,7 @@ class ConnectionTests: XCTestCase {
strings.start()
}
let socket = try Socket()
let socket = try Socket(reusePort: true)
let connect = socket.connect(host: "localhost", port: 50000)
connect.onCompleted {
let buffer = Data("This is a test".utf8)
@ -67,8 +70,8 @@ class ConnectionTests: XCTestCase {
func testResourceCleanUp() {
// Create two servers consecutively
// testClientServer()
// testClientServer()
try! testClientServer()
try! testClientServer()
}
}