Compare commits
32 Commits
Author | SHA1 | Date |
---|---|---|
![]() |
821074970d | |
![]() |
43238cc231 | |
![]() |
760076df26 | |
![]() |
f6b76d1559 | |
![]() |
2b701eb423 | |
![]() |
8112c77a33 | |
![]() |
555837e694 | |
![]() |
4a7ea1e82e | |
![]() |
cddbc315c0 | |
![]() |
44f955a8e5 | |
![]() |
40feea4ab1 | |
![]() |
61d3f12206 | |
![]() |
d64fcd469a | |
![]() |
7f5db3c422 | |
![]() |
f8183bde2b | |
![]() |
8ddfcd1f62 | |
![]() |
323b235770 | |
![]() |
2d13b1e90d | |
![]() |
57d43919e3 | |
![]() |
d1478078e9 | |
![]() |
c647d5f0ae | |
![]() |
a41bdafa8d | |
![]() |
fb1eb87d14 | |
![]() |
5196d4b55f | |
![]() |
1cc5552d91 | |
![]() |
1866fcbf7d | |
![]() |
cb9f2411c5 | |
![]() |
09e270d688 | |
![]() |
5ef886ed86 | |
![]() |
c70aabc491 | |
![]() |
ac1a13e978 | |
![]() |
46a7262ee0 |
|
@ -13,6 +13,11 @@
|
|||
# Linux trash folder which might appear on any partition or disk
|
||||
.Trash-*
|
||||
|
||||
## Other
|
||||
*.xcodeproj
|
||||
#*.xcodeproj/*.plist
|
||||
*.moved-aside
|
||||
*.xcuserstate
|
||||
|
||||
### OSX ###
|
||||
.DS_Store
|
||||
|
@ -66,8 +71,8 @@ xcuserdata/
|
|||
*.moved-aside
|
||||
*.xccheckout
|
||||
*.xcscmblueprint
|
||||
*.xcodeproj/*.plist
|
||||
*.xcodeproj
|
||||
#*.xcodeproj/*.plist
|
||||
#*.xcodeproj
|
||||
|
||||
|
||||
### Swift ###
|
||||
|
|
|
@ -0,0 +1,11 @@
|
|||
language: objective-c
|
||||
osx_image: xcode8
|
||||
|
||||
xcode_project: Reflex.xcodeproj
|
||||
xcode_scheme: Reflex
|
||||
xcode_sdk: iphonesimulator10.0
|
||||
|
||||
script:
|
||||
- set -o pipefail && xcodebuild -project Reflex.xcodeproj -scheme Reflex -sdk iphonesimulator ONLY_ACTIVE_ARCH=NO CODE_SIGNING_REQUIRED=NO -destination "platform=iOS Simulator,name=iPhone 6s" build | xcpretty
|
||||
- pod lib lint
|
||||
|
|
@ -0,0 +1,16 @@
|
|||
{
|
||||
"object": {
|
||||
"pins": [
|
||||
{
|
||||
"package": "PromiseKit",
|
||||
"repositoryURL": "https://github.com/mxcl/PromiseKit.git",
|
||||
"state": {
|
||||
"branch": null,
|
||||
"revision": "6bab5e0c7f93947d9c0a7df0937add7454657f2c",
|
||||
"version": "4.5.0"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"version": 1
|
||||
}
|
|
@ -1,7 +1,27 @@
|
|||
// swift-tools-version:4.0
|
||||
// The swift-tools-version declares the minimum version of Swift required to build this package.
|
||||
|
||||
import PackageDescription
|
||||
|
||||
let package = Package(
|
||||
name: "Reflex",
|
||||
name: "StreamKit",
|
||||
products: [
|
||||
.library(
|
||||
name: "StreamKit",
|
||||
targets: ["StreamKit"]
|
||||
),
|
||||
],
|
||||
dependencies: [
|
||||
.package(url: "https://github.com/mxcl/PromiseKit.git", from: "4.5.0"),
|
||||
],
|
||||
targets: [
|
||||
.target(
|
||||
name: "StreamKit",
|
||||
dependencies: ["PromiseKit"]
|
||||
),
|
||||
.testTarget(
|
||||
name: "StreamKitTests",
|
||||
dependencies: ["StreamKit"]
|
||||
),
|
||||
]
|
||||
)
|
||||
|
|
10
README.md
10
README.md
|
@ -1,9 +1,9 @@
|
|||
# Reflex
|
||||
# StreamKit
|
||||
|
||||
Reflex is a very small and compact Functional Reactive Programming library
|
||||
StreamKit is a very small and compact Functional Reactive Programming library
|
||||
which is used to implement the Edge event system.
|
||||
|
||||
It is inspired by ReactiveCocoa, but the API has been greatly reduced in scope
|
||||
and simplified. The Reflex API is designed so that it can also be used as a
|
||||
simple callback system, much like Node.js Events. No functional programming
|
||||
necessary.
|
||||
and simplified. The StreamKit API is designed so that it can also be used as a
|
||||
simple callback system, much like Node.js Events. You don't need to be an FRP
|
||||
wizard to use it.
|
||||
|
|
|
@ -1,126 +0,0 @@
|
|||
//
|
||||
// Disposable.swift
|
||||
// Edge
|
||||
//
|
||||
// Created by Tyler Fleming Cloutier on 5/29/16.
|
||||
//
|
||||
//
|
||||
|
||||
/// Represents something that can be “disposed,” usually associated with freeing
|
||||
/// resources or canceling work.
|
||||
public protocol Disposable: class {
|
||||
/// Whether this disposable has been disposed already.
|
||||
var disposed: Bool { get }
|
||||
|
||||
func dispose()
|
||||
}
|
||||
|
||||
/// A disposable that only flips `disposed` upon disposal, and performs no other
|
||||
/// work.
|
||||
public final class SimpleDisposable: Disposable {
|
||||
private var _disposed = false
|
||||
|
||||
public var disposed: Bool {
|
||||
return _disposed
|
||||
}
|
||||
|
||||
public init() {}
|
||||
|
||||
public func dispose() {
|
||||
_disposed = true
|
||||
}
|
||||
}
|
||||
|
||||
/// A disposable that will run an action upon disposal.
|
||||
public final class ActionDisposable: Disposable {
|
||||
var action: (() -> Void)?
|
||||
|
||||
public var disposed: Bool {
|
||||
return action == nil
|
||||
}
|
||||
|
||||
/// Initializes the disposable to run the given action upon disposal.
|
||||
public init(action: (() -> Void)?) {
|
||||
self.action = action
|
||||
}
|
||||
|
||||
public func dispose() {
|
||||
let oldAction = action
|
||||
action = nil
|
||||
oldAction?()
|
||||
}
|
||||
}
|
||||
|
||||
/// A disposable that, upon deinitialization, will automatically dispose of
|
||||
/// another disposable.
|
||||
public final class ScopedDisposable: Disposable {
|
||||
|
||||
/// The disposable which will be disposed when the ScopedDisposable
|
||||
/// deinitializes.
|
||||
public let innerDisposable: Disposable
|
||||
|
||||
public var disposed: Bool {
|
||||
return innerDisposable.disposed
|
||||
}
|
||||
|
||||
/// Initializes the receiver to dispose of the argument upon
|
||||
/// deinitialization.
|
||||
public init(_ disposable: Disposable) {
|
||||
innerDisposable = disposable
|
||||
}
|
||||
|
||||
deinit {
|
||||
dispose()
|
||||
}
|
||||
|
||||
public func dispose() {
|
||||
innerDisposable.dispose()
|
||||
}
|
||||
}
|
||||
|
||||
/// A disposable that will optionally dispose of another disposable.
|
||||
public final class SerialDisposable: Disposable {
|
||||
|
||||
private struct State {
|
||||
var innerDisposable: Disposable? = nil
|
||||
var disposed = false
|
||||
}
|
||||
|
||||
private var state = State()
|
||||
|
||||
public var disposed: Bool {
|
||||
return state.disposed
|
||||
}
|
||||
|
||||
/// The inner disposable to dispose of.
|
||||
///
|
||||
/// Whenever this property is set (even to the same value!), the previous
|
||||
/// disposable is automatically disposed.
|
||||
public var innerDisposable: Disposable? {
|
||||
get {
|
||||
return state.innerDisposable
|
||||
}
|
||||
|
||||
set(d) {
|
||||
let oldState = state
|
||||
state.innerDisposable = d
|
||||
|
||||
oldState.innerDisposable?.dispose()
|
||||
if oldState.disposed {
|
||||
d?.dispose()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Initializes the receiver to dispose of the argument when the
|
||||
/// SerialDisposable is disposed.
|
||||
public init(_ disposable: Disposable? = nil) {
|
||||
innerDisposable = disposable
|
||||
}
|
||||
|
||||
public func dispose() {
|
||||
let orig = state
|
||||
state = State(innerDisposable: nil, disposed: true)
|
||||
orig.innerDisposable?.dispose()
|
||||
}
|
||||
}
|
|
@ -1,72 +0,0 @@
|
|||
//
|
||||
// Observer.swift
|
||||
// Edge
|
||||
//
|
||||
// Created by Tyler Fleming Cloutier on 5/29/16.
|
||||
//
|
||||
//
|
||||
|
||||
import Foundation
|
||||
|
||||
/// An Observer is a simple wrapper around a function which can receive Events
|
||||
/// (typically from a Signal).
|
||||
public struct Observer<Value, ErrorType: Error> {
|
||||
|
||||
public typealias Action = (Event<Value, ErrorType>) -> Void
|
||||
|
||||
public let action: Action
|
||||
|
||||
public init(_ action: @escaping Action) {
|
||||
self.action = action
|
||||
}
|
||||
|
||||
/// Creates an Observer with an action which calls each of the provided
|
||||
/// callbacks
|
||||
public init(
|
||||
failed: ((ErrorType) -> Void)? = nil,
|
||||
completed: (() -> Void)? = nil,
|
||||
interrupted: (() -> Void)? = nil,
|
||||
next: ((Value) -> Void)? = nil)
|
||||
{
|
||||
self.init { event in
|
||||
switch event {
|
||||
case let .Next(value):
|
||||
next?(value)
|
||||
|
||||
case let .Failed(error):
|
||||
failed?(error)
|
||||
|
||||
case .Completed:
|
||||
completed?()
|
||||
|
||||
case .Interrupted:
|
||||
interrupted?()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public func sendEvent(_ event: Event<Value, ErrorType>) {
|
||||
action(event)
|
||||
}
|
||||
|
||||
/// Puts a `Next` event into the given observer.
|
||||
public func sendNext(_ value: Value) {
|
||||
action(.Next(value))
|
||||
}
|
||||
|
||||
/// Puts an `Failed` event into the given observer.
|
||||
public func sendFailed(_ error: ErrorType) {
|
||||
action(.Failed(error))
|
||||
}
|
||||
|
||||
/// Puts a `Completed` event into the given observer.
|
||||
public func sendCompleted() {
|
||||
action(.Completed)
|
||||
}
|
||||
|
||||
/// Puts a `Interrupted` event into the given observer.
|
||||
public func sendInterrupted() {
|
||||
action(.Interrupted)
|
||||
}
|
||||
}
|
|
@ -1,15 +1,16 @@
|
|||
//
|
||||
// Bag.swift
|
||||
// Edge
|
||||
//
|
||||
// Created by Tyler Fleming Cloutier on 5/29/16.
|
||||
// ReactiveSwift
|
||||
//
|
||||
// Created by Justin Spahr-Summers on 2014-07-10.
|
||||
// Copyright (c) 2014 GitHub. All rights reserved.
|
||||
//
|
||||
/// A uniquely identifying token for removing a value that was inserted into a
|
||||
/// Bag.
|
||||
|
||||
/// A token for the identification of an item in a Bag.
|
||||
public final class RemovalToken {
|
||||
fileprivate var identifier: UInt?
|
||||
|
||||
|
||||
fileprivate init(identifier: UInt) {
|
||||
self.identifier = identifier
|
||||
}
|
||||
|
@ -19,32 +20,38 @@ public final class RemovalToken {
|
|||
public struct Bag<Element> {
|
||||
fileprivate var elements: [BagElement<Element>] = []
|
||||
private var currentIdentifier: UInt = 0
|
||||
|
||||
|
||||
public init() {
|
||||
|
||||
}
|
||||
|
||||
/// Inserts the given value in the collection, and returns a token that can
|
||||
/// later be passed to removeValueForToken().
|
||||
public mutating func insert(value: Element) -> RemovalToken {
|
||||
let (nextIdentifier, overflow) = UInt.addWithOverflow(currentIdentifier, 1)
|
||||
|
||||
/// Insert the given value into `self`, and return a token that can
|
||||
/// later be passed to `removeValueForToken()`.
|
||||
///
|
||||
/// - parameters:
|
||||
/// - value: A value that will be inserted.
|
||||
@discardableResult
|
||||
public mutating func insert(_ value: Element) -> RemovalToken {
|
||||
let (nextIdentifier, overflow) = currentIdentifier.addingReportingOverflow(1)
|
||||
if overflow {
|
||||
reindex()
|
||||
}
|
||||
|
||||
|
||||
let token = RemovalToken(identifier: currentIdentifier)
|
||||
let element = BagElement(value: value, identifier: currentIdentifier, token: token)
|
||||
|
||||
|
||||
elements.append(element)
|
||||
currentIdentifier = nextIdentifier
|
||||
|
||||
|
||||
return token
|
||||
}
|
||||
|
||||
/// Removes a value, given the token returned from insert().
|
||||
|
||||
/// Remove a value, given the token returned from `insert()`.
|
||||
///
|
||||
/// If the value has already been removed, nothing happens.
|
||||
public mutating func removeValueForToken(token: RemovalToken) {
|
||||
/// - note: If the value has already been removed, nothing happens.
|
||||
///
|
||||
/// - parameters:
|
||||
/// - token: A token returned from a call to `insert()`.
|
||||
public mutating func remove(using token: RemovalToken) {
|
||||
if let identifier = token.identifier {
|
||||
// Removal is more likely for recent objects than old ones.
|
||||
for i in elements.indices.reversed() {
|
||||
|
@ -56,14 +63,14 @@ public struct Bag<Element> {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// In the event of an identifier overflow (highly, highly unlikely), this
|
||||
/// will reset all current identifiers to reclaim a contiguous set of
|
||||
/// available identifiers for the future.
|
||||
|
||||
/// In the event of an identifier overflow (highly, highly unlikely), reset
|
||||
/// all current identifiers to reclaim a contiguous set of available
|
||||
/// identifiers for the future.
|
||||
private mutating func reindex() {
|
||||
for i in elements.indices {
|
||||
currentIdentifier = UInt(i)
|
||||
|
||||
|
||||
elements[i].identifier = currentIdentifier
|
||||
elements[i].token.identifier = currentIdentifier
|
||||
}
|
||||
|
@ -72,24 +79,19 @@ public struct Bag<Element> {
|
|||
|
||||
extension Bag: Collection {
|
||||
public typealias Index = Array<Element>.Index
|
||||
public typealias SubSequence = Array<Element>.SubSequence
|
||||
|
||||
|
||||
public var startIndex: Index {
|
||||
return elements.startIndex
|
||||
}
|
||||
|
||||
|
||||
public var endIndex: Index {
|
||||
return elements.endIndex
|
||||
}
|
||||
|
||||
public subscript(position: Index) -> Element {
|
||||
return elements[position].value
|
||||
public subscript(index: Index) -> Element {
|
||||
return elements[index].value
|
||||
}
|
||||
|
||||
public subscript(bounds: Range<Index>) -> SubSequence {
|
||||
return elements[bounds].map{ $0.value }[0..<elements.count]
|
||||
}
|
||||
|
||||
public func index(after i: Index) -> Index {
|
||||
return i + 1
|
||||
}
|
|
@ -0,0 +1,63 @@
|
|||
//
|
||||
// Disposable.swift
|
||||
// Edge
|
||||
//
|
||||
// Created by Tyler Fleming Cloutier on 5/29/16.
|
||||
//
|
||||
//
|
||||
|
||||
/// Represents something that can be “disposed,” usually associated with freeing
|
||||
/// resources or canceling work.
|
||||
public protocol Disposable {
|
||||
/// Whether this disposable has been disposed already.
|
||||
var disposed: Bool { get }
|
||||
|
||||
func dispose()
|
||||
}
|
||||
|
||||
/// A disposable that will run an action upon disposal.
|
||||
public final class ActionDisposable: Disposable {
|
||||
var action: (() -> Void)?
|
||||
|
||||
public var disposed: Bool {
|
||||
return action == nil
|
||||
}
|
||||
|
||||
/// Initializes the disposable to run the given action upon disposal.
|
||||
public init(action: (() -> Void)?) {
|
||||
self.action = action
|
||||
}
|
||||
|
||||
public func dispose() {
|
||||
let oldAction = action
|
||||
action = nil
|
||||
oldAction?()
|
||||
}
|
||||
}
|
||||
|
||||
/// A disposable that, upon deinitialization, will automatically dispose of
|
||||
/// another disposable.
|
||||
public final class ScopedDisposable: Disposable {
|
||||
|
||||
/// The disposable which will be disposed when the ScopedDisposable
|
||||
/// deinitializes.
|
||||
public let innerDisposable: Disposable
|
||||
|
||||
public var disposed: Bool {
|
||||
return innerDisposable.disposed
|
||||
}
|
||||
|
||||
/// Initializes the receiver to dispose of the argument upon
|
||||
/// deinitialization.
|
||||
public init(_ disposable: Disposable) {
|
||||
innerDisposable = disposable
|
||||
}
|
||||
|
||||
deinit {
|
||||
dispose()
|
||||
}
|
||||
|
||||
public func dispose() {
|
||||
innerDisposable.dispose()
|
||||
}
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
//
|
||||
// Error.swift
|
||||
// Reflex
|
||||
//
|
||||
// Created by Tyler Fleming Cloutier on 11/15/16.
|
||||
//
|
||||
//
|
||||
|
||||
import Foundation
|
||||
|
||||
public enum NoError: Swift.Error { }
|
|
@ -11,93 +11,93 @@ import Foundation
|
|||
/// Represents a signal event.
|
||||
///
|
||||
/// Signals must conform to the grammar:
|
||||
/// `Next* (Failed | Completed | Interrupted)?`
|
||||
public enum Event<Value, ErrorType: Error> {
|
||||
/// `next* (failed | completed | interrupted)?`
|
||||
public enum Event<Value> {
|
||||
|
||||
/// A value provided by the signal.
|
||||
case Next(Value)
|
||||
case next(Value)
|
||||
|
||||
/// The signal terminated because of an error. No further events will be
|
||||
/// received.
|
||||
case Failed(ErrorType)
|
||||
case failed(Error)
|
||||
|
||||
/// The signal successfully terminated. No further events will be received.
|
||||
case Completed
|
||||
case completed
|
||||
|
||||
/// Event production on the signal has been interrupted. No further events
|
||||
/// will be received.
|
||||
case Interrupted
|
||||
case interrupted
|
||||
|
||||
|
||||
/// Whether this event indicates signal termination (i.e., that no further
|
||||
/// events will be received).
|
||||
public var isTerminating: Bool {
|
||||
switch self {
|
||||
case .Next:
|
||||
case .next:
|
||||
return false
|
||||
|
||||
case .Failed, .Completed, .Interrupted:
|
||||
case .failed, .completed, .interrupted:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
/// Lifts the given function over the event's value.
|
||||
public func map<U>(_ f: (Value) -> U) -> Event<U, ErrorType> {
|
||||
public func map<U>(_ f: (Value) -> U) -> Event<U> {
|
||||
switch self {
|
||||
case let .Next(value):
|
||||
return .Next(f(value))
|
||||
case let .next(value):
|
||||
return .next(f(value))
|
||||
|
||||
case let .Failed(error):
|
||||
return .Failed(error)
|
||||
case let .failed(error):
|
||||
return .failed(error)
|
||||
|
||||
case .Completed:
|
||||
return .Completed
|
||||
case .completed:
|
||||
return .completed
|
||||
|
||||
case .Interrupted:
|
||||
return .Interrupted
|
||||
case .interrupted:
|
||||
return .interrupted
|
||||
}
|
||||
}
|
||||
|
||||
/// Lifts the given function over the event's value.
|
||||
public func flatMap<U>(_ f: (Value) -> U?) -> Event<U, ErrorType>? {
|
||||
public func flatMap<U>(_ f: (Value) -> U?) -> Event<U>? {
|
||||
switch self {
|
||||
case let .Next(value):
|
||||
case let .next(value):
|
||||
if let nextValue = f(value) {
|
||||
return .Next(nextValue)
|
||||
return .next(nextValue)
|
||||
}
|
||||
return nil
|
||||
|
||||
case let .Failed(error):
|
||||
return .Failed(error)
|
||||
case let .failed(error):
|
||||
return .failed(error)
|
||||
|
||||
case .Completed:
|
||||
return .Completed
|
||||
case .completed:
|
||||
return .completed
|
||||
|
||||
case .Interrupted:
|
||||
return .Interrupted
|
||||
case .interrupted:
|
||||
return .interrupted
|
||||
}
|
||||
}
|
||||
|
||||
/// Lifts the given function over the event's error.
|
||||
public func mapError<F>(_ f: (ErrorType) -> F) -> Event<Value, F> {
|
||||
public func mapError<F: Error>(_ f: (Error) -> F) -> Event<Value> {
|
||||
switch self {
|
||||
case let .Next(value):
|
||||
return .Next(value)
|
||||
case let .next(value):
|
||||
return .next(value)
|
||||
|
||||
case let .Failed(error):
|
||||
return .Failed(f(error))
|
||||
case let .failed(error):
|
||||
return .failed(f(error))
|
||||
|
||||
case .Completed:
|
||||
return .Completed
|
||||
case .completed:
|
||||
return .completed
|
||||
|
||||
case .Interrupted:
|
||||
return .Interrupted
|
||||
case .interrupted:
|
||||
return .interrupted
|
||||
}
|
||||
}
|
||||
|
||||
/// Unwraps the contained `Next` value.
|
||||
public var value: Value? {
|
||||
if case let .Next(value) = self {
|
||||
if case let .next(value) = self {
|
||||
return value
|
||||
} else {
|
||||
return nil
|
||||
|
@ -106,7 +106,7 @@ public enum Event<Value, ErrorType: Error> {
|
|||
|
||||
/// Unwraps the contained `Error` value.
|
||||
public var error: Error? {
|
||||
if case let .Failed(error) = self {
|
||||
if case let .failed(error) = self {
|
||||
return error
|
||||
} else {
|
||||
return nil
|
||||
|
@ -114,18 +114,18 @@ public enum Event<Value, ErrorType: Error> {
|
|||
}
|
||||
}
|
||||
|
||||
public func == <Value: Equatable, ErrorType: Equatable> (lhs: Event<Value, ErrorType>, rhs: Event<Value, ErrorType>) -> Bool {
|
||||
public func == <Value: Equatable> (lhs: Event<Value>, rhs: Event<Value>) -> Bool {
|
||||
switch (lhs, rhs) {
|
||||
case let (.Next(left), .Next(right)):
|
||||
case let (.next(left), .next(right)):
|
||||
return left == right
|
||||
|
||||
case let (.Failed(left), .Failed(right)):
|
||||
return left == right
|
||||
case let (.failed(left), .failed(right)):
|
||||
return left.localizedDescription == right.localizedDescription
|
||||
|
||||
case (.Completed, .Completed):
|
||||
case (.completed, .completed):
|
||||
return true
|
||||
|
||||
case (.Interrupted, .Interrupted):
|
||||
case (.interrupted, .interrupted):
|
||||
return true
|
||||
|
||||
default:
|
|
@ -0,0 +1,138 @@
|
|||
//
|
||||
// Observer.swift
|
||||
// Edge
|
||||
//
|
||||
// Created by Tyler Fleming Cloutier on 5/29/16.
|
||||
//
|
||||
//
|
||||
|
||||
import Foundation
|
||||
|
||||
|
||||
/// A CiruitBreaker optionally holds a strong reference to either a
|
||||
/// `Signal` or a `Source` until a terminating event is
|
||||
/// received. At such time, it delivers the event and then
|
||||
/// removes its reference. In so doing, it "breaks the circuit"
|
||||
/// between the signal, the handler, and the input observer.
|
||||
/// This allows the downstream signal to be released.
|
||||
class CircuitBreaker<Value> {
|
||||
|
||||
private var signal: Signal<Value>? = nil
|
||||
private var source: Source<Value>? = nil
|
||||
fileprivate var action: Observer<Value>.Action! = nil
|
||||
|
||||
// This variable is used to maintain memory access exclusivity when
|
||||
// the signal or source is released causing that signal to send an
|
||||
// interrupt to its observer.
|
||||
private var hasTerminated = false
|
||||
|
||||
/// Holds a strong reference to a `Signal` until a
|
||||
/// terminating event is received.
|
||||
init(holding signal: Signal<Value>?) {
|
||||
self.signal = signal
|
||||
self.action = { [weak self] event in
|
||||
guard let weakSelf = self else { return }
|
||||
guard !weakSelf.hasTerminated else { return }
|
||||
weakSelf.signal?.observers.forEach { observer in
|
||||
observer.send(event)
|
||||
}
|
||||
|
||||
if event.isTerminating {
|
||||
// Do not have to dispose of cancel disposable
|
||||
// since it will be disposed when the signal is deallocated.
|
||||
weakSelf.hasTerminated = true
|
||||
weakSelf.signal = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Holds a strong reference to a `Source` until a
|
||||
/// terminating event is received.
|
||||
init(holding source: Source<Value>?) {
|
||||
self.source = source
|
||||
self.action = { [weak self] event in
|
||||
guard let weakSelf = self else { return }
|
||||
guard !weakSelf.hasTerminated else { return }
|
||||
self?.source?.observers.forEach { observer in
|
||||
observer.send(event)
|
||||
}
|
||||
|
||||
if event.isTerminating {
|
||||
// Do not have to dispose of cancel disposable
|
||||
// since it will be disposed when the signal is deallocated.
|
||||
weakSelf.hasTerminated = true
|
||||
self?.source = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fileprivate init(with action: @escaping (Event<Value>) -> Void) {
|
||||
self.action = action
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
public struct Observer<Value> {
|
||||
|
||||
public typealias Action = (Event<Value>) -> Void
|
||||
let breaker: CircuitBreaker<Value>
|
||||
|
||||
init(with breaker: CircuitBreaker<Value>) {
|
||||
self.breaker = breaker
|
||||
}
|
||||
|
||||
public init(_ action: @escaping Action) {
|
||||
self.breaker = CircuitBreaker(with: action)
|
||||
}
|
||||
|
||||
/// Creates an Observer with an action which calls each of the provided
|
||||
/// callbacks
|
||||
public init(
|
||||
failed: ((Error) -> Void)? = nil,
|
||||
completed: (() -> Void)? = nil,
|
||||
interrupted: (() -> Void)? = nil,
|
||||
next: ((Value) -> Void)? = nil)
|
||||
{
|
||||
self.init { event in
|
||||
switch event {
|
||||
case let .next(value):
|
||||
next?(value)
|
||||
|
||||
case let .failed(error):
|
||||
failed?(error)
|
||||
|
||||
case .completed:
|
||||
completed?()
|
||||
|
||||
case .interrupted:
|
||||
interrupted?()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Puts any `Event` into the the given observer.
|
||||
public func send(_ event: Event<Value>) {
|
||||
breaker.action(event)
|
||||
}
|
||||
|
||||
/// Puts a `Next` event into the given observer.
|
||||
public func sendNext(_ value: Value) {
|
||||
send(.next(value))
|
||||
}
|
||||
|
||||
/// Puts an `Failed` event into the given observer.
|
||||
public func sendFailed(_ error: Error) {
|
||||
send(.failed(error))
|
||||
}
|
||||
|
||||
/// Puts a `Completed` event into the given observer.
|
||||
public func sendCompleted() {
|
||||
send(.completed)
|
||||
}
|
||||
|
||||
/// Puts a `Interrupted` event into the given observer.
|
||||
public func sendInterrupted() {
|
||||
send(.interrupted)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
//
|
||||
// Promise.swift
|
||||
// StreamKit
|
||||
//
|
||||
// Created by Tyler Fleming Cloutier on 11/20/17.
|
||||
//
|
||||
|
||||
import Foundation
|
||||
import PromiseKit
|
||||
|
||||
extension Promise {
|
||||
|
||||
public func asSignal() -> Signal<T> {
|
||||
// TODO: If the promise is already resolved the value will
|
||||
// be sent to the signals before there are any observers added.
|
||||
return Signal { observer in
|
||||
self.then { value -> () in
|
||||
observer.sendNext(value)
|
||||
observer.sendCompleted()
|
||||
}.catch { error in
|
||||
observer.sendFailed(error)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
extension SignalType {
|
||||
|
||||
public func asPromise() -> Promise<[Value]> {
|
||||
var values: [Value] = []
|
||||
return Promise { resolve, reject in
|
||||
self.onNext {
|
||||
values.append($0)
|
||||
}
|
||||
self.onCompleted {
|
||||
resolve(values)
|
||||
}
|
||||
self.onFailed {
|
||||
reject($0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -8,11 +8,15 @@
|
|||
|
||||
import Foundation
|
||||
|
||||
public final class Signal<Value, ErrorType: Error>: SignalType, InternalSignalType, SpecialSignalGenerator {
|
||||
public final class Signal<Value>: SignalType, InternalSignalType, SpecialSignalGenerator {
|
||||
|
||||
internal var observers = Bag<Observer<Value, ErrorType>>()
|
||||
internal var observers = Bag<Observer<Value>>()
|
||||
|
||||
public var signal: Signal<Value, ErrorType> {
|
||||
private var handlerDisposable: Disposable?
|
||||
|
||||
public var cancelDisposable: Disposable?
|
||||
|
||||
public var signal: Signal<Value> {
|
||||
return self
|
||||
}
|
||||
|
||||
|
@ -23,21 +27,21 @@ public final class Signal<Value, ErrorType: Error>: SignalType, InternalSignalTy
|
|||
/// if a terminating event is sent to the observer. The Signal itself will
|
||||
/// remain alive until the observer is released. This is because the observer
|
||||
/// captures a self reference.
|
||||
public init(_ generator: @escaping (Observer<Value, ErrorType>) -> Disposable?) {
|
||||
public init(_ startHandler: @escaping (Observer<Value>) -> Disposable?) {
|
||||
|
||||
let generatorDisposable = SerialDisposable()
|
||||
let observer = Observer(with: CircuitBreaker(holding: self))
|
||||
let handlerDisposable = startHandler(observer)
|
||||
|
||||
let inputObserver = Observer<Value, ErrorType> { event in
|
||||
self.observers.forEach { (observer) in
|
||||
observer.action(event)
|
||||
}
|
||||
|
||||
if event.isTerminating {
|
||||
generatorDisposable.dispose()
|
||||
}
|
||||
// The cancel disposable should send interrupted and then dispose of the
|
||||
// disposable produced by the startHandler.
|
||||
cancelDisposable = ActionDisposable {
|
||||
observer.sendInterrupted()
|
||||
handlerDisposable?.dispose()
|
||||
}
|
||||
|
||||
generatorDisposable.innerDisposable = generator(inputObserver)
|
||||
}
|
||||
|
||||
deinit {
|
||||
cancelDisposable?.dispose()
|
||||
}
|
||||
|
||||
/// Creates a Signal that will be controlled by sending events to the returned
|
||||
|
@ -45,8 +49,8 @@ public final class Signal<Value, ErrorType: Error>: SignalType, InternalSignalTy
|
|||
///
|
||||
/// The Signal will remain alive until a terminating event is sent to the
|
||||
/// observer.
|
||||
public static func pipe() -> (Signal, Observer<Value, ErrorType>) {
|
||||
var observer: Observer<Value, ErrorType>!
|
||||
public static func pipe() -> (Signal, Observer<Value>) {
|
||||
var observer: Observer<Value>!
|
||||
let signal = self.init { innerObserver in
|
||||
observer = innerObserver
|
||||
return nil
|
||||
|
@ -68,12 +72,8 @@ public protocol SpecialSignalGenerator {
|
|||
|
||||
/// The type of values being sent on the signal.
|
||||
associatedtype Value
|
||||
|
||||
/// The type of error that can occur on the signal. If errors aren't possible
|
||||
/// then `NoError` can be used.
|
||||
associatedtype ErrorType: Error
|
||||
|
||||
init(_ generator: @escaping (Observer<Value, ErrorType>) -> Disposable?)
|
||||
|
||||
init(_ generator: @escaping (Observer<Value>) -> Disposable?)
|
||||
|
||||
}
|
||||
|
||||
|
@ -91,7 +91,7 @@ public extension SpecialSignalGenerator {
|
|||
|
||||
/// Creates a Signal that will immediately fail with the
|
||||
/// given error.
|
||||
public init(error: ErrorType) {
|
||||
public init(error: Error) {
|
||||
self.init { observer in
|
||||
observer.sendFailed(error)
|
||||
return nil
|
||||
|
@ -140,52 +140,43 @@ public extension SpecialSignalGenerator {
|
|||
|
||||
}
|
||||
|
||||
/// An internal protocol for adding methods that require access to the observers
|
||||
/// of the signal.
|
||||
internal protocol InternalSignalType: SignalType {
|
||||
|
||||
var observers: Bag<Observer<Value>> { get }
|
||||
|
||||
}
|
||||
|
||||
/// Note that this type is not parameterized by an Error type which is in line
|
||||
/// with the Swift error handling model.
|
||||
/// A good reference for a discussion of the pros and cons is here:
|
||||
/// https://github.com/ReactiveX/RxSwift/issues/650
|
||||
public protocol SignalType {
|
||||
|
||||
/// The type of values being sent on the signal.
|
||||
associatedtype Value
|
||||
|
||||
/// The type of error that can occur on the signal. If errors aren't possible
|
||||
/// then `NoError` can be used.
|
||||
associatedtype ErrorType: Error
|
||||
|
||||
/// The exposed raw signal that underlies the ColdSignalType
|
||||
var signal: Signal<Value, ErrorType> { get }
|
||||
/// The exposed raw signal that underlies the `SignalType`.
|
||||
var signal: Signal<Value> { get }
|
||||
|
||||
}
|
||||
var cancelDisposable: Disposable? { get }
|
||||
|
||||
/// An internal protocol for adding methods that require access to the observers
|
||||
/// of the signal.
|
||||
internal protocol InternalSignalType: SignalType {
|
||||
|
||||
var observers: Bag<Observer<Value, ErrorType>> { get }
|
||||
|
||||
}
|
||||
|
||||
internal extension InternalSignalType {
|
||||
|
||||
/// Interrupts all observers and terminates the stream.
|
||||
func interrupt() {
|
||||
for observer in self.observers {
|
||||
observer.sendInterrupted()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public extension SignalType {
|
||||
|
||||
/// Adds an observer to the Signal which observes any future events from the Signal.
|
||||
/// If the Signal has already terminated, the observer will immediately receive an
|
||||
/// Adds an observer to the `Signal` which observes any future events from the `Signal`.
|
||||
/// If the `Signal` has already terminated, the observer will immediately receive an
|
||||
/// `Interrupted` event.
|
||||
///
|
||||
/// Returns a Disposable which can be used to disconnect the observer. Disposing
|
||||
/// of the Disposable will have no effect on the Signal itself.
|
||||
/// of the Disposable will have no effect on the `Signal` itself.
|
||||
@discardableResult
|
||||
public func add(observer: Observer<Value, ErrorType>) -> Disposable? {
|
||||
let token = signal.observers.insert(value: observer)
|
||||
public func add(observer: Observer<Value>) -> Disposable? {
|
||||
let token = signal.observers.insert(observer)
|
||||
return ActionDisposable {
|
||||
self.signal.observers.removeValueForToken(token: token)
|
||||
self.signal.observers.remove(using: token)
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -193,7 +184,7 @@ public extension SignalType {
|
|||
/// Convenience override for add(observer:) to allow trailing-closure style
|
||||
/// invocations.
|
||||
@discardableResult
|
||||
public func on(action: @escaping Observer<Value, ErrorType>.Action) -> Disposable? {
|
||||
public func on(action: @escaping Observer<Value>.Action) -> Disposable? {
|
||||
return self.add(observer: Observer(action))
|
||||
}
|
||||
|
||||
|
@ -226,7 +217,7 @@ public extension SignalType {
|
|||
/// callback. Disposing of the Disposable will have no effect on the Signal
|
||||
/// itself.
|
||||
@discardableResult
|
||||
public func onFailed(error: @escaping (ErrorType) -> Void) -> Disposable? {
|
||||
public func onFailed(error: @escaping (Error) -> Void) -> Disposable? {
|
||||
return self.add(observer: Observer(failed: error))
|
||||
}
|
||||
|
||||
|
@ -246,34 +237,34 @@ public extension SignalType {
|
|||
|
||||
public extension SignalType {
|
||||
|
||||
public var identity: Signal<Value, ErrorType> {
|
||||
public var identity: Signal<Value> {
|
||||
return self.map { $0 }
|
||||
}
|
||||
|
||||
/// Maps each value in the signal to a new value.
|
||||
public func map<U>(_ transform: @escaping (Value) -> U) -> Signal<U, ErrorType> {
|
||||
public func map<U>(_ transform: @escaping (Value) -> U) -> Signal<U> {
|
||||
return Signal { observer in
|
||||
return self.on { event -> Void in
|
||||
observer.sendEvent(event.map(transform))
|
||||
observer.send(event.map(transform))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Maps errors in the signal to a new error.
|
||||
public func mapError<F>(_ transform: @escaping (ErrorType) -> F) -> Signal<Value, F> {
|
||||
public func mapError<F: Error>(_ transform: @escaping (Error) -> F) -> Signal<Value> {
|
||||
return Signal { observer in
|
||||
return self.on { event -> Void in
|
||||
observer.sendEvent(event.mapError(transform))
|
||||
observer.send(event.mapError(transform))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Preserves only the values of the signal that pass the given predicate.
|
||||
public func filter(_ predicate: @escaping (Value) -> Bool) -> Signal<Value, ErrorType> {
|
||||
public func filter(_ predicate: @escaping (Value) -> Bool) -> Signal<Value> {
|
||||
return Signal { observer in
|
||||
return self.on { (event: Event<Value, ErrorType>) -> Void in
|
||||
return self.on { (event: Event<Value>) -> Void in
|
||||
guard let value = event.value else {
|
||||
observer.sendEvent(event)
|
||||
observer.send(event)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -286,40 +277,31 @@ public extension SignalType {
|
|||
|
||||
/// Splits the signal into two signals. The first signal in the tuple matches the
|
||||
/// predicate, the second signal does not match the predicate
|
||||
public func partition(_ predicate: @escaping (Value) -> Bool) -> (Signal<Value, ErrorType>, Signal<Value, ErrorType>) {
|
||||
let left = Signal<Value, ErrorType> { observer in
|
||||
return self.on { (event: Event<Value, ErrorType>) -> Void in
|
||||
guard let value = event.value else {
|
||||
observer.sendEvent(event)
|
||||
return
|
||||
}
|
||||
|
||||
if predicate(value) {
|
||||
observer.sendNext(value)
|
||||
}
|
||||
public func partition(_ predicate: @escaping (Value) -> Bool) -> (Signal<Value>, Signal<Value>) {
|
||||
let (left, leftObserver) = Signal<Value>.pipe()
|
||||
let (right, rightObserver) = Signal<Value>.pipe()
|
||||
self.on { (event: Event<Value>) -> Void in
|
||||
guard let value = event.value else {
|
||||
leftObserver.send(event)
|
||||
rightObserver.send(event)
|
||||
return
|
||||
}
|
||||
}
|
||||
let right = Signal<Value, ErrorType> { observer in
|
||||
return self.on { (event: Event<Value, ErrorType>) -> Void in
|
||||
guard let value = event.value else {
|
||||
observer.sendEvent(event)
|
||||
return
|
||||
}
|
||||
|
||||
if !predicate(value) {
|
||||
observer.sendNext(value)
|
||||
}
|
||||
|
||||
if predicate(value) {
|
||||
leftObserver.sendNext(value)
|
||||
} else {
|
||||
rightObserver.sendNext(value)
|
||||
}
|
||||
}
|
||||
return (left, right)
|
||||
}
|
||||
|
||||
/// Aggregate values into a single combined value. Mirrors the Swift Collection
|
||||
public func reduce<T>(initial: T, _ combine: @escaping (T, Value) -> T) -> Signal<T, ErrorType> {
|
||||
public func reduce<T>(initial: T, _ combine: @escaping (T, Value) -> T) -> Signal<T> {
|
||||
return Signal { observer in
|
||||
var accumulator = initial
|
||||
return self.on { event in
|
||||
observer.action(event.map { value in
|
||||
observer.send(event.map { value in
|
||||
accumulator = combine(accumulator, value)
|
||||
return accumulator
|
||||
})
|
||||
|
@ -327,14 +309,87 @@ public extension SignalType {
|
|||
}
|
||||
}
|
||||
|
||||
public func flatMap<U>(_ transform: @escaping (Value) -> U?) -> Signal<U, ErrorType> {
|
||||
public func flatMap<U>(_ transform: @escaping (Value) -> U?) -> Signal<U> {
|
||||
return Signal { observer in
|
||||
return self.on { event -> Void in
|
||||
if let e = event.flatMap(transform) {
|
||||
observer.sendEvent(e)
|
||||
observer.send(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public func flatMap<U>(_ transform: @escaping (Value) -> Signal<U>) -> Signal<U> {
|
||||
return map(transform).joined()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
extension SignalType where Value: SignalType {
|
||||
|
||||
/// Listens to every `Source` produced from the current `Signal`
|
||||
/// Starts each `Source` and forwards on all values and errors onto
|
||||
/// the `Signal` which is returned. In this way it joins each of the
|
||||
/// `Source`s into a single `Signal`.
|
||||
///
|
||||
/// The joined `Signal` completes when the current `Signal` and all of
|
||||
/// its produced `Source`s complete.
|
||||
///
|
||||
/// Note: This means that each `Source` will be started as it is received.
|
||||
public func joined() -> Signal<Value.Value> {
|
||||
// Start the number in flight at 1 for `self`
|
||||
|
||||
return Signal { observer in
|
||||
|
||||
var numberInFlight = 1
|
||||
var disposables = [Disposable]()
|
||||
func decrementInFlight() {
|
||||
numberInFlight -= 1
|
||||
if numberInFlight == 0 {
|
||||
observer.sendCompleted()
|
||||
}
|
||||
}
|
||||
|
||||
func incrementInFlight() {
|
||||
numberInFlight += 1
|
||||
}
|
||||
|
||||
self.on { event in
|
||||
|
||||
switch event {
|
||||
case .next(let source):
|
||||
incrementInFlight()
|
||||
source.on { event in
|
||||
switch event {
|
||||
case .completed, .interrupted:
|
||||
decrementInFlight()
|
||||
|
||||
case .next, .failed:
|
||||
observer.send(event)
|
||||
}
|
||||
}
|
||||
source.cancelDisposable.map { disposables.append($0) }
|
||||
(source as? Source<Value.Value>)?.start()
|
||||
|
||||
case .failed(let error):
|
||||
observer.sendFailed(error)
|
||||
|
||||
case .completed:
|
||||
decrementInFlight()
|
||||
|
||||
case .interrupted:
|
||||
observer.sendInterrupted()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return ActionDisposable {
|
||||
for disposable in disposables {
|
||||
disposable.dispose()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -8,38 +8,40 @@
|
|||
|
||||
import Foundation
|
||||
|
||||
public final class ColdSignal<T, E: Error>: ColdSignalType, InternalSignalType, SpecialSignalGenerator {
|
||||
public final class Source<V>: SourceType, InternalSignalType, SpecialSignalGenerator {
|
||||
|
||||
public typealias Value = T
|
||||
public typealias ErrorType = E
|
||||
|
||||
internal var observers = Bag<Observer<Value, ErrorType>>()
|
||||
|
||||
public var coldSignal: ColdSignal {
|
||||
public typealias Value = V
|
||||
|
||||
internal var observers = Bag<Observer<Value>>()
|
||||
|
||||
public var source: Source {
|
||||
return self
|
||||
}
|
||||
|
||||
internal let startHandler: (Observer<Value, ErrorType>) -> Disposable?
|
||||
internal let startHandler: (Observer<Value>) -> Disposable?
|
||||
|
||||
private var cancelDisposable: Disposable?
|
||||
public var cancelDisposable: Disposable?
|
||||
|
||||
private var handlerDisposable: Disposable?
|
||||
private var started: Bool {
|
||||
if let disposable = cancelDisposable {
|
||||
return !disposable.disposed
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
private var started = false
|
||||
|
||||
/// Initializes a ColdSignal that will invoke the given closure at the
|
||||
/// Initializes a Source that will invoke the given closure at the
|
||||
/// invocation of `start()`.
|
||||
///
|
||||
/// The events that the closure puts into the given observer will become
|
||||
/// the events sent to this ColdSignal.
|
||||
/// the events sent to this `Source`.
|
||||
///
|
||||
/// In order to stop or dispose of the signal, invoke `stop()`. Calling this method
|
||||
/// will dispose of the disposable returned by the given closure.
|
||||
///
|
||||
/// Invoking `start()` will have no effect until the signal is stopped. After
|
||||
/// `stop()` is called this process may be repeated.
|
||||
public init(_ generator: @escaping (Observer<Value, ErrorType>) -> Disposable?) {
|
||||
self.startHandler = generator
|
||||
public init(_ startHandler: @escaping (Observer<Value>) -> Disposable?) {
|
||||
self.startHandler = startHandler
|
||||
}
|
||||
|
||||
/// Creates a Signal from the producer, then attaches the given observer to
|
||||
|
@ -47,128 +49,116 @@ public final class ColdSignal<T, E: Error>: ColdSignalType, InternalSignalType,
|
|||
///
|
||||
/// Returns a Disposable which can be used to interrupt the work associated
|
||||
/// with the signal and immediately send an `Interrupted` event.
|
||||
|
||||
@discardableResult
|
||||
public func start() {
|
||||
if !started {
|
||||
started = true
|
||||
|
||||
let observer = Observer<Value, ErrorType> { event in
|
||||
// Pass event downstream
|
||||
self.observers.forEach { (observer) in
|
||||
observer.action(event)
|
||||
}
|
||||
|
||||
// If event is terminating dispose of the handlerDisposable.
|
||||
if event.isTerminating {
|
||||
self.handlerDisposable?.dispose()
|
||||
}
|
||||
}
|
||||
|
||||
handlerDisposable = startHandler(observer)
|
||||
let observer = Observer(with: CircuitBreaker(holding: self))
|
||||
let handlerDisposable = startHandler(observer)
|
||||
|
||||
// The cancel disposable should send interrupted and then dispose of the
|
||||
// disposable produced by the startHandler.
|
||||
cancelDisposable = ActionDisposable { [weak self] in
|
||||
cancelDisposable = ActionDisposable {
|
||||
observer.sendInterrupted()
|
||||
self?.handlerDisposable?.dispose()
|
||||
handlerDisposable?.dispose()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public func stop() {
|
||||
cancelDisposable?.dispose()
|
||||
started = false
|
||||
}
|
||||
|
||||
|
||||
deinit {
|
||||
self.stop()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
extension ColdSignal: CustomDebugStringConvertible {
|
||||
extension Source: CustomDebugStringConvertible {
|
||||
|
||||
public var debugDescription: String {
|
||||
let obs = Array(self.observers.map { String(describing: $0) })
|
||||
return "ColdSignal[\(obs.joined(separator: ", "))]"
|
||||
return "Source[\(obs.joined(separator: ", "))]"
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public protocol ColdSignalType: SignalType {
|
||||
public protocol SourceType: SignalType {
|
||||
|
||||
/// The exposed raw signal that underlies the ColdSignalType
|
||||
var coldSignal: ColdSignal<Value, ErrorType> { get }
|
||||
/// The exposed raw signal that underlies the SourceType
|
||||
var source: Source<Value> { get }
|
||||
|
||||
/// Invokes the closure provided upon initialization, and passes in a newly
|
||||
/// created observer to which events can be sent.
|
||||
func start()
|
||||
|
||||
/// Stops the ColdSignal by sending an interrupt to all of it's
|
||||
/// Stops the `Source` by sending an interrupt to all of it's
|
||||
/// observers and then invoking the disposable returned by the closure
|
||||
/// that was provided upon initialization.
|
||||
func stop()
|
||||
|
||||
}
|
||||
|
||||
public extension ColdSignalType {
|
||||
public extension SourceType {
|
||||
|
||||
public var signal: Signal<Value, ErrorType> {
|
||||
public var signal: Signal<Value> {
|
||||
return Signal { observer in
|
||||
self.coldSignal.add(observer: observer)
|
||||
self.source.add(observer: observer)
|
||||
}
|
||||
}
|
||||
|
||||
/// Invokes the closure provided upon initialization, and passes in a newly
|
||||
/// created observer to which events can be sent.
|
||||
func start() {
|
||||
coldSignal.start()
|
||||
source.start()
|
||||
}
|
||||
|
||||
/// Stops the ColdSignal by sending an interrupt to all of it's
|
||||
/// Stops the `Source` by sending an interrupt to all of it's
|
||||
/// observers and then invoking the disposable returned by the closure
|
||||
/// that was provided upon initialization.
|
||||
func stop() {
|
||||
coldSignal.stop()
|
||||
source.stop()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public extension ColdSignalType {
|
||||
public extension SourceType {
|
||||
|
||||
/// Adds an observer to the ColdSignal which observes any future events from the
|
||||
/// ColdSignal. If the Signal has already terminated, the observer will immediately
|
||||
/// Adds an observer to the `Source` which observes any future events from the
|
||||
/// `Source`. If the `Signal` has already terminated, the observer will immediately
|
||||
/// receive an `Interrupted` event.
|
||||
///
|
||||
/// Returns a Disposable which can be used to disconnect the observer. Disposing
|
||||
/// of the Disposable will have no effect on the Signal itself.
|
||||
@discardableResult
|
||||
public func add(observer: Observer<Value, ErrorType>) -> Disposable? {
|
||||
let token = coldSignal.observers.insert(value: observer)
|
||||
return ActionDisposable {
|
||||
self.coldSignal.observers.removeValueForToken(token: token)
|
||||
public func add(observer: Observer<Value>) -> Disposable? {
|
||||
let token = source.observers.insert(observer)
|
||||
return ActionDisposable { [weak source = source] in
|
||||
source?.observers.remove(using: token)
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a ColdSignal, adds exactly one observer, and then immediately
|
||||
/// invokes start on the ColdSignal.
|
||||
/// Creates a `Source`, adds exactly one observer, and then immediately
|
||||
/// invokes start on the `Source`.
|
||||
///
|
||||
/// Returns a Disposable which can be used to dispose of the added observer.
|
||||
@discardableResult
|
||||
public func start(with observer: Observer<Value, ErrorType>) -> Disposable? {
|
||||
let disposable = coldSignal.add(observer: observer)
|
||||
self.coldSignal.start()
|
||||
public func start(with observer: Observer<Value>) -> Disposable? {
|
||||
let disposable = source.add(observer: observer)
|
||||
source.start()
|
||||
return disposable
|
||||
}
|
||||
|
||||
/// Creates a ColdSignal, adds exactly one observer, and then immediately
|
||||
/// invokes start on the ColdSignal.
|
||||
/// Creates a `Source`, adds exactly one observer, and then immediately
|
||||
/// invokes start on the `Source`.
|
||||
///
|
||||
/// Returns a Disposable which can be used to dispose of the added observer.
|
||||
@discardableResult
|
||||
public func start(_ observerAction: @escaping Observer<Value, ErrorType>.Action) -> Disposable? {
|
||||
public func start(_ observerAction: @escaping Observer<Value>.Action) -> Disposable? {
|
||||
return start(with: Observer(observerAction))
|
||||
}
|
||||
|
||||
/// Creates a ColdSignal, adds exactly one observer for next, and then immediately
|
||||
/// invokes start on the ColdSignal.
|
||||
/// Creates a `Source`, adds exactly one observer for next, and then immediately
|
||||
/// invokes start on the `Source`.
|
||||
///
|
||||
/// Returns a Disposable which can be used to dispose of the added observer.
|
||||
@discardableResult
|
||||
|
@ -176,8 +166,8 @@ public extension ColdSignalType {
|
|||
return start(with: Observer(next: next))
|
||||
}
|
||||
|
||||
/// Creates a ColdSignal, adds exactly one observer for completed events, and then
|
||||
/// immediately invokes start on the ColdSignal.
|
||||
/// Creates a `Source`, adds exactly one observer for completed events, and then
|
||||
/// immediately invokes start on the `Source`.
|
||||
///
|
||||
/// Returns a Disposable which can be used to dispose of the added observer.
|
||||
@discardableResult
|
||||
|
@ -185,17 +175,17 @@ public extension ColdSignalType {
|
|||
return start(with: Observer(completed: completed))
|
||||
}
|
||||
|
||||
/// Creates a ColdSignal, adds exactly one observer for errors, and then
|
||||
/// immediately invokes start on the ColdSignal.
|
||||
/// Creates a `Source`, adds exactly one observer for errors, and then
|
||||
/// immediately invokes start on the `Source`.
|
||||
///
|
||||
/// Returns a Disposable which can be used to dispose of the added observer.
|
||||
@discardableResult
|
||||
public func startWithFailed(failed: @escaping (ErrorType) -> Void) -> Disposable? {
|
||||
public func startWithFailed(failed: @escaping (Error) -> Void) -> Disposable? {
|
||||
return start(with: Observer(failed: failed))
|
||||
}
|
||||
|
||||
/// Creates a ColdSignal, adds exactly one observer for interrupts, and then
|
||||
/// immediately invokes start on the ColdSignal.
|
||||
/// Creates a `Source`, adds exactly one observer for interrupts, and then
|
||||
/// immediately invokes start on the `Source`.
|
||||
///
|
||||
/// Returns a Disposable which can be used to dispose of the added observer.
|
||||
@discardableResult
|
||||
|
@ -205,67 +195,76 @@ public extension ColdSignalType {
|
|||
|
||||
}
|
||||
|
||||
public extension ColdSignalType {
|
||||
public extension SourceType {
|
||||
|
||||
/// Creates a new `ColdSignal` which will apply a unary operator directly to events
|
||||
/// Creates a new `Source` which will apply a unary operator directly to events
|
||||
/// produced by the `startHandler`.
|
||||
///
|
||||
/// The new `ColdSignal` is in no way related to the source `ColdSignal` except
|
||||
/// The new `Source` is in no way related to the source `Source` except
|
||||
/// that they share a reference to the same `startHandler`.
|
||||
public func lift<U, F>(_ transform: @escaping (Signal<Value, ErrorType>) -> Signal<U, F>) -> ColdSignal<U, F> {
|
||||
return ColdSignal { observer in
|
||||
let (pipeSignal, pipeObserver) = Signal<Value, ErrorType>.pipe()
|
||||
public func lift<U>(_ transform: @escaping (Signal<Value>) -> Signal<U>) -> Source<U> {
|
||||
return Source { observer in
|
||||
let (pipeSignal, pipeObserver) = Signal<Value>.pipe()
|
||||
transform(pipeSignal).add(observer: observer)
|
||||
return self.coldSignal.startHandler(pipeObserver)
|
||||
return self.source.startHandler(pipeObserver)
|
||||
}
|
||||
}
|
||||
|
||||
public func lift<U, F>(_ transform: @escaping (Signal<Value, ErrorType>) -> (Signal<U, F>, Signal<U, F>))
|
||||
-> (ColdSignal<U, F>, ColdSignal<U, F>)
|
||||
public func lift<U>(_ transform: @escaping (Signal<Value>) -> (Signal<U>, Signal<U>))
|
||||
-> (Source<U>, Source<U>)
|
||||
{
|
||||
let (pipeSignal, pipeObserver) = Signal<Value, ErrorType>.pipe()
|
||||
let (pipeSignal, pipeObserver) = Signal<Value>.pipe()
|
||||
let (left, right) = transform(pipeSignal)
|
||||
let coldLeft = ColdSignal<U, F> { observer in
|
||||
let sourceLeft = Source<U> { observer in
|
||||
left.add(observer: observer)
|
||||
return self.coldSignal.startHandler(pipeObserver)
|
||||
return self.source.startHandler(pipeObserver)
|
||||
}
|
||||
let coldRight = ColdSignal<U, F> { observer in
|
||||
let sourceRight = Source<U> { observer in
|
||||
right.add(observer: observer)
|
||||
return self.coldSignal.startHandler(pipeObserver)
|
||||
return self.source.startHandler(pipeObserver)
|
||||
}
|
||||
return (coldLeft, coldRight)
|
||||
return (sourceLeft, sourceRight)
|
||||
}
|
||||
|
||||
public var identity: Source<Value> {
|
||||
return lift { $0.identity }
|
||||
}
|
||||
|
||||
/// Maps each value in the signal to a new value.
|
||||
public func map<U>(_ transform: @escaping (Value) -> U) -> ColdSignal<U, ErrorType> {
|
||||
public func map<U>(_ transform: @escaping (Value) -> U) -> Source<U> {
|
||||
return lift { $0.map(transform) }
|
||||
}
|
||||
|
||||
/// Maps errors in the signal to a new error.
|
||||
public func mapError<F>(_ transform: @escaping (ErrorType) -> F) -> ColdSignal<Value, F> {
|
||||
public func mapError<F: Error>(_ transform: @escaping (Error) -> F) -> Source<Value> {
|
||||
return lift { $0.mapError(transform) }
|
||||
}
|
||||
|
||||
/// Preserves only the values of the signal that pass the given predicate.
|
||||
public func filter(_ predicate: @escaping (Value) -> Bool) -> ColdSignal<Value, ErrorType> {
|
||||
public func filter(_ predicate: @escaping (Value) -> Bool) -> Source<Value> {
|
||||
return lift { $0.filter(predicate) }
|
||||
}
|
||||
|
||||
/// Splits the signal into two signals. The first signal in the tuple matches the
|
||||
/// predicate, the second signal does not match the predicate
|
||||
public func partition(_ predicate: @escaping (Value) -> Bool)
|
||||
-> (ColdSignal<Value, ErrorType>, ColdSignal<Value, ErrorType>) {
|
||||
-> (Source<Value>, Source<Value>) {
|
||||
return lift { $0.partition(predicate) }
|
||||
}
|
||||
|
||||
/// Aggregate values into a single combined value. Mirrors the Swift Collection
|
||||
public func reduce<T>(initial: T, _ combine: @escaping (T, Value) -> T) -> ColdSignal<T, ErrorType> {
|
||||
public func reduce<T>(initial: T, _ combine: @escaping (T, Value) -> T) -> Source<T> {
|
||||
return lift { $0.reduce(initial: initial, combine) }
|
||||
}
|
||||
|
||||
public func flatMap<U>(_ transform: @escaping (Value) -> U?) -> ColdSignal<U, ErrorType> {
|
||||
public func flatMap<U>(_ transform: @escaping (Value) -> U?) -> Source<U> {
|
||||
return lift { $0.flatMap(transform) }
|
||||
}
|
||||
|
||||
|
||||
public func flatMap<U>(_ transform: @escaping (Value) -> Source<U>) -> Source<U> {
|
||||
return lift { $0.map(transform).joined() }
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
@testable import Reflex
|
||||
@testable import StreamKit
|
||||
|
||||
import XCTest
|
||||
|
||||
|
@ -6,7 +6,7 @@ class TestBasic: XCTestCase {
|
|||
|
||||
func testSignalPipe() {
|
||||
|
||||
let (signal, observer) = Signal<Int, NSError>.pipe()
|
||||
let (signal, observer) = Signal<Int>.pipe()
|
||||
var nextIndex = 0
|
||||
let nextVals = [0, 3, 5, 2, -3]
|
||||
var didComplete = false
|
||||
|
@ -21,6 +21,10 @@ class TestBasic: XCTestCase {
|
|||
didComplete = true
|
||||
}
|
||||
|
||||
signal.onFailed { error in
|
||||
XCTFail(error.localizedDescription)
|
||||
}
|
||||
|
||||
for val in nextVals {
|
||||
observer.sendNext(val)
|
||||
}
|
Loading…
Reference in New Issue