Skip to content

Commit

Permalink
Refactor subscription's logic (#6210)
Browse files Browse the repository at this point in the history
* refactor subscription's logic
 - Moved subscribing to provider events from  Web3Subscription to Web3SubscriptionManager
  - subscribe and un subscribe called at Web3Subscription now is the same as calling them on Web3SubscriptionManager
   - Web3Subscription is lined now to Web3SubscriptionManager instead of directly to Web3RequestManager
 - update test cases

* add `SimpleProvider` the base of `EIP1193Provider`

* update CHANGELOG.md

* enable backward compatibility for subscriptions
 + mark the obsolete as deprecated

* add `removeListener` events to `EIP1193Provider`
* Fix some events types at `SocketProvider`

* add and fix old test cases for multiple subscriptions
  • Loading branch information
Muhammad-Altabba committed Jun 24, 2023
1 parent 51a59f9 commit e8d579c
Show file tree
Hide file tree
Showing 21 changed files with 645 additions and 174 deletions.
11 changes: 11 additions & 0 deletions packages/web3-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,17 @@ Documentation:

## [Unreleased]

### Added

- Web3Subscription constructor accept a Subscription Manager (as an alternative to accepting Request Manager that is now marked marked as deprecated) (#6210)

### Changed

- Web3Subscription constructor overloading that accept a Request Manager is marked as deprecated (#6210)

### Fixed

- Fixed Batch requests erroring out on one request (#6164)
- Fixed the issue: Subscribing to multiple blockchain events causes every listener to be fired for every registered event (#6210)
- Fixed the issue: Unsubscribe at a Web3Subscription class will still have the id of the subscription at the Web3SubscriptionManager (#6210)
- Fixed the issue: A call to the provider is made for every subscription object (#6210)
3 changes: 2 additions & 1 deletion packages/web3-core/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ export const isLegacySendAsyncProvider = <API extends Web3APISpec>(
export const isSupportedProvider = <API extends Web3APISpec>(
provider: SupportedProviders<API>,
): provider is SupportedProviders<API> =>
Web3BaseProvider.isWeb3Provider(provider) ||
isWeb3Provider(provider) ||
isEIP1193Provider(provider) ||
isLegacyRequestProvider(provider) ||
isLegacySendAsyncProvider(provider) ||
isLegacySendProvider(provider);
Expand Down
10 changes: 5 additions & 5 deletions packages/web3-core/src/web3_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ export class Web3Context<
public static givenProvider?: SupportedProviders<never>;
public readonly providers = Web3RequestManager.providers;
protected _requestManager: Web3RequestManager<API>;
protected _subscriptionManager?: Web3SubscriptionManager<API, RegisteredSubs>;
protected _subscriptionManager: Web3SubscriptionManager<API, RegisteredSubs>;
protected _accountProvider?: Web3AccountProvider<Web3BaseWalletAccount>;
protected _wallet?: Web3BaseWallet<Web3BaseWalletAccount>;

Expand Down Expand Up @@ -146,10 +146,10 @@ export class Web3Context<

if (subscriptionManager) {
this._subscriptionManager = subscriptionManager;
} else if (registeredSubscriptions) {
} else {
this._subscriptionManager = new Web3SubscriptionManager(
this.requestManager,
registeredSubscriptions,
registeredSubscriptions ?? ({} as RegisteredSubs),
);
}

Expand Down Expand Up @@ -195,8 +195,7 @@ export class Web3Context<
provider: this.provider,
requestManager: this.requestManager,
subscriptionManager: this.subscriptionManager,
registeredSubscriptions: this.subscriptionManager
?.registeredSubscriptions as RegisteredSubs,
registeredSubscriptions: this.subscriptionManager?.registeredSubscriptions,
providers: this.providers,
wallet: this.wallet,
accountProvider: this.accountProvider,
Expand Down Expand Up @@ -231,6 +230,7 @@ export class Web3Context<
this.setConfig(parentContext.config);
this._requestManager = parentContext.requestManager;
this.provider = parentContext.provider;
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
this._subscriptionManager = parentContext.subscriptionManager;
this._wallet = parentContext.wallet;
this._accountProvider = parentContext._accountProvider;
Expand Down
123 changes: 101 additions & 22 deletions packages/web3-core/src/web3_subscription_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,22 @@ You should have received a copy of the GNU Lesser General Public License
along with web3.js. If not, see <http://www.gnu.org/licenses/>.
*/

import { DataFormat, DEFAULT_RETURN_FORMAT, Web3APISpec } from 'web3-types';
import {
DataFormat,
DEFAULT_RETURN_FORMAT,
EIP1193Provider,
JsonRpcNotification,
JsonRpcSubscriptionResult,
JsonRpcSubscriptionResultOld,
Log,
Web3APISpec,
Web3BaseProvider,
} from 'web3-types';
import { ProviderError, SubscriptionError } from 'web3-errors';
import { isNullish } from 'web3-utils';
import { isSupportSubscriptions } from './utils.js';
import { Web3RequestManager, Web3RequestManagerEvent } from './web3_request_manager.js';
// eslint-disable-next-line import/no-cycle
import { Web3SubscriptionConstructor } from './web3_subscriptions.js';

type ShouldUnsubscribeCondition = ({
Expand All @@ -31,8 +42,10 @@ type ShouldUnsubscribeCondition = ({
}) => boolean | undefined;

export class Web3SubscriptionManager<
API extends Web3APISpec,
RegisteredSubs extends { [key: string]: Web3SubscriptionConstructor<API> },
API extends Web3APISpec = Web3APISpec,
RegisteredSubs extends { [key: string]: Web3SubscriptionConstructor<API> } = {
[key: string]: Web3SubscriptionConstructor<API>;
},
> {
private readonly _subscriptions: Map<
string,
Expand All @@ -41,34 +54,97 @@ export class Web3SubscriptionManager<

/**
*
* @param requestManager
* @param registeredSubscriptions
* @param - requestManager
* @param - registeredSubscriptions
*
* @example
* ```ts
* const requestManager = new Web3RequestManager("ws://localhost:8545");
* const subscriptionManager = new Web3SubscriptionManager(requestManager, {});
* ```
*/
public constructor(
requestManager: Web3RequestManager<API>,
registeredSubscriptions: RegisteredSubs,
);
/**
* @deprecated This constructor overloading should not be used
*/
public constructor(
requestManager: Web3RequestManager<API>,
registeredSubscriptions: RegisteredSubs,
tolerateUnlinkedSubscription: boolean,
);
public constructor(
public readonly requestManager: Web3RequestManager<API>,
public readonly registeredSubscriptions: RegisteredSubs,
private readonly tolerateUnlinkedSubscription: boolean = false,
) {
this.requestManager.on(Web3RequestManagerEvent.BEFORE_PROVIDER_CHANGE, async () => {
await this.unsubscribe();
});

this.requestManager.on(Web3RequestManagerEvent.PROVIDER_CHANGED, () => {
this.clear();
this.listenToProviderEvents();
});

this.listenToProviderEvents();
}

private listenToProviderEvents() {
const providerAsWebProvider = this.requestManager.provider as Web3BaseProvider;
if (
!this.requestManager.provider ||
(typeof providerAsWebProvider?.supportsSubscriptions === 'function' &&
!providerAsWebProvider?.supportsSubscriptions())
) {
return;
}

if (typeof (this.requestManager.provider as EIP1193Provider<API>).on === 'function') {
if (
typeof (this.requestManager.provider as EIP1193Provider<API>).request === 'function'
) {
// Listen to provider messages and data
(this.requestManager.provider as EIP1193Provider<API>).on(
'message',
// eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-argument
(message: any) => this.messageListener(message),
);
} else {
// eslint-disable-next-line @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-argument
providerAsWebProvider.on<Log>('data', (data: any) => this.messageListener(data));
}
}
}

protected messageListener(
data?:
| JsonRpcSubscriptionResult
| JsonRpcSubscriptionResultOld<Log>
| JsonRpcNotification<Log>,
) {
if (!data) {
throw new SubscriptionError('Should not call messageListener with no data. Type was');
}
const subscriptionId =
(data as JsonRpcNotification).params?.subscription ||
(data as JsonRpcSubscriptionResultOld).data?.subscription ||
(data as JsonRpcSubscriptionResult).id?.toString(16);

// Process if the received data is related to a subscription
if (subscriptionId) {
const sub = this._subscriptions.get(subscriptionId);
sub?.processSubscriptionData(data);
}
}
/**
* Will create a new subscription
*
* @param name - The subscription you want to subscribe to
* @param args (optional) - Optional additional parameters, depending on the subscription type
* @param returnFormat ({@link DataFormat} defaults to {@link DEFAULT_RETURN_FORMAT}) - Specifies how the return data from the call should be formatted.
* @param args - Optional additional parameters, depending on the subscription type
* @param returnFormat- ({@link DataFormat} defaults to {@link DEFAULT_RETURN_FORMAT}) - Specifies how the return data from the call should be formatted.
*
* Will subscribe to a specific topic (note: name)
* @returns The subscription object
Expand All @@ -78,19 +154,16 @@ export class Web3SubscriptionManager<
args?: ConstructorParameters<RegisteredSubs[T]>[0],
returnFormat: DataFormat = DEFAULT_RETURN_FORMAT,
): Promise<InstanceType<RegisteredSubs[T]>> {
if (!this.requestManager.provider) {
throw new ProviderError('Provider not available');
}

const Klass: RegisteredSubs[T] = this.registeredSubscriptions[name];
if (!Klass) {
throw new SubscriptionError('Invalid subscription type');
}

// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
const subscription = new Klass(args ?? undefined, {
requestManager: this.requestManager,
subscriptionManager: this as Web3SubscriptionManager<API, RegisteredSubs>,
returnFormat,
}) as InstanceType<RegisteredSubs[T]>;
} as any) as InstanceType<RegisteredSubs[T]>;

await this.addSubscription(subscription);

Expand All @@ -111,6 +184,10 @@ export class Web3SubscriptionManager<
* @param sub - A {@link Web3Subscription} object
*/
public async addSubscription(sub: InstanceType<RegisteredSubs[keyof RegisteredSubs]>) {
if (!this.requestManager.provider) {
throw new ProviderError('Provider not available');
}

if (!this.supportsSubscriptions()) {
throw new SubscriptionError('The current provider does not support subscriptions');
}
Expand All @@ -119,34 +196,36 @@ export class Web3SubscriptionManager<
throw new SubscriptionError(`Subscription with id "${sub.id}" already exists`);
}

await sub.subscribe();
await sub.sendSubscriptionRequest();

if (isNullish(sub.id)) {
throw new SubscriptionError('Subscription is not subscribed yet.');
}

this._subscriptions.set(sub.id, sub);

return sub.id;
}

/**
* Will clear a subscription
*
* @param id - The subscription of type {@link Web3Subscription} to remove
*/

public async removeSubscription(sub: InstanceType<RegisteredSubs[keyof RegisteredSubs]>) {
if (isNullish(sub.id)) {
const { id } = sub;

if (isNullish(id)) {
throw new SubscriptionError(
'Subscription is not subscribed yet. Or, had already been unsubscribed but not through the Subscription Manager.',
);
}

if (!this._subscriptions.has(sub.id)) {
throw new SubscriptionError(
`Subscription with id "${sub.id.toString()}" does not exists`,
);
if (!this._subscriptions.has(id) && !this.tolerateUnlinkedSubscription) {
throw new SubscriptionError(`Subscription with id "${id.toString()}" does not exists`);
}
const { id } = sub;
await sub.unsubscribe();

await sub.sendUnsubscribeRequest();
this._subscriptions.delete(id);
return id;
}
Expand Down
Loading

0 comments on commit e8d579c

Please sign in to comment.