Plumb metadata through PendingDatagramWritesManager. (#1593)
Motivation: ECN information is held in metadata. As part of the path to writing ECN to the network the required state needs to pass through PendingDatagramWritesManager. Modifications: Add an extra metadata field to PendingDatagramWrite and ensure this data is passed to the correct write method. Result: Nothing visible - just preparation for actual ECN send.
This commit is contained in:
parent
723ff6acf1
commit
1e2d2b997b
|
@ -17,6 +17,7 @@ private struct PendingDatagramWrite {
|
|||
var data: ByteBuffer
|
||||
var promise: Optional<EventLoopPromise<Void>>
|
||||
let address: SocketAddress
|
||||
var metadata: AddressedEnvelope<ByteBuffer>.Metadata?
|
||||
|
||||
/// A helper function that copies the underlying sockaddr structure into temporary storage,
|
||||
/// and then returns the length.
|
||||
|
@ -407,7 +408,10 @@ final class PendingDatagramWritesManager: PendingWritesManager {
|
|||
/// - result: If the `Channel` is still writable after adding the write of `data`.
|
||||
func add(envelope: AddressedEnvelope<ByteBuffer>, promise: EventLoopPromise<Void>?) -> Bool {
|
||||
assert(self.isOpen)
|
||||
self.state.append(.init(data: envelope.data, promise: promise, address: envelope.remoteAddress))
|
||||
self.state.append(.init(data: envelope.data,
|
||||
promise: promise,
|
||||
address: envelope.remoteAddress,
|
||||
metadata: envelope.metadata))
|
||||
|
||||
if self.state.bytes > waterMark.high && channelWritabilityFlag.compareAndExchange(expected: true, desired: false) {
|
||||
// Returns false to signal the Channel became non-writable and we need to notify the user
|
||||
|
@ -428,12 +432,12 @@ final class PendingDatagramWritesManager: PendingWritesManager {
|
|||
/// - scalarWriteOperation: An operation that writes a single, contiguous array of bytes (usually `sendto`).
|
||||
/// - vectorWriteOperation: An operation that writes multiple contiguous arrays of bytes (usually `sendmmsg`).
|
||||
/// - returns: The `WriteResult` and whether the `Channel` is now writable.
|
||||
func triggerAppropriateWriteOperations(scalarWriteOperation: (UnsafeRawBufferPointer, UnsafePointer<sockaddr>, socklen_t) throws -> IOResult<Int>,
|
||||
func triggerAppropriateWriteOperations(scalarWriteOperation: (UnsafeRawBufferPointer, UnsafePointer<sockaddr>, socklen_t, AddressedEnvelope<ByteBuffer>.Metadata?) throws -> IOResult<Int>,
|
||||
vectorWriteOperation: (UnsafeMutableBufferPointer<MMsgHdr>) throws -> IOResult<Int>) throws -> OverallWriteResult {
|
||||
return try self.triggerWriteOperations { writeMechanism in
|
||||
switch writeMechanism {
|
||||
case .scalarBufferWrite:
|
||||
return try triggerScalarBufferWrite(scalarWriteOperation: { try scalarWriteOperation($0, $1, $2) })
|
||||
return try triggerScalarBufferWrite(scalarWriteOperation: { try scalarWriteOperation($0, $1, $2, $3) })
|
||||
case .vectorBufferWrite:
|
||||
do {
|
||||
return try triggerVectorBufferWrite(vectorWriteOperation: { try vectorWriteOperation($0) })
|
||||
|
@ -444,7 +448,7 @@ final class PendingDatagramWritesManager: PendingWritesManager {
|
|||
throw error
|
||||
}
|
||||
|
||||
return try triggerScalarBufferWrite(scalarWriteOperation: { try scalarWriteOperation($0, $1, $2) })
|
||||
return try triggerScalarBufferWrite(scalarWriteOperation: { try scalarWriteOperation($0, $1, $2, $3) })
|
||||
}
|
||||
case .scalarFileWrite:
|
||||
preconditionFailure("PendingDatagramWritesManager was handed a file write")
|
||||
|
@ -498,13 +502,15 @@ final class PendingDatagramWritesManager: PendingWritesManager {
|
|||
///
|
||||
/// - parameters:
|
||||
/// - scalarWriteOperation: An operation that writes a single, contiguous array of bytes (usually `sendto`).
|
||||
private func triggerScalarBufferWrite(scalarWriteOperation: (UnsafeRawBufferPointer, UnsafePointer<sockaddr>, socklen_t) throws -> IOResult<Int>) rethrows -> OneWriteOperationResult {
|
||||
private func triggerScalarBufferWrite(scalarWriteOperation: (UnsafeRawBufferPointer, UnsafePointer<sockaddr>, socklen_t, AddressedEnvelope<ByteBuffer>.Metadata?) throws -> IOResult<Int>) rethrows -> OneWriteOperationResult {
|
||||
assert(self.state.isFlushPending && self.isOpen && !self.state.isEmpty,
|
||||
"illegal state for scalar datagram write operation: flushPending: \(self.state.isFlushPending), isOpen: \(self.isOpen), empty: \(self.state.isEmpty)")
|
||||
let pending = self.state.nextWrite!
|
||||
do {
|
||||
let writeResult = try pending.address.withSockAddr { (addrPtr, addrSize) in
|
||||
try pending.data.withUnsafeReadableBytes { try scalarWriteOperation($0, addrPtr, socklen_t(addrSize)) }
|
||||
try pending.data.withUnsafeReadableBytes {
|
||||
try scalarWriteOperation($0, addrPtr, socklen_t(addrSize), pending.metadata)
|
||||
}
|
||||
}
|
||||
return self.didWrite(writeResult, messages: nil)
|
||||
} catch {
|
||||
|
|
|
@ -617,7 +617,7 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
|
|||
|
||||
override func writeToSocket() throws -> OverallWriteResult {
|
||||
let result = try self.pendingWrites.triggerAppropriateWriteOperations(
|
||||
scalarWriteOperation: { (ptr, destinationPtr, destinationSize) in
|
||||
scalarWriteOperation: { (ptr, destinationPtr, destinationSize, metadata) in
|
||||
guard ptr.count > 0 else {
|
||||
// No need to call write if the buffer is empty.
|
||||
return .processed(0)
|
||||
|
|
|
@ -111,7 +111,7 @@ class PendingDatagramWritesManagerTests: XCTestCase {
|
|||
var result: OverallWriteResult? = nil
|
||||
|
||||
do {
|
||||
let r = try pwm.triggerAppropriateWriteOperations(scalarWriteOperation: { (buf, addr, len) in
|
||||
let r = try pwm.triggerAppropriateWriteOperations(scalarWriteOperation: { (buf, addr, len, metadata) in
|
||||
defer {
|
||||
singleState += 1
|
||||
everythingState += 1
|
||||
|
|
Loading…
Reference in New Issue