From 28ff4c95968fef5e56b7b1a81c454187e0e7dc62 Mon Sep 17 00:00:00 2001 From: Francisco Rodriguez Date: Thu, 12 Mar 2020 21:48:00 -0700 Subject: [PATCH] fix(@aws-amplify/datastore) Adding socket disconnection detection (#5086) --- .../storage/adapter/AsyncStorageDatabase.ts | 2 +- .../src/sync/datastoreConnectivity.ts | 56 +++++++++++++++++++ packages/datastore/src/sync/index.ts | 41 +++++++++++--- .../Providers/AWSAppSyncRealTimeProvider.ts | 31 ++++++---- packages/pubsub/src/index.ts | 8 ++- 5 files changed, 119 insertions(+), 19 deletions(-) create mode 100644 packages/datastore/src/sync/datastoreConnectivity.ts diff --git a/packages/datastore/src/storage/adapter/AsyncStorageDatabase.ts b/packages/datastore/src/storage/adapter/AsyncStorageDatabase.ts index 7d7aeaee1a4..dc255229702 100644 --- a/packages/datastore/src/storage/adapter/AsyncStorageDatabase.ts +++ b/packages/datastore/src/storage/adapter/AsyncStorageDatabase.ts @@ -5,7 +5,7 @@ const DB_NAME = '@AmplifyDatastore'; const COLLECTION = 'Collection'; const DATA = 'Data'; -//TODO: Consider refactoring to a batch save operation. +// TODO: Consider refactoring to a batch save operation. class AsyncStorageDatabase { async save(item: T, storeName: string) { const itemKey = this.getKeyForItem(storeName, item.id); diff --git a/packages/datastore/src/sync/datastoreConnectivity.ts b/packages/datastore/src/sync/datastoreConnectivity.ts new file mode 100644 index 00000000000..5b6ca81d37a --- /dev/null +++ b/packages/datastore/src/sync/datastoreConnectivity.ts @@ -0,0 +1,56 @@ +import * as Observable from 'zen-observable'; +import { ConsoleLogger as Logger, Reachability } from '@aws-amplify/core'; + +const logger = new Logger('DataStore'); + +const RECONNECTING_IN = 5000; // 5s this may be configurable in the future + +type ConnectionStatus = { + // Might add other params in the future + online: boolean; +}; + +export default class DataStoreConnectivity { + private connectionStatus: ConnectionStatus; + private observer: ZenObservable.SubscriptionObserver; + constructor() { + this.connectionStatus = { + online: false, + }; + } + + status(): Observable { + if (this.observer) { + throw new Error('Subscriber already exists'); + } + return new Observable(observer => { + this.observer = observer; + // Will be used to forward socket connection changes, enhancing Reachability + + const subs = new Reachability() + .networkMonitor() + .subscribe(({ online }) => { + this.connectionStatus.online = online; + + const observerResult = { ...this.connectionStatus }; // copyOf status + + observer.next(observerResult); + }); + + return () => { + subs.unsubscribe(); + }; + }); + } + + socketDisconnected() { + if (this.observer && typeof this.observer.next === 'function') { + this.observer.next({ online: false }); // Notify network issue from the socket + + setTimeout(() => { + const observerResult = { ...this.connectionStatus }; // copyOf status + this.observer.next(observerResult); + }, RECONNECTING_IN); // giving time for socket cleanup and network status stabilization + } + } +} diff --git a/packages/datastore/src/sync/index.ts b/packages/datastore/src/sync/index.ts index 526fced2a6a..a1bafb5c8d4 100644 --- a/packages/datastore/src/sync/index.ts +++ b/packages/datastore/src/sync/index.ts @@ -10,7 +10,6 @@ import { ModelInit, MutableModel, NamespaceResolver, - PersistentModel, PersistentModelConstructor, SchemaModel, SchemaNamespace, @@ -26,6 +25,8 @@ import { predicateToGraphQLCondition, TransformerMutationType, } from './utils'; +import DataStoreConnectivity from './datastoreConnectivity'; +import { CONTROL_MSG as PUBSUB_CONTROL_MSG } from '@aws-amplify/pubsub'; const logger = new Logger('DataStore'); @@ -134,17 +135,26 @@ export class SyncEngine { return; } - new Reachability().networkMonitor().subscribe(async ({ online }) => { - this.online = online; - if (online) { + const datastoreConnectivity = new DataStoreConnectivity(); + + datastoreConnectivity.status().subscribe(async ({ online }) => { + if (online && !this.online) { + // From offline to online //#region GraphQL Subscriptions const [ ctlSubsObservable, dataSubsObservable, ] = this.subscriptionsProcessor.start(); + + const errorHandler = this.disconnectionHandler( + datastoreConnectivity + ); try { subscriptions.push( - await this.waitForSubscriptionsReady(ctlSubsObservable) + await this.waitForSubscriptionsReady( + ctlSubsObservable, + errorHandler + ) ); } catch (err) { observer.error(err); @@ -225,10 +235,11 @@ export class SyncEngine { } ) ); - } else { + } else if (!online) { subscriptions.forEach(sub => sub.unsubscribe()); subscriptions = []; } + this.online = online; }); this.storage @@ -397,8 +408,23 @@ export class SyncEngine { }); } + private disconnectionHandler( + datastoreConnectivity: DataStoreConnectivity + ): (msg: string) => void { + return (msg: string) => { + // This implementation is tight to AWSAppSyncRealTimeProvider 'Connection closed', 'Timeout disconnect' msg + if ( + PUBSUB_CONTROL_MSG.CONNECTION_CLOSED === msg || + PUBSUB_CONTROL_MSG.TIMEOUT_DISCONNECT === msg + ) { + datastoreConnectivity.socketDisconnected(); + } + }; + } + private async waitForSubscriptionsReady( - ctlSubsObservable: Observable + ctlSubsObservable: Observable, + errorHandler: (msg: string) => void ): Promise { return new Promise((resolve, reject) => { const subscription = ctlSubsObservable.subscribe({ @@ -409,6 +435,7 @@ export class SyncEngine { }, error: err => { reject(`subscription failed ${err}`); + errorHandler(err); }, }); }); diff --git a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider.ts b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider.ts index da148207404..de7d0c26b88 100644 --- a/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider.ts +++ b/packages/pubsub/src/Providers/AWSAppSyncRealTimeProvider.ts @@ -29,6 +29,7 @@ import { import Cache from '@aws-amplify/cache'; import Auth from '@aws-amplify/auth'; import { AbstractPubSubProvider } from './PubSubProvider'; +import { CONTROL_MSG } from '@aws-amplify/pubsub'; const logger = new Logger('AWSAppSyncRealTimeProvider'); @@ -148,6 +149,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { private keepAliveTimeout = DEFAULT_KEEP_ALIVE_TIMEOUT; private subscriptionObserverMap: Map = new Map(); private promiseArray: Array<{ res: Function; rej: Function }> = []; + getProviderName() { return 'AWSAppSyncRealTimeProvider'; } @@ -191,17 +193,24 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { try { // Waiting that subscription has been connected before trying to unsubscribe await this._waitForSubscriptionToBeConnected(subscriptionId); - const { subscriptionState } = this.subscriptionObserverMap.get( - subscriptionId - ); + + const { subscriptionState } = + this.subscriptionObserverMap.get(subscriptionId) || {}; + + if (!subscriptionState) { + // subscription already unsubscribed + return; + } + if (subscriptionState === SUBSCRIPTION_STATUS.CONNECTED) { this._sendUnsubscriptionMessage(subscriptionId); } else { - throw new Error('Subscription fail, start removing subscription'); + throw new Error('Subscription never connected'); } } catch (err) { + logger.debug(`Error while unsubscribing ${err}`); + } finally { this._removeSubscriptionObserver(subscriptionId); - return; } }; } @@ -361,8 +370,6 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { }; const stringToAWSRealTime = JSON.stringify(unsubscribeMessage); this.awsRealTimeSocket.send(stringToAWSRealTime); - - this._removeSubscriptionObserver(subscriptionId); } } catch (err) { // If GQL_STOP is not sent because of disconnection issue, then there is nothing the client can do @@ -394,6 +401,9 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { logger.debug('closing WebSocket...'); clearTimeout(this.keepAliveTimeoutId); const tempSocket = this.awsRealTimeSocket; + // Cleaning callbacks to avoid race condition, socket still exists + tempSocket.onclose = undefined; + tempSocket.onerror = undefined; tempSocket.close(1000); this.awsRealTimeSocket = null; this.socketStatus = SOCKET_STATUS.CLOSED; @@ -456,7 +466,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { if (type === MESSAGE_TYPES.GQL_CONNECTION_KEEP_ALIVE) { clearTimeout(this.keepAliveTimeoutId); this.keepAliveTimeoutId = setTimeout( - this._errorDisconnect.bind(this, 'Timeout disconnect'), + this._errorDisconnect.bind(this, CONTROL_MSG.TIMEOUT_DISCONNECT), this.keepAliveTimeout ); return; @@ -493,6 +503,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { } private _errorDisconnect(msg: string) { + logger.debug(`Disconnect error: ${msg}`); this.subscriptionObserverMap.forEach(({ observer }) => { if (!observer.closed) { observer.error({ @@ -636,7 +647,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { return new Promise((res, rej) => { let ackOk = false; this.awsRealTimeSocket.onerror = error => { - logger.debug(`WebSocket closed ${JSON.stringify(error)}`); + logger.debug(`WebSocket error ${JSON.stringify(error)}`); }; this.awsRealTimeSocket.onclose = event => { logger.debug(`WebSocket closed ${event.reason}`); @@ -662,7 +673,7 @@ export class AWSAppSyncRealTimeProvider extends AbstractPubSubProvider { ); this.awsRealTimeSocket.onerror = err => { logger.debug(err); - this._errorDisconnect('Connection closed'); + this._errorDisconnect(CONTROL_MSG.CONNECTION_CLOSED); }; res('Cool, connected to AWS AppSyncRealTime'); return; diff --git a/packages/pubsub/src/index.ts b/packages/pubsub/src/index.ts index 0d2175311ef..c9164d5ad3a 100644 --- a/packages/pubsub/src/index.ts +++ b/packages/pubsub/src/index.ts @@ -16,6 +16,11 @@ import Amplify, { ConsoleLogger as Logger } from '@aws-amplify/core'; const logger = new Logger('PubSub'); +enum CONTROL_MSG { + CONNECTION_CLOSED = 'Connection closed', + TIMEOUT_DISCONNECT = 'Timeout disconnect', +} + let _instance: PubSubClass = null; if (!_instance) { @@ -29,4 +34,5 @@ Amplify.register(PubSub); export default PubSub; export * from './Providers/AWSIotProvider'; -export { PubSubClass }; + +export { PubSubClass, CONTROL_MSG };