From 57cef65ae2633b952cfe6f36595ab9b96df50d72 Mon Sep 17 00:00:00 2001 From: Malthe Borch Date: Wed, 28 Feb 2024 16:53:23 +0100 Subject: [PATCH] Use 'EventEmitter' with generics --- CHANGES.md | 7 +++ README.md | 1 + package.json | 34 +++++----- src/client.ts | 150 ++++++++++++++++++++++---------------------- src/index.ts | 35 ++++++----- src/result.ts | 10 ++- test/client.test.ts | 24 ++++--- 7 files changed, 142 insertions(+), 119 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index db3727c..78294c0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,13 @@ In next release ... connected when the promise returns. The `Client` symbol is now a type instead of a value. +- Add `off` method to disable event listening. + +- Remove dependency on + [ts-typed-events](https://www.npmjs.com/package/ts-typed-events), + which has been supplanted by updated typings for the built-in + `EventEmitter` class that's now generic. + ## v1.9.0 (2024-01-23) - Add support for ESM modules. diff --git a/README.md b/README.md index 5899b65..887f683 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ $ npm install ts-postgres - Rows and column names - Streaming data directly into a socket - Supports CommonJS and ESM modules +- No dependencies See the [documentation](https://malthe.github.io/ts-postgres/) for a complete reference. diff --git a/package.json b/package.json index f39b514..b839294 100644 --- a/package.json +++ b/package.json @@ -12,9 +12,6 @@ ], "homepage": "https://github.com/malthe/ts-postgres", "author": "Malthe Borch ", - "dependencies": { - "ts-typed-events": "^3.0.0" - }, "main": "./dist/module/index.js", "types": "./dist/module/index.d.ts", "exports": { @@ -57,20 +54,23 @@ ] }, "devDependencies": { - "@types/node": "^20.10.6", - "@typescript-eslint/eslint-plugin": "^6.10.0", - "@typescript-eslint/parser": "^6.10.0", - "colors": "^1.4.0", - "eslint": "^8.56.0", - "eslint-config-standard": "^17.1.0", - "eslint-plugin-import": "^2.29.0", - "eslint-plugin-node": "^11.1.0", - "eslint-plugin-prettier": "^5.0.1", - "eslint-plugin-promise": "^6.1.1", + "@types/node": "^20.11.21", + "@typescript-eslint/eslint-plugin": "^6", + "@typescript-eslint/parser": "^6", + "colors": "^1", + "eslint": "^8", + "eslint-config-standard": "^17", + "eslint-plugin-import": "^2", + "eslint-plugin-node": "^11", + "eslint-plugin-prettier": "^5", + "eslint-plugin-promise": "^6", "lint-staged": "^15.0.2", - "rimraf": "^3.0.2", - "ts-node": "^10.9.2", - "typedoc": "^0.25.4", - "typescript": "^5.2.2" + "rimraf": "~3.0", + "ts-node": "~10.9", + "typedoc": "~0.25", + "typescript": "~5.2" + }, + "peerDependencies": { + "@types/node": "^20" } } diff --git a/src/client.ts b/src/client.ts index 2192205..e54d80f 100644 --- a/src/client.ts +++ b/src/client.ts @@ -10,8 +10,7 @@ import { connect as tls, createSecureContext, } from 'node:tls'; - -import { Event as TypedEvent } from 'ts-typed-events'; +import { EventEmitter } from 'node:events'; import { Defaults, Environment } from './defaults.js'; import * as logger from './logging.js'; @@ -94,7 +93,7 @@ export interface Configuration export interface Notification { processId: number; channel: string; - payload?: string; + payload: string; } export interface PreparedStatement { @@ -107,17 +106,6 @@ export interface PreparedStatement { ) => ResultIterator; } -export type Callback = (data: T) => void; - -/* eslint-disable @typescript-eslint/no-explicit-any */ -type CallbackOf = U extends any ? Callback : never; - -type Event = ClientNotice | DatabaseError | Notification; - -type Connect = Error | null; - -type End = NodeJS.ErrnoException | null; - type CloseHandler = () => void; interface RowDataHandler { @@ -161,20 +149,31 @@ interface PreFlightQueue { const DEFAULTS = new Defaults(env as unknown as Environment); +export type EventMap< + T = { + error: DatabaseError; + notice: ClientNotice; + notification: Notification; + }, +> = { + [K in keyof T]: [T[K]]; +}; + +type Resolve = (value?: T) => void; + +export type EventListener = K extends keyof EventMap ? ( + (...args: EventMap[K]) => void +) : never; + export class ClientImpl { - private readonly events = { - connect: new TypedEvent(), - end: new TypedEvent(), - error: new TypedEvent(), - notice: new TypedEvent(), - notification: new TypedEvent(), - }; - - private ending = false; + private readonly events = new EventEmitter(); + private connected = false; - private connecting = false; private error = false; + private ending?: Resolve; + private connecting?: Resolve; + private readonly encoding: BufferEncoding; private readonly writer: Writer; @@ -217,7 +216,7 @@ export class ClientImpl { this.stream.on('close', () => { this.closed = true; - this.events.end.emit(null); + this.ending?.(); }); this.stream.on('connect', () => { @@ -237,14 +236,14 @@ export class ClientImpl { /* istanbul ignore next */ this.stream.on('error', (error: NodeJS.ErrnoException) => { if (this.connecting) { - this.events.connect.emit(error); + this.connecting(error); } else { // Don't raise ECONNRESET errors - they can & should be // ignored during disconnect. - if (this.ending && error.errno === constants.errno.ECONNRESET) - return; - - this.events.end.emit(error); + if (this.ending) { + if (error.errno === constants.errno.ECONNRESET) return; + this.ending(); + } } }); @@ -294,7 +293,7 @@ export class ClientImpl { const abort = (error: Error) => { this.handleError(error); - this.events.connect.emit(error); + this.connecting?.(error); }; const startup = (stream?: Socket) => { @@ -384,12 +383,14 @@ export class ClientImpl { const read = this.handle(buffer, offset, size); offset += read; remaining = size - read; - } catch (error) { + } catch (error: unknown) { logger.warn(error); if (this.connecting) { - this.events.connect.emit(error as Error); + this.connecting(error as Error); } else { try { + // In normal operation (including regular handling of errors), + // there's nothing further to clean up at this point. while (this.handleError(error as Error)) { logger.info( 'Cancelled query due to an internal error', @@ -420,20 +421,25 @@ export class ClientImpl { if (this.error) { throw new Error("Can't connect in error state"); } - this.connecting = true; const timeout = this.config.connectionTimeout ?? DEFAULTS.connectionTimeout; - let p = this.events.connect.once().then((error: Connect) => { - if (error) { - this.connecting = false; - this.stream.destroy(); - throw error; - } - return { - encrypted: this.stream instanceof TLSSocket, - parameters: this.parameters as ReadonlyMap, + let p = new Promise((resolve, reject) => { + this.connecting = (error?: Error) => { + this.connecting = undefined; + if (error) { + this.stream.destroy(); + reject(error); + } else { + resolve({ + encrypted: this.stream instanceof TLSSocket, + parameters: this.parameters as ReadonlyMap< + string, + string + >, + }); + } }; }); @@ -476,8 +482,6 @@ export class ClientImpl { throw new Error('Connection unexpectedly destroyed'); } - this.ending = true; - if (this.connected) { this.writer.end(); this.send(); @@ -486,32 +490,21 @@ export class ClientImpl { } else { this.stream.destroy(); } - return new Promise((resolve, reject) => - this.events.end.once().then((value) => { - if (value === null) resolve(); - reject(value); - }), - ); + return new Promise((resolve, reject) => { + this.ending = (error?: NodeJS.ErrnoException) => { + this.ending = undefined; + if (!error) resolve(); + reject(error); + }; + }); } - on(event: 'notification', callback: Callback): void; - on(event: 'error', callback: Callback): void; - on(event: 'notice', callback: Callback): void; - on(event: string, callback: CallbackOf): void { - switch (event) { - case 'error': { - this.events.error.on(callback as Callback); - break; - } - case 'notice': { - this.events.notice.on(callback as Callback); - break; - } - case 'notification': { - this.events.notification.on(callback as Callback); - break; - } - } + on(event: K, listener: EventListener): void { + this.events.on(event, listener); + } + + off(event: K, listener: EventListener): void { + this.events.off(event, listener); } /** Prepare a statement for later execution. @@ -954,7 +947,7 @@ export class ClientImpl { outer: switch (code) { case 0: { nextTick(() => { - this.events.connect.emit(null); + this.connecting?.(); }); break; } @@ -1094,10 +1087,16 @@ export class ClientImpl { if (this.connecting) throw error; - this.events.error.emit(error); - loop: if (!this.handleError(error)) { + try { + this.events.emit('error', error); + } catch { + // If there are no subscribers for the event, an error + // is raised. We're not interesting in this behavior. + } + + if (!this.handleError(error)) { throw new Error( - 'Internal error occurred while processing database error', + 'An error occurred without an active query', ); } break; @@ -1106,7 +1105,7 @@ export class ClientImpl { const notice = this.parseError( buffer.subarray(start, start + length), ); - this.events.notice.emit(notice); + this.events.emit('notice', notice); break; } case Message.NotificationResponse: { @@ -1114,7 +1113,7 @@ export class ClientImpl { const processId = reader.readInt32BE(); const channel = reader.readCString(this.encoding); const payload = reader.readCString(this.encoding); - this.events.notification.emit({ + this.events.emit('notification', { processId: processId, channel: channel, payload: payload, @@ -1150,7 +1149,6 @@ export class ClientImpl { this.errorHandlerQueue.shift(); this.cleanupQueue.expect(Cleanup.ErrorHandler); } else { - this.connecting = false; this.connected = true; } const status = buffer.readInt8(start); diff --git a/src/index.ts b/src/index.ts index e4bd1c3..44a5488 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,14 +1,23 @@ -import { ClientImpl, SSLMode } from './client.js'; -import type { Configuration, ConnectionInfo } from './client.js'; -export type { - Callback, - ClientNotice, - DataTypeError, - Notification, - PreparedStatement, - SSL, +import { ClientImpl, type ConnectionInfo } from './client.js'; +export { + SSLMode, + type ClientNotice, + type Configuration, + type DataTypeError, + type EventListener, + type EventMap, + type Notification, + type PreparedStatement, + type SSL, } from './client.js'; -export type { BufferEncoding, Point, ValueTypeReader } from './types.js'; +import type { Configuration } from './client.js'; +export { + DataFormat, + DataType, + type BufferEncoding, + type Point, + type ValueTypeReader, +} from './types.js'; export type { Query, QueryOptions } from './query.js'; export type { Result, @@ -25,12 +34,6 @@ export type { TransactionStatus, } from './protocol.js'; -export type { Configuration }; - -export { DataFormat, DataType } from './types.js'; - -export { SSLMode }; - interface _Client extends ClientImpl {} /** A database client, encapsulating a single connection to the database. diff --git a/src/result.ts b/src/result.ts index 62ded14..42a5150 100644 --- a/src/result.ts +++ b/src/result.ts @@ -97,7 +97,7 @@ export class Result { * * Iterating asynchronously yields objects of the generic type parameter. */ -export class ResultIterator extends Promise> { +class ResultIteratorImpl extends Promise> { private subscribers: (( done: boolean, error?: string | DatabaseError | Error, @@ -221,7 +221,11 @@ export type DataHandler = Callback; export type NameHandler = Callback; -ResultIterator.prototype.constructor = Promise; +ResultIteratorImpl.prototype.constructor = Promise; + +export interface ResultIterator extends ResultIteratorImpl { + +} export function makeResult(transform?: (name: string) => string) { let dataHandler: DataHandler | null = null; @@ -229,7 +233,7 @@ export function makeResult(transform?: (name: string) => string) { const names: string[] = []; const rows: any[][] = []; - const p = new ResultIterator(names, rows, (resolve, reject) => { + const p = new ResultIteratorImpl(names, rows, (resolve, reject) => { dataHandler = (row: any[] | Resolution | Error) => { if (row === null || typeof row === 'string') { resolve(row); diff --git a/test/client.test.ts b/test/client.test.ts index 27625ff..0630e7d 100644 --- a/test/client.test.ts +++ b/test/client.test.ts @@ -9,6 +9,7 @@ import { Client, DataFormat, DataType, + Notification, PreparedStatement, Result, ResultIterator, @@ -238,13 +239,22 @@ describe('Query', () => { deepEqual(result.names, ['FOO']); }); - test('Listen/notify', async ({ client }) => { - await client.query('listen foo'); - client.on('notification', (msg) => { - equal(msg.channel, 'foo'); - equal(msg.payload, 'bar'); - }); - await client.query("notify foo, 'bar'"); + test('Listen/notify', async ({ client, connect }) => { + const notifies: Omit[] = []; + const listener = ({ channel, payload }: Notification) => { + notifies.push({ channel, payload }); + }; + client.on('notification', listener); + const p = client.query('listen foo'); + const other = await connect(); + await other.query("notify foo, 'bar'"); + await other.end(); + deepEqual(notifies, [{ channel: 'foo', payload: 'bar' }]); + await p; + await client.query("notify foo, 'baz'"); + client.off('notification', listener); + await client.query("notify foo, 'boo'"); + deepEqual(notifies, [{ channel: 'foo', payload: 'bar' }, { channel: 'foo', payload: 'baz' }]); }); test('Session timeout', async ({ connect }) => {