Reorder ‘channel active’ calls to the same order as `_ChannelInboundHandler` and their likely chronological order. (#953)
Motivation: When first viewing the example classes, coming to the ‘read’ method first, leaves the subject unclear as to what the method is ‘reading’. It is preferable to view something being sent first, and then to view the reading of the response. It also matches the call order against the protocol making it a little easier for those unfamiliar with the protocol to see which methods have been implemented. Modifications: Moved channel active calls to be top of the class. Despite the diff there are no actual code modifications. UDP Client changed to indent using spaces to match rest of project. Incidental change. Result: The examples are slighter clearer to read, particularly for newcomers to swift-no as the calls are in a logical chronological order.
This commit is contained in:
parent
06649bb8c7
commit
3395d39731
|
@ -54,6 +54,31 @@ final class ChatHandler: ChannelInboundHandler {
|
|||
// All access to channels is guarded by channelsSyncQueue.
|
||||
private let channelsSyncQueue = DispatchQueue(label: "channelsQueue")
|
||||
private var channels: [ObjectIdentifier: Channel] = [:]
|
||||
|
||||
public func channelActive(context: ChannelHandlerContext) {
|
||||
let remoteAddress = context.remoteAddress!
|
||||
let channel = context.channel
|
||||
self.channelsSyncQueue.async {
|
||||
// broadcast the message to all the connected clients except the one that just became active.
|
||||
self.writeToAll(channels: self.channels, allocator: channel.allocator, message: "(ChatServer) - New client connected with address: \(remoteAddress)\n")
|
||||
|
||||
self.channels[ObjectIdentifier(channel)] = channel
|
||||
}
|
||||
|
||||
var buffer = channel.allocator.buffer(capacity: 64)
|
||||
buffer.writeString("(ChatServer) - Welcome to: \(context.localAddress!)\n")
|
||||
context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)
|
||||
}
|
||||
|
||||
public func channelInactive(context: ChannelHandlerContext) {
|
||||
let channel = context.channel
|
||||
self.channelsSyncQueue.async {
|
||||
if self.channels.removeValue(forKey: ObjectIdentifier(channel)) != nil {
|
||||
// Broadcast the message to all the connected clients except the one that just was disconnected.
|
||||
self.writeToAll(channels: self.channels, allocator: channel.allocator, message: "(ChatServer) - Client disconnected\n")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
||||
let id = ObjectIdentifier(context.channel)
|
||||
|
@ -77,31 +102,6 @@ final class ChatHandler: ChannelInboundHandler {
|
|||
context.close(promise: nil)
|
||||
}
|
||||
|
||||
public func channelActive(context: ChannelHandlerContext) {
|
||||
let remoteAddress = context.remoteAddress!
|
||||
let channel = context.channel
|
||||
self.channelsSyncQueue.async {
|
||||
// broadcast the message to all the connected clients except the one that just became active.
|
||||
self.writeToAll(channels: self.channels, allocator: channel.allocator, message: "(ChatServer) - New client connected with address: \(remoteAddress)\n")
|
||||
|
||||
self.channels[ObjectIdentifier(channel)] = channel
|
||||
}
|
||||
|
||||
var buffer = channel.allocator.buffer(capacity: 64)
|
||||
buffer.writeString("(ChatServer) - Welcome to: \(context.localAddress!)\n")
|
||||
context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)
|
||||
}
|
||||
|
||||
public func channelInactive(context: ChannelHandlerContext) {
|
||||
let channel = context.channel
|
||||
self.channelsSyncQueue.async {
|
||||
if self.channels.removeValue(forKey: ObjectIdentifier(channel)) != nil {
|
||||
// Broadcast the message to all the connected clients except the one that just was disconnected.
|
||||
self.writeToAll(channels: self.channels, allocator: channel.allocator, message: "(ChatServer) - Client disconnected\n")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private func writeToAll(channels: [ObjectIdentifier: Channel], allocator: ByteBufferAllocator, message: String) {
|
||||
var buffer = allocator.buffer(capacity: message.utf8.count)
|
||||
buffer.writeString(message)
|
||||
|
|
|
@ -20,6 +20,16 @@ private final class EchoHandler: ChannelInboundHandler {
|
|||
public typealias InboundIn = ByteBuffer
|
||||
public typealias OutboundOut = ByteBuffer
|
||||
private var numBytes = 0
|
||||
|
||||
public func channelActive(context: ChannelHandlerContext) {
|
||||
print("Client connected to \(context.remoteAddress!)")
|
||||
|
||||
// We are connected. It's time to send the message to the server to initialize the ping-pong sequence.
|
||||
var buffer = context.channel.allocator.buffer(capacity: line.utf8.count)
|
||||
buffer.writeString(line)
|
||||
self.numBytes = buffer.readableBytes
|
||||
context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)
|
||||
}
|
||||
|
||||
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
||||
var byteBuffer = self.unwrapInboundIn(data)
|
||||
|
@ -42,16 +52,6 @@ private final class EchoHandler: ChannelInboundHandler {
|
|||
// reduce allocations.
|
||||
context.close(promise: nil)
|
||||
}
|
||||
|
||||
public func channelActive(context: ChannelHandlerContext) {
|
||||
print("Client connected to \(context.remoteAddress!)")
|
||||
|
||||
// We are connected. It's time to send the message to the server to initialize the ping-pong sequence.
|
||||
var buffer = context.channel.allocator.buffer(capacity: line.utf8.count)
|
||||
buffer.writeString(line)
|
||||
self.numBytes = buffer.readableBytes
|
||||
context.writeAndFlush(self.wrapOutboundOut(buffer), promise: nil)
|
||||
}
|
||||
}
|
||||
|
||||
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||
|
|
|
@ -20,6 +20,34 @@ let line = readLine(strippingNewline: true)!
|
|||
private final class HTTPEchoHandler: ChannelInboundHandler {
|
||||
public typealias InboundIn = HTTPClientResponsePart
|
||||
public typealias OutboundOut = HTTPClientRequestPart
|
||||
|
||||
public func channelActive(context: ChannelHandlerContext) {
|
||||
print("Client connected to \(context.remoteAddress!)")
|
||||
|
||||
// We are connected. It's time to send the message to the server to initialize the ping-pong sequence.
|
||||
|
||||
var buffer = context.channel.allocator.buffer(capacity: line.utf8.count)
|
||||
buffer.writeString(line)
|
||||
|
||||
var headers = HTTPHeaders()
|
||||
headers.add(name: "Content-Type", value: "text/plain; charset=utf-8")
|
||||
headers.add(name: "Content-Length", value: "\(buffer.readableBytes)")
|
||||
|
||||
// This sample only sends an echo request.
|
||||
// The sample server has more functionality which can be easily tested by playing with the URI.
|
||||
// For example, try "/dynamic/count-to-ten" or "/dynamic/client-ip"
|
||||
|
||||
let requestHead = HTTPRequestHead(version: HTTPVersion(major: 1, minor: 1),
|
||||
method: .GET,
|
||||
uri: "/dynamic/echo",
|
||||
headers: headers)
|
||||
|
||||
context.write(self.wrapOutboundOut(.head(requestHead)), promise: nil)
|
||||
|
||||
context.write(self.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: nil)
|
||||
|
||||
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
|
||||
}
|
||||
|
||||
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
||||
|
||||
|
@ -47,34 +75,6 @@ private final class HTTPEchoHandler: ChannelInboundHandler {
|
|||
// reduce allocations.
|
||||
context.close(promise: nil)
|
||||
}
|
||||
|
||||
public func channelActive(context: ChannelHandlerContext) {
|
||||
print("Client connected to \(context.remoteAddress!)")
|
||||
|
||||
// We are connected. It's time to send the message to the server to initialize the ping-pong sequence.
|
||||
|
||||
var buffer = context.channel.allocator.buffer(capacity: line.utf8.count)
|
||||
buffer.writeString(line)
|
||||
|
||||
var headers = HTTPHeaders()
|
||||
headers.add(name: "Content-Type", value: "text/plain; charset=utf-8")
|
||||
headers.add(name: "Content-Length", value: "\(buffer.readableBytes)")
|
||||
|
||||
// This sample only sends an echo request.
|
||||
// The sample server has more functionality which can be easily tested by playing with the URI.
|
||||
// For example, try "/dynamic/count-to-ten" or "/dynamic/client-ip"
|
||||
|
||||
let requestHead = HTTPRequestHead(version: HTTPVersion(major: 1, minor: 1),
|
||||
method: .GET,
|
||||
uri: "/dynamic/echo",
|
||||
headers: headers)
|
||||
|
||||
context.write(self.wrapOutboundOut(.head(requestHead)), promise: nil)
|
||||
|
||||
context.write(self.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: nil)
|
||||
|
||||
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
|
||||
}
|
||||
}
|
||||
|
||||
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||
|
|
|
@ -17,62 +17,62 @@ print("Please enter line to send to the server")
|
|||
let line = readLine(strippingNewline: true)!
|
||||
|
||||
private final class EchoHandler: ChannelInboundHandler {
|
||||
public typealias InboundIn = AddressedEnvelope<ByteBuffer>
|
||||
public typealias OutboundOut = AddressedEnvelope<ByteBuffer>
|
||||
private var numBytes = 0
|
||||
|
||||
private let remoteAddressInitializer: () throws -> SocketAddress
|
||||
|
||||
init(remoteAddressInitializer: @escaping () throws -> SocketAddress) {
|
||||
self.remoteAddressInitializer = remoteAddressInitializer
|
||||
}
|
||||
|
||||
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
||||
let envelope = self.unwrapInboundIn(data)
|
||||
var byteBuffer = envelope.data
|
||||
|
||||
self.numBytes -= byteBuffer.readableBytes
|
||||
|
||||
if self.numBytes <= 0 {
|
||||
if let string = byteBuffer.readString(length: byteBuffer.readableBytes) {
|
||||
print("Received: '\(string)' back from the server, closing channel.")
|
||||
} else {
|
||||
print("Received the line back from the server, closing channel.")
|
||||
}
|
||||
context.close(promise: nil)
|
||||
}
|
||||
}
|
||||
|
||||
public func errorCaught(context: ChannelHandlerContext, error: Error) {
|
||||
print("error: ", error)
|
||||
|
||||
// As we are not really interested getting notified on success or failure we just pass nil as promise to
|
||||
// reduce allocations.
|
||||
context.close(promise: nil)
|
||||
}
|
||||
|
||||
public func channelActive(context: ChannelHandlerContext) {
|
||||
|
||||
do {
|
||||
// Channel is available. It's time to send the message to the server to initialize the ping-pong sequence.
|
||||
|
||||
// Get the server address.
|
||||
let remoteAddress = try self.remoteAddressInitializer()
|
||||
|
||||
// Set the transmission data.
|
||||
var buffer = context.channel.allocator.buffer(capacity: line.utf8.count)
|
||||
buffer.writeString(line)
|
||||
self.numBytes = buffer.readableBytes
|
||||
|
||||
// Forward the data.
|
||||
let envolope = AddressedEnvelope<ByteBuffer>(remoteAddress: remoteAddress, data: buffer)
|
||||
|
||||
context.writeAndFlush(self.wrapOutboundOut(envolope), promise: nil)
|
||||
|
||||
} catch {
|
||||
print("Could not resolve remote address")
|
||||
}
|
||||
}
|
||||
public typealias InboundIn = AddressedEnvelope<ByteBuffer>
|
||||
public typealias OutboundOut = AddressedEnvelope<ByteBuffer>
|
||||
private var numBytes = 0
|
||||
|
||||
private let remoteAddressInitializer: () throws -> SocketAddress
|
||||
|
||||
init(remoteAddressInitializer: @escaping () throws -> SocketAddress) {
|
||||
self.remoteAddressInitializer = remoteAddressInitializer
|
||||
}
|
||||
|
||||
public func channelActive(context: ChannelHandlerContext) {
|
||||
|
||||
do {
|
||||
// Channel is available. It's time to send the message to the server to initialize the ping-pong sequence.
|
||||
|
||||
// Get the server address.
|
||||
let remoteAddress = try self.remoteAddressInitializer()
|
||||
|
||||
// Set the transmission data.
|
||||
var buffer = context.channel.allocator.buffer(capacity: line.utf8.count)
|
||||
buffer.writeString(line)
|
||||
self.numBytes = buffer.readableBytes
|
||||
|
||||
// Forward the data.
|
||||
let envolope = AddressedEnvelope<ByteBuffer>(remoteAddress: remoteAddress, data: buffer)
|
||||
|
||||
context.writeAndFlush(self.wrapOutboundOut(envolope), promise: nil)
|
||||
|
||||
} catch {
|
||||
print("Could not resolve remote address")
|
||||
}
|
||||
}
|
||||
|
||||
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
|
||||
let envelope = self.unwrapInboundIn(data)
|
||||
var byteBuffer = envelope.data
|
||||
|
||||
self.numBytes -= byteBuffer.readableBytes
|
||||
|
||||
if self.numBytes <= 0 {
|
||||
if let string = byteBuffer.readString(length: byteBuffer.readableBytes) {
|
||||
print("Received: '\(string)' back from the server, closing channel.")
|
||||
} else {
|
||||
print("Received the line back from the server, closing channel.")
|
||||
}
|
||||
context.close(promise: nil)
|
||||
}
|
||||
}
|
||||
|
||||
public func errorCaught(context: ChannelHandlerContext, error: Error) {
|
||||
print("error: ", error)
|
||||
|
||||
// As we are not really interested getting notified on success or failure we just pass nil as promise to
|
||||
// reduce allocations.
|
||||
context.close(promise: nil)
|
||||
}
|
||||
}
|
||||
|
||||
// First argument is the program path
|
||||
|
@ -88,55 +88,55 @@ let defaultServerPort: Int = 9999
|
|||
let defaultListeningPort: Int = 8888
|
||||
|
||||
enum ConnectTo {
|
||||
case ip(host: String, sendPort: Int, listeningPort: Int)
|
||||
case unixDomainSocket(sendPath: String, listeningPath: String)
|
||||
case ip(host: String, sendPort: Int, listeningPort: Int)
|
||||
case unixDomainSocket(sendPath: String, listeningPath: String)
|
||||
}
|
||||
|
||||
let connectTarget: ConnectTo
|
||||
|
||||
switch (arg1, arg1.flatMap(Int.init), arg2, arg2.flatMap(Int.init), arg3.flatMap(Int.init)) {
|
||||
case (.some(let h), .none , _, .some(let sp), .some(let lp)):
|
||||
/* We received three arguments (String Int Int), let's interpret that as a server host with a server port and a local listening port */
|
||||
connectTarget = .ip(host: h, sendPort: sp, listeningPort: lp)
|
||||
/* We received three arguments (String Int Int), let's interpret that as a server host with a server port and a local listening port */
|
||||
connectTarget = .ip(host: h, sendPort: sp, listeningPort: lp)
|
||||
case (.some(let sp), .none , .some(let lp), .none, _):
|
||||
/* We received two arguments (String String), let's interpret that as sending socket path and listening socket path */
|
||||
assert(sp != lp, "The sending and listening sockets should differ.")
|
||||
connectTarget = .unixDomainSocket(sendPath: sp, listeningPath: lp)
|
||||
/* We received two arguments (String String), let's interpret that as sending socket path and listening socket path */
|
||||
assert(sp != lp, "The sending and listening sockets should differ.")
|
||||
connectTarget = .unixDomainSocket(sendPath: sp, listeningPath: lp)
|
||||
case (_, .some(let sp), _, .some(let lp), _):
|
||||
/* We received two argument (Int Int), let's interpret that as the server port and a listening port on the default host. */
|
||||
connectTarget = .ip(host: defaultHost, sendPort: sp, listeningPort: lp)
|
||||
/* We received two argument (Int Int), let's interpret that as the server port and a listening port on the default host. */
|
||||
connectTarget = .ip(host: defaultHost, sendPort: sp, listeningPort: lp)
|
||||
default:
|
||||
connectTarget = .ip(host: defaultHost, sendPort: defaultServerPort, listeningPort: defaultListeningPort)
|
||||
connectTarget = .ip(host: defaultHost, sendPort: defaultServerPort, listeningPort: defaultListeningPort)
|
||||
}
|
||||
|
||||
let remoteAddress = { () -> SocketAddress in
|
||||
switch connectTarget {
|
||||
case .ip(let host, let sendPort, _):
|
||||
return try SocketAddress.makeAddressResolvingHost(host, port: sendPort)
|
||||
case .unixDomainSocket(let sendPath, _):
|
||||
return try SocketAddress(unixDomainSocketPath: sendPath)
|
||||
}
|
||||
switch connectTarget {
|
||||
case .ip(let host, let sendPort, _):
|
||||
return try SocketAddress.makeAddressResolvingHost(host, port: sendPort)
|
||||
case .unixDomainSocket(let sendPath, _):
|
||||
return try SocketAddress(unixDomainSocketPath: sendPath)
|
||||
}
|
||||
}
|
||||
|
||||
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||
let bootstrap = DatagramBootstrap(group: group)
|
||||
// Enable SO_REUSEADDR.
|
||||
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
|
||||
.channelInitializer { channel in
|
||||
channel.pipeline.addHandler(EchoHandler(remoteAddressInitializer: remoteAddress))
|
||||
// Enable SO_REUSEADDR.
|
||||
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
|
||||
.channelInitializer { channel in
|
||||
channel.pipeline.addHandler(EchoHandler(remoteAddressInitializer: remoteAddress))
|
||||
}
|
||||
defer {
|
||||
try! group.syncShutdownGracefully()
|
||||
try! group.syncShutdownGracefully()
|
||||
}
|
||||
|
||||
let channel = try { () -> Channel in
|
||||
switch connectTarget {
|
||||
case .ip(let host, _, let listeningPort):
|
||||
return try bootstrap.bind(host: host, port: listeningPort).wait()
|
||||
case .unixDomainSocket(_, let listeningPath):
|
||||
return try bootstrap.bind(unixDomainSocketPath: listeningPath).wait()
|
||||
}
|
||||
}()
|
||||
switch connectTarget {
|
||||
case .ip(let host, _, let listeningPort):
|
||||
return try bootstrap.bind(host: host, port: listeningPort).wait()
|
||||
case .unixDomainSocket(_, let listeningPath):
|
||||
return try bootstrap.bind(unixDomainSocketPath: listeningPath).wait()
|
||||
}
|
||||
}()
|
||||
|
||||
// Will be closed after we echo-ed back to the server.
|
||||
try channel.closeFuture.wait()
|
||||
|
|
Loading…
Reference in New Issue