Add support for UDP_SEGMENT (#2372)

Motivation:

On Linux, the UDP_SEGMENT socket option allows for large buffers to be
written to the kernel and segmented by the kernel (or in some cases the
NIC) into smaller datagrams. This can substantially decrease the number
of syscalls.

This can be set on a per message basis on a per socket basis. This
change adds per socket configuration.

Modifications:

- Add a CNIOLinux function to check whether UDP_SEGMENT is supported on
  that particular Linux.
- Add a helper to `System` to check whether UDP_SEGMENT is supported on
  the current platform.
- On Linux only:
  - add the udp socket option level
  - add the udp_segment socket option
- Add the `DatagramSegmentSize` channel option.
- Get/Set the option in `DatagramChannel`

Results:

UDP GSO is supported on Linux.

Co-authored-by: Cory Benfield <lukasa@apple.com>
This commit is contained in:
George Barnett 2023-02-27 13:18:58 +00:00 committed by GitHub
parent 81e5d344e4
commit 19b878f461
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 312 additions and 17 deletions

View File

@ -25,6 +25,7 @@
#include <errno.h> #include <errno.h>
#include <pthread.h> #include <pthread.h>
#include <netinet/ip.h> #include <netinet/ip.h>
#include <netinet/udp.h>
#include "liburing_nio.h" #include "liburing_nio.h"
#if __has_include(<linux/mptcp.h>) #if __has_include(<linux/mptcp.h>)
@ -105,5 +106,7 @@ size_t CNIOLinux_CMSG_SPACE(size_t);
// awkward time_T pain // awkward time_T pain
extern const int CNIOLinux_SO_TIMESTAMP; extern const int CNIOLinux_SO_TIMESTAMP;
extern const int CNIOLinux_SO_RCVTIMEO; extern const int CNIOLinux_SO_RCVTIMEO;
int CNIOLinux_supports_udp_segment();
#endif #endif
#endif #endif

View File

@ -150,4 +150,21 @@ size_t CNIOLinux_CMSG_SPACE(size_t payloadSizeBytes) {
const int CNIOLinux_SO_TIMESTAMP = SO_TIMESTAMP; const int CNIOLinux_SO_TIMESTAMP = SO_TIMESTAMP;
const int CNIOLinux_SO_RCVTIMEO = SO_RCVTIMEO; const int CNIOLinux_SO_RCVTIMEO = SO_RCVTIMEO;
int CNIOLinux_supports_udp_segment() {
#ifndef UDP_SEGMENT
return -1;
#else
int fd = socket(AF_INET, SOCK_DGRAM, 0);
if (fd == -1) {
return -1;
}
int gso_size = 512;
int rc = setsockopt(fd, IPPROTO_UDP, UDP_SEGMENT, &gso_size, sizeof(gso_size));
close(fd);
return rc;
#endif
}
#endif #endif

View File

@ -245,6 +245,15 @@ extension NIOBSDSocket.OptionLevel {
/// Socket options that apply to all sockets. /// Socket options that apply to all sockets.
public static let socket: NIOBSDSocket.OptionLevel = public static let socket: NIOBSDSocket.OptionLevel =
NIOBSDSocket.OptionLevel(rawValue: SOL_SOCKET) NIOBSDSocket.OptionLevel(rawValue: SOL_SOCKET)
/// Socket options that apply only to UDP sockets.
#if os(Linux) || os(Android)
public static let udp: NIOBSDSocket.OptionLevel =
NIOBSDSocket.OptionLevel(rawValue: CInt(IPPROTO_UDP))
#else
public static let udp: NIOBSDSocket.OptionLevel =
NIOBSDSocket.OptionLevel(rawValue: IPPROTO_UDP)
#endif
} }
// IPv4 Options // IPv4 Options
@ -328,6 +337,15 @@ extension NIOBSDSocket.Option {
} }
#endif #endif
#if os(Linux)
extension NIOBSDSocket.Option {
// Note: UDP_SEGMENT is not available on all Linux platforms so the value is hardcoded.
/// Use UDP segmentation offload (UDP_SEGMENT, or 'GSO'). Only available on Linux.
public static let udp_segment = NIOBSDSocket.Option(rawValue: 103)
}
#endif
// MPTCP options // MPTCP options
// //
// These values are hardcoded as they're fairly new, and not available in all // These values are hardcoded as they're fairly new, and not available in all

View File

@ -191,6 +191,19 @@ extension ChannelOptions {
public init() { } public init() { }
} }
/// ``DatagramSegmentSize`` controls the 'UDP_SEGMENT' socket option (sometimes reffered to as 'GSO') which allows for
/// large writes to be sent via `sendmsg` and `sendmmsg` and segmented into separate datagrams by the kernel (or in some cases, the NIC).
/// The size of segments the large write is split into is controlled by the value of this option (note that writes do not need to be a
/// multiple of this option).
///
/// This option is currently only supported on Linux (4.18 and newer). Support can be checked using ``System/supportsUDPSegmentationOffload``.
///
/// Setting this option to zero disables segmentation offload.
public struct DatagramSegmentSize: ChannelOption, Sendable {
public typealias Value = CInt
public init() { }
}
/// When set to true IP level ECN information will be reported through `AddressedEnvelope.Metadata` /// When set to true IP level ECN information will be reported through `AddressedEnvelope.Metadata`
public struct ExplicitCongestionNotificationsOption: ChannelOption, Sendable { public struct ExplicitCongestionNotificationsOption: ChannelOption, Sendable {
public typealias Value = Bool public typealias Value = Bool
@ -317,6 +330,9 @@ public struct ChannelOptions {
/// - seealso: `DatagramVectorReadMessageCountOption` /// - seealso: `DatagramVectorReadMessageCountOption`
public static let datagramVectorReadMessageCount = Types.DatagramVectorReadMessageCountOption() public static let datagramVectorReadMessageCount = Types.DatagramVectorReadMessageCountOption()
/// - seealso: `DatagramSegmentSize`
public static let datagramSegmentSize = Types.DatagramSegmentSize()
/// - seealso: `ExplicitCongestionNotificationsOption` /// - seealso: `ExplicitCongestionNotificationsOption`
public static let explicitCongestionNotification = Types.ExplicitCongestionNotificationsOption() public static let explicitCongestionNotification = Types.ExplicitCongestionNotificationsOption()

View File

@ -202,3 +202,16 @@ public enum System {
} }
} }
extension System {
#if os(Linux)
/// Returns true if the platform supports 'UDP_SEGMENT' (GSO).
///
/// The option can be enabled by setting the ``DatagramSegmentSize`` channel option.
public static let supportsUDPSegmentationOffload: Bool = CNIOLinux_supports_udp_segment() == 0
#else
/// Returns true if the platform supports 'UDP_SEGMENT' (GSO).
///
/// The option can be enabled by setting the ``DatagramSegmentSize`` channel option.
public static let supportsUDPSegmentationOffload: Bool = false
#endif
}

View File

@ -267,4 +267,35 @@ extension NIOBSDSocketControlMessage {
} }
} }
extension NIOBSDSocket {
static func setUDPSegmentSize(_ segmentSize: CInt, socket: NIOBSDSocket.Handle) throws {
#if os(Linux)
var segmentSize = segmentSize
try Self.setsockopt(socket: socket,
level: .udp,
option_name: .udp_segment,
option_value: &segmentSize,
option_len: socklen_t(MemoryLayout<CInt>.size))
#else
throw ChannelError.operationUnsupported
#endif
}
static func getUDPSegmentSize(socket: NIOBSDSocket.Handle) throws -> CInt {
#if os(Linux)
var segmentSize: CInt = 0
var optionLength = socklen_t(MemoryLayout<CInt>.size)
try withUnsafeMutablePointer(to: &segmentSize) { segmentSizeBytes in
try Self.getsockopt(socket: socket,
level: .udp,
option_name: .udp_segment,
option_value: segmentSizeBytes,
option_len: &optionLength)
}
return segmentSize
#else
throw ChannelError.operationUnsupported
#endif
}
}
#endif #endif

View File

@ -551,4 +551,14 @@ extension NIOBSDSocketControlMessage {
return CNIOWindows_CMSG_SPACE(payloadSize) return CNIOWindows_CMSG_SPACE(payloadSize)
} }
} }
extension NIOBSDSocket {
static func setUDPSegmentSize(_ segmentSize: CInt, socket: NIOBSDSocket.Handle) throws {
throw ChannelError.operationUnsupported
}
static func getUDPSegmentSize(socket: NIOBSDSocket.Handle) throws -> CInt {
throw ChannelError.operationUnsupported
}
}
#endif #endif

View File

@ -324,4 +324,18 @@ typealias IOVector = iovec
try NIOBSDSocket.shutdown(socket: $0, how: how) try NIOBSDSocket.shutdown(socket: $0, how: how)
} }
} }
/// Sets the value for the 'UDP_SEGMENT' socket option.
func setUDPSegmentSize(_ segmentSize: CInt) throws {
try self.withUnsafeHandle {
try NIOBSDSocket.setUDPSegmentSize(segmentSize, socket: $0)
}
}
/// Returns the value of the 'UDP_SEGMENT' socket option.
func getUDPSegmentSize() throws -> CInt {
return try self.withUnsafeHandle {
try NIOBSDSocket.getUDPSegmentSize(socket: $0)
}
}
} }

View File

@ -508,7 +508,12 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
// Receiving packet info is only supported for IP // Receiving packet info is only supported for IP
throw ChannelError.operationUnsupported throw ChannelError.operationUnsupported
} }
break case _ as ChannelOptions.Types.DatagramSegmentSize:
guard System.supportsUDPSegmentationOffload else {
throw ChannelError.operationUnsupported
}
let segmentSize = value as! ChannelOptions.Types.DatagramSegmentSize.Value
try self.socket.setUDPSegmentSize(segmentSize)
default: default:
try super.setOption0(option, value: value) try super.setOption0(option, value: value)
} }
@ -552,6 +557,11 @@ final class DatagramChannel: BaseSocketChannel<Socket> {
// Receiving packet info is only supported for IP // Receiving packet info is only supported for IP
throw ChannelError.operationUnsupported throw ChannelError.operationUnsupported
} }
case _ as ChannelOptions.Types.DatagramSegmentSize:
guard System.supportsUDPSegmentationOffload else {
throw ChannelError.operationUnsupported
}
return try self.socket.getUDPSegmentSize() as! Option.Value
default: default:
return try super.getOption0(option) return try super.getOption0(option)
} }

View File

@ -75,6 +75,13 @@ extension DatagramChannelTests {
("testConnectingSocketAfterFlushingExistingMessages", testConnectingSocketAfterFlushingExistingMessages), ("testConnectingSocketAfterFlushingExistingMessages", testConnectingSocketAfterFlushingExistingMessages),
("testConnectingSocketFailsBufferedWrites", testConnectingSocketFailsBufferedWrites), ("testConnectingSocketFailsBufferedWrites", testConnectingSocketFailsBufferedWrites),
("testReconnectingSocketFailsBufferedWrites", testReconnectingSocketFailsBufferedWrites), ("testReconnectingSocketFailsBufferedWrites", testReconnectingSocketFailsBufferedWrites),
("testGSOIsUnsupportedOnNonLinuxPlatforms", testGSOIsUnsupportedOnNonLinuxPlatforms),
("testSetGSOOption", testSetGSOOption),
("testGetGSOOption", testGetGSOOption),
("testLargeScalarWriteWithGSO", testLargeScalarWriteWithGSO),
("testLargeVectorWriteWithGSO", testLargeVectorWriteWithGSO),
("testWriteBufferAtGSOSegmentCountLimit", testWriteBufferAtGSOSegmentCountLimit),
("testWriteBufferAboveGSOSegmentCountLimitShouldError", testWriteBufferAboveGSOSegmentCountLimitShouldError),
] ]
} }
} }

View File

@ -1136,4 +1136,160 @@ class DatagramChannelTests: XCTestCase {
resultsIn: .success(()) resultsIn: .success(())
) )
} }
func testGSOIsUnsupportedOnNonLinuxPlatforms() throws {
#if !os(Linux)
XCTAssertFalse(System.supportsUDPSegmentationOffload)
#endif
}
func testSetGSOOption() throws {
let didSet = self.firstChannel.setOption(ChannelOptions.datagramSegmentSize, value: 1024)
if System.supportsUDPSegmentationOffload {
XCTAssertNoThrow(try didSet.wait())
} else {
XCTAssertThrowsError(try didSet.wait()) { error in
XCTAssertEqual(error as? ChannelError, .operationUnsupported)
}
}
}
func testGetGSOOption() throws {
let getOption = self.firstChannel.getOption(ChannelOptions.datagramSegmentSize)
if System.supportsUDPSegmentationOffload {
XCTAssertEqual(try getOption.wait(), 0) // not-set
} else {
XCTAssertThrowsError(try getOption.wait()) { error in
XCTAssertEqual(error as? ChannelError, .operationUnsupported)
}
}
}
func testLargeScalarWriteWithGSO() throws {
try XCTSkipUnless(System.supportsUDPSegmentationOffload, "UDP_SEGMENT (GSO) is not supported on this platform")
// We're going to enable GSO with a segment size of 1000, send one large buffer which
// contains ten 1000-byte segments. Each segment will contain the bytes corresponding to
// the index of the segment. We validate that the receiver receives 10 datagrams, each
// corresponding to one segment from the buffer.
let segmentSize: CInt = 1000
let segments = 10
// Enable GSO
let didSet = self.firstChannel.setOption(ChannelOptions.datagramSegmentSize, value: segmentSize)
XCTAssertNoThrow(try didSet.wait())
// Form a handful of segments
let buffers = (0..<segments).map { i in
ByteBuffer(repeating: UInt8(i), count: Int(segmentSize))
}
// Coalesce the segments into a single buffer.
var buffer = self.firstChannel.allocator.buffer(capacity: segments * Int(segmentSize))
for segment in buffers {
buffer.writeImmutableBuffer(segment)
}
for byte in UInt8(0) ..< UInt8(10) {
buffer.writeRepeatingByte(byte, count: Int(segmentSize))
}
// Write the single large buffer.
let writeData = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer)
XCTAssertNoThrow(try self.firstChannel.writeAndFlush(NIOAny(writeData)).wait())
// The receiver will receive separate segments.
let receivedBuffers = try self.secondChannel.waitForDatagrams(count: segments)
let receivedBytes = receivedBuffers.map { $0.data.readableBytes }.reduce(0, +)
XCTAssertEqual(Int(segmentSize) * segments, receivedBytes)
var unusedIndexes = Set(buffers.indices)
for envelope in receivedBuffers {
if let index = buffers.firstIndex(of: envelope.data) {
XCTAssertNotNil(unusedIndexes.remove(index))
} else {
XCTFail("No matching buffer")
}
}
}
func testLargeVectorWriteWithGSO() throws {
try XCTSkipUnless(System.supportsUDPSegmentationOffload, "UDP_SEGMENT (GSO) is not supported on this platform")
// Similar to the test above, but with multiple writes.
let segmentSize: CInt = 1000
let segments = 10
// Enable GSO
let didSet = self.firstChannel.setOption(ChannelOptions.datagramSegmentSize, value: segmentSize)
XCTAssertNoThrow(try didSet.wait())
// Form a handful of segments
let buffers = (0..<segments).map { i in
ByteBuffer(repeating: UInt8(i), count: Int(segmentSize))
}
// Coalesce the segments into a single buffer.
var buffer = self.firstChannel.allocator.buffer(capacity: segments * Int(segmentSize))
for segment in buffers {
buffer.writeImmutableBuffer(segment)
}
for byte in UInt8(0) ..< UInt8(10) {
buffer.writeRepeatingByte(byte, count: Int(segmentSize))
}
// Write the single large buffer.
let writeData = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer)
let write1 = self.firstChannel.write(NIOAny(writeData))
let write2 = self.firstChannel.write(NIOAny(writeData))
self.firstChannel.flush()
XCTAssertNoThrow(try write1.wait())
XCTAssertNoThrow(try write2.wait())
// The receiver will receive separate segments.
let receivedBuffers = try self.secondChannel.waitForDatagrams(count: segments)
let receivedBytes = receivedBuffers.map { $0.data.readableBytes }.reduce(0, +)
XCTAssertEqual(Int(segmentSize) * segments, receivedBytes)
let keysWithValues = buffers.indices.map { index in (index, 2) }
var indexCounts = Dictionary(uniqueKeysWithValues: keysWithValues)
for envelope in receivedBuffers {
if let index = buffers.firstIndex(of: envelope.data) {
indexCounts[index, default: 0] -= 1
XCTAssertGreaterThanOrEqual(indexCounts[index]!, 0)
} else {
XCTFail("No matching buffer")
}
}
}
func testWriteBufferAtGSOSegmentCountLimit() throws {
try XCTSkipUnless(System.supportsUDPSegmentationOffload, "UDP_SEGMENT (GSO) is not supported on this platform")
let segmentSize = 10
let didSet = self.firstChannel.setOption(ChannelOptions.datagramSegmentSize, value: CInt(segmentSize))
XCTAssertNoThrow(try didSet.wait())
let buffer = self.firstChannel.allocator.buffer(repeating: 1, count: segmentSize * 64)
let writeData = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer)
XCTAssertNoThrow(try self.firstChannel.writeAndFlush(NIOAny(writeData)).wait())
let read = try self.secondChannel.waitForDatagrams(count: 64)
XCTAssertEqual(read.map { $0.data.readableBytes }.reduce(0, +), 64 * segmentSize)
}
func testWriteBufferAboveGSOSegmentCountLimitShouldError() throws {
try XCTSkipUnless(System.supportsUDPSegmentationOffload, "UDP_SEGMENT (GSO) is not supported on this platform")
let segmentSize = 10
let didSet = self.firstChannel.setOption(ChannelOptions.datagramSegmentSize, value: CInt(segmentSize))
XCTAssertNoThrow(try didSet.wait())
let buffer = self.firstChannel.allocator.buffer(repeating: 1, count: segmentSize * 65)
let writeData = AddressedEnvelope(remoteAddress: self.secondChannel.localAddress!, data: buffer)
// The kernel limits messages to a maximum of 64 segments; any more should result in an error.
XCTAssertThrowsError(try self.firstChannel.writeAndFlush(NIOAny(writeData)).wait()) {
XCTAssert($0 is IOError)
}
}
} }