feat: disconnect on last subscription, fix data races (#46)
* feat: disconnect on last subscription, fix data races * add more testing, and add changelog
This commit is contained in:
parent
151e0233c3
commit
b80fa4d265
|
@ -0,0 +1,7 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Workspace
|
||||
version = "1.0">
|
||||
<FileRef
|
||||
location = "self:">
|
||||
</FileRef>
|
||||
</Workspace>
|
|
@ -8,6 +8,8 @@
|
|||
|
||||
/* Begin PBXBuildFile section */
|
||||
0B59161BB37D32073E4FD61B /* Pods_AppSyncRTCSample.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 7CF486070B34EFD15B4DB8FC /* Pods_AppSyncRTCSample.framework */; };
|
||||
2164E65D2639AD5600385027 /* StarscreamAdapterTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2164E65C2639AD5600385027 /* StarscreamAdapterTests.swift */; };
|
||||
2164E674263C58CE00385027 /* AppSyncRealTimeClientTestBase.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2164E673263C58CD00385027 /* AppSyncRealTimeClientTestBase.swift */; };
|
||||
217F39992405D9D500F1A0B3 /* AppSyncRealTimeClient.framework in Frameworks */ = {isa = PBXBuildFile; fileRef = 217F398F2405D9D500F1A0B3 /* AppSyncRealTimeClient.framework */; };
|
||||
217F39CC2406E98400F1A0B3 /* AppSyncResponse.swift in Sources */ = {isa = PBXBuildFile; fileRef = 217F39AA2406E98300F1A0B3 /* AppSyncResponse.swift */; };
|
||||
217F39CD2406E98400F1A0B3 /* InterceptableConnection.swift in Sources */ = {isa = PBXBuildFile; fileRef = 217F39AB2406E98300F1A0B3 /* InterceptableConnection.swift */; };
|
||||
|
@ -117,6 +119,8 @@
|
|||
|
||||
/* Begin PBXFileReference section */
|
||||
18D6E56CE03BAC33493CC19B /* Pods-HostApp.debug.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-HostApp.debug.xcconfig"; path = "Target Support Files/Pods-HostApp/Pods-HostApp.debug.xcconfig"; sourceTree = "<group>"; };
|
||||
2164E65C2639AD5600385027 /* StarscreamAdapterTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = StarscreamAdapterTests.swift; sourceTree = "<group>"; };
|
||||
2164E673263C58CD00385027 /* AppSyncRealTimeClientTestBase.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = AppSyncRealTimeClientTestBase.swift; sourceTree = "<group>"; };
|
||||
217F398F2405D9D500F1A0B3 /* AppSyncRealTimeClient.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = AppSyncRealTimeClient.framework; sourceTree = BUILT_PRODUCTS_DIR; };
|
||||
217F39932405D9D500F1A0B3 /* Info.plist */ = {isa = PBXFileReference; lastKnownFileType = text.plist.xml; path = Info.plist; sourceTree = "<group>"; };
|
||||
217F39982405D9D500F1A0B3 /* AppSyncRealTimeClientTests.xctest */ = {isa = PBXFileReference; explicitFileType = wrapper.cfbundle; includeInIndex = 0; path = AppSyncRealTimeClientTests.xctest; sourceTree = BUILT_PRODUCTS_DIR; };
|
||||
|
@ -297,9 +301,9 @@
|
|||
217F399C2405D9D500F1A0B3 /* AppSyncRealTimeClientTests */ = {
|
||||
isa = PBXGroup;
|
||||
children = (
|
||||
217F399F2405D9D500F1A0B3 /* Info.plist */,
|
||||
217F39E82406EA3F00F1A0B3 /* Connection */,
|
||||
217F39EC2406EA4000F1A0B3 /* ConnectionProvider */,
|
||||
217F399F2405D9D500F1A0B3 /* Info.plist */,
|
||||
21D38B84240A39E400EC2A8D /* Interceptor */,
|
||||
217F39EA2406EA3F00F1A0B3 /* Mocks */,
|
||||
217F39EE2406EA4000F1A0B3 /* Support */,
|
||||
|
@ -432,9 +436,11 @@
|
|||
isa = PBXGroup;
|
||||
children = (
|
||||
21D38B4B2409B6C000EC2A8D /* amplifyconfiguration.json */,
|
||||
21D38B4D2409B8B200EC2A8D /* README.md */,
|
||||
21D38B422409AFBD00EC2A8D /* Info.plist */,
|
||||
21D38B402409AFBD00EC2A8D /* AppSyncRealTimeClientIntegrationTests.swift */,
|
||||
2164E673263C58CD00385027 /* AppSyncRealTimeClientTestBase.swift */,
|
||||
21D38B422409AFBD00EC2A8D /* Info.plist */,
|
||||
21D38B4D2409B8B200EC2A8D /* README.md */,
|
||||
2164E65C2639AD5600385027 /* StarscreamAdapterTests.swift */,
|
||||
21D38B95240C4DC200EC2A8D /* Support */,
|
||||
);
|
||||
path = AppSyncRealTimeClientIntegrationTests;
|
||||
|
@ -1144,9 +1150,11 @@
|
|||
buildActionMask = 2147483647;
|
||||
files = (
|
||||
21D38B99240C4E1C00EC2A8D /* ConfigurationHelper.swift in Sources */,
|
||||
2164E674263C58CE00385027 /* AppSyncRealTimeClientTestBase.swift in Sources */,
|
||||
21D38B97240C4DCF00EC2A8D /* Error+Extension.swift in Sources */,
|
||||
21D38B9D240C540D00EC2A8D /* TestCommonConstants.swift in Sources */,
|
||||
21D38B412409AFBD00EC2A8D /* AppSyncRealTimeClientIntegrationTests.swift in Sources */,
|
||||
2164E65D2639AD5600385027 /* StarscreamAdapterTests.swift in Sources */,
|
||||
);
|
||||
runOnlyForDeploymentPostprocessing = 0;
|
||||
};
|
||||
|
|
|
@ -39,6 +39,7 @@
|
|||
buildConfiguration = "Debug"
|
||||
selectedDebuggerIdentifier = "Xcode.DebuggerFoundation.Debugger.LLDB"
|
||||
selectedLauncherIdentifier = "Xcode.DebuggerFoundation.Launcher.LLDB"
|
||||
enableThreadSanitizer = "YES"
|
||||
launchStyle = "0"
|
||||
useCustomWorkingDirectory = "NO"
|
||||
ignoresPersistentStateOnLaunch = "NO"
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
//
|
||||
// Copyright 2018-2020 Amazon.com,
|
||||
// Copyright 2018-2021 Amazon.com,
|
||||
// Inc. or its affiliates. All Rights Reserved.
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
@ -18,7 +18,7 @@ extension AppSyncSubscriptionConnection {
|
|||
return
|
||||
}
|
||||
if connectionState == .connected {
|
||||
AppSyncLogger.debug("Start subscription")
|
||||
AppSyncLogger.debug("[AppSyncSubscriptionConnection] \(#function): connection is connected, start subscription.")
|
||||
startSubscription()
|
||||
}
|
||||
}
|
||||
|
@ -52,7 +52,7 @@ extension AppSyncSubscriptionConnection {
|
|||
|
||||
private func convertToPayload(for query: String, variables: [String: Any?]?) -> AppSyncMessage.Payload? {
|
||||
guard let subscriptionItem = subscriptionItem else {
|
||||
AppSyncLogger.debug("\(#function): no subscription item")
|
||||
AppSyncLogger.warn("[AppSyncSubscriptionConnection] \(#function): missing subscription item")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
//
|
||||
// Copyright 2018-2020 Amazon.com,
|
||||
// Copyright 2018-2021 Amazon.com,
|
||||
// Inc. or its affiliates. All Rights Reserved.
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
@ -11,12 +11,14 @@ extension AppSyncSubscriptionConnection {
|
|||
|
||||
func handleDataEvent(response: AppSyncResponse) {
|
||||
guard let subscriptionItem = subscriptionItem else {
|
||||
AppSyncLogger.debug("\(#function): no subscription item")
|
||||
AppSyncLogger.warn("[AppSyncSubscriptionConnection] \(#function): missing subscription item")
|
||||
return
|
||||
}
|
||||
|
||||
guard response.id == subscriptionItem.identifier else {
|
||||
AppSyncLogger.verbose("\(#function): ignoring data event for \(response.id ?? "(null)")")
|
||||
AppSyncLogger.verbose("""
|
||||
[AppSyncSubscriptionConnection] \(#function): \(subscriptionItem.identifier). Ignoring data event for \(response.id ?? "(null)")
|
||||
""")
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@ import Starscream
|
|||
extension AppSyncSubscriptionConnection {
|
||||
func handleError(error: Error) {
|
||||
guard let subscriptionItem = subscriptionItem else {
|
||||
AppSyncLogger.debug("\(#function): no subscription item")
|
||||
AppSyncLogger.warn("[AppSyncSubscriptionConnection] \(#function): missing subscription item")
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -34,7 +34,7 @@ extension AppSyncSubscriptionConnection {
|
|||
|
||||
let retryAdvice = retryHandler.shouldRetryRequest(for: connectionError)
|
||||
if retryAdvice.shouldRetry, let retryInterval = retryAdvice.retryInterval {
|
||||
AppSyncLogger.debug("Retrying subscription \(subscriptionItem.identifier) after \(retryInterval)")
|
||||
AppSyncLogger.debug("[AppSyncSubscriptionConnection] Retrying subscription \(subscriptionItem.identifier) after \(retryInterval)")
|
||||
DispatchQueue.global().asyncAfter(deadline: .now() + retryInterval) {
|
||||
self.connectionProvider?.connect()
|
||||
}
|
||||
|
|
|
@ -50,17 +50,17 @@ public class AppSyncSubscriptionConnection: SubscriptionConnection, RetryableCon
|
|||
}
|
||||
|
||||
public func unsubscribe(item: SubscriptionItem) {
|
||||
AppSyncLogger.debug("Unsubscribe - \(item.identifier)")
|
||||
AppSyncLogger.debug("[AppSyncSubscriptionConnection] Unsubscribe \(item.identifier)")
|
||||
|
||||
let message = AppSyncMessage(id: item.identifier, type: .unsubscribe("stop"))
|
||||
|
||||
guard let connectionProvider = connectionProvider else {
|
||||
AppSyncLogger.debug("\(#function): no connection provider")
|
||||
AppSyncLogger.warn("[AppSyncSubscriptionConnection] \(#function): missing connection provider")
|
||||
return
|
||||
}
|
||||
|
||||
guard let subscriptionItem = subscriptionItem else {
|
||||
AppSyncLogger.debug("\(#function): no subscription item")
|
||||
AppSyncLogger.warn("[AppSyncSubscriptionConnection] \(#function): missing subscription item")
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -70,18 +70,18 @@ public class AppSyncSubscriptionConnection: SubscriptionConnection, RetryableCon
|
|||
|
||||
private func addListener() {
|
||||
guard let connectionProvider = connectionProvider else {
|
||||
AppSyncLogger.debug("\(#function): no connection provider")
|
||||
AppSyncLogger.warn("[AppSyncSubscriptionConnection] \(#function): no connection provider")
|
||||
return
|
||||
}
|
||||
|
||||
guard let subscriptionItem = subscriptionItem else {
|
||||
AppSyncLogger.debug("\(#function): no subscription item")
|
||||
AppSyncLogger.warn("[AppSyncSubscriptionConnection] \(#function): no subscription item")
|
||||
return
|
||||
}
|
||||
|
||||
connectionProvider.addListener(identifier: subscriptionItem.identifier) { [weak self] event in
|
||||
guard let self = self else {
|
||||
AppSyncLogger.debug("Self is nil, listener is not called.")
|
||||
AppSyncLogger.debug("[AppSyncSubscriptionConnection] \(#function): Self is nil, listener is not called.")
|
||||
return
|
||||
}
|
||||
switch event {
|
||||
|
|
|
@ -11,7 +11,7 @@ extension RealtimeConnectionProvider {
|
|||
|
||||
/// Start a stale connection timer, first invalidating and destroying any existing timer
|
||||
func startStaleConnectionTimer() {
|
||||
AppSyncLogger.debug("Starting stale connection timer for \(staleConnectionTimeout.get())s")
|
||||
AppSyncLogger.debug("[RealtimeConnectionProvider] Starting stale connection timer for \(staleConnectionTimeout.get())s")
|
||||
if staleConnectionTimer != nil {
|
||||
stopStaleConnectionTimer()
|
||||
}
|
||||
|
@ -22,14 +22,14 @@ extension RealtimeConnectionProvider {
|
|||
|
||||
/// Stop and destroy any existing stale connection timer
|
||||
func stopStaleConnectionTimer() {
|
||||
AppSyncLogger.debug("Stopping and destroying stale connection timer")
|
||||
AppSyncLogger.debug("[RealtimeConnectionProvider] Stopping and destroying stale connection timer")
|
||||
staleConnectionTimer?.invalidate()
|
||||
staleConnectionTimer = nil
|
||||
}
|
||||
|
||||
/// Reset the stale connection timer in response to receiving a message
|
||||
func resetStaleConnectionTimer() {
|
||||
AppSyncLogger.debug("Resetting stale connection timer")
|
||||
AppSyncLogger.debug("[RealtimeConnectionProvider] Resetting stale connection timer")
|
||||
staleConnectionTimer?.resetCountdown()
|
||||
}
|
||||
|
||||
|
@ -39,9 +39,9 @@ extension RealtimeConnectionProvider {
|
|||
guard let self = self else {
|
||||
return
|
||||
}
|
||||
AppSyncLogger.error("[RealtimeConnectionProvider] Realtime connection is stale, disconnecting.")
|
||||
self.status = .notConnected
|
||||
self.websocket.disconnect()
|
||||
AppSyncLogger.error("Realtime connection is stale, disconnected.")
|
||||
self.updateCallback(event: .error(ConnectionProviderError.connection))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ extension RealtimeConnectionProvider: AppSyncWebsocketDelegate {
|
|||
public func websocketDidConnect(provider: AppSyncWebsocketProvider) {
|
||||
// Call the ack to finish the connection handshake
|
||||
// Inform the callback when ack gives back a response.
|
||||
AppSyncLogger.debug("WebsocketDidConnect, sending init message...")
|
||||
AppSyncLogger.debug("[RealtimeConnectionProvider] WebsocketDidConnect, sending init message")
|
||||
sendConnectionInitMessage()
|
||||
startStaleConnectionTimer()
|
||||
}
|
||||
|
@ -48,10 +48,12 @@ extension RealtimeConnectionProvider: AppSyncWebsocketDelegate {
|
|||
|
||||
switch response.responseType {
|
||||
case .connectionAck:
|
||||
AppSyncLogger.debug("[RealtimeConnectionProvider] received connectionAck")
|
||||
connectionQueue.async { [weak self] in
|
||||
self?.handleConnectionAck(response: response)
|
||||
}
|
||||
case .error:
|
||||
AppSyncLogger.debug("[RealtimeConnectionProvider] received error")
|
||||
connectionQueue.async { [weak self] in
|
||||
self?.handleError(response: response)
|
||||
}
|
||||
|
@ -60,7 +62,7 @@ extension RealtimeConnectionProvider: AppSyncWebsocketDelegate {
|
|||
updateCallback(event: .data(appSyncResponse))
|
||||
}
|
||||
case .keepAlive:
|
||||
AppSyncLogger.debug("\(self) received keepAlive")
|
||||
AppSyncLogger.debug("[RealtimeConnectionProvider] received keepAlive")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@ import Foundation
|
|||
/// through websocket.
|
||||
public class RealtimeConnectionProvider: ConnectionProvider {
|
||||
private let url: URL
|
||||
private var listeners: [String: ConnectionProviderCallback]
|
||||
var listeners: [String: ConnectionProviderCallback]
|
||||
|
||||
let websocket: AppSyncWebsocketProvider
|
||||
|
||||
|
@ -131,9 +131,11 @@ public class RealtimeConnectionProvider: ConnectionProvider {
|
|||
self.listeners.removeValue(forKey: identifier)
|
||||
|
||||
if self.listeners.isEmpty {
|
||||
AppSyncLogger.debug("All listeners removed, disconnecting")
|
||||
AppSyncLogger.debug("[RealtimeConnectionProvider] all subscriptions removed, disconnecting websocket connection.")
|
||||
self.status = .notConnected
|
||||
self.disconnect()
|
||||
self.websocket.disconnect()
|
||||
self.staleConnectionTimer?.invalidate()
|
||||
self.staleConnectionTimer = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
//
|
||||
// Copyright 2018-2020 Amazon.com,
|
||||
// Copyright 2018-2021 Amazon.com,
|
||||
// Inc. or its affiliates. All Rights Reserved.
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
@ -63,7 +63,7 @@ public class APIKeyAuthInterceptor: AuthInterceptor {
|
|||
)
|
||||
return signedMessage
|
||||
default:
|
||||
AppSyncLogger.debug("Message type does not need signing - \(message.messageType)")
|
||||
break
|
||||
}
|
||||
return message
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
//
|
||||
// Copyright 2018-2020 Amazon.com,
|
||||
// Copyright 2018-2021 Amazon.com,
|
||||
// Inc. or its affiliates. All Rights Reserved.
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
@ -37,7 +37,7 @@ public class OIDCAuthInterceptor: AuthInterceptor {
|
|||
)
|
||||
return signedMessage
|
||||
default:
|
||||
AppSyncLogger.debug("Message type does not need signing - \(message.messageType)")
|
||||
break
|
||||
}
|
||||
return message
|
||||
}
|
||||
|
|
|
@ -13,7 +13,7 @@ public struct AppSyncJSONHelper {
|
|||
let jsonEncoder = JSONEncoder()
|
||||
do {
|
||||
let jsonHeader = try jsonEncoder.encode(header)
|
||||
AppSyncLogger.verbose("Header - \(String(describing: String(data: jsonHeader, encoding: .utf8)))")
|
||||
AppSyncLogger.verbose("Generated Header for request - \(String(describing: String(data: jsonHeader, encoding: .utf8)))")
|
||||
return jsonHeader.base64EncodedString()
|
||||
} catch {
|
||||
AppSyncLogger.error(error.localizedDescription)
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
//
|
||||
// Copyright 2018-2020 Amazon.com,
|
||||
// Copyright 2018-2021 Amazon.com,
|
||||
// Inc. or its affiliates. All Rights Reserved.
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
@ -12,23 +12,23 @@ import Starscream
|
|||
extension StarscreamAdapter: Starscream.WebSocketDelegate {
|
||||
|
||||
public func websocketDidConnect(socket: WebSocketClient) {
|
||||
AppSyncLogger.verbose("WebsocketDidConnect")
|
||||
AppSyncLogger.verbose("[StarscreamAdapter] websocketDidConnect: websocket has been connected.")
|
||||
delegate?.websocketDidConnect(provider: self)
|
||||
}
|
||||
|
||||
public func websocketDidDisconnect(socket: WebSocketClient, error: Error?) {
|
||||
AppSyncLogger.verbose("WebsocketDidDisconnect - \(error?.localizedDescription ?? "No error")")
|
||||
AppSyncLogger.verbose("[StarscreamAdapter] websocketDidDisconnect: \(error?.localizedDescription ?? "No error")")
|
||||
delegate?.websocketDidDisconnect(provider: self, error: error)
|
||||
}
|
||||
|
||||
public func websocketDidReceiveMessage(socket: WebSocketClient, text: String) {
|
||||
AppSyncLogger.verbose("WebsocketDidReceiveMessage - \(text)")
|
||||
AppSyncLogger.verbose("[StarscreamAdapter] websocketDidReceiveMessage: - \(text)")
|
||||
let data = text.data(using: .utf8) ?? Data()
|
||||
delegate?.websocketDidReceiveData(provider: self, data: data)
|
||||
}
|
||||
|
||||
public func websocketDidReceiveData(socket: WebSocketClient, data: Data) {
|
||||
AppSyncLogger.verbose("WebsocketDidReceiveData - \(data)")
|
||||
AppSyncLogger.verbose("[StarscreamAdapter] WebsocketDidReceiveData - \(data)")
|
||||
delegate?.websocketDidReceiveData(provider: self, data: data)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,29 +13,40 @@ public class StarscreamAdapter: AppSyncWebsocketProvider {
|
|||
// Do nothing
|
||||
}
|
||||
|
||||
private let serialQueue = DispatchQueue(label: "com.amazonaws.StarscreamAdapter.serialQueue")
|
||||
|
||||
var socket: WebSocket?
|
||||
weak var delegate: AppSyncWebsocketDelegate?
|
||||
|
||||
public func connect(url: URL, protocols: [String], delegate: AppSyncWebsocketDelegate?) {
|
||||
AppSyncLogger.verbose("Connecting to url ...")
|
||||
socket = WebSocket(url: url, protocols: protocols)
|
||||
self.delegate = delegate
|
||||
socket?.delegate = self
|
||||
socket?.callbackQueue = DispatchQueue(label: "com.amazonaws.StarscreamAdapter.callBack")
|
||||
socket?.connect()
|
||||
serialQueue.async {
|
||||
AppSyncLogger.verbose("[StarscreamAdapter] connect. Connecting to url")
|
||||
self.socket = WebSocket(url: url, protocols: protocols)
|
||||
self.delegate = delegate
|
||||
self.socket?.delegate = self
|
||||
self.socket?.callbackQueue = DispatchQueue(label: "com.amazonaws.StarscreamAdapter.callBack")
|
||||
self.socket?.connect()
|
||||
}
|
||||
}
|
||||
|
||||
public func disconnect() {
|
||||
socket?.disconnect()
|
||||
socket = nil
|
||||
serialQueue.async {
|
||||
AppSyncLogger.verbose("[StarscreamAdapter] socket.disconnect")
|
||||
self.socket?.disconnect()
|
||||
self.socket = nil
|
||||
}
|
||||
}
|
||||
|
||||
public func write(message: String) {
|
||||
AppSyncLogger.verbose("Websocket write - \(message)")
|
||||
socket?.write(string: message)
|
||||
serialQueue.async {
|
||||
AppSyncLogger.verbose("[StarscreamAdapter] socket.write - \(message)")
|
||||
self.socket?.write(string: message)
|
||||
}
|
||||
}
|
||||
|
||||
public var isConnected: Bool {
|
||||
return socket?.isConnected ?? false
|
||||
serialQueue.sync {
|
||||
return socket?.isConnected ?? false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,42 +8,7 @@
|
|||
import XCTest
|
||||
@testable import AppSyncRealTimeClient
|
||||
|
||||
class AppSyncRealTimeClientIntegrationTests: XCTestCase {
|
||||
|
||||
var url: URL!
|
||||
var apiKey: String!
|
||||
let requestString = """
|
||||
subscription onCreate {
|
||||
onCreateTodo{
|
||||
id
|
||||
description
|
||||
name
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
override func setUp() {
|
||||
do {
|
||||
let json = try ConfigurationHelper.retrieve(forResource: "amplifyconfiguration")
|
||||
if let data = json as? [String: Any],
|
||||
let api = data["api"] as? [String: Any],
|
||||
let plugins = api["plugins"] as? [String: Any],
|
||||
let awsAPIPlugin = plugins["awsAPIPlugin"] as? [String: Any],
|
||||
let apiNameOptional = awsAPIPlugin.first,
|
||||
let apiName = apiNameOptional.value as? [String: Any],
|
||||
let endpoint = apiName["endpoint"] as? String,
|
||||
let apiKey = apiName["apiKey"] as? String {
|
||||
|
||||
url = URL(string: endpoint)
|
||||
self.apiKey = apiKey
|
||||
} else {
|
||||
throw "Could not retrieve endpoint"
|
||||
}
|
||||
|
||||
} catch {
|
||||
print("Error \(error)")
|
||||
}
|
||||
}
|
||||
class AppSyncRealTimeClientIntegrationTests: AppSyncRealTimeClientTestBase {
|
||||
|
||||
/// Simple integration test against an AppSync service provisioned with a simple
|
||||
/// Todo model generated by the GraphQL Transform on the `model` directive.
|
||||
|
@ -96,16 +61,15 @@ class AppSyncRealTimeClientIntegrationTests: XCTestCase {
|
|||
/// 1. Create a new connection provider
|
||||
/// 2. Create multiple subscriptions
|
||||
/// 3. Unsubscribe the subscriptions
|
||||
/// 4. Sleep to make sure the asynchronous process to disconnect the socket is executed
|
||||
/// 5. Ensure the socket is disconnected
|
||||
/// 6. Repeat Steps 2-5 with the existing connection provider.
|
||||
/// 4. Ensure the socket is disconnected
|
||||
/// 5. Repeat Steps 2-4 with the existing connection provider.
|
||||
///
|
||||
/// - Given: Connected subscriptions
|
||||
/// - When:
|
||||
/// - All subscription items are unsubscribed
|
||||
/// - Then:
|
||||
/// - Underlying websocket is disconnected
|
||||
func testAllSubscriptionsCancelledShouldDisconnectTheWebsocket() {
|
||||
func testAllSubscriptionsCancelledShouldDisconnectTheWebsocket2() {
|
||||
let connectedInvoked = expectation(description: "Connection established")
|
||||
connectedInvoked.expectedFulfillmentCount = 3
|
||||
|
||||
|
@ -162,14 +126,11 @@ class AppSyncRealTimeClientIntegrationTests: XCTestCase {
|
|||
assertStatus(of: realTimeConnectionProvider, equals: .connected)
|
||||
|
||||
subscriptionConnection1.unsubscribe(item: item1)
|
||||
assertStatus(of: realTimeConnectionProvider, equals: .connected)
|
||||
subscriptionConnection2.unsubscribe(item: item2)
|
||||
assertStatus(of: realTimeConnectionProvider, equals: .connected)
|
||||
subscriptionConnection3.unsubscribe(item: item3)
|
||||
|
||||
// Sleep is required here as disconnecting the connection provider is done
|
||||
// asynchronously on the connection queue for the very last unsubscribe. This
|
||||
// means we need to "pull" for the status to ensure the system is operating
|
||||
// correctly by sleeping and checking that the status is .notConnected
|
||||
sleep(5)
|
||||
assertStatus(of: realTimeConnectionProvider, equals: .notConnected)
|
||||
|
||||
let newConnectedInvoked = expectation(description: "Connection established")
|
||||
|
@ -191,6 +152,102 @@ class AppSyncRealTimeClientIntegrationTests: XCTestCase {
|
|||
assertStatus(of: realTimeConnectionProvider, equals: .notConnected)
|
||||
}
|
||||
|
||||
/// The purpose of this test is to ensure that a signifcant number of subscriptions can be created on a websocket, then unsubscribed, and repeated.
|
||||
///
|
||||
/// Specifically, the following test exercises the following:
|
||||
/// 1. Create a new connection provider
|
||||
/// 2. Create multiple subscriptions
|
||||
/// 3. Unsubscribe the subscriptions
|
||||
/// 4. Ensure the socket is disconnected
|
||||
/// 5. Repeat Steps 2-4 with the existing connection provider.
|
||||
///
|
||||
/// - Given: Connected subscriptions
|
||||
/// - When:
|
||||
/// - All subscription items are unsubscribed
|
||||
/// - Then:
|
||||
/// - Underlying websocket is disconnected
|
||||
func testSubscribeUnsubscribeRepeat() {
|
||||
let authInterceptor = APIKeyAuthInterceptor(apiKey)
|
||||
let connectionProvider = ConnectionProviderFactory.createConnectionProvider(
|
||||
for: url,
|
||||
authInterceptor: authInterceptor,
|
||||
connectionType: .appSyncRealtime
|
||||
)
|
||||
guard let realTimeConnectionProvider = connectionProvider as? RealtimeConnectionProvider else {
|
||||
XCTFail("Could not retrieve concrete connection provider")
|
||||
return
|
||||
}
|
||||
|
||||
let count = 30
|
||||
let subscriptions = subscribe(connectionProvider, count: count)
|
||||
assertStatus(of: realTimeConnectionProvider, equals: .connected)
|
||||
for index in 0 ..< count {
|
||||
subscriptions[index].1.unsubscribe(item: subscriptions[index].0)
|
||||
}
|
||||
assertStatus(of: realTimeConnectionProvider, equals: .notConnected)
|
||||
let subscriptions2 = subscribe(connectionProvider, count: count)
|
||||
assertStatus(of: realTimeConnectionProvider, equals: .connected)
|
||||
for index in 0 ..< count {
|
||||
subscriptions2[index].1.unsubscribe(item: subscriptions2[index].0)
|
||||
}
|
||||
assertStatus(of: realTimeConnectionProvider, equals: .notConnected)
|
||||
}
|
||||
|
||||
func testMultipleThreadsSubscribeUnsubscribe() {
|
||||
let authInterceptor = APIKeyAuthInterceptor(apiKey)
|
||||
let connectionProvider = ConnectionProviderFactory.createConnectionProvider(
|
||||
for: url,
|
||||
authInterceptor: authInterceptor,
|
||||
connectionType: .appSyncRealtime
|
||||
)
|
||||
guard let realTimeConnectionProvider = connectionProvider as? RealtimeConnectionProvider else {
|
||||
XCTFail("Could not retrieve concrete connection provider")
|
||||
return
|
||||
}
|
||||
let expectedPerforms = expectation(description: "total performs")
|
||||
expectedPerforms.expectedFulfillmentCount = 1_000
|
||||
DispatchQueue.concurrentPerform(iterations: 1_000) { _ in
|
||||
let subscriptionConnection = AppSyncSubscriptionConnection(provider: connectionProvider)
|
||||
let item = subscriptionConnection.subscribe(
|
||||
requestString: requestString,
|
||||
variables: nil
|
||||
) { event, _ in }
|
||||
subscriptionConnection.unsubscribe(item: item)
|
||||
expectedPerforms.fulfill()
|
||||
}
|
||||
wait(for: [expectedPerforms], timeout: 1)
|
||||
assertStatus(of: realTimeConnectionProvider, equals: .notConnected)
|
||||
}
|
||||
|
||||
// MARK: - Helpers
|
||||
|
||||
private func subscribe(
|
||||
_ connectionProvider: ConnectionProvider,
|
||||
count: Int
|
||||
) -> [(SubscriptionItem, AppSyncSubscriptionConnection)] {
|
||||
let connectedInvoked = expectation(description: "Connection established")
|
||||
connectedInvoked.expectedFulfillmentCount = count
|
||||
var subscriptions = [(SubscriptionItem, AppSyncSubscriptionConnection)]()
|
||||
|
||||
for _ in 1 ... count {
|
||||
let subscriptionConnection = AppSyncSubscriptionConnection(provider: connectionProvider)
|
||||
let item = subscriptionConnection.subscribe(
|
||||
requestString: requestString,
|
||||
variables: nil
|
||||
) { event, _ in
|
||||
if case let .connection(state) = event {
|
||||
if case .connected = state {
|
||||
connectedInvoked.fulfill()
|
||||
}
|
||||
}
|
||||
}
|
||||
subscriptions.append((item, subscriptionConnection))
|
||||
}
|
||||
|
||||
wait(for: [connectedInvoked], timeout: TestCommonConstants.networkTimeout)
|
||||
return subscriptions
|
||||
}
|
||||
|
||||
/// Checks the status of the provider in a thread-safe way. This is only needed for tests; real-world
|
||||
/// call sites wouldn't be able to access the `status` as it has `internal` access.
|
||||
private func assertStatus(
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
//
|
||||
// Copyright 2018-2021 Amazon.com,
|
||||
// Inc. or its affiliates. All Rights Reserved.
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
import XCTest
|
||||
@testable import AppSyncRealTimeClient
|
||||
|
||||
class AppSyncRealTimeClientTestBase: XCTestCase {
|
||||
|
||||
var url: URL!
|
||||
var apiKey: String!
|
||||
let requestString = """
|
||||
subscription onCreate {
|
||||
onCreateTodo{
|
||||
id
|
||||
description
|
||||
name
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
override func setUp() {
|
||||
do {
|
||||
let json = try ConfigurationHelper.retrieve(forResource: "amplifyconfiguration")
|
||||
if let data = json as? [String: Any],
|
||||
let api = data["api"] as? [String: Any],
|
||||
let plugins = api["plugins"] as? [String: Any],
|
||||
let awsAPIPlugin = plugins["awsAPIPlugin"] as? [String: Any],
|
||||
let apiNameOptional = awsAPIPlugin.first,
|
||||
let apiName = apiNameOptional.value as? [String: Any],
|
||||
let endpoint = apiName["endpoint"] as? String,
|
||||
let apiKey = apiName["apiKey"] as? String {
|
||||
|
||||
url = URL(string: endpoint)
|
||||
self.apiKey = apiKey
|
||||
} else {
|
||||
throw "Could not retrieve endpoint"
|
||||
}
|
||||
|
||||
} catch {
|
||||
print("Error \(error)")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,33 @@
|
|||
//
|
||||
// Copyright 2018-2021 Amazon.com,
|
||||
// Inc. or its affiliates. All Rights Reserved.
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
import XCTest
|
||||
@testable import AppSyncRealTimeClient
|
||||
import Starscream
|
||||
|
||||
class StarscreamAdapterTests: AppSyncRealTimeClientTestBase {
|
||||
|
||||
func testConnectDisconnect() throws {
|
||||
let starscreamAdapter = StarscreamAdapter()
|
||||
let apiKeyAuthInterceptor = APIKeyAuthInterceptor(apiKey)
|
||||
let request = AppSyncConnectionRequest(url: url)
|
||||
let signedRequest = apiKeyAuthInterceptor.interceptConnection(request, for: url)
|
||||
let expectedPerforms = expectation(description: "total performs")
|
||||
expectedPerforms.expectedFulfillmentCount = 1_000
|
||||
DispatchQueue.concurrentPerform(iterations: 1_000) { _ in
|
||||
starscreamAdapter.connect(
|
||||
url: signedRequest.url,
|
||||
protocols: ["graphql-ws"],
|
||||
delegate: nil
|
||||
)
|
||||
starscreamAdapter.disconnect()
|
||||
expectedPerforms.fulfill()
|
||||
}
|
||||
wait(for: [expectedPerforms], timeout: 1)
|
||||
XCTAssertFalse(starscreamAdapter.isConnected)
|
||||
}
|
||||
}
|
|
@ -1,5 +1,5 @@
|
|||
//
|
||||
// Copyright 2018-2020 Amazon.com,
|
||||
// Copyright 2018-2021 Amazon.com,
|
||||
// Inc. or its affiliates. All Rights Reserved.
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
@ -59,6 +59,71 @@ class ConnectionProviderTests: RealtimeConnectionProviderTestBase {
|
|||
waitForExpectations(timeout: 0.05)
|
||||
}
|
||||
|
||||
/// Provider add and remove listeners tests
|
||||
///
|
||||
/// Given:
|
||||
/// - A connected websocket with a listener
|
||||
/// When:
|
||||
/// - remove all listeners
|
||||
/// Then:
|
||||
/// - The listeners are removed and the connection is disconnected
|
||||
func testAddRemoveListeners() {
|
||||
receivedNotConnected.isInverted = true
|
||||
receivedError.isInverted = true
|
||||
|
||||
let onConnect: MockWebsocketProvider.OnConnect = { _, _, delegate in
|
||||
self.websocketDelegate = delegate
|
||||
DispatchQueue.global().async {
|
||||
delegate?.websocketDidConnect(provider: self.websocket)
|
||||
}
|
||||
}
|
||||
|
||||
let receivedDisconnect = expectation(description: "receivedDisconnect")
|
||||
let onDisconnect: MockWebsocketProvider.OnDisconnect = {
|
||||
receivedDisconnect.fulfill()
|
||||
}
|
||||
|
||||
let onWrite: MockWebsocketProvider.OnWrite = { message in
|
||||
guard RealtimeConnectionProviderTestBase.messageType(of: message, equals: "connection_init") else {
|
||||
XCTFail("Incoming message did not have 'connection_init' type")
|
||||
return
|
||||
}
|
||||
|
||||
self.websocketDelegate.websocketDidReceiveData(
|
||||
provider: self.websocket,
|
||||
data: RealtimeConnectionProviderTestBase.makeConnectionAckMessage()
|
||||
)
|
||||
}
|
||||
|
||||
websocket = MockWebsocketProvider(
|
||||
onConnect: onConnect,
|
||||
onDisconnect: onDisconnect,
|
||||
onWrite: onWrite
|
||||
)
|
||||
|
||||
// Retain the provider so it doesn't release prior to executing callbacks
|
||||
let provider = createProviderAndConnect(listeners: ["1", "2", "3", "4"])
|
||||
|
||||
wait(
|
||||
for: [receivedInProgress, receivedConnected, receivedNotConnected, receivedError],
|
||||
timeout: 1
|
||||
)
|
||||
|
||||
XCTAssertFalse(provider.listeners.isEmpty)
|
||||
|
||||
let listenersToRemove = provider.listeners.map { $0.key }
|
||||
|
||||
// Removing all the listeners will disconnect the websocket connection
|
||||
for identifier in listenersToRemove {
|
||||
provider.removeListener(identifier: identifier)
|
||||
}
|
||||
|
||||
// Since removing listeners is asynchronous, we have to wait for the disconnect
|
||||
wait(for: [receivedDisconnect], timeout: 1)
|
||||
XCTAssertTrue(provider.listeners.isEmpty)
|
||||
XCTAssertEqual(provider.status, .notConnected)
|
||||
}
|
||||
|
||||
/// Provider test
|
||||
///
|
||||
/// Given:
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
//
|
||||
// Copyright 2018-2020 Amazon.com,
|
||||
// Copyright 2018-2021 Amazon.com,
|
||||
// Inc. or its affiliates. All Rights Reserved.
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
@ -39,7 +39,7 @@ class RealtimeConnectionProviderTestBase: XCTestCase {
|
|||
///
|
||||
/// Preconditions:
|
||||
/// - `self.websocket` must be initialized in the mock provider's `onConnect`
|
||||
func createProviderAndConnect() -> RealtimeConnectionProvider {
|
||||
func createProviderAndConnect(listeners: [String]? = nil) -> RealtimeConnectionProvider {
|
||||
let provider = RealtimeConnectionProvider(for: url, websocket: websocket)
|
||||
provider.addListener(identifier: "testListener") { event in
|
||||
switch event {
|
||||
|
@ -58,6 +58,11 @@ class RealtimeConnectionProviderTestBase: XCTestCase {
|
|||
break
|
||||
}
|
||||
}
|
||||
if let listeners = listeners {
|
||||
listeners.forEach { identifier in
|
||||
provider.addListener(identifier: identifier) { _ in }
|
||||
}
|
||||
}
|
||||
provider.connect()
|
||||
return provider
|
||||
}
|
||||
|
|
|
@ -2,7 +2,9 @@
|
|||
|
||||
## Unreleased
|
||||
|
||||
- *Changes on `main` branch that have not yet been released*
|
||||
### Feature
|
||||
|
||||
- feat: disconnect on last subscription, fix data races (See [PR #46](https://github.com/aws-amplify/aws-appsync-realtime-client-ios/pull/46))
|
||||
|
||||
## 1.4.4
|
||||
|
||||
|
|
Loading…
Reference in New Issue