fix AtomicBox by deprecating it and implementing it with a CAS loop (#1287)

Motivation:

Atomic references cannot easily be maintained correctly because the
`load` operation gets a pointer value that might (before the loader gets
a change to retain the returned reference) be destroyed by another
thread.

Modifications:

- Deprecate `AtomicBox`
- Implement `NIOCASLoopBox` with functionality similar to AtomicBox (but
  implemented using a CAS loop :( ).

Result:

- fixes #1286
This commit is contained in:
Johannes Weiss 2019-12-06 17:41:14 +00:00 committed by GitHub
parent eee18dc81f
commit e208ab9555
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 367 additions and 13 deletions

View File

@ -66,7 +66,7 @@ var targets: [PackageDescription.Target] = [
.testTarget(name: "NIOTests",
dependencies: ["NIO", "NIOFoundationCompat", "NIOTestUtils", "NIOConcurrencyHelpers"]),
.testTarget(name: "NIOConcurrencyHelpersTests",
dependencies: ["NIOConcurrencyHelpers"]),
dependencies: ["NIOConcurrencyHelpers", "NIO"]),
.testTarget(name: "NIOHTTP1Tests",
dependencies: ["NIOHTTP1", "NIOFoundationCompat", "NIOTestUtils"]),
.testTarget(name: "NIOTLSTests",

View File

@ -14,6 +14,18 @@
import CNIOAtomics
#if os(macOS) || os(iOS) || os(watchOS) || os(tvOS)
import Darwin
fileprivate func sys_sched_yield() {
pthread_yield_np()
}
#else
import Glibc
fileprivate func sys_sched_yield() {
_ = sched_yield()
}
#endif
/// An atomic primitive object.
///
/// Before using `UnsafeEmbeddedAtomic`, please consider whether your needs can be met by `Atomic` instead.
@ -389,9 +401,11 @@ extension UInt: AtomicPrimitive {
public static let atomic_store = catmc_atomic_unsigned_long_store
}
/// `AtomicBox` is a heap-allocated box which allows atomic access to an instance of a Swift class.
/// `AtomicBox` is a heap-allocated box which allows lock-free access to an instance of a Swift class.
///
/// It behaves very much like `Atomic<T>` but for objects, maintaining the correct retain counts.
/// - warning: The use of `AtomicBox` should be avoided because it requires an implementation of a spin-lock
/// (more precisely a CAS loop) to operate correctly.
@available(*, deprecated, message: "AtomicBox is deprecated without replacement because the original implementation doesn't work.")
public final class AtomicBox<T: AnyObject> {
private let storage: NIOAtomic<UInt>
@ -415,6 +429,10 @@ public final class AtomicBox<T: AnyObject> {
/// details on atomic memory models, check the documentation for C11's
/// `stdatomic.h`.
///
///
/// - warning: The implementation of `exchange` contains a _Compare and Exchange loop_, ie. it may busy wait with
/// 100% CPU load.
///
/// - Parameter expected: The value that this object must currently hold for the
/// compare-and-swap to succeed.
/// - Parameter desired: The new value that this object will hold if the compare
@ -425,14 +443,25 @@ public final class AtomicBox<T: AnyObject> {
return withExtendedLifetime(desired) {
let expectedPtr = Unmanaged<T>.passUnretained(expected)
let desiredPtr = Unmanaged<T>.passUnretained(desired)
let expectedPtrBits = UInt(bitPattern: expectedPtr.toOpaque())
let desiredPtrBits = UInt(bitPattern: desiredPtr.toOpaque())
if self.storage.compareAndExchange(expected: UInt(bitPattern: expectedPtr.toOpaque()),
desired: UInt(bitPattern: desiredPtr.toOpaque())) {
_ = desiredPtr.retain()
expectedPtr.release()
return true
} else {
return false
while true {
if self.storage.compareAndExchange(expected: expectedPtrBits, desired: desiredPtrBits) {
if desiredPtrBits != expectedPtrBits {
_ = desiredPtr.retain()
expectedPtr.release()
}
return true
} else {
let currentPtrBits = self.storage.load()
if currentPtrBits == 0 || currentPtrBits == expectedPtrBits {
sys_sched_yield()
continue
} else {
return false
}
}
}
}
}
@ -443,11 +472,30 @@ public final class AtomicBox<T: AnyObject> {
/// more than that this operation is atomic: there is no guarantee that any other
/// event will be ordered before or after this one.
///
/// - warning: The implementation of `exchange` contains a _Compare and Exchange loop_, ie. it may busy wait with
/// 100% CPU load.
///
/// - Parameter value: The new value to set this object to.
/// - Returns: The value previously held by this object.
public func exchange(with value: T) -> T {
let newPtr = Unmanaged<T>.passRetained(value)
let oldPtrBits = self.storage.exchange(with: UInt(bitPattern: newPtr.toOpaque()))
let newPtrBits = UInt(bitPattern: newPtr.toOpaque())
// step 1: We need to actually CAS loop here to swap out a non-0 value with the new one.
var oldPtrBits: UInt = 0
while true {
let speculativeVal = self.storage.load()
guard speculativeVal != 0 else {
sys_sched_yield()
continue
}
if self.storage.compareAndExchange(expected: speculativeVal, desired: newPtrBits) {
oldPtrBits = speculativeVal
break
}
}
// step 2: After having gained 'ownership' of the old value, we can release the Unmanged.
let oldPtr = Unmanaged<T>.fromOpaque(UnsafeRawPointer(bitPattern: oldPtrBits)!)
return oldPtr.takeRetainedValue()
}
@ -458,11 +506,33 @@ public final class AtomicBox<T: AnyObject> {
/// more than that this operation is atomic: there is no guarantee that any other
/// event will be ordered before or after this one.
///
/// - warning: The implementation of `exchange` contains a _Compare and Exchange loop_, ie. it may busy wait with
/// 100% CPU load.
///
/// - Returns: The value of this object
public func load() -> T {
let ptrBits = self.storage.load()
// step 1: We need to gain ownership of the value by successfully swapping 0 (marker value) in.
var ptrBits: UInt = 0
while true {
let speculativeVal = self.storage.load()
guard speculativeVal != 0 else {
sys_sched_yield()
continue
}
if self.storage.compareAndExchange(expected: speculativeVal, desired: 0) {
ptrBits = speculativeVal
break
}
}
// step 2: We now consumed a +1'd version of val, so we have all the time in the world to retain it.
let ptr = Unmanaged<T>.fromOpaque(UnsafeRawPointer(bitPattern: ptrBits)!)
return ptr.takeUnretainedValue()
let value = ptr.takeUnretainedValue()
// step 3: Now, let's exchange it back into the store
let casWorked = self.storage.compareAndExchange(expected: 0, desired: ptrBits)
precondition(casWorked) // this _has_ to work because `0` means we own it exclusively.
return value
}
/// Atomically replaces the value of this object with `value`.
@ -471,6 +541,9 @@ public final class AtomicBox<T: AnyObject> {
/// more than that this operation is atomic: there is no guarantee that any other
/// event will be ordered before or after this one.
///
/// - warning: The implementation of `exchange` contains a _Compare and Exchange loop_, ie. it may busy wait with
/// 100% CPU load.
///
/// - Parameter value: The new value to set the object to.
public func store(_ value: T) -> Void {
_ = self.exchange(with: value)

View File

@ -53,6 +53,12 @@ extension NIOConcurrencyHelpersTests {
("testAtomicBoxCompareAndExchangeWorksIfNotEqual", testAtomicBoxCompareAndExchangeWorksIfNotEqual),
("testAtomicBoxStoreWorks", testAtomicBoxStoreWorks),
("testAtomicBoxCompareAndExchangeOntoItselfWorks", testAtomicBoxCompareAndExchangeOntoItselfWorks),
("testAtomicLoadMassLoadAndStore", testAtomicLoadMassLoadAndStore),
("testAtomicBoxCompareAndExchangeOntoItself", testAtomicBoxCompareAndExchangeOntoItself),
("testLoadAndExchangeHammering", testLoadAndExchangeHammering),
("testLoadAndStoreHammering", testLoadAndStoreHammering),
("testLoadAndCASHammering", testLoadAndCASHammering),
("testMultipleLoadsRacingWhilstStoresAreGoingOn", testMultipleLoadsRacingWhilstStoresAreGoingOn),
]
}
}

View File

@ -18,6 +18,7 @@ import Glibc
#endif
import Dispatch
import XCTest
import NIO
@testable import NIOConcurrencyHelpers
class NIOConcurrencyHelpersTests: XCTestCase {
@ -606,6 +607,7 @@ class NIOConcurrencyHelpersTests: XCTestCase {
}
}
@available(*, deprecated, message: "AtomicBox is deprecated, this is a test for the deprecated functionality")
func testAtomicBoxDoesNotTriviallyLeak() throws {
class SomeClass {}
weak var weakSomeInstance1: SomeClass? = nil
@ -624,6 +626,7 @@ class NIOConcurrencyHelpersTests: XCTestCase {
XCTAssertNil(weakSomeInstance2)
}
@available(*, deprecated, message: "AtomicBox is deprecated, this is a test for the deprecated functionality")
func testAtomicBoxCompareAndExchangeWorksIfEqual() throws {
class SomeClass {}
weak var weakSomeInstance1: SomeClass? = nil
@ -656,6 +659,7 @@ class NIOConcurrencyHelpersTests: XCTestCase {
XCTAssertNil(weakSomeInstance3)
}
@available(*, deprecated, message: "AtomicBox is deprecated, this is a test for the deprecated functionality")
func testAtomicBoxCompareAndExchangeWorksIfNotEqual() throws {
class SomeClass {}
weak var weakSomeInstance1: SomeClass? = nil
@ -689,6 +693,7 @@ class NIOConcurrencyHelpersTests: XCTestCase {
XCTAssertNil(weakSomeInstance3)
}
@available(*, deprecated, message: "AtomicBox is deprecated, this is a test for the deprecated functionality")
func testAtomicBoxStoreWorks() throws {
class SomeClass {}
weak var weakSomeInstance1: SomeClass? = nil
@ -720,6 +725,7 @@ class NIOConcurrencyHelpersTests: XCTestCase {
XCTAssertNil(weakSomeInstance3)
}
@available(*, deprecated, message: "AtomicBox is deprecated, this is a test for the deprecated functionality")
func testAtomicBoxCompareAndExchangeOntoItselfWorks() {
let q = DispatchQueue(label: "q")
let g = DispatchGroup()
@ -750,4 +756,273 @@ class NIOConcurrencyHelpersTests: XCTestCase {
})()
XCTAssertNil(weakInstance)
}
@available(*, deprecated, message: "AtomicBox is deprecated, this is a test for the deprecated functionality")
func testAtomicLoadMassLoadAndStore() {
let writer = DispatchQueue(label: "\(#file):writer")
let reader = DispatchQueue(label: "\(#file):reader")
let g = DispatchGroup()
let writerArrived = DispatchSemaphore(value: 0)
let readerArrived = DispatchSemaphore(value: 0)
let go = DispatchSemaphore(value: 0)
let iterations = 100_000
class Foo {
var x: Int
init(_ x: Int) {
self.x = x
}
deinit {
self.x = -1
}
}
let box: AtomicBox<Foo> = .init(value: Foo(iterations))
writer.async(group: g) {
writerArrived.signal()
go.wait()
for i in 0..<iterations {
box.store(Foo(i))
}
}
reader.async(group: g) {
readerArrived.signal()
go.wait()
for _ in 0..<iterations {
if box.load().x < 0 {
XCTFail("bad")
}
}
}
writerArrived.wait()
readerArrived.wait()
go.signal()
go.signal()
g.wait()
}
@available(*, deprecated, message: "AtomicBox is deprecated, this is a test for the deprecated functionality")
func testAtomicBoxCompareAndExchangeOntoItself() {
class Foo {}
weak var weakF: Foo? = nil
weak var weakG: Foo? = nil
@inline(never)
func doIt() {
let f = Foo()
let g = Foo()
weakF = f
weakG = g
let box = AtomicBox<Foo>(value: f)
XCTAssertFalse(box.compareAndExchange(expected: g, desired: g))
XCTAssertTrue(box.compareAndExchange(expected: f, desired: f))
XCTAssertFalse(box.compareAndExchange(expected: g, desired: g))
XCTAssertTrue(box.compareAndExchange(expected: f, desired: g))
}
doIt()
assert(weakF == nil, within: .seconds(1))
assert(weakF == nil, within: .seconds(1))
}
@available(*, deprecated, message: "AtomicBox is deprecated, this is a test for the deprecated functionality")
func testLoadAndExchangeHammering() {
let allDeallocations = NIOAtomic<Int>.makeAtomic(value: 0)
let iterations = 100_000
@inline(never)
func doIt() {
let box = AtomicBox(value: IntHolderWithDeallocationTracking(0, allDeallocations: allDeallocations))
spawnAndJoinRacingThreads(count: 6) { i in
switch i {
case 0: // writer
for i in 1 ... iterations {
let nextObject = box.exchange(with: .init(i, allDeallocations: allDeallocations))
XCTAssertEqual(nextObject.value, i - 1)
}
default: // readers
while true {
if box.load().value < 0 || box.load().value > iterations {
XCTFail("bad")
}
if box.load().value == iterations {
break
}
}
}
}
}
doIt()
assert(allDeallocations.load() == iterations + 1, within: .seconds(1))
}
@available(*, deprecated, message: "AtomicBox is deprecated, this is a test for the deprecated functionality")
func testLoadAndStoreHammering() {
let allDeallocations = NIOAtomic<Int>.makeAtomic(value: 0)
let iterations = 100_000
@inline(never)
func doIt() {
let box = AtomicBox(value: IntHolderWithDeallocationTracking(0, allDeallocations: allDeallocations))
spawnAndJoinRacingThreads(count: 6) { i in
switch i {
case 0: // writer
for i in 1 ... iterations {
box.store(IntHolderWithDeallocationTracking(i, allDeallocations: allDeallocations))
}
default: // readers
while true {
if box.load().value < 0 || box.load().value > iterations {
XCTFail("loaded the wrong value")
}
if box.load().value == iterations {
break
}
}
}
}
}
doIt()
assert(allDeallocations.load() == iterations + 1, within: .seconds(1))
}
@available(*, deprecated, message: "AtomicBox is deprecated, this is a test for the deprecated functionality")
func testLoadAndCASHammering() {
let allDeallocations = NIOAtomic<Int>.makeAtomic(value: 0)
let iterations = 100_000
@inline(never)
func doIt() {
let box = AtomicBox(value: IntHolderWithDeallocationTracking(0, allDeallocations: allDeallocations))
spawnAndJoinRacingThreads(count: 6) { i in
switch i {
case 0: // writer
for i in 1 ... iterations {
let old = box.load()
XCTAssertEqual(i - 1, old.value)
if !box.compareAndExchange(expected: old,
desired: .init(i, allDeallocations: allDeallocations)) {
XCTFail("compare and exchange didn't work but it should have")
}
}
default: // readers
while true {
if box.load().value < 0 || box.load().value > iterations {
XCTFail("loaded wrong value")
}
if box.load().value == iterations {
break
}
}
}
}
}
doIt()
assert(allDeallocations.load() == iterations + 1, within: .seconds(1))
}
@available(*, deprecated, message: "AtomicBox is deprecated, this is a test for the deprecated functionality")
func testMultipleLoadsRacingWhilstStoresAreGoingOn() {
// regression test for https://github.com/apple/swift-nio/pull/1287#discussion_r353932225
let allDeallocations = NIOAtomic<Int>.makeAtomic(value: 0)
let iterations = 100_000
@inline(never)
func doIt() {
let box = AtomicBox(value: IntHolderWithDeallocationTracking(0, allDeallocations: allDeallocations))
spawnAndJoinRacingThreads(count: 3) { i in
switch i {
case 0:
var last = Int.min
while last < iterations {
let loaded = box.load()
XCTAssertGreaterThanOrEqual(loaded.value, last)
last = loaded.value
}
case 1:
for n in 1...iterations {
_ = box.store(.init(n, allDeallocations: allDeallocations))
}
case 2:
var last = Int.min
while last < iterations {
let loaded = box.load()
XCTAssertGreaterThanOrEqual(loaded.value, last)
last = loaded.value
}
default:
preconditionFailure("thread \(i)?!")
}
}
}
doIt()
XCTAssertEqual(iterations + 1, allDeallocations.load())
}
}
func spawnAndJoinRacingThreads(count: Int, _ body: @escaping (Int) -> Void) {
let go = DispatchSemaphore(value: 0) // will be incremented when the threads are supposed to run (and race).
let arrived = Array(repeating: DispatchSemaphore(value: 0), count: count) // waiting for all threads to arrive
let group = DispatchGroup()
for i in 0..<count {
DispatchQueue(label: "\(#file):\(#line):\(i)").async(group: group) {
arrived[i].signal()
go.wait()
body(i)
}
}
for sem in arrived {
sem.wait()
}
// all the threads are ready to go
for _ in 0..<count {
go.signal()
}
group.wait()
}
func assert(_ condition: @autoclosure () -> Bool, within time: TimeAmount, testInterval: TimeAmount? = nil, _ message: String = "condition not satisfied in time", file: StaticString = #file, line: UInt = #line) {
let testInterval = testInterval ?? TimeAmount.nanoseconds(time.nanoseconds / 5)
let endTime = NIODeadline.now() + time
repeat {
if condition() { return }
usleep(UInt32(testInterval.nanoseconds / 1000))
} while (NIODeadline.now() < endTime)
if !condition() {
XCTFail(message, file: file, line: line)
}
}
fileprivate class IntHolderWithDeallocationTracking {
private(set) var value: Int
let allDeallocations: NIOAtomic<Int>
init(_ x: Int, allDeallocations: NIOAtomic<Int>) {
self.value = x
self.allDeallocations = allDeallocations
}
deinit {
self.value = -1
_ = self.allDeallocations.add(1)
}
}