diff --git a/packages/ethers/src.ts/index.ts b/packages/ethers/src.ts/index.ts index 64771aac56..aa4d22fd3d 100644 --- a/packages/ethers/src.ts/index.ts +++ b/packages/ethers/src.ts/index.ts @@ -5,7 +5,7 @@ import * as ethers from "./ethers"; try { - const anyGlobal = (window as any); + const anyGlobal = ((window || { }) as any); if (anyGlobal._ethers == null) { anyGlobal._ethers = ethers; diff --git a/packages/providers/package.json b/packages/providers/package.json index c2529546d5..7ea0241a55 100644 --- a/packages/providers/package.json +++ b/packages/providers/package.json @@ -2,6 +2,7 @@ "author": "Richard Moore ", "browser": { "./ipc-provider": "./lib/browser-ipc-provider.js", + "ws": "./lib/browser-ws.js", "net": "./lib/browser-net.js" }, "dependencies": { @@ -19,7 +20,8 @@ "@ethersproject/rlp": ">=5.0.0-beta.126", "@ethersproject/strings": ">=5.0.0-beta.130", "@ethersproject/transactions": ">=5.0.0-beta.128", - "@ethersproject/web": ">=5.0.0-beta.129" + "@ethersproject/web": ">=5.0.0-beta.129", + "ws": "7.2.3" }, "description": "Ethereum Providers for ethers.", "devDependencies": { diff --git a/packages/providers/src.ts/base-provider.ts b/packages/providers/src.ts/base-provider.ts index fd75a0bd55..63096013a4 100644 --- a/packages/providers/src.ts/base-provider.ts +++ b/packages/providers/src.ts/base-provider.ts @@ -112,7 +112,7 @@ function getTime() { * - transaction hash */ -class Event { +export class Event { readonly listener: Listener; readonly once: boolean; readonly tag: string; @@ -123,6 +123,27 @@ class Event { defineReadOnly(this, "once", once); } + get type(): string { + return this.tag.split(":")[0] + } + + get hash(): string { + const comps = this.tag.split(":"); + if (comps[0] !== "tx") { return null; } + return comps[1]; + } + + get filter(): Filter { + const comps = this.tag.split(":"); + if (comps[0] !== "filter") { return null; } + const filter = { + address: comps[1], + topics: deserializeTopics(comps[2]) + } + if (!filter.address || filter.address === "*") { delete filter.address; } + return filter; + } + pollable(): boolean { return (this.tag.indexOf(":") >= 0 || this.tag === "block" || this.tag === "pending"); } @@ -154,7 +175,7 @@ export class BaseProvider extends Provider { _emitted: { [ eventName: string ]: number | "pending" }; _pollingInterval: number; - _poller: any; // @TODO: what does TypeScript think setInterval returns? + _poller: NodeJS.Timer; _lastBlockNumber: number; @@ -309,10 +330,9 @@ export class BaseProvider extends Provider { // Find all transaction hashes we are waiting on this._events.forEach((event) => { - const comps = event.tag.split(":"); - switch (comps[0]) { + switch (event.type) { case "tx": { - const hash = comps[1]; + const hash = event.hash; let runner = this.getTransactionReceipt(hash).then((receipt) => { if (!receipt || receipt.blockNumber == null) { return null; } this._emitted["t:" + hash] = receipt.blockNumber; @@ -326,14 +346,9 @@ export class BaseProvider extends Provider { } case "filter": { - const topics = deserializeTopics(comps[2]); - const filter = { - address: comps[1], - fromBlock: this._lastBlockNumber + 1, - toBlock: blockNumber, - topics: topics - } - if (!filter.address || filter.address === "*") { delete filter.address; } + const filter = event.filter; + filter.fromBlock = this._lastBlockNumber + 1; + filter.toBlock = blockNumber; const runner = this.getLogs(filter).then((logs) => { if (logs.length === 0) { return; } @@ -342,7 +357,6 @@ export class BaseProvider extends Provider { this._emitted["t:" + log.transactionHash] = log.blockNumber; this.emit(filter, log); }); - return null; }).catch((error: Error) => { this.emit("error", error); }); runners.push(runner); @@ -937,25 +951,19 @@ export class BaseProvider extends Provider { return logger.throwError(method + " not implemented", Logger.errors.NOT_IMPLEMENTED, { operation: method }); } - _startPending(): void { - console.log("WARNING: this provider does not support pending events"); - } - - _stopPending(): void { + _startEvent(event: Event): void { + this.polling = (this._events.filter((e) => e.pollable()).length > 0); } - // Returns true if there are events that still require polling - _checkPolling(): void { + _stopEvent(event: Event): void { this.polling = (this._events.filter((e) => e.pollable()).length > 0); } _addEventListener(eventName: EventType, listener: Listener, once: boolean): this { - this._events.push(new Event(getEventTag(eventName), listener, once)); - - if (eventName === "pending") { this._startPending(); } + const event = new Event(getEventTag(eventName), listener, once) + this._events.push(event); - // Do we still now have any events that require polling? - this._checkPolling(); + this._startEvent(event); return this; } @@ -972,18 +980,27 @@ export class BaseProvider extends Provider { emit(eventName: EventType, ...args: Array): boolean { let result = false; + let stopped: Array = [ ]; + let eventTag = getEventTag(eventName); this._events = this._events.filter((event) => { if (event.tag !== eventTag) { return true; } + setTimeout(() => { event.listener.apply(this, args); }, 0); + result = true; - return !(event.once); + + if (event.once) { + stopped.push(event); + return false; + } + + return true; }); - // Do we still have any events that require polling? ("once" events remove themselves) - this._checkPolling(); + stopped.forEach((event) => { this._stopEvent(event); }); return result; } @@ -1013,6 +1030,8 @@ export class BaseProvider extends Provider { return this.removeAllListeners(eventName); } + const stopped: Array = [ ]; + let found = false; let eventTag = getEventTag(eventName); @@ -1020,31 +1039,31 @@ export class BaseProvider extends Provider { if (event.tag !== eventTag || event.listener != listener) { return true; } if (found) { return true; } found = true; + stopped.push(event); return false; }); - if (eventName === "pending" && this.listenerCount("pending") === 0) { this._stopPending(); } - - // Do we still have any events that require polling? - this._checkPolling(); + stopped.forEach((event) => { this._stopEvent(event); }); return this; } removeAllListeners(eventName?: EventType): this { + let stopped: Array = [ ]; if (eventName == null) { + stopped = this._events; + this._events = [ ]; - this._stopPending(); } else { - let eventTag = getEventTag(eventName); + const eventTag = getEventTag(eventName); this._events = this._events.filter((event) => { - return (event.tag !== eventTag); + if (event.tag !== eventTag) { return true; } + stopped.push(event); + return false; }); - if (eventName === "pending") { this._stopPending(); } } - // Do we still have any events that require polling? - this._checkPolling(); + stopped.forEach((event) => { this._stopEvent(event); }); return this; } diff --git a/packages/providers/src.ts/browser-ws.ts b/packages/providers/src.ts/browser-ws.ts new file mode 100644 index 0000000000..90956d9c05 --- /dev/null +++ b/packages/providers/src.ts/browser-ws.ts @@ -0,0 +1,17 @@ +"use strict"; + +import { Logger } from "@ethersproject/logger"; +import { version } from "./_version"; + +let WS = (WebSocket as any); + +if (WS == null) { + const logger = new Logger(version); + WS = function() { + logger.throwError("WebSockets not supported in this environment", Logger.errors.UNSUPPORTED_OPERATION, { + operation: "new WebSocket()" + }); + } +} + +module.exports = WS; diff --git a/packages/providers/src.ts/index.ts b/packages/providers/src.ts/index.ts index f875da4c35..872bb3a5bc 100644 --- a/packages/providers/src.ts/index.ts +++ b/packages/providers/src.ts/index.ts @@ -27,6 +27,7 @@ import { InfuraProvider } from "./infura-provider"; import { JsonRpcProvider, JsonRpcSigner } from "./json-rpc-provider"; import { NodesmithProvider } from "./nodesmith-provider"; import { Web3Provider } from "./web3-provider"; +import { WebSocketProvider } from "./websocket-provider"; import { AsyncSendable } from "./web3-provider"; @@ -86,6 +87,7 @@ export { JsonRpcProvider, NodesmithProvider, Web3Provider, + WebSocketProvider, IpcProvider, diff --git a/packages/providers/src.ts/json-rpc-provider.ts b/packages/providers/src.ts/json-rpc-provider.ts index d0a7e5258c..f5bd2fe3c3 100644 --- a/packages/providers/src.ts/json-rpc-provider.ts +++ b/packages/providers/src.ts/json-rpc-provider.ts @@ -15,7 +15,7 @@ import { Logger } from "@ethersproject/logger"; import { version } from "./_version"; const logger = new Logger(version); -import { BaseProvider } from "./base-provider"; +import { BaseProvider, Event } from "./base-provider"; function timer(timeout: number): Promise { @@ -263,7 +263,7 @@ export class JsonRpcProvider extends BaseProvider { } // Default URL - if (!url) { url = "http:/" + "/localhost:8545"; } + if (!url) { url = getStatic<() => string>(this.constructor, "defaultUrl")(); } if (typeof(url) === "string") { this.connection = Object.freeze({ @@ -276,6 +276,10 @@ export class JsonRpcProvider extends BaseProvider { this._nextId = 42; } + static defaultUrl(): string { + return "http:/" + "/localhost:8545"; + } + getSigner(addressOrIndex?: string | number): JsonRpcSigner { return new JsonRpcSigner(_constructorGuard, this, addressOrIndex); } @@ -402,6 +406,11 @@ export class JsonRpcProvider extends BaseProvider { return logger.throwError(method + " not implemented", Logger.errors.NOT_IMPLEMENTED, { operation: method }); } + _startEvent(event: Event): void { + if (event.tag === "pending") { this._startPending(); } + super._startEvent(event); + } + _startPending(): void { if (this._pendingFilter != null) { return; } let self = this; @@ -445,10 +454,14 @@ export class JsonRpcProvider extends BaseProvider { }).catch((error: Error) => { }); } - _stopPending(): void { - this._pendingFilter = null; + _stopEvent(event: Event): void { + if (event.tag === "pending" && this.listenerCount("pending") === 0) { + this._pendingFilter = null; + } + super._stopEvent(event); } + // Convert an ethers.js transaction into a JSON-RPC transaction // - gasLimit => gas // - All values hexlified diff --git a/packages/providers/src.ts/websocket-provider.ts b/packages/providers/src.ts/websocket-provider.ts new file mode 100644 index 0000000000..3cc962e537 --- /dev/null +++ b/packages/providers/src.ts/websocket-provider.ts @@ -0,0 +1,256 @@ +"use strict"; + +import WebSocket from "ws"; + +import { BigNumber } from "@ethersproject/bignumber"; +import { Networkish } from "@ethersproject/networks"; +import { defineReadOnly } from "@ethersproject/properties"; + +import { Event } from "./base-provider"; +import { JsonRpcProvider } from "./json-rpc-provider"; + +import { Logger } from "@ethersproject/logger"; +import { version } from "./_version"; +const logger = new Logger(version); + + +/** + * Notes: + * + * This provider differs a bit from the polling providers. One main + * difference is how it handles consistency. The polling providers + * will stall responses to ensure a consistent state, while this + * WebSocket provider assumes the connected backend will manage this. + * + * For example, if a polling provider emits an event which indicats + * the event occurred in blockhash XXX, a call to fetch that block by + * its hash XXX, if not present will retry until it is present. This + * can occur when querying a pool of nodes that are mildly out of sync + * with each other. + */ + +let NextId = 1; + +export type InflightRequest = { + callback: (error: Error, result: any) => void; + payload: string; +}; + +export type Subscription = { + tag: string; + processFunc: (payload: any) => void; +}; +/* +function subscribable(tag: string): boolean { + return (tag === "block" || tag === "pending"); +} +*/ +export class WebSocketProvider extends JsonRpcProvider { + readonly _websocket: any; + readonly _requests: { [ name: string ]: InflightRequest }; + + // Maps event tag to subscription ID (we dedupe identical events) + readonly _subIds: { [ tag: string ]: Promise }; + + // Maps Subscription ID to Subscription + readonly _subs: { [ name: string ]: Subscription }; + + _wsReady: boolean; + + constructor(url: string, network: Networkish) { + super(url, network); + this._pollingInterval = -1; + + defineReadOnly(this, "_websocket", new WebSocket(this.connection.url)); + defineReadOnly(this, "_requests", { }); + defineReadOnly(this, "_subs", { }); + defineReadOnly(this, "_subIds", { }); + + // Stall sending requests until the socket is open... + this._wsReady = false; + this._websocket.onopen = () => { + this._wsReady = true; + Object.keys(this._requests).forEach((id) => { + this._websocket.send(this._requests[id].payload); + }); + }; + + this._websocket.onmessage = (messageEvent: { data: string }) => { + const data = messageEvent.data; + const result = JSON.parse(data); + if (result.id) { + const id = String(result.id); + const request = this._requests[id]; + delete this._requests[id]; + + if (result.result) { + request.callback(null, result.result); + + } else { + if (result.error) { + const error: any = new Error(result.error.message || "unknown error"); + defineReadOnly(error, "code", result.error.code || null); + defineReadOnly(error, "response", data); + request.callback(error, undefined); + } else { + request.callback(new Error("unknown error"), undefined); + } + } + + } else if (result.method === "eth_subscription") { + // Subscription... + const sub = this._subs[result.params.subscription]; + if (sub) { + //this.emit.apply(this, ); + sub.processFunc(result.params.result) + } + + } else { + console.warn("this should not happen"); + } + }; + } + + get pollingInterval(): number { + return 0; + } + + resetEventsBlock(blockNumber: number): void { + logger.throwError("cannot reset events block on WebSocketProvider", Logger.errors.UNSUPPORTED_OPERATION, { + operation: "resetEventBlock" + }); + } + + set pollingInterval(value: number) { + logger.throwError("cannot set polling interval on WebSocketProvider", Logger.errors.UNSUPPORTED_OPERATION, { + operation: "setPollingInterval" + }); + } + + async poll(): Promise { + return null; + } + + set polling(value: boolean) { + if (!value) { return; } + + logger.throwError("cannot set polling on WebSocketProvider", Logger.errors.UNSUPPORTED_OPERATION, { + operation: "setPolling" + }); + } + + send(method: string, params?: Array): Promise { + const rid = NextId++; + + return new Promise((resolve, reject) => { + function callback(error: Error, result: any) { + if (error) { return reject(error); } + return resolve(result); + } + + const payload = JSON.stringify({ + method: method, + params: params, + id: rid, + jsonrpc: "2.0" + }); + + this._requests[String(rid)] = { callback, payload }; + + if (this._wsReady) { this._websocket.send(payload); } + }); + } + + static defaultUrl(): string { + return "ws:/" + "/localhost:8546"; + } + + async _subscribe(tag: string, param: Array, processFunc: (result: any) => void): Promise { + let subIdPromise = this._subIds[tag]; + if (subIdPromise == null) { + subIdPromise = this.send("eth_subscribe", param); + this._subIds[tag] = subIdPromise; + } + const subId = await subIdPromise; + this._subs[subId] = { tag, processFunc }; + } + + _startEvent(event: Event): void { + switch (event.type) { + case "block": + this._subscribe("block", [ "newHeads", { } ], (result: any) => { + this.emit("block", BigNumber.from(result.number).toNumber()); + }); + break; + + case "pending": + this._subscribe("pending", [ "newPendingTransactions" ], (result: any) => { + this.emit("pending", result); + }); + break; + + case "filter": + this._subscribe(event.tag, [ "logs", event.filter ], (result: any) => { + this.emit(event.filter, result); + }); + break; + + case "tx": { + const emitReceipt = (event: Event) => { + const hash = event.hash; + this.getTransactionReceipt(hash).then((receipt) => { + if (!receipt) { return; } + this.emit(hash, receipt); + }); + }; + + // In case it is already mined + emitReceipt(event); + + // To keep things simple, we start up a single newHeads subscription + // to keep an eye out for transactions we are watching for. + // Starting a subscription for an event (i.e. "tx") that is already + // running is (basically) a nop. + this._subscribe("tx", [ "newHeads", { } ], (result: any) => { + this._events.filter((e) => (e.type === "tx")).forEach(emitReceipt); + }); + break; + } + + // Nothing is needed + case "debug": + case "error": + break; + + default: + console.log("unhandled:", event); + break; + } + } + + _stopEvent(event: Event): void { + let tag = event.tag; + + if (event.type === "tx") { + // There are remaining transaction event listeners + if (this._events.filter((e) => (e.type === "tx")).length) { + return; + } + tag = "tx"; + } else if (this.listenerCount(event.tag)) { + // There are remaining event listeners + return; + } + + const subId = this._subIds[tag]; + if (!subId) { return; } + + delete this._subIds[tag]; + subId.then((subId) => { + if (!this._subs[subId]) { return; } + delete this._subs[subId]; + this.send("eth_unsubscribe", [ subId ]); + }); + } + +} diff --git a/packages/providers/thirdparty.d.ts b/packages/providers/thirdparty.d.ts new file mode 100644 index 0000000000..89a51550f6 --- /dev/null +++ b/packages/providers/thirdparty.d.ts @@ -0,0 +1,10 @@ +declare module "ws" { + export interface WebSocker { + send(): void; + onopen: () => void; + onmessage: (messageEvent: { target: any, type: string, data: string }) => void + } + + export default WebSocket; +} + diff --git a/packages/providers/tsconfig.json b/packages/providers/tsconfig.json index 92930025f4..cf84c4c52a 100644 --- a/packages/providers/tsconfig.json +++ b/packages/providers/tsconfig.json @@ -5,7 +5,8 @@ "outDir": "./lib/" }, "include": [ - "./src.ts/*.ts" + "./src.ts/*.ts", + "./thirdparty.d.ts" ], "exclude": [] } diff --git a/rollup.config.js b/rollup.config.js index 7d2655c666..a8c96ffe30 100644 --- a/rollup.config.js +++ b/rollup.config.js @@ -98,7 +98,7 @@ export default commandLineArgs => { namedExports: { "bn.js": [ "BN" ], "elliptic": [ "ec" ], - "scrypt-js": [ "scrypt" ], + "scrypt-js": [ "scrypt", "syncScrypt" ], }, }), ];