refactor of operation queues

This commit is contained in:
Peter 2015-11-16 17:05:21 -05:00
parent efa97a5273
commit 00217e3aac
5 changed files with 156 additions and 83 deletions

View File

@ -51,35 +51,49 @@ public class DDPClient: NSObject {
// included for storing login id and token
internal let userData = NSUserDefaults.standardUserDefaults()
internal let incomingData:NSOperationQueue = {
let background: NSOperationQueue = {
let queue = NSOperationQueue()
queue.name = "DDP Incoming Data Queue"
queue.name = "DDP Background Data Queue"
queue.qualityOfService = .Background
return queue
}()
internal let callbackQueue: NSOperationQueue = {
let queue = NSOperationQueue()
queue.name = "DDP Callback Queue"
queue.maxConcurrentOperationCount = 1
queue.qualityOfService = .UserInitiated
return queue
}()
// Calling methods on the server + their callbacks
internal let outgoingData:NSOperationQueue = {
internal let documentQueue: NSOperationQueue = {
let queue = NSOperationQueue()
queue.name = "DDP Outgoing Data Queue"
// queue.maxConcurrentOperationCount = 1
return queue
}()
internal let operation:NSOperationQueue = {
let queue = NSOperationQueue()
queue.name = "DDP Operation Queue"
queue.name = "DDP Background Queue"
queue.maxConcurrentOperationCount = 1
queue.qualityOfService = .Background
return queue
}()
internal let heartbeat:NSOperationQueue = {
internal let heartbeat: NSOperationQueue = {
let queue = NSOperationQueue()
queue.name = "DDP Heartbeat Queue"
queue.qualityOfService = .Utility
return queue
}()
internal let mainQueue = NSOperationQueue.mainQueue()
let userBackground: NSOperationQueue = {
let queue = NSOperationQueue()
queue.name = "DDP High Priority Background Queue"
queue.qualityOfService = .UserInitiated
return queue
}()
let userMainQueue: NSOperationQueue = {
let queue = NSOperationQueue.mainQueue()
queue.name = "DDP High Priorty Main Queue"
queue.qualityOfService = .UserInitiated
return queue
}()
private var socket:WebSocket!
private var server:(ping:NSDate?, pong:NSDate?) = (nil, nil)
@ -149,14 +163,13 @@ public class DDPClient: NSObject {
}
socket.event.message = { message in
self.operation.addOperationWithBlock() {
self.background.addOperationWithBlock() {
if let text = message as? String {
do { try self.ddpMessageHandler(DDPMessage(message: text)) }
catch { log.debug("Message handling error. Raw message: \(text)")}
}
}
}
}
/**
@ -179,7 +192,6 @@ public class DDPClient: NSObject {
private func pong(ping: DDPMessage) {
heartbeat.addOperationWithBlock() {
self.server.ping = NSDate()
// log.debug("Ping")
var response = ["msg":"pong"]
if let id = ping.id { response["id"] = id }
self.sendMessage(response)
@ -197,7 +209,7 @@ public class DDPClient: NSObject {
self.connection = (true, message.session!)
self.events.onConnected(session:message.session!)
case .Result: NSOperationQueue.mainQueue().addOperationWithBlock() {
case .Result: callbackQueue.addOperationWithBlock() {
if let id = message.id, // Message has id
let callback = self.resultCallbacks[id], // There is a callback registered for the message
let result = message.result {
@ -212,57 +224,56 @@ public class DDPClient: NSObject {
// Principal callbacks for managing data
// Document was added
case .Added: incomingData.addOperationWithBlock() {
if let collection = message.collection,
let id = message.id {
self.documentWasAdded(collection, id: id, fields: message.fields)
}
case .Added: documentQueue.addOperationWithBlock() {
if let collection = message.collection,
let id = message.id {
self.documentWasAdded(collection, id: id, fields: message.fields)
}
}
// Document was changed
case .Changed: incomingData.addOperationWithBlock() {
if let collection = message.collection,
let id = message.id {
self.documentWasChanged(collection, id: id, fields: message.fields, cleared: message.cleared)
}
case .Changed: documentQueue.addOperationWithBlock() {
if let collection = message.collection,
let id = message.id {
self.documentWasChanged(collection, id: id, fields: message.fields, cleared: message.cleared)
}
}
// Document was removed
case .Removed: incomingData.addOperationWithBlock() {
if let collection = message.collection,
let id = message.id {
self.documentWasRemoved(collection, id: id)
}
case .Removed: documentQueue.addOperationWithBlock() {
if let collection = message.collection,
let id = message.id {
self.documentWasRemoved(collection, id: id)
}
}
// Notifies you when the result of a method changes
case .Updated: incomingData.addOperationWithBlock() {
if let methods = message.methods {
self.methodWasUpdated(methods)
}
case .Updated: documentQueue.addOperationWithBlock() {
if let methods = message.methods {
self.methodWasUpdated(methods)
}
}
// Callbacks for managing subscriptions
case .Ready: incomingData.addOperationWithBlock() {
if let subs = message.subs {
self.ready(subs)
}
case .Ready: background.addOperationWithBlock() {
if let subs = message.subs {
self.ready(subs)
}
}
// Callback that fires when subscription has been completely removed
//
case .Nosub: incomingData.addOperationWithBlock() {
case .Nosub: background.addOperationWithBlock() {
if let id = message.id {
self.nosub(id, error: message.error)
}
}
}
case .Ping: heartbeat.addOperationWithBlock() { self.pong(message) }
case .Pong: heartbeat.addOperationWithBlock() { self.server.pong = NSDate() }
case .Error: incomingData.addOperationWithBlock() {
case .Error: background.addOperationWithBlock() {
self.didReceiveErrorMessage(DDPError(json: message.json))
}
@ -290,10 +301,12 @@ public class DDPClient: NSObject {
public func method(name: String, params: AnyObject?, callback: DDPMethodCallback?) -> String {
let id = getId()
let message = ["msg":"method", "method":name, "id":id] as NSMutableDictionary
if let p = params { message["params"] = p }
if let c = callback { resultCallbacks[id] = c }
outgoingData.addOperationWithBlock() { self.sendMessage(message) }
let message = ["msg":"method", "method":name, "id":id] as NSMutableDictionary
if let p = params { message["params"] = p }
if let c = callback { self.resultCallbacks[id] = c }
userBackground.addOperationWithBlock() {
self.sendMessage(message)
}
return id
}
@ -303,11 +316,13 @@ public class DDPClient: NSObject {
internal func sub(id: String, name: String, params: [AnyObject]?, callback: DDPCallback?) -> String {
if let c = callback { subCallbacks[id] = c }
subscriptions[id] = (id, name, false)
if let c = callback { self.subCallbacks[id] = c }
self.subscriptions[id] = (id, name, false)
let message = ["msg":"sub", "name":name, "id":id] as NSMutableDictionary
if let p = params { message["params"] = p }
outgoingData.addOperationWithBlock() { self.sendMessage(message) }
userBackground.addOperationWithBlock() {
self.sendMessage(message)
}
return id
}
@ -374,7 +389,7 @@ public class DDPClient: NSObject {
public func unsub(name: String, callback: DDPCallback?) -> String? {
if let sub = findSubscription(name) {
unsub(withId: sub.id, callback: callback)
outgoingData.addOperationWithBlock() { self.sendMessage(["msg":"unsub", "id":sub.id]) }
background.addOperationWithBlock() { self.sendMessage(["msg":"unsub", "id":sub.id]) }
return sub.id
}
return nil
@ -382,7 +397,7 @@ public class DDPClient: NSObject {
internal func unsub(withId id: String, callback: DDPCallback?) {
if let c = callback { unsubCallbacks[id] = c }
outgoingData.addOperationWithBlock() { self.sendMessage(["msg":"unsub", "id":id]) }
background.addOperationWithBlock() { self.sendMessage(["msg":"unsub", "id":id]) }
}
//

View File

@ -238,7 +238,10 @@ extension DDPClient {
return serverResponse
}
// Callback runs on main thread
internal func login(params: NSDictionary, callback: ((result: AnyObject?, error: DDPError?) -> ())?) {
// method is run on the userBackground queue
method("login", params: NSArray(arrayLiteral: params)) { result, error in
guard let e = error where (e.isValid == true) else {
@ -259,13 +262,14 @@ extension DDPClient {
self.userData.setObject(expiration, forKey: DDP_TOKEN_EXPIRES)
}
if let c = callback { c(result:result, error:error) }
self.userData.setObject(true, forKey: DDP_LOGGED_IN)
NSOperationQueue.mainQueue().addOperationWithBlock() {
self.userMainQueue.addOperationWithBlock() {
if let c = callback { c(result:result, error:error) }
self.userData.setObject(true, forKey: DDP_LOGGED_IN)
NSNotificationCenter.defaultCenter().postNotificationName(DDP_USER_DID_LOGIN, object: nil)
if let _ = self.delegate {
NSNotificationCenter.defaultCenter().postNotificationName(DDP_USER_DID_LOGIN, object: nil)
self.delegate!.ddpUserDidLogin(self.user()!)
}
@ -390,28 +394,27 @@ extension DDPClient {
/**
Logs a user out and removes their account data from NSUserDefaults.
When it completes, it posts a notification: DDP_USER_DID_LOGOUT
When it completes, it posts a notification: DDP_USER_DID_LOGOUT on the main queue
- parameter callback: A closure with result and error parameters describing the outcome of the operation
*/
public func logout(callback:DDPMethodCallback?) {
method("logout", params: nil) { result, error in
if (error == nil) {
NSOperationQueue.mainQueue().addOperationWithBlock() {
if (error == nil) {
let user = self.user()!
NSOperationQueue.mainQueue().addOperationWithBlock() {
NSNotificationCenter.defaultCenter().postNotificationName(DDP_USER_DID_LOGOUT, object: nil)
if let _ = self.delegate {
self.delegate!.ddpUserDidLogout(user)
}
self.resetUserData()
} else {
log.error("\(error)")
}
self.resetUserData()
} else {
log.error("\(error)")
if let c = callback { c(result: result, error: error) }
}
if let c = callback { c(result: result, error: error) }
}
}

View File

@ -20,6 +20,46 @@
import Foundation
class DDPOperation: NSOperation {
typealias Action = () -> Void
var message: DDPMessage
var action: Action
init(message: DDPMessage, action: Action) {
self.message = message
self.action = action
super.init()
}
override func main() {
action()
}
}
class DDPPingOperation: DDPOperation {
override init(message: DDPMessage, action: Action) {
super.init(message: message, action: action)
self.queuePriority = .High
self.qualityOfService = .Utility
}
}
class DDPMessageOperation: DDPOperation {
init(message: DDPMessage, action: Action, completion: Action?) {
super.init(message: message, action: action)
self.queuePriority = .Low
self.qualityOfService = .Background
self.completionBlock = completion
}
}
/**
Enum value representing the types of DDP messages that the server can send
*/

View File

@ -106,6 +106,20 @@ public class Meteor {
*/
public static func unsubscribe(name:String, callback:DDPCallback?) -> String? { return client.unsub(name, callback: callback) }
/**
Calls a method on the server. If a callback is passed, the callback is asynchronously
executed when the method has completed. The callback takes two arguments: result and error. It
the method call is successful, result contains the return value of the method, if any. If the method fails,
error contains information about the error.
- parameter name: The name of the method
- parameter params: An array containing method arguments, if any
- parameter callback: The closure to be executed when the method has been executed
*/
public static func call(name:String, params:[AnyObject]?, callback:DDPMethodCallback?) -> String? {
return client.method(name, params: params, callback: callback)
}
/**
Call a single function to establish a DDP connection, and login with email and password
@ -169,9 +183,7 @@ public class Meteor {
public override func documentWasAdded(collection:String, id:String, fields:NSDictionary?) {
if let meteorCollection = Meteor.collections[collection] as? MeteorCollectionType {
NSOperationQueue.mainQueue().addOperationWithBlock() {
meteorCollection.documentWasAdded(collection, id: id, fields: fields)
}
meteorCollection.documentWasAdded(collection, id: id, fields: fields)
}
}
@ -187,9 +199,7 @@ public class Meteor {
public override func documentWasChanged(collection:String, id:String, fields:NSDictionary?, cleared:[String]?) {
if let meteorCollection = Meteor.collections[collection] as? MeteorCollectionType {
NSOperationQueue.mainQueue().addOperationWithBlock() {
meteorCollection.documentWasChanged(collection, id: id, fields: fields, cleared: cleared)
}
meteorCollection.documentWasChanged(collection, id: id, fields: fields, cleared: cleared)
}
}
@ -203,9 +213,7 @@ public class Meteor {
public override func documentWasRemoved(collection:String, id:String) {
if let meteorCollection = Meteor.collections[collection] as? MeteorCollectionType {
NSOperationQueue.mainQueue().addOperationWithBlock() {
meteorCollection.documentWasRemoved(collection, id: id)
}
meteorCollection.documentWasRemoved(collection, id: id)
}
}
}
@ -243,7 +251,7 @@ public class MeteorCollection: NSObject, MeteorCollectionType {
}
/**
Called when a document has been sent from the server. Always executes on the main queue
Called when a document has been sent from the server.
- parameter collection: the string name of the collection to which the document belongs
- parameter id: the string unique id that identifies the document on the server

View File

@ -72,6 +72,7 @@ public class MeteorCoreDataCollection:MeteorCollection {
return []
}
// Retrieves a single result by name
public func findOne(id:String) -> NSManagedObject? {
let fetchRequest = NSFetchRequest(entityName: self.entityName)
@ -84,20 +85,26 @@ public class MeteorCoreDataCollection:MeteorCollection {
}
public func exists(id:String) -> Bool {
let error = NSErrorPointer()
let fetchRequest = NSFetchRequest(entityName: self.entityName)
fetchRequest.predicate = NSPredicate(format: "id == '\(id)'")
let count = managedObjectContext.countForFetchRequest(fetchRequest, error: nil)
if count > 0 {
let count = managedObjectContext.countForFetchRequest(fetchRequest, error: error)
if error == nil && count > 0 {
return true
}
return false
}
public func exists(collection:String, id:String) -> Bool {
let error = NSErrorPointer()
let fetchRequest = NSFetchRequest(entityName: self.entityName)
fetchRequest.predicate = NSPredicate(format: "id == '\(id)' AND collection == '\(collection)'")
let count = managedObjectContext.countForFetchRequest(fetchRequest, error: nil)
if count > 0 {
print("Managed object context \(managedObjectContext)")
let count = managedObjectContext.countForFetchRequest(fetchRequest, error: error)
if error == nil && count > 0 {
return true
}
return false