Compare commits

...

32 Commits

Author SHA1 Message Date
Tyler Cloutier 821074970d
Update README.md 2017-11-26 15:30:39 -08:00
Tyler Cloutier 43238cc231 Fixed cancelDisposable bug. 2017-11-20 22:27:59 -08:00
Tyler Cloutier 760076df26 Fixed an issue with flatMapping of Signals 2017-11-20 17:35:48 -08:00
Tyler Cloutier f6b76d1559 Made the Promise conversions public. 2017-11-20 17:22:56 -08:00
Tyler Cloutier 2b701eb423 Added Stream/Promise bridging. 2017-11-20 16:53:24 -08:00
Tyler Cloutier 8112c77a33 Added PromiseKit and removed typed error to be more inline with Swift error handling philosphies. Also see: https://github.com/ReactiveX/RxSwift/issues/650 2017-11-20 16:42:21 -08:00
Tyler Cloutier 555837e694 Fixed test target. 2017-11-19 15:57:54 -08:00
Tyler Cloutier 4a7ea1e82e Added xcodeproj to gitignore. 2017-11-19 15:55:27 -08:00
Tyler Cloutier cddbc315c0 Renamed to StreamKit and fixed Swift 4 build issues. 2017-11-19 15:50:28 -08:00
Tyler Cloutier 44f955a8e5 Removed podspec and xcodeproj. 2017-11-19 15:39:05 -08:00
Tyler Cloutier 40feea4ab1 Merge branch 'master' of https://github.com/SwiftOnEdge/Reflex into HEAD 2017-10-07 21:50:51 -07:00
Tyler Cloutier 61d3f12206 Fixed problems with memory exclusivity in Swift 4. 2017-10-07 21:50:19 -07:00
Tyler Cloutier d64fcd469a Merge pull request #2 from hernangonzalez/feature/cocoapods_support
Add Cocoapods & Travis support
2016-12-28 13:43:46 -05:00
Hernan G. Gonzalez 7f5db3c422 no message 2016-12-26 14:45:28 -03:00
Hernan G. Gonzalez f8183bde2b no message 2016-12-26 14:37:37 -03:00
Hernan G. Gonzalez 8ddfcd1f62 use iphone 6 2016-12-26 14:26:43 -03:00
Hernan G. Gonzalez 323b235770 target iphone simulator 2016-12-26 14:20:22 -03:00
Hernan G. Gonzalez 2d13b1e90d code sign = no 2016-12-26 14:15:49 -03:00
Hernan G. Gonzalez 57d43919e3 just build 2016-12-26 14:14:34 -03:00
Hernan G. Gonzalez d1478078e9 update travis 2016-12-26 13:34:55 -03:00
Hernan G. Gonzalez c647d5f0ae Disable code signing on test build (travis) 2016-12-26 11:47:24 -03:00
Hernan G. Gonzalez a41bdafa8d Add Cocoapods & Travis support 2016-12-22 13:20:43 -03:00
Tyler Cloutier fb1eb87d14 Updated bundle identifier 2016-11-24 21:03:07 -08:00
Tyler Cloutier 5196d4b55f Updated the target to use automatic code signing 2016-11-24 20:56:13 -08:00
Tyler Cloutier 1cc5552d91 Added flatMap for Source too. 2016-11-21 01:30:43 -08:00
Tyler Cloutier 1866fcbf7d Changed APIs. Added canceling to regular Signals. Added flatMapping and joining of Sources. 2016-11-21 01:25:28 -08:00
Tyler Cloutier cb9f2411c5 Renamed ColdSignal to Source. Remains to be seen if this is a good idea. 2016-11-20 22:52:38 -08:00
Tyler Cloutier 09e270d688 Added the NoError type 2016-11-15 00:34:50 -08:00
Tyler Cloutier 5ef886ed86 Added back plists 2016-11-15 00:20:38 -08:00
Tyler Cloutier c70aabc491 Adding xcodeproj for carthage support 2016-11-14 23:43:48 -08:00
Tyler Cloutier ac1a13e978 Pretty significant refactor and improved memory/resource management. 2016-11-13 18:22:37 -08:00
Tyler Cloutier 46a7262ee0 Improved partition to all execute the predicate a single time. 2016-11-12 01:11:38 -08:00
16 changed files with 641 additions and 469 deletions

9
.gitignore vendored
View File

@ -13,6 +13,11 @@
# Linux trash folder which might appear on any partition or disk
.Trash-*
## Other
*.xcodeproj
#*.xcodeproj/*.plist
*.moved-aside
*.xcuserstate
### OSX ###
.DS_Store
@ -66,8 +71,8 @@ xcuserdata/
*.moved-aside
*.xccheckout
*.xcscmblueprint
*.xcodeproj/*.plist
*.xcodeproj
#*.xcodeproj/*.plist
#*.xcodeproj
### Swift ###

11
.travis.yml Normal file
View File

@ -0,0 +1,11 @@
language: objective-c
osx_image: xcode8
xcode_project: Reflex.xcodeproj
xcode_scheme: Reflex
xcode_sdk: iphonesimulator10.0
script:
- set -o pipefail && xcodebuild -project Reflex.xcodeproj -scheme Reflex -sdk iphonesimulator ONLY_ACTIVE_ARCH=NO CODE_SIGNING_REQUIRED=NO -destination "platform=iOS Simulator,name=iPhone 6s" build | xcpretty
- pod lib lint

16
Package.resolved Normal file
View File

@ -0,0 +1,16 @@
{
"object": {
"pins": [
{
"package": "PromiseKit",
"repositoryURL": "https://github.com/mxcl/PromiseKit.git",
"state": {
"branch": null,
"revision": "6bab5e0c7f93947d9c0a7df0937add7454657f2c",
"version": "4.5.0"
}
}
]
},
"version": 1
}

View File

@ -1,7 +1,27 @@
// swift-tools-version:4.0
// The swift-tools-version declares the minimum version of Swift required to build this package.
import PackageDescription
let package = Package(
name: "Reflex",
name: "StreamKit",
products: [
.library(
name: "StreamKit",
targets: ["StreamKit"]
),
],
dependencies: [
.package(url: "https://github.com/mxcl/PromiseKit.git", from: "4.5.0"),
],
targets: [
.target(
name: "StreamKit",
dependencies: ["PromiseKit"]
),
.testTarget(
name: "StreamKitTests",
dependencies: ["StreamKit"]
),
]
)

View File

@ -1,9 +1,9 @@
# Reflex
# StreamKit
Reflex is a very small and compact Functional Reactive Programming library
StreamKit is a very small and compact Functional Reactive Programming library
which is used to implement the Edge event system.
It is inspired by ReactiveCocoa, but the API has been greatly reduced in scope
and simplified. The Reflex API is designed so that it can also be used as a
simple callback system, much like Node.js Events. No functional programming
necessary.
and simplified. The StreamKit API is designed so that it can also be used as a
simple callback system, much like Node.js Events. You don't need to be an FRP
wizard to use it.

View File

@ -1,126 +0,0 @@
//
// Disposable.swift
// Edge
//
// Created by Tyler Fleming Cloutier on 5/29/16.
//
//
/// Represents something that can be disposed, usually associated with freeing
/// resources or canceling work.
public protocol Disposable: class {
/// Whether this disposable has been disposed already.
var disposed: Bool { get }
func dispose()
}
/// A disposable that only flips `disposed` upon disposal, and performs no other
/// work.
public final class SimpleDisposable: Disposable {
private var _disposed = false
public var disposed: Bool {
return _disposed
}
public init() {}
public func dispose() {
_disposed = true
}
}
/// A disposable that will run an action upon disposal.
public final class ActionDisposable: Disposable {
var action: (() -> Void)?
public var disposed: Bool {
return action == nil
}
/// Initializes the disposable to run the given action upon disposal.
public init(action: (() -> Void)?) {
self.action = action
}
public func dispose() {
let oldAction = action
action = nil
oldAction?()
}
}
/// A disposable that, upon deinitialization, will automatically dispose of
/// another disposable.
public final class ScopedDisposable: Disposable {
/// The disposable which will be disposed when the ScopedDisposable
/// deinitializes.
public let innerDisposable: Disposable
public var disposed: Bool {
return innerDisposable.disposed
}
/// Initializes the receiver to dispose of the argument upon
/// deinitialization.
public init(_ disposable: Disposable) {
innerDisposable = disposable
}
deinit {
dispose()
}
public func dispose() {
innerDisposable.dispose()
}
}
/// A disposable that will optionally dispose of another disposable.
public final class SerialDisposable: Disposable {
private struct State {
var innerDisposable: Disposable? = nil
var disposed = false
}
private var state = State()
public var disposed: Bool {
return state.disposed
}
/// The inner disposable to dispose of.
///
/// Whenever this property is set (even to the same value!), the previous
/// disposable is automatically disposed.
public var innerDisposable: Disposable? {
get {
return state.innerDisposable
}
set(d) {
let oldState = state
state.innerDisposable = d
oldState.innerDisposable?.dispose()
if oldState.disposed {
d?.dispose()
}
}
}
/// Initializes the receiver to dispose of the argument when the
/// SerialDisposable is disposed.
public init(_ disposable: Disposable? = nil) {
innerDisposable = disposable
}
public func dispose() {
let orig = state
state = State(innerDisposable: nil, disposed: true)
orig.innerDisposable?.dispose()
}
}

View File

@ -1,72 +0,0 @@
//
// Observer.swift
// Edge
//
// Created by Tyler Fleming Cloutier on 5/29/16.
//
//
import Foundation
/// An Observer is a simple wrapper around a function which can receive Events
/// (typically from a Signal).
public struct Observer<Value, ErrorType: Error> {
public typealias Action = (Event<Value, ErrorType>) -> Void
public let action: Action
public init(_ action: @escaping Action) {
self.action = action
}
/// Creates an Observer with an action which calls each of the provided
/// callbacks
public init(
failed: ((ErrorType) -> Void)? = nil,
completed: (() -> Void)? = nil,
interrupted: (() -> Void)? = nil,
next: ((Value) -> Void)? = nil)
{
self.init { event in
switch event {
case let .Next(value):
next?(value)
case let .Failed(error):
failed?(error)
case .Completed:
completed?()
case .Interrupted:
interrupted?()
}
}
}
public func sendEvent(_ event: Event<Value, ErrorType>) {
action(event)
}
/// Puts a `Next` event into the given observer.
public func sendNext(_ value: Value) {
action(.Next(value))
}
/// Puts an `Failed` event into the given observer.
public func sendFailed(_ error: ErrorType) {
action(.Failed(error))
}
/// Puts a `Completed` event into the given observer.
public func sendCompleted() {
action(.Completed)
}
/// Puts a `Interrupted` event into the given observer.
public func sendInterrupted() {
action(.Interrupted)
}
}

View File

@ -1,15 +1,16 @@
//
// Bag.swift
// Edge
//
// Created by Tyler Fleming Cloutier on 5/29/16.
// ReactiveSwift
//
// Created by Justin Spahr-Summers on 2014-07-10.
// Copyright (c) 2014 GitHub. All rights reserved.
//
/// A uniquely identifying token for removing a value that was inserted into a
/// Bag.
/// A token for the identification of an item in a Bag.
public final class RemovalToken {
fileprivate var identifier: UInt?
fileprivate init(identifier: UInt) {
self.identifier = identifier
}
@ -19,32 +20,38 @@ public final class RemovalToken {
public struct Bag<Element> {
fileprivate var elements: [BagElement<Element>] = []
private var currentIdentifier: UInt = 0
public init() {
}
/// Inserts the given value in the collection, and returns a token that can
/// later be passed to removeValueForToken().
public mutating func insert(value: Element) -> RemovalToken {
let (nextIdentifier, overflow) = UInt.addWithOverflow(currentIdentifier, 1)
/// Insert the given value into `self`, and return a token that can
/// later be passed to `removeValueForToken()`.
///
/// - parameters:
/// - value: A value that will be inserted.
@discardableResult
public mutating func insert(_ value: Element) -> RemovalToken {
let (nextIdentifier, overflow) = currentIdentifier.addingReportingOverflow(1)
if overflow {
reindex()
}
let token = RemovalToken(identifier: currentIdentifier)
let element = BagElement(value: value, identifier: currentIdentifier, token: token)
elements.append(element)
currentIdentifier = nextIdentifier
return token
}
/// Removes a value, given the token returned from insert().
/// Remove a value, given the token returned from `insert()`.
///
/// If the value has already been removed, nothing happens.
public mutating func removeValueForToken(token: RemovalToken) {
/// - note: If the value has already been removed, nothing happens.
///
/// - parameters:
/// - token: A token returned from a call to `insert()`.
public mutating func remove(using token: RemovalToken) {
if let identifier = token.identifier {
// Removal is more likely for recent objects than old ones.
for i in elements.indices.reversed() {
@ -56,14 +63,14 @@ public struct Bag<Element> {
}
}
}
/// In the event of an identifier overflow (highly, highly unlikely), this
/// will reset all current identifiers to reclaim a contiguous set of
/// available identifiers for the future.
/// In the event of an identifier overflow (highly, highly unlikely), reset
/// all current identifiers to reclaim a contiguous set of available
/// identifiers for the future.
private mutating func reindex() {
for i in elements.indices {
currentIdentifier = UInt(i)
elements[i].identifier = currentIdentifier
elements[i].token.identifier = currentIdentifier
}
@ -72,24 +79,19 @@ public struct Bag<Element> {
extension Bag: Collection {
public typealias Index = Array<Element>.Index
public typealias SubSequence = Array<Element>.SubSequence
public var startIndex: Index {
return elements.startIndex
}
public var endIndex: Index {
return elements.endIndex
}
public subscript(position: Index) -> Element {
return elements[position].value
public subscript(index: Index) -> Element {
return elements[index].value
}
public subscript(bounds: Range<Index>) -> SubSequence {
return elements[bounds].map{ $0.value }[0..<elements.count]
}
public func index(after i: Index) -> Index {
return i + 1
}

View File

@ -0,0 +1,63 @@
//
// Disposable.swift
// Edge
//
// Created by Tyler Fleming Cloutier on 5/29/16.
//
//
/// Represents something that can be disposed, usually associated with freeing
/// resources or canceling work.
public protocol Disposable {
/// Whether this disposable has been disposed already.
var disposed: Bool { get }
func dispose()
}
/// A disposable that will run an action upon disposal.
public final class ActionDisposable: Disposable {
var action: (() -> Void)?
public var disposed: Bool {
return action == nil
}
/// Initializes the disposable to run the given action upon disposal.
public init(action: (() -> Void)?) {
self.action = action
}
public func dispose() {
let oldAction = action
action = nil
oldAction?()
}
}
/// A disposable that, upon deinitialization, will automatically dispose of
/// another disposable.
public final class ScopedDisposable: Disposable {
/// The disposable which will be disposed when the ScopedDisposable
/// deinitializes.
public let innerDisposable: Disposable
public var disposed: Bool {
return innerDisposable.disposed
}
/// Initializes the receiver to dispose of the argument upon
/// deinitialization.
public init(_ disposable: Disposable) {
innerDisposable = disposable
}
deinit {
dispose()
}
public func dispose() {
innerDisposable.dispose()
}
}

View File

@ -0,0 +1,11 @@
//
// Error.swift
// Reflex
//
// Created by Tyler Fleming Cloutier on 11/15/16.
//
//
import Foundation
public enum NoError: Swift.Error { }

View File

@ -11,93 +11,93 @@ import Foundation
/// Represents a signal event.
///
/// Signals must conform to the grammar:
/// `Next* (Failed | Completed | Interrupted)?`
public enum Event<Value, ErrorType: Error> {
/// `next* (failed | completed | interrupted)?`
public enum Event<Value> {
/// A value provided by the signal.
case Next(Value)
case next(Value)
/// The signal terminated because of an error. No further events will be
/// received.
case Failed(ErrorType)
case failed(Error)
/// The signal successfully terminated. No further events will be received.
case Completed
case completed
/// Event production on the signal has been interrupted. No further events
/// will be received.
case Interrupted
case interrupted
/// Whether this event indicates signal termination (i.e., that no further
/// events will be received).
public var isTerminating: Bool {
switch self {
case .Next:
case .next:
return false
case .Failed, .Completed, .Interrupted:
case .failed, .completed, .interrupted:
return true
}
}
/// Lifts the given function over the event's value.
public func map<U>(_ f: (Value) -> U) -> Event<U, ErrorType> {
public func map<U>(_ f: (Value) -> U) -> Event<U> {
switch self {
case let .Next(value):
return .Next(f(value))
case let .next(value):
return .next(f(value))
case let .Failed(error):
return .Failed(error)
case let .failed(error):
return .failed(error)
case .Completed:
return .Completed
case .completed:
return .completed
case .Interrupted:
return .Interrupted
case .interrupted:
return .interrupted
}
}
/// Lifts the given function over the event's value.
public func flatMap<U>(_ f: (Value) -> U?) -> Event<U, ErrorType>? {
public func flatMap<U>(_ f: (Value) -> U?) -> Event<U>? {
switch self {
case let .Next(value):
case let .next(value):
if let nextValue = f(value) {
return .Next(nextValue)
return .next(nextValue)
}
return nil
case let .Failed(error):
return .Failed(error)
case let .failed(error):
return .failed(error)
case .Completed:
return .Completed
case .completed:
return .completed
case .Interrupted:
return .Interrupted
case .interrupted:
return .interrupted
}
}
/// Lifts the given function over the event's error.
public func mapError<F>(_ f: (ErrorType) -> F) -> Event<Value, F> {
public func mapError<F: Error>(_ f: (Error) -> F) -> Event<Value> {
switch self {
case let .Next(value):
return .Next(value)
case let .next(value):
return .next(value)
case let .Failed(error):
return .Failed(f(error))
case let .failed(error):
return .failed(f(error))
case .Completed:
return .Completed
case .completed:
return .completed
case .Interrupted:
return .Interrupted
case .interrupted:
return .interrupted
}
}
/// Unwraps the contained `Next` value.
public var value: Value? {
if case let .Next(value) = self {
if case let .next(value) = self {
return value
} else {
return nil
@ -106,7 +106,7 @@ public enum Event<Value, ErrorType: Error> {
/// Unwraps the contained `Error` value.
public var error: Error? {
if case let .Failed(error) = self {
if case let .failed(error) = self {
return error
} else {
return nil
@ -114,18 +114,18 @@ public enum Event<Value, ErrorType: Error> {
}
}
public func == <Value: Equatable, ErrorType: Equatable> (lhs: Event<Value, ErrorType>, rhs: Event<Value, ErrorType>) -> Bool {
public func == <Value: Equatable> (lhs: Event<Value>, rhs: Event<Value>) -> Bool {
switch (lhs, rhs) {
case let (.Next(left), .Next(right)):
case let (.next(left), .next(right)):
return left == right
case let (.Failed(left), .Failed(right)):
return left == right
case let (.failed(left), .failed(right)):
return left.localizedDescription == right.localizedDescription
case (.Completed, .Completed):
case (.completed, .completed):
return true
case (.Interrupted, .Interrupted):
case (.interrupted, .interrupted):
return true
default:

View File

@ -0,0 +1,138 @@
//
// Observer.swift
// Edge
//
// Created by Tyler Fleming Cloutier on 5/29/16.
//
//
import Foundation
/// A CiruitBreaker optionally holds a strong reference to either a
/// `Signal` or a `Source` until a terminating event is
/// received. At such time, it delivers the event and then
/// removes its reference. In so doing, it "breaks the circuit"
/// between the signal, the handler, and the input observer.
/// This allows the downstream signal to be released.
class CircuitBreaker<Value> {
private var signal: Signal<Value>? = nil
private var source: Source<Value>? = nil
fileprivate var action: Observer<Value>.Action! = nil
// This variable is used to maintain memory access exclusivity when
// the signal or source is released causing that signal to send an
// interrupt to its observer.
private var hasTerminated = false
/// Holds a strong reference to a `Signal` until a
/// terminating event is received.
init(holding signal: Signal<Value>?) {
self.signal = signal
self.action = { [weak self] event in
guard let weakSelf = self else { return }
guard !weakSelf.hasTerminated else { return }
weakSelf.signal?.observers.forEach { observer in
observer.send(event)
}
if event.isTerminating {
// Do not have to dispose of cancel disposable
// since it will be disposed when the signal is deallocated.
weakSelf.hasTerminated = true
weakSelf.signal = nil
}
}
}
/// Holds a strong reference to a `Source` until a
/// terminating event is received.
init(holding source: Source<Value>?) {
self.source = source
self.action = { [weak self] event in
guard let weakSelf = self else { return }
guard !weakSelf.hasTerminated else { return }
self?.source?.observers.forEach { observer in
observer.send(event)
}
if event.isTerminating {
// Do not have to dispose of cancel disposable
// since it will be disposed when the signal is deallocated.
weakSelf.hasTerminated = true
self?.source = nil
}
}
}
fileprivate init(with action: @escaping (Event<Value>) -> Void) {
self.action = action
}
}
public struct Observer<Value> {
public typealias Action = (Event<Value>) -> Void
let breaker: CircuitBreaker<Value>
init(with breaker: CircuitBreaker<Value>) {
self.breaker = breaker
}
public init(_ action: @escaping Action) {
self.breaker = CircuitBreaker(with: action)
}
/// Creates an Observer with an action which calls each of the provided
/// callbacks
public init(
failed: ((Error) -> Void)? = nil,
completed: (() -> Void)? = nil,
interrupted: (() -> Void)? = nil,
next: ((Value) -> Void)? = nil)
{
self.init { event in
switch event {
case let .next(value):
next?(value)
case let .failed(error):
failed?(error)
case .completed:
completed?()
case .interrupted:
interrupted?()
}
}
}
/// Puts any `Event` into the the given observer.
public func send(_ event: Event<Value>) {
breaker.action(event)
}
/// Puts a `Next` event into the given observer.
public func sendNext(_ value: Value) {
send(.next(value))
}
/// Puts an `Failed` event into the given observer.
public func sendFailed(_ error: Error) {
send(.failed(error))
}
/// Puts a `Completed` event into the given observer.
public func sendCompleted() {
send(.completed)
}
/// Puts a `Interrupted` event into the given observer.
public func sendInterrupted() {
send(.interrupted)
}
}

View File

@ -0,0 +1,46 @@
//
// Promise.swift
// StreamKit
//
// Created by Tyler Fleming Cloutier on 11/20/17.
//
import Foundation
import PromiseKit
extension Promise {
public func asSignal() -> Signal<T> {
// TODO: If the promise is already resolved the value will
// be sent to the signals before there are any observers added.
return Signal { observer in
self.then { value -> () in
observer.sendNext(value)
observer.sendCompleted()
}.catch { error in
observer.sendFailed(error)
}
return nil
}
}
}
extension SignalType {
public func asPromise() -> Promise<[Value]> {
var values: [Value] = []
return Promise { resolve, reject in
self.onNext {
values.append($0)
}
self.onCompleted {
resolve(values)
}
self.onFailed {
reject($0)
}
}
}
}

View File

@ -8,11 +8,15 @@
import Foundation
public final class Signal<Value, ErrorType: Error>: SignalType, InternalSignalType, SpecialSignalGenerator {
public final class Signal<Value>: SignalType, InternalSignalType, SpecialSignalGenerator {
internal var observers = Bag<Observer<Value, ErrorType>>()
internal var observers = Bag<Observer<Value>>()
public var signal: Signal<Value, ErrorType> {
private var handlerDisposable: Disposable?
public var cancelDisposable: Disposable?
public var signal: Signal<Value> {
return self
}
@ -23,21 +27,21 @@ public final class Signal<Value, ErrorType: Error>: SignalType, InternalSignalTy
/// if a terminating event is sent to the observer. The Signal itself will
/// remain alive until the observer is released. This is because the observer
/// captures a self reference.
public init(_ generator: @escaping (Observer<Value, ErrorType>) -> Disposable?) {
public init(_ startHandler: @escaping (Observer<Value>) -> Disposable?) {
let generatorDisposable = SerialDisposable()
let observer = Observer(with: CircuitBreaker(holding: self))
let handlerDisposable = startHandler(observer)
let inputObserver = Observer<Value, ErrorType> { event in
self.observers.forEach { (observer) in
observer.action(event)
}
if event.isTerminating {
generatorDisposable.dispose()
}
// The cancel disposable should send interrupted and then dispose of the
// disposable produced by the startHandler.
cancelDisposable = ActionDisposable {
observer.sendInterrupted()
handlerDisposable?.dispose()
}
generatorDisposable.innerDisposable = generator(inputObserver)
}
deinit {
cancelDisposable?.dispose()
}
/// Creates a Signal that will be controlled by sending events to the returned
@ -45,8 +49,8 @@ public final class Signal<Value, ErrorType: Error>: SignalType, InternalSignalTy
///
/// The Signal will remain alive until a terminating event is sent to the
/// observer.
public static func pipe() -> (Signal, Observer<Value, ErrorType>) {
var observer: Observer<Value, ErrorType>!
public static func pipe() -> (Signal, Observer<Value>) {
var observer: Observer<Value>!
let signal = self.init { innerObserver in
observer = innerObserver
return nil
@ -68,12 +72,8 @@ public protocol SpecialSignalGenerator {
/// The type of values being sent on the signal.
associatedtype Value
/// The type of error that can occur on the signal. If errors aren't possible
/// then `NoError` can be used.
associatedtype ErrorType: Error
init(_ generator: @escaping (Observer<Value, ErrorType>) -> Disposable?)
init(_ generator: @escaping (Observer<Value>) -> Disposable?)
}
@ -91,7 +91,7 @@ public extension SpecialSignalGenerator {
/// Creates a Signal that will immediately fail with the
/// given error.
public init(error: ErrorType) {
public init(error: Error) {
self.init { observer in
observer.sendFailed(error)
return nil
@ -140,52 +140,43 @@ public extension SpecialSignalGenerator {
}
/// An internal protocol for adding methods that require access to the observers
/// of the signal.
internal protocol InternalSignalType: SignalType {
var observers: Bag<Observer<Value>> { get }
}
/// Note that this type is not parameterized by an Error type which is in line
/// with the Swift error handling model.
/// A good reference for a discussion of the pros and cons is here:
/// https://github.com/ReactiveX/RxSwift/issues/650
public protocol SignalType {
/// The type of values being sent on the signal.
associatedtype Value
/// The type of error that can occur on the signal. If errors aren't possible
/// then `NoError` can be used.
associatedtype ErrorType: Error
/// The exposed raw signal that underlies the ColdSignalType
var signal: Signal<Value, ErrorType> { get }
/// The exposed raw signal that underlies the `SignalType`.
var signal: Signal<Value> { get }
}
var cancelDisposable: Disposable? { get }
/// An internal protocol for adding methods that require access to the observers
/// of the signal.
internal protocol InternalSignalType: SignalType {
var observers: Bag<Observer<Value, ErrorType>> { get }
}
internal extension InternalSignalType {
/// Interrupts all observers and terminates the stream.
func interrupt() {
for observer in self.observers {
observer.sendInterrupted()
}
}
}
public extension SignalType {
/// Adds an observer to the Signal which observes any future events from the Signal.
/// If the Signal has already terminated, the observer will immediately receive an
/// Adds an observer to the `Signal` which observes any future events from the `Signal`.
/// If the `Signal` has already terminated, the observer will immediately receive an
/// `Interrupted` event.
///
/// Returns a Disposable which can be used to disconnect the observer. Disposing
/// of the Disposable will have no effect on the Signal itself.
/// of the Disposable will have no effect on the `Signal` itself.
@discardableResult
public func add(observer: Observer<Value, ErrorType>) -> Disposable? {
let token = signal.observers.insert(value: observer)
public func add(observer: Observer<Value>) -> Disposable? {
let token = signal.observers.insert(observer)
return ActionDisposable {
self.signal.observers.removeValueForToken(token: token)
self.signal.observers.remove(using: token)
}
}
@ -193,7 +184,7 @@ public extension SignalType {
/// Convenience override for add(observer:) to allow trailing-closure style
/// invocations.
@discardableResult
public func on(action: @escaping Observer<Value, ErrorType>.Action) -> Disposable? {
public func on(action: @escaping Observer<Value>.Action) -> Disposable? {
return self.add(observer: Observer(action))
}
@ -226,7 +217,7 @@ public extension SignalType {
/// callback. Disposing of the Disposable will have no effect on the Signal
/// itself.
@discardableResult
public func onFailed(error: @escaping (ErrorType) -> Void) -> Disposable? {
public func onFailed(error: @escaping (Error) -> Void) -> Disposable? {
return self.add(observer: Observer(failed: error))
}
@ -246,34 +237,34 @@ public extension SignalType {
public extension SignalType {
public var identity: Signal<Value, ErrorType> {
public var identity: Signal<Value> {
return self.map { $0 }
}
/// Maps each value in the signal to a new value.
public func map<U>(_ transform: @escaping (Value) -> U) -> Signal<U, ErrorType> {
public func map<U>(_ transform: @escaping (Value) -> U) -> Signal<U> {
return Signal { observer in
return self.on { event -> Void in
observer.sendEvent(event.map(transform))
observer.send(event.map(transform))
}
}
}
/// Maps errors in the signal to a new error.
public func mapError<F>(_ transform: @escaping (ErrorType) -> F) -> Signal<Value, F> {
public func mapError<F: Error>(_ transform: @escaping (Error) -> F) -> Signal<Value> {
return Signal { observer in
return self.on { event -> Void in
observer.sendEvent(event.mapError(transform))
observer.send(event.mapError(transform))
}
}
}
/// Preserves only the values of the signal that pass the given predicate.
public func filter(_ predicate: @escaping (Value) -> Bool) -> Signal<Value, ErrorType> {
public func filter(_ predicate: @escaping (Value) -> Bool) -> Signal<Value> {
return Signal { observer in
return self.on { (event: Event<Value, ErrorType>) -> Void in
return self.on { (event: Event<Value>) -> Void in
guard let value = event.value else {
observer.sendEvent(event)
observer.send(event)
return
}
@ -286,40 +277,31 @@ public extension SignalType {
/// Splits the signal into two signals. The first signal in the tuple matches the
/// predicate, the second signal does not match the predicate
public func partition(_ predicate: @escaping (Value) -> Bool) -> (Signal<Value, ErrorType>, Signal<Value, ErrorType>) {
let left = Signal<Value, ErrorType> { observer in
return self.on { (event: Event<Value, ErrorType>) -> Void in
guard let value = event.value else {
observer.sendEvent(event)
return
}
if predicate(value) {
observer.sendNext(value)
}
public func partition(_ predicate: @escaping (Value) -> Bool) -> (Signal<Value>, Signal<Value>) {
let (left, leftObserver) = Signal<Value>.pipe()
let (right, rightObserver) = Signal<Value>.pipe()
self.on { (event: Event<Value>) -> Void in
guard let value = event.value else {
leftObserver.send(event)
rightObserver.send(event)
return
}
}
let right = Signal<Value, ErrorType> { observer in
return self.on { (event: Event<Value, ErrorType>) -> Void in
guard let value = event.value else {
observer.sendEvent(event)
return
}
if !predicate(value) {
observer.sendNext(value)
}
if predicate(value) {
leftObserver.sendNext(value)
} else {
rightObserver.sendNext(value)
}
}
return (left, right)
}
/// Aggregate values into a single combined value. Mirrors the Swift Collection
public func reduce<T>(initial: T, _ combine: @escaping (T, Value) -> T) -> Signal<T, ErrorType> {
public func reduce<T>(initial: T, _ combine: @escaping (T, Value) -> T) -> Signal<T> {
return Signal { observer in
var accumulator = initial
return self.on { event in
observer.action(event.map { value in
observer.send(event.map { value in
accumulator = combine(accumulator, value)
return accumulator
})
@ -327,14 +309,87 @@ public extension SignalType {
}
}
public func flatMap<U>(_ transform: @escaping (Value) -> U?) -> Signal<U, ErrorType> {
public func flatMap<U>(_ transform: @escaping (Value) -> U?) -> Signal<U> {
return Signal { observer in
return self.on { event -> Void in
if let e = event.flatMap(transform) {
observer.sendEvent(e)
observer.send(e)
}
}
}
}
public func flatMap<U>(_ transform: @escaping (Value) -> Signal<U>) -> Signal<U> {
return map(transform).joined()
}
}
extension SignalType where Value: SignalType {
/// Listens to every `Source` produced from the current `Signal`
/// Starts each `Source` and forwards on all values and errors onto
/// the `Signal` which is returned. In this way it joins each of the
/// `Source`s into a single `Signal`.
///
/// The joined `Signal` completes when the current `Signal` and all of
/// its produced `Source`s complete.
///
/// Note: This means that each `Source` will be started as it is received.
public func joined() -> Signal<Value.Value> {
// Start the number in flight at 1 for `self`
return Signal { observer in
var numberInFlight = 1
var disposables = [Disposable]()
func decrementInFlight() {
numberInFlight -= 1
if numberInFlight == 0 {
observer.sendCompleted()
}
}
func incrementInFlight() {
numberInFlight += 1
}
self.on { event in
switch event {
case .next(let source):
incrementInFlight()
source.on { event in
switch event {
case .completed, .interrupted:
decrementInFlight()
case .next, .failed:
observer.send(event)
}
}
source.cancelDisposable.map { disposables.append($0) }
(source as? Source<Value.Value>)?.start()
case .failed(let error):
observer.sendFailed(error)
case .completed:
decrementInFlight()
case .interrupted:
observer.sendInterrupted()
}
}
return ActionDisposable {
for disposable in disposables {
disposable.dispose()
}
}
}
}
}

View File

@ -8,38 +8,40 @@
import Foundation
public final class ColdSignal<T, E: Error>: ColdSignalType, InternalSignalType, SpecialSignalGenerator {
public final class Source<V>: SourceType, InternalSignalType, SpecialSignalGenerator {
public typealias Value = T
public typealias ErrorType = E
internal var observers = Bag<Observer<Value, ErrorType>>()
public var coldSignal: ColdSignal {
public typealias Value = V
internal var observers = Bag<Observer<Value>>()
public var source: Source {
return self
}
internal let startHandler: (Observer<Value, ErrorType>) -> Disposable?
internal let startHandler: (Observer<Value>) -> Disposable?
private var cancelDisposable: Disposable?
public var cancelDisposable: Disposable?
private var handlerDisposable: Disposable?
private var started: Bool {
if let disposable = cancelDisposable {
return !disposable.disposed
}
return false
}
private var started = false
/// Initializes a ColdSignal that will invoke the given closure at the
/// Initializes a Source that will invoke the given closure at the
/// invocation of `start()`.
///
/// The events that the closure puts into the given observer will become
/// the events sent to this ColdSignal.
/// the events sent to this `Source`.
///
/// In order to stop or dispose of the signal, invoke `stop()`. Calling this method
/// will dispose of the disposable returned by the given closure.
///
/// Invoking `start()` will have no effect until the signal is stopped. After
/// `stop()` is called this process may be repeated.
public init(_ generator: @escaping (Observer<Value, ErrorType>) -> Disposable?) {
self.startHandler = generator
public init(_ startHandler: @escaping (Observer<Value>) -> Disposable?) {
self.startHandler = startHandler
}
/// Creates a Signal from the producer, then attaches the given observer to
@ -47,128 +49,116 @@ public final class ColdSignal<T, E: Error>: ColdSignalType, InternalSignalType,
///
/// Returns a Disposable which can be used to interrupt the work associated
/// with the signal and immediately send an `Interrupted` event.
@discardableResult
public func start() {
if !started {
started = true
let observer = Observer<Value, ErrorType> { event in
// Pass event downstream
self.observers.forEach { (observer) in
observer.action(event)
}
// If event is terminating dispose of the handlerDisposable.
if event.isTerminating {
self.handlerDisposable?.dispose()
}
}
handlerDisposable = startHandler(observer)
let observer = Observer(with: CircuitBreaker(holding: self))
let handlerDisposable = startHandler(observer)
// The cancel disposable should send interrupted and then dispose of the
// disposable produced by the startHandler.
cancelDisposable = ActionDisposable { [weak self] in
cancelDisposable = ActionDisposable {
observer.sendInterrupted()
self?.handlerDisposable?.dispose()
handlerDisposable?.dispose()
}
}
}
public func stop() {
cancelDisposable?.dispose()
started = false
}
deinit {
self.stop()
}
}
extension ColdSignal: CustomDebugStringConvertible {
extension Source: CustomDebugStringConvertible {
public var debugDescription: String {
let obs = Array(self.observers.map { String(describing: $0) })
return "ColdSignal[\(obs.joined(separator: ", "))]"
return "Source[\(obs.joined(separator: ", "))]"
}
}
public protocol ColdSignalType: SignalType {
public protocol SourceType: SignalType {
/// The exposed raw signal that underlies the ColdSignalType
var coldSignal: ColdSignal<Value, ErrorType> { get }
/// The exposed raw signal that underlies the SourceType
var source: Source<Value> { get }
/// Invokes the closure provided upon initialization, and passes in a newly
/// created observer to which events can be sent.
func start()
/// Stops the ColdSignal by sending an interrupt to all of it's
/// Stops the `Source` by sending an interrupt to all of it's
/// observers and then invoking the disposable returned by the closure
/// that was provided upon initialization.
func stop()
}
public extension ColdSignalType {
public extension SourceType {
public var signal: Signal<Value, ErrorType> {
public var signal: Signal<Value> {
return Signal { observer in
self.coldSignal.add(observer: observer)
self.source.add(observer: observer)
}
}
/// Invokes the closure provided upon initialization, and passes in a newly
/// created observer to which events can be sent.
func start() {
coldSignal.start()
source.start()
}
/// Stops the ColdSignal by sending an interrupt to all of it's
/// Stops the `Source` by sending an interrupt to all of it's
/// observers and then invoking the disposable returned by the closure
/// that was provided upon initialization.
func stop() {
coldSignal.stop()
source.stop()
}
}
public extension ColdSignalType {
public extension SourceType {
/// Adds an observer to the ColdSignal which observes any future events from the
/// ColdSignal. If the Signal has already terminated, the observer will immediately
/// Adds an observer to the `Source` which observes any future events from the
/// `Source`. If the `Signal` has already terminated, the observer will immediately
/// receive an `Interrupted` event.
///
/// Returns a Disposable which can be used to disconnect the observer. Disposing
/// of the Disposable will have no effect on the Signal itself.
@discardableResult
public func add(observer: Observer<Value, ErrorType>) -> Disposable? {
let token = coldSignal.observers.insert(value: observer)
return ActionDisposable {
self.coldSignal.observers.removeValueForToken(token: token)
public func add(observer: Observer<Value>) -> Disposable? {
let token = source.observers.insert(observer)
return ActionDisposable { [weak source = source] in
source?.observers.remove(using: token)
}
}
/// Creates a ColdSignal, adds exactly one observer, and then immediately
/// invokes start on the ColdSignal.
/// Creates a `Source`, adds exactly one observer, and then immediately
/// invokes start on the `Source`.
///
/// Returns a Disposable which can be used to dispose of the added observer.
@discardableResult
public func start(with observer: Observer<Value, ErrorType>) -> Disposable? {
let disposable = coldSignal.add(observer: observer)
self.coldSignal.start()
public func start(with observer: Observer<Value>) -> Disposable? {
let disposable = source.add(observer: observer)
source.start()
return disposable
}
/// Creates a ColdSignal, adds exactly one observer, and then immediately
/// invokes start on the ColdSignal.
/// Creates a `Source`, adds exactly one observer, and then immediately
/// invokes start on the `Source`.
///
/// Returns a Disposable which can be used to dispose of the added observer.
@discardableResult
public func start(_ observerAction: @escaping Observer<Value, ErrorType>.Action) -> Disposable? {
public func start(_ observerAction: @escaping Observer<Value>.Action) -> Disposable? {
return start(with: Observer(observerAction))
}
/// Creates a ColdSignal, adds exactly one observer for next, and then immediately
/// invokes start on the ColdSignal.
/// Creates a `Source`, adds exactly one observer for next, and then immediately
/// invokes start on the `Source`.
///
/// Returns a Disposable which can be used to dispose of the added observer.
@discardableResult
@ -176,8 +166,8 @@ public extension ColdSignalType {
return start(with: Observer(next: next))
}
/// Creates a ColdSignal, adds exactly one observer for completed events, and then
/// immediately invokes start on the ColdSignal.
/// Creates a `Source`, adds exactly one observer for completed events, and then
/// immediately invokes start on the `Source`.
///
/// Returns a Disposable which can be used to dispose of the added observer.
@discardableResult
@ -185,17 +175,17 @@ public extension ColdSignalType {
return start(with: Observer(completed: completed))
}
/// Creates a ColdSignal, adds exactly one observer for errors, and then
/// immediately invokes start on the ColdSignal.
/// Creates a `Source`, adds exactly one observer for errors, and then
/// immediately invokes start on the `Source`.
///
/// Returns a Disposable which can be used to dispose of the added observer.
@discardableResult
public func startWithFailed(failed: @escaping (ErrorType) -> Void) -> Disposable? {
public func startWithFailed(failed: @escaping (Error) -> Void) -> Disposable? {
return start(with: Observer(failed: failed))
}
/// Creates a ColdSignal, adds exactly one observer for interrupts, and then
/// immediately invokes start on the ColdSignal.
/// Creates a `Source`, adds exactly one observer for interrupts, and then
/// immediately invokes start on the `Source`.
///
/// Returns a Disposable which can be used to dispose of the added observer.
@discardableResult
@ -205,67 +195,76 @@ public extension ColdSignalType {
}
public extension ColdSignalType {
public extension SourceType {
/// Creates a new `ColdSignal` which will apply a unary operator directly to events
/// Creates a new `Source` which will apply a unary operator directly to events
/// produced by the `startHandler`.
///
/// The new `ColdSignal` is in no way related to the source `ColdSignal` except
/// The new `Source` is in no way related to the source `Source` except
/// that they share a reference to the same `startHandler`.
public func lift<U, F>(_ transform: @escaping (Signal<Value, ErrorType>) -> Signal<U, F>) -> ColdSignal<U, F> {
return ColdSignal { observer in
let (pipeSignal, pipeObserver) = Signal<Value, ErrorType>.pipe()
public func lift<U>(_ transform: @escaping (Signal<Value>) -> Signal<U>) -> Source<U> {
return Source { observer in
let (pipeSignal, pipeObserver) = Signal<Value>.pipe()
transform(pipeSignal).add(observer: observer)
return self.coldSignal.startHandler(pipeObserver)
return self.source.startHandler(pipeObserver)
}
}
public func lift<U, F>(_ transform: @escaping (Signal<Value, ErrorType>) -> (Signal<U, F>, Signal<U, F>))
-> (ColdSignal<U, F>, ColdSignal<U, F>)
public func lift<U>(_ transform: @escaping (Signal<Value>) -> (Signal<U>, Signal<U>))
-> (Source<U>, Source<U>)
{
let (pipeSignal, pipeObserver) = Signal<Value, ErrorType>.pipe()
let (pipeSignal, pipeObserver) = Signal<Value>.pipe()
let (left, right) = transform(pipeSignal)
let coldLeft = ColdSignal<U, F> { observer in
let sourceLeft = Source<U> { observer in
left.add(observer: observer)
return self.coldSignal.startHandler(pipeObserver)
return self.source.startHandler(pipeObserver)
}
let coldRight = ColdSignal<U, F> { observer in
let sourceRight = Source<U> { observer in
right.add(observer: observer)
return self.coldSignal.startHandler(pipeObserver)
return self.source.startHandler(pipeObserver)
}
return (coldLeft, coldRight)
return (sourceLeft, sourceRight)
}
public var identity: Source<Value> {
return lift { $0.identity }
}
/// Maps each value in the signal to a new value.
public func map<U>(_ transform: @escaping (Value) -> U) -> ColdSignal<U, ErrorType> {
public func map<U>(_ transform: @escaping (Value) -> U) -> Source<U> {
return lift { $0.map(transform) }
}
/// Maps errors in the signal to a new error.
public func mapError<F>(_ transform: @escaping (ErrorType) -> F) -> ColdSignal<Value, F> {
public func mapError<F: Error>(_ transform: @escaping (Error) -> F) -> Source<Value> {
return lift { $0.mapError(transform) }
}
/// Preserves only the values of the signal that pass the given predicate.
public func filter(_ predicate: @escaping (Value) -> Bool) -> ColdSignal<Value, ErrorType> {
public func filter(_ predicate: @escaping (Value) -> Bool) -> Source<Value> {
return lift { $0.filter(predicate) }
}
/// Splits the signal into two signals. The first signal in the tuple matches the
/// predicate, the second signal does not match the predicate
public func partition(_ predicate: @escaping (Value) -> Bool)
-> (ColdSignal<Value, ErrorType>, ColdSignal<Value, ErrorType>) {
-> (Source<Value>, Source<Value>) {
return lift { $0.partition(predicate) }
}
/// Aggregate values into a single combined value. Mirrors the Swift Collection
public func reduce<T>(initial: T, _ combine: @escaping (T, Value) -> T) -> ColdSignal<T, ErrorType> {
public func reduce<T>(initial: T, _ combine: @escaping (T, Value) -> T) -> Source<T> {
return lift { $0.reduce(initial: initial, combine) }
}
public func flatMap<U>(_ transform: @escaping (Value) -> U?) -> ColdSignal<U, ErrorType> {
public func flatMap<U>(_ transform: @escaping (Value) -> U?) -> Source<U> {
return lift { $0.flatMap(transform) }
}
public func flatMap<U>(_ transform: @escaping (Value) -> Source<U>) -> Source<U> {
return lift { $0.map(transform).joined() }
}
}

View File

@ -1,4 +1,4 @@
@testable import Reflex
@testable import StreamKit
import XCTest
@ -6,7 +6,7 @@ class TestBasic: XCTestCase {
func testSignalPipe() {
let (signal, observer) = Signal<Int, NSError>.pipe()
let (signal, observer) = Signal<Int>.pipe()
var nextIndex = 0
let nextVals = [0, 3, 5, 2, -3]
var didComplete = false
@ -21,6 +21,10 @@ class TestBasic: XCTestCase {
didComplete = true
}
signal.onFailed { error in
XCTFail(error.localizedDescription)
}
for val in nextVals {
observer.sendNext(val)
}