Don't retain a task when all we want is a time (#2373)
Motivation: To know when we next need to wake up, we keep track of what the next deadline will be. This works great, but in order to keep track of this UInt64 we save off an entire ScheduledTask. This object is quite wide (6 pointers wide), and two of those pointers require ARC traffic, so doing this saving produces unnecessary overhead. Worse, saving this task plays poorly with task cancellation. If the saved task is cancelled, this has the effect of "retaining" that task until the next event loop tick. This is unlikely to produce catastrophic bugs in real programs, where the loop does tick, but it violates our tests which rigorously assume that we will always drop a task when it is cancelled. In specific manufactured cases it's possible to produce leaks of non-trivial duration. Modifications: - Wrote a weirdly complex test. - Moved the implementation of Task.readyIn to a method on NIODeadline - Saved a NIODeadline instead of a ScheduledTask Result: Minor performance improvement in the core event loop processing, minor correctness improvement.
This commit is contained in:
parent
5db1dfabb0
commit
9afaf801e5
|
@ -378,21 +378,14 @@ internal struct ScheduledTask {
|
||||||
let task: () -> Void
|
let task: () -> Void
|
||||||
private let failFn: (Error) ->()
|
private let failFn: (Error) ->()
|
||||||
@usableFromInline
|
@usableFromInline
|
||||||
internal let _readyTime: NIODeadline
|
internal let readyTime: NIODeadline
|
||||||
|
|
||||||
@usableFromInline
|
@usableFromInline
|
||||||
init(id: UInt64, _ task: @escaping () -> Void, _ failFn: @escaping (Error) -> Void, _ time: NIODeadline) {
|
init(id: UInt64, _ task: @escaping () -> Void, _ failFn: @escaping (Error) -> Void, _ time: NIODeadline) {
|
||||||
self.id = id
|
self.id = id
|
||||||
self.task = task
|
self.task = task
|
||||||
self.failFn = failFn
|
self.failFn = failFn
|
||||||
self._readyTime = time
|
self.readyTime = time
|
||||||
}
|
|
||||||
|
|
||||||
func readyIn(_ t: NIODeadline) -> TimeAmount {
|
|
||||||
if _readyTime < t {
|
|
||||||
return .nanoseconds(0)
|
|
||||||
}
|
|
||||||
return _readyTime - t
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func fail(_ error: Error) {
|
func fail(_ error: Error) {
|
||||||
|
@ -403,17 +396,17 @@ internal struct ScheduledTask {
|
||||||
extension ScheduledTask: CustomStringConvertible {
|
extension ScheduledTask: CustomStringConvertible {
|
||||||
@usableFromInline
|
@usableFromInline
|
||||||
var description: String {
|
var description: String {
|
||||||
return "ScheduledTask(readyTime: \(self._readyTime))"
|
return "ScheduledTask(readyTime: \(self.readyTime))"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
extension ScheduledTask: Comparable {
|
extension ScheduledTask: Comparable {
|
||||||
@usableFromInline
|
@usableFromInline
|
||||||
static func < (lhs: ScheduledTask, rhs: ScheduledTask) -> Bool {
|
static func < (lhs: ScheduledTask, rhs: ScheduledTask) -> Bool {
|
||||||
if lhs._readyTime == rhs._readyTime {
|
if lhs.readyTime == rhs.readyTime {
|
||||||
return lhs.id < rhs.id
|
return lhs.id < rhs.id
|
||||||
} else {
|
} else {
|
||||||
return lhs._readyTime < rhs._readyTime
|
return lhs.readyTime < rhs.readyTime
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -422,3 +415,12 @@ extension ScheduledTask: Comparable {
|
||||||
return lhs.id == rhs.id
|
return lhs.id == rhs.id
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
extension NIODeadline {
|
||||||
|
func readyIn(_ target: NIODeadline) -> TimeAmount {
|
||||||
|
if self < target {
|
||||||
|
return .nanoseconds(0)
|
||||||
|
}
|
||||||
|
return self - target
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -402,14 +402,14 @@ Further information:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private func currentSelectorStrategy(nextReadyTask: ScheduledTask?) -> SelectorStrategy {
|
private func currentSelectorStrategy(nextReadyDeadline: NIODeadline?) -> SelectorStrategy {
|
||||||
guard let sched = nextReadyTask else {
|
guard let deadline = nextReadyDeadline else {
|
||||||
// No tasks to handle so just block. If any tasks were added in the meantime wakeup(...) was called and so this
|
// No tasks to handle so just block. If any tasks were added in the meantime wakeup(...) was called and so this
|
||||||
// will directly unblock.
|
// will directly unblock.
|
||||||
return .block
|
return .block
|
||||||
}
|
}
|
||||||
|
|
||||||
let nextReady = sched.readyIn(.now())
|
let nextReady = deadline.readyIn(.now())
|
||||||
if nextReady <= .nanoseconds(0) {
|
if nextReady <= .nanoseconds(0) {
|
||||||
// Something is ready to be processed just do a non-blocking select of events.
|
// Something is ready to be processed just do a non-blocking select of events.
|
||||||
return .now
|
return .now
|
||||||
|
@ -449,7 +449,7 @@ Further information:
|
||||||
assert(self.internalState == .noLongerRunning, "illegal state: \(self.internalState)")
|
assert(self.internalState == .noLongerRunning, "illegal state: \(self.internalState)")
|
||||||
self.internalState = .exitingThread
|
self.internalState = .exitingThread
|
||||||
}
|
}
|
||||||
var nextReadyTask: ScheduledTask? = nil
|
var nextReadyDeadline: NIODeadline? = nil
|
||||||
self._tasksLock.withLock {
|
self._tasksLock.withLock {
|
||||||
if let firstTask = self._scheduledTasks.peek() {
|
if let firstTask = self._scheduledTasks.peek() {
|
||||||
// The reason this is necessary is a very interesting race:
|
// The reason this is necessary is a very interesting race:
|
||||||
|
@ -457,7 +457,7 @@ Further information:
|
||||||
// `EventLoop` reference _before_ the EL thread has entered the `run` function.
|
// `EventLoop` reference _before_ the EL thread has entered the `run` function.
|
||||||
// If that is the case, we need to schedule the first wakeup at the ready time for this task that was
|
// If that is the case, we need to schedule the first wakeup at the ready time for this task that was
|
||||||
// enqueued really early on, so let's do that :).
|
// enqueued really early on, so let's do that :).
|
||||||
nextReadyTask = firstTask
|
nextReadyDeadline = firstTask.readyTime
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
while self.internalState != .noLongerRunning && self.internalState != .exitingThread {
|
while self.internalState != .noLongerRunning && self.internalState != .exitingThread {
|
||||||
|
@ -465,7 +465,7 @@ Further information:
|
||||||
/* for macOS: in case any calls we make to Foundation put objects into an autoreleasepool */
|
/* for macOS: in case any calls we make to Foundation put objects into an autoreleasepool */
|
||||||
try withAutoReleasePool {
|
try withAutoReleasePool {
|
||||||
try self._selector.whenReady(
|
try self._selector.whenReady(
|
||||||
strategy: currentSelectorStrategy(nextReadyTask: nextReadyTask),
|
strategy: currentSelectorStrategy(nextReadyDeadline: nextReadyDeadline),
|
||||||
onLoopBegin: { self._tasksLock.withLock { () -> Void in self._pendingTaskPop = true } }
|
onLoopBegin: { self._tasksLock.withLock { () -> Void in self._pendingTaskPop = true } }
|
||||||
) { ev in
|
) { ev in
|
||||||
switch ev.registration.channel {
|
switch ev.registration.channel {
|
||||||
|
@ -498,17 +498,17 @@ Further information:
|
||||||
|
|
||||||
// Make a copy of the tasks so we can execute these while not holding the lock anymore
|
// Make a copy of the tasks so we can execute these while not holding the lock anymore
|
||||||
while tasksCopy.count < tasksCopy.capacity, let task = self._scheduledTasks.peek() {
|
while tasksCopy.count < tasksCopy.capacity, let task = self._scheduledTasks.peek() {
|
||||||
if task.readyIn(now) <= .nanoseconds(0) {
|
if task.readyTime.readyIn(now) <= .nanoseconds(0) {
|
||||||
self._scheduledTasks.pop()
|
self._scheduledTasks.pop()
|
||||||
self.tasksCopy.append(task)
|
self.tasksCopy.append(task)
|
||||||
} else {
|
} else {
|
||||||
nextReadyTask = task
|
nextReadyDeadline = task.readyTime
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Reset nextReadyTask to nil which means we will do a blocking select.
|
// Reset nextreadyDeadline to nil which means we will do a blocking select.
|
||||||
nextReadyTask = nil
|
nextReadyDeadline = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.tasksCopy.isEmpty {
|
if self.tasksCopy.isEmpty {
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
//
|
//
|
||||||
// This source file is part of the SwiftNIO open source project
|
// This source file is part of the SwiftNIO open source project
|
||||||
//
|
//
|
||||||
// Copyright (c) 2017-2022 Apple Inc. and the SwiftNIO project authors
|
// Copyright (c) 2017-2023 Apple Inc. and the SwiftNIO project authors
|
||||||
// Licensed under Apache License v2.0
|
// Licensed under Apache License v2.0
|
||||||
//
|
//
|
||||||
// See LICENSE.txt for license information
|
// See LICENSE.txt for license information
|
||||||
|
@ -56,6 +56,7 @@ extension EventLoopTest {
|
||||||
("testRepeatedTaskThatIsCancelledAfterRunningAtLeastTwiceNotifies", testRepeatedTaskThatIsCancelledAfterRunningAtLeastTwiceNotifies),
|
("testRepeatedTaskThatIsCancelledAfterRunningAtLeastTwiceNotifies", testRepeatedTaskThatIsCancelledAfterRunningAtLeastTwiceNotifies),
|
||||||
("testRepeatedTaskThatCancelsItselfNotifiesOnlyWhenFinished", testRepeatedTaskThatCancelsItselfNotifiesOnlyWhenFinished),
|
("testRepeatedTaskThatCancelsItselfNotifiesOnlyWhenFinished", testRepeatedTaskThatCancelsItselfNotifiesOnlyWhenFinished),
|
||||||
("testCancelledScheduledTasksDoNotHoldOnToRunClosure", testCancelledScheduledTasksDoNotHoldOnToRunClosure),
|
("testCancelledScheduledTasksDoNotHoldOnToRunClosure", testCancelledScheduledTasksDoNotHoldOnToRunClosure),
|
||||||
|
("testCancelledScheduledTasksDoNotHoldOnToRunClosureEvenIfTheyWereTheNextTaskToExecute", testCancelledScheduledTasksDoNotHoldOnToRunClosureEvenIfTheyWereTheNextTaskToExecute),
|
||||||
("testIllegalCloseOfEventLoopFails", testIllegalCloseOfEventLoopFails),
|
("testIllegalCloseOfEventLoopFails", testIllegalCloseOfEventLoopFails),
|
||||||
("testSubtractingDeadlineFromPastAndFuturesDeadlinesWorks", testSubtractingDeadlineFromPastAndFuturesDeadlinesWorks),
|
("testSubtractingDeadlineFromPastAndFuturesDeadlinesWorks", testSubtractingDeadlineFromPastAndFuturesDeadlinesWorks),
|
||||||
("testCallingSyncShutdownGracefullyMultipleTimesShouldNotHang", testCallingSyncShutdownGracefullyMultipleTimesShouldNotHang),
|
("testCallingSyncShutdownGracefullyMultipleTimesShouldNotHang", testCallingSyncShutdownGracefullyMultipleTimesShouldNotHang),
|
||||||
|
|
|
@ -830,6 +830,77 @@ public final class EventLoopTest : XCTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func testCancelledScheduledTasksDoNotHoldOnToRunClosureEvenIfTheyWereTheNextTaskToExecute() {
|
||||||
|
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
|
||||||
|
defer {
|
||||||
|
XCTAssertNoThrow(try group.syncShutdownGracefully())
|
||||||
|
}
|
||||||
|
|
||||||
|
class Thing {
|
||||||
|
private let deallocated: ConditionLock<Int>
|
||||||
|
|
||||||
|
init(_ deallocated: ConditionLock<Int>) {
|
||||||
|
self.deallocated = deallocated
|
||||||
|
}
|
||||||
|
|
||||||
|
deinit {
|
||||||
|
self.deallocated.lock()
|
||||||
|
self.deallocated.unlock(withValue: 1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func make(deallocated: ConditionLock<Int>) -> Scheduled<Never> {
|
||||||
|
let aThing = Thing(deallocated)
|
||||||
|
return group.next().scheduleTask(in: .hours(1)) {
|
||||||
|
preconditionFailure("this should definitely not run: \(aThing)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// What the heck are we doing here?
|
||||||
|
//
|
||||||
|
// Our goal is to arrange for our scheduled task to become "nextReadyTask" in SelectableEventLoop, so that
|
||||||
|
// when we cancel it there is still a copy aliasing it. This reproduces a subtle correctness bug that
|
||||||
|
// existed in NIO 2.48.0 and earlier.
|
||||||
|
//
|
||||||
|
// This will happen if:
|
||||||
|
//
|
||||||
|
// 1. We schedule a task for the future
|
||||||
|
// 2. The event loop begins a tick.
|
||||||
|
// 3. The event loop finds our scheduled task in the future.
|
||||||
|
//
|
||||||
|
// We can make that happen by scheduling our task and then waiting for a tick to pass, which we can
|
||||||
|
// achieve using `submit`.
|
||||||
|
//
|
||||||
|
// However, if there are no _other_, _even later_ tasks, we'll free the reference. This is
|
||||||
|
// because the nextReadyTask is cleared if the list of scheduled tasks ends up empty, so we don't want that to happen.
|
||||||
|
//
|
||||||
|
// So the order of operations is:
|
||||||
|
//
|
||||||
|
// 1. Schedule the task for the future.
|
||||||
|
// 2. Schedule another, even later, task.
|
||||||
|
// 3. Wait for a tick to pass.
|
||||||
|
// 4. Cancel our scheduled.
|
||||||
|
//
|
||||||
|
// In the correct code, this should invoke deinit. In the buggy code, it does not.
|
||||||
|
//
|
||||||
|
// Unfortunately, this window is very hard to hit. Cancelling the scheduled task wakes the loop up, and if it is
|
||||||
|
// still awake by the time we run the cancellation handler it'll notice the change. So we have to tolerate
|
||||||
|
// a somewhat flaky test.
|
||||||
|
let deallocated = ConditionLock(value: 0)
|
||||||
|
let scheduled = make(deallocated: deallocated)
|
||||||
|
scheduled.futureResult.eventLoop.scheduleTask(in: .hours(2)) { }
|
||||||
|
try! scheduled.futureResult.eventLoop.submit { }.wait()
|
||||||
|
scheduled.cancel()
|
||||||
|
if deallocated.lock(whenValue: 1, timeoutSeconds: 60) {
|
||||||
|
deallocated.unlock()
|
||||||
|
} else {
|
||||||
|
XCTFail("Timed out waiting for lock")
|
||||||
|
}
|
||||||
|
XCTAssertThrowsError(try scheduled.futureResult.wait()) { error in
|
||||||
|
XCTAssertEqual(EventLoopError.cancelled, error as? EventLoopError)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func testIllegalCloseOfEventLoopFails() {
|
func testIllegalCloseOfEventLoopFails() {
|
||||||
// Vapor 3 closes EventLoops directly which is illegal and makes the `shutdownGracefully` of the owning
|
// Vapor 3 closes EventLoops directly which is illegal and makes the `shutdownGracefully` of the owning
|
||||||
// MultiThreadedEventLoopGroup never succeed.
|
// MultiThreadedEventLoopGroup never succeed.
|
||||||
|
|
Loading…
Reference in New Issue