Pretty significant refactor and improved memory/resource management.

This commit is contained in:
Tyler Cloutier 2016-11-13 18:22:37 -08:00
parent 46a7262ee0
commit ac1a13e978
4 changed files with 181 additions and 157 deletions

View File

@ -8,23 +8,21 @@
import Foundation
public final class ColdSignal<T, E: Error>: ColdSignalType, InternalSignalType, SpecialSignalGenerator {
public final class ColdSignal<V, E: Swift.Error>: ColdSignalType, InternalSignalType, SpecialSignalGenerator {
public typealias Value = T
public typealias ErrorType = E
public typealias Value = V
public typealias Error = E
internal var observers = Bag<Observer<Value, ErrorType>>()
internal var observers = Bag<Observer<Value, Error>>()
public var coldSignal: ColdSignal {
return self
}
internal let startHandler: (Observer<Value, ErrorType>) -> Disposable?
internal let startHandler: (Observer<Value, Error>) -> Disposable?
private var cancelDisposable: Disposable?
private var handlerDisposable: Disposable?
private var started = false
/// Initializes a ColdSignal that will invoke the given closure at the
@ -38,7 +36,7 @@ public final class ColdSignal<T, E: Error>: ColdSignalType, InternalSignalType,
///
/// Invoking `start()` will have no effect until the signal is stopped. After
/// `stop()` is called this process may be repeated.
public init(_ generator: @escaping (Observer<Value, ErrorType>) -> Disposable?) {
public init(_ generator: @escaping (Observer<Value, Error>) -> Disposable?) {
self.startHandler = generator
}
@ -47,31 +45,19 @@ public final class ColdSignal<T, E: Error>: ColdSignalType, InternalSignalType,
///
/// Returns a Disposable which can be used to interrupt the work associated
/// with the signal and immediately send an `Interrupted` event.
@discardableResult
public func start() {
if !started {
started = true
let observer = Observer<Value, ErrorType> { event in
// Pass event downstream
self.observers.forEach { (observer) in
observer.action(event)
}
// If event is terminating dispose of the handlerDisposable.
if event.isTerminating {
self.handlerDisposable?.dispose()
}
}
handlerDisposable = startHandler(observer)
let observer = Observer(with: CircuitBreaker(holding: self))
let handlerDisposable = startHandler(observer)
// The cancel disposable should send interrupted and then dispose of the
// disposable produced by the startHandler.
cancelDisposable = ActionDisposable { [weak self] in
cancelDisposable = ActionDisposable {
observer.sendInterrupted()
self?.handlerDisposable?.dispose()
handlerDisposable?.dispose()
}
}
}
@ -95,7 +81,7 @@ extension ColdSignal: CustomDebugStringConvertible {
public protocol ColdSignalType: SignalType {
/// The exposed raw signal that underlies the ColdSignalType
var coldSignal: ColdSignal<Value, ErrorType> { get }
var coldSignal: ColdSignal<Value, Error> { get }
/// Invokes the closure provided upon initialization, and passes in a newly
/// created observer to which events can be sent.
@ -110,7 +96,7 @@ public protocol ColdSignalType: SignalType {
public extension ColdSignalType {
public var signal: Signal<Value, ErrorType> {
public var signal: Signal<Value, Error> {
return Signal { observer in
self.coldSignal.add(observer: observer)
}
@ -140,7 +126,7 @@ public extension ColdSignalType {
/// Returns a Disposable which can be used to disconnect the observer. Disposing
/// of the Disposable will have no effect on the Signal itself.
@discardableResult
public func add(observer: Observer<Value, ErrorType>) -> Disposable? {
public func add(observer: Observer<Value, Error>) -> Disposable? {
let token = coldSignal.observers.insert(value: observer)
return ActionDisposable {
self.coldSignal.observers.removeValueForToken(token: token)
@ -152,7 +138,7 @@ public extension ColdSignalType {
///
/// Returns a Disposable which can be used to dispose of the added observer.
@discardableResult
public func start(with observer: Observer<Value, ErrorType>) -> Disposable? {
public func start(with observer: Observer<Value, Error>) -> Disposable? {
let disposable = coldSignal.add(observer: observer)
self.coldSignal.start()
return disposable
@ -163,7 +149,7 @@ public extension ColdSignalType {
///
/// Returns a Disposable which can be used to dispose of the added observer.
@discardableResult
public func start(_ observerAction: @escaping Observer<Value, ErrorType>.Action) -> Disposable? {
public func start(_ observerAction: @escaping Observer<Value, Error>.Action) -> Disposable? {
return start(with: Observer(observerAction))
}
@ -190,7 +176,7 @@ public extension ColdSignalType {
///
/// Returns a Disposable which can be used to dispose of the added observer.
@discardableResult
public func startWithFailed(failed: @escaping (ErrorType) -> Void) -> Disposable? {
public func startWithFailed(failed: @escaping (Error) -> Void) -> Disposable? {
return start(with: Observer(failed: failed))
}
@ -212,18 +198,18 @@ public extension ColdSignalType {
///
/// The new `ColdSignal` is in no way related to the source `ColdSignal` except
/// that they share a reference to the same `startHandler`.
public func lift<U, F>(_ transform: @escaping (Signal<Value, ErrorType>) -> Signal<U, F>) -> ColdSignal<U, F> {
public func lift<U, F>(_ transform: @escaping (Signal<Value, Error>) -> Signal<U, F>) -> ColdSignal<U, F> {
return ColdSignal { observer in
let (pipeSignal, pipeObserver) = Signal<Value, ErrorType>.pipe()
let (pipeSignal, pipeObserver) = Signal<Value, Error>.pipe()
transform(pipeSignal).add(observer: observer)
return self.coldSignal.startHandler(pipeObserver)
}
}
public func lift<U, F>(_ transform: @escaping (Signal<Value, ErrorType>) -> (Signal<U, F>, Signal<U, F>))
public func lift<U, F>(_ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>, Signal<U, F>))
-> (ColdSignal<U, F>, ColdSignal<U, F>)
{
let (pipeSignal, pipeObserver) = Signal<Value, ErrorType>.pipe()
let (pipeSignal, pipeObserver) = Signal<Value, Error>.pipe()
let (left, right) = transform(pipeSignal)
let coldLeft = ColdSignal<U, F> { observer in
left.add(observer: observer)
@ -237,33 +223,33 @@ public extension ColdSignalType {
}
/// Maps each value in the signal to a new value.
public func map<U>(_ transform: @escaping (Value) -> U) -> ColdSignal<U, ErrorType> {
public func map<U>(_ transform: @escaping (Value) -> U) -> ColdSignal<U, Error> {
return lift { $0.map(transform) }
}
/// Maps errors in the signal to a new error.
public func mapError<F>(_ transform: @escaping (ErrorType) -> F) -> ColdSignal<Value, F> {
public func mapError<F>(_ transform: @escaping (Error) -> F) -> ColdSignal<Value, F> {
return lift { $0.mapError(transform) }
}
/// Preserves only the values of the signal that pass the given predicate.
public func filter(_ predicate: @escaping (Value) -> Bool) -> ColdSignal<Value, ErrorType> {
public func filter(_ predicate: @escaping (Value) -> Bool) -> ColdSignal<Value, Error> {
return lift { $0.filter(predicate) }
}
/// Splits the signal into two signals. The first signal in the tuple matches the
/// predicate, the second signal does not match the predicate
public func partition(_ predicate: @escaping (Value) -> Bool)
-> (ColdSignal<Value, ErrorType>, ColdSignal<Value, ErrorType>) {
-> (ColdSignal<Value, Error>, ColdSignal<Value, Error>) {
return lift { $0.partition(predicate) }
}
/// Aggregate values into a single combined value. Mirrors the Swift Collection
public func reduce<T>(initial: T, _ combine: @escaping (T, Value) -> T) -> ColdSignal<T, ErrorType> {
public func reduce<T>(initial: T, _ combine: @escaping (T, Value) -> T) -> ColdSignal<T, Error> {
return lift { $0.reduce(initial: initial, combine) }
}
public func flatMap<U>(_ transform: @escaping (Value) -> U?) -> ColdSignal<U, ErrorType> {
public func flatMap<U>(_ transform: @escaping (Value) -> U?) -> ColdSignal<U, Error> {
return lift { $0.flatMap(transform) }
}

View File

@ -11,93 +11,93 @@ import Foundation
/// Represents a signal event.
///
/// Signals must conform to the grammar:
/// `Next* (Failed | Completed | Interrupted)?`
public enum Event<Value, ErrorType: Error> {
/// `next* (failed | completed | interrupted)?`
public enum Event<Value, Error: Swift.Error> {
/// A value provided by the signal.
case Next(Value)
case next(Value)
/// The signal terminated because of an error. No further events will be
/// received.
case Failed(ErrorType)
case failed(Error)
/// The signal successfully terminated. No further events will be received.
case Completed
case completed
/// Event production on the signal has been interrupted. No further events
/// will be received.
case Interrupted
case interrupted
/// Whether this event indicates signal termination (i.e., that no further
/// events will be received).
public var isTerminating: Bool {
switch self {
case .Next:
case .next:
return false
case .Failed, .Completed, .Interrupted:
case .failed, .completed, .interrupted:
return true
}
}
/// Lifts the given function over the event's value.
public func map<U>(_ f: (Value) -> U) -> Event<U, ErrorType> {
public func map<U>(_ f: (Value) -> U) -> Event<U, Error> {
switch self {
case let .Next(value):
return .Next(f(value))
case let .next(value):
return .next(f(value))
case let .Failed(error):
return .Failed(error)
case let .failed(error):
return .failed(error)
case .Completed:
return .Completed
case .completed:
return .completed
case .Interrupted:
return .Interrupted
case .interrupted:
return .interrupted
}
}
/// Lifts the given function over the event's value.
public func flatMap<U>(_ f: (Value) -> U?) -> Event<U, ErrorType>? {
public func flatMap<U>(_ f: (Value) -> U?) -> Event<U, Error>? {
switch self {
case let .Next(value):
case let .next(value):
if let nextValue = f(value) {
return .Next(nextValue)
return .next(nextValue)
}
return nil
case let .Failed(error):
return .Failed(error)
case let .failed(error):
return .failed(error)
case .Completed:
return .Completed
case .completed:
return .completed
case .Interrupted:
return .Interrupted
case .interrupted:
return .interrupted
}
}
/// Lifts the given function over the event's error.
public func mapError<F>(_ f: (ErrorType) -> F) -> Event<Value, F> {
public func mapError<F>(_ f: (Error) -> F) -> Event<Value, F> {
switch self {
case let .Next(value):
return .Next(value)
case let .next(value):
return .next(value)
case let .Failed(error):
return .Failed(f(error))
case let .failed(error):
return .failed(f(error))
case .Completed:
return .Completed
case .completed:
return .completed
case .Interrupted:
return .Interrupted
case .interrupted:
return .interrupted
}
}
/// Unwraps the contained `Next` value.
public var value: Value? {
if case let .Next(value) = self {
if case let .next(value) = self {
return value
} else {
return nil
@ -106,7 +106,7 @@ public enum Event<Value, ErrorType: Error> {
/// Unwraps the contained `Error` value.
public var error: Error? {
if case let .Failed(error) = self {
if case let .failed(error) = self {
return error
} else {
return nil
@ -114,18 +114,18 @@ public enum Event<Value, ErrorType: Error> {
}
}
public func == <Value: Equatable, ErrorType: Equatable> (lhs: Event<Value, ErrorType>, rhs: Event<Value, ErrorType>) -> Bool {
public func == <Value: Equatable, Error: Equatable> (lhs: Event<Value, Error>, rhs: Event<Value, Error>) -> Bool {
switch (lhs, rhs) {
case let (.Next(left), .Next(right)):
case let (.next(left), .next(right)):
return left == right
case let (.Failed(left), .Failed(right)):
case let (.failed(left), .failed(right)):
return left == right
case (.Completed, .Completed):
case (.completed, .completed):
return true
case (.Interrupted, .Interrupted):
case (.interrupted, .interrupted):
return true
default:

View File

@ -8,65 +8,118 @@
import Foundation
/// An Observer is a simple wrapper around a function which can receive Events
/// (typically from a Signal).
public struct Observer<Value, ErrorType: Error> {
/// A CiruitBreaker optionally holds a strong reference to either a
/// `Signal` or a `ColdSignal` until a terminating event is
/// received. At such time, it delivers the event and then
/// removes its reference. In so doing, it "breaks the circuit"
/// between the signal, the handler, and the input observer.
/// This allows the downstream signal to be released.
class CircuitBreaker<Value, Error: Swift.Error> {
private var signal: Signal<Value, Error>? = nil
private var coldSignal: ColdSignal<Value, Error>? = nil
fileprivate var action: Observer<Value, Error>.Action! = nil
public typealias Action = (Event<Value, ErrorType>) -> Void
/// Holds a strong reference to a `Signal` until a
/// terminating event is received.
init(holding signal: Signal<Value, Error>?) {
self.signal = signal
self.action = { [weak self] event in
// If event is terminating dispose of the handlerDisposable.
self?.signal?.observers.forEach { observer in
observer.send(event)
}
if event.isTerminating {
self?.signal = nil
}
}
}
public let action: Action
/// Holds a strong reference to a `ColdSignal` until a
/// terminating event is received.
init(holding coldSignal: ColdSignal<Value, Error>?) {
self.coldSignal = coldSignal
self.action = { [weak self] event in
// If event is terminating dispose of the handlerDisposable.
self?.coldSignal?.observers.forEach { observer in
observer.send(event)
}
if event.isTerminating {
self?.coldSignal = nil
}
}
}
fileprivate init(with action: @escaping (Event<Value, Error>) -> Void) {
self.action = action
}
}
public struct Observer<Value, Error: Swift.Error> {
public typealias Action = (Event<Value, Error>) -> Void
let breaker: CircuitBreaker<Value, Error>
init(with breaker: CircuitBreaker<Value, Error>) {
self.breaker = breaker
}
public init(_ action: @escaping Action) {
self.action = action
self.breaker = CircuitBreaker(with: action)
}
/// Creates an Observer with an action which calls each of the provided
/// callbacks
public init(
failed: ((ErrorType) -> Void)? = nil,
failed: ((Error) -> Void)? = nil,
completed: (() -> Void)? = nil,
interrupted: (() -> Void)? = nil,
next: ((Value) -> Void)? = nil)
{
self.init { event in
switch event {
case let .Next(value):
case let .next(value):
next?(value)
case let .Failed(error):
case let .failed(error):
failed?(error)
case .Completed:
case .completed:
completed?()
case .Interrupted:
case .interrupted:
interrupted?()
}
}
}
public func sendEvent(_ event: Event<Value, ErrorType>) {
action(event)
/// Puts any `Event` into the the given observer.
public func send(_ event: Event<Value, Error>) {
breaker.action(event)
}
/// Puts a `Next` event into the given observer.
public func sendNext(_ value: Value) {
action(.Next(value))
send(.next(value))
}
/// Puts an `Failed` event into the given observer.
public func sendFailed(_ error: ErrorType) {
action(.Failed(error))
public func sendFailed(_ error: Error) {
send(.failed(error))
}
/// Puts a `Completed` event into the given observer.
public func sendCompleted() {
action(.Completed)
send(.completed)
}
/// Puts a `Interrupted` event into the given observer.
public func sendInterrupted() {
action(.Interrupted)
send(.interrupted)
}
}

View File

@ -8,11 +8,13 @@
import Foundation
public final class Signal<Value, ErrorType: Error>: SignalType, InternalSignalType, SpecialSignalGenerator {
public final class Signal<Value, Error: Swift.Error>: SignalType, InternalSignalType, SpecialSignalGenerator {
internal var observers = Bag<Observer<Value, ErrorType>>()
internal var observers = Bag<Observer<Value, Error>>()
public var signal: Signal<Value, ErrorType> {
private var handlerDisposable: Disposable?
public var signal: Signal<Value, Error> {
return self
}
@ -23,21 +25,15 @@ public final class Signal<Value, ErrorType: Error>: SignalType, InternalSignalTy
/// if a terminating event is sent to the observer. The Signal itself will
/// remain alive until the observer is released. This is because the observer
/// captures a self reference.
public init(_ generator: @escaping (Observer<Value, ErrorType>) -> Disposable?) {
public init(_ generator: @escaping (Observer<Value, Error>) -> Disposable?) {
let generatorDisposable = SerialDisposable()
let inputObserver = Observer<Value, ErrorType> { event in
self.observers.forEach { (observer) in
observer.action(event)
}
if event.isTerminating {
generatorDisposable.dispose()
}
}
let observer = Observer(with: CircuitBreaker(holding: self))
handlerDisposable = generator(observer)
generatorDisposable.innerDisposable = generator(inputObserver)
}
deinit {
handlerDisposable?.dispose()
}
/// Creates a Signal that will be controlled by sending events to the returned
@ -45,8 +41,8 @@ public final class Signal<Value, ErrorType: Error>: SignalType, InternalSignalTy
///
/// The Signal will remain alive until a terminating event is sent to the
/// observer.
public static func pipe() -> (Signal, Observer<Value, ErrorType>) {
var observer: Observer<Value, ErrorType>!
public static func pipe() -> (Signal, Observer<Value, Error>) {
var observer: Observer<Value, Error>!
let signal = self.init { innerObserver in
observer = innerObserver
return nil
@ -71,9 +67,9 @@ public protocol SpecialSignalGenerator {
/// The type of error that can occur on the signal. If errors aren't possible
/// then `NoError` can be used.
associatedtype ErrorType: Error
associatedtype Error: Swift.Error
init(_ generator: @escaping (Observer<Value, ErrorType>) -> Disposable?)
init(_ generator: @escaping (Observer<Value, Error>) -> Disposable?)
}
@ -91,7 +87,7 @@ public extension SpecialSignalGenerator {
/// Creates a Signal that will immediately fail with the
/// given error.
public init(error: ErrorType) {
public init(error: Error) {
self.init { observer in
observer.sendFailed(error)
return nil
@ -147,10 +143,10 @@ public protocol SignalType {
/// The type of error that can occur on the signal. If errors aren't possible
/// then `NoError` can be used.
associatedtype ErrorType: Error
associatedtype Error: Swift.Error
/// The exposed raw signal that underlies the ColdSignalType
var signal: Signal<Value, ErrorType> { get }
var signal: Signal<Value, Error> { get }
}
@ -158,18 +154,7 @@ public protocol SignalType {
/// of the signal.
internal protocol InternalSignalType: SignalType {
var observers: Bag<Observer<Value, ErrorType>> { get }
}
internal extension InternalSignalType {
/// Interrupts all observers and terminates the stream.
func interrupt() {
for observer in self.observers {
observer.sendInterrupted()
}
}
var observers: Bag<Observer<Value, Error>> { get }
}
@ -182,7 +167,7 @@ public extension SignalType {
/// Returns a Disposable which can be used to disconnect the observer. Disposing
/// of the Disposable will have no effect on the Signal itself.
@discardableResult
public func add(observer: Observer<Value, ErrorType>) -> Disposable? {
public func add(observer: Observer<Value, Error>) -> Disposable? {
let token = signal.observers.insert(value: observer)
return ActionDisposable {
self.signal.observers.removeValueForToken(token: token)
@ -193,7 +178,7 @@ public extension SignalType {
/// Convenience override for add(observer:) to allow trailing-closure style
/// invocations.
@discardableResult
public func on(action: @escaping Observer<Value, ErrorType>.Action) -> Disposable? {
public func on(action: @escaping Observer<Value, Error>.Action) -> Disposable? {
return self.add(observer: Observer(action))
}
@ -226,7 +211,7 @@ public extension SignalType {
/// callback. Disposing of the Disposable will have no effect on the Signal
/// itself.
@discardableResult
public func onFailed(error: @escaping (ErrorType) -> Void) -> Disposable? {
public func onFailed(error: @escaping (Error) -> Void) -> Disposable? {
return self.add(observer: Observer(failed: error))
}
@ -246,34 +231,34 @@ public extension SignalType {
public extension SignalType {
public var identity: Signal<Value, ErrorType> {
public var identity: Signal<Value, Error> {
return self.map { $0 }
}
/// Maps each value in the signal to a new value.
public func map<U>(_ transform: @escaping (Value) -> U) -> Signal<U, ErrorType> {
public func map<U>(_ transform: @escaping (Value) -> U) -> Signal<U, Error> {
return Signal { observer in
return self.on { event -> Void in
observer.sendEvent(event.map(transform))
observer.send(event.map(transform))
}
}
}
/// Maps errors in the signal to a new error.
public func mapError<F>(_ transform: @escaping (ErrorType) -> F) -> Signal<Value, F> {
public func mapError<F>(_ transform: @escaping (Error) -> F) -> Signal<Value, F> {
return Signal { observer in
return self.on { event -> Void in
observer.sendEvent(event.mapError(transform))
observer.send(event.mapError(transform))
}
}
}
/// Preserves only the values of the signal that pass the given predicate.
public func filter(_ predicate: @escaping (Value) -> Bool) -> Signal<Value, ErrorType> {
public func filter(_ predicate: @escaping (Value) -> Bool) -> Signal<Value, Error> {
return Signal { observer in
return self.on { (event: Event<Value, ErrorType>) -> Void in
return self.on { (event: Event<Value, Error>) -> Void in
guard let value = event.value else {
observer.sendEvent(event)
observer.send(event)
return
}
@ -286,13 +271,13 @@ public extension SignalType {
/// Splits the signal into two signals. The first signal in the tuple matches the
/// predicate, the second signal does not match the predicate
public func partition(_ predicate: @escaping (Value) -> Bool) -> (Signal<Value, ErrorType>, Signal<Value, ErrorType>) {
let (left, leftObserver) = Signal<Value, ErrorType>.pipe()
let (right, rightObserver) = Signal<Value, ErrorType>.pipe()
self.on { (event: Event<Value, ErrorType>) -> Void in
public func partition(_ predicate: @escaping (Value) -> Bool) -> (Signal<Value, Error>, Signal<Value, Error>) {
let (left, leftObserver) = Signal<Value, Error>.pipe()
let (right, rightObserver) = Signal<Value, Error>.pipe()
self.on { (event: Event<Value, Error>) -> Void in
guard let value = event.value else {
leftObserver.sendEvent(event)
rightObserver.sendEvent(event)
leftObserver.send(event)
rightObserver.send(event)
return
}
@ -306,11 +291,11 @@ public extension SignalType {
}
/// Aggregate values into a single combined value. Mirrors the Swift Collection
public func reduce<T>(initial: T, _ combine: @escaping (T, Value) -> T) -> Signal<T, ErrorType> {
public func reduce<T>(initial: T, _ combine: @escaping (T, Value) -> T) -> Signal<T, Error> {
return Signal { observer in
var accumulator = initial
return self.on { event in
observer.action(event.map { value in
observer.send(event.map { value in
accumulator = combine(accumulator, value)
return accumulator
})
@ -318,11 +303,11 @@ public extension SignalType {
}
}
public func flatMap<U>(_ transform: @escaping (Value) -> U?) -> Signal<U, ErrorType> {
public func flatMap<U>(_ transform: @escaping (Value) -> U?) -> Signal<U, Error> {
return Signal { observer in
return self.on { event -> Void in
if let e = event.flatMap(transform) {
observer.sendEvent(e)
observer.send(e)
}
}
}