Skip to content

Commit

Permalink
Use 'EventEmitter' with generics
Browse files Browse the repository at this point in the history
  • Loading branch information
malthe committed Feb 28, 2024
1 parent b67261c commit 57cef65
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 119 deletions.
7 changes: 7 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ In next release ...
connected when the promise returns. The `Client` symbol is now a
type instead of a value.

- Add `off` method to disable event listening.

- Remove dependency on
[ts-typed-events](https://www.npmjs.com/package/ts-typed-events),
which has been supplanted by updated typings for the built-in
`EventEmitter` class that's now generic.

## v1.9.0 (2024-01-23)

- Add support for ESM modules.
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ $ npm install ts-postgres
- Rows and column names
- Streaming data directly into a socket
- Supports CommonJS and ESM modules
- No dependencies

See the [documentation](https://malthe.github.io/ts-postgres/) for a complete reference.

Expand Down
34 changes: 17 additions & 17 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
],
"homepage": "https://github.com/malthe/ts-postgres",
"author": "Malthe Borch <mborch@gmail.com>",
"dependencies": {
"ts-typed-events": "^3.0.0"
},
"main": "./dist/module/index.js",
"types": "./dist/module/index.d.ts",
"exports": {
Expand Down Expand Up @@ -57,20 +54,23 @@
]
},
"devDependencies": {
"@types/node": "^20.10.6",
"@typescript-eslint/eslint-plugin": "^6.10.0",
"@typescript-eslint/parser": "^6.10.0",
"colors": "^1.4.0",
"eslint": "^8.56.0",
"eslint-config-standard": "^17.1.0",
"eslint-plugin-import": "^2.29.0",
"eslint-plugin-node": "^11.1.0",
"eslint-plugin-prettier": "^5.0.1",
"eslint-plugin-promise": "^6.1.1",
"@types/node": "^20.11.21",
"@typescript-eslint/eslint-plugin": "^6",
"@typescript-eslint/parser": "^6",
"colors": "^1",
"eslint": "^8",
"eslint-config-standard": "^17",
"eslint-plugin-import": "^2",
"eslint-plugin-node": "^11",
"eslint-plugin-prettier": "^5",
"eslint-plugin-promise": "^6",
"lint-staged": "^15.0.2",
"rimraf": "^3.0.2",
"ts-node": "^10.9.2",
"typedoc": "^0.25.4",
"typescript": "^5.2.2"
"rimraf": "~3.0",
"ts-node": "~10.9",
"typedoc": "~0.25",
"typescript": "~5.2"
},
"peerDependencies": {
"@types/node": "^20"
}
}
150 changes: 74 additions & 76 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import {
connect as tls,
createSecureContext,
} from 'node:tls';

import { Event as TypedEvent } from 'ts-typed-events';
import { EventEmitter } from 'node:events';

import { Defaults, Environment } from './defaults.js';
import * as logger from './logging.js';
Expand Down Expand Up @@ -94,7 +93,7 @@ export interface Configuration
export interface Notification {
processId: number;
channel: string;
payload?: string;
payload: string;
}

export interface PreparedStatement<T = ResultRecord> {
Expand All @@ -107,17 +106,6 @@ export interface PreparedStatement<T = ResultRecord> {
) => ResultIterator<T>;
}

export type Callback<T> = (data: T) => void;

/* eslint-disable @typescript-eslint/no-explicit-any */
type CallbackOf<U> = U extends any ? Callback<U> : never;

type Event = ClientNotice | DatabaseError | Notification;

type Connect = Error | null;

type End = NodeJS.ErrnoException | null;

type CloseHandler = () => void;

interface RowDataHandler {
Expand Down Expand Up @@ -161,20 +149,31 @@ interface PreFlightQueue {

const DEFAULTS = new Defaults(env as unknown as Environment);

export type EventMap<
T = {
error: DatabaseError;
notice: ClientNotice;
notification: Notification;
},
> = {
[K in keyof T]: [T[K]];
};

type Resolve<T> = (value?: T) => void;

export type EventListener<K> = K extends keyof EventMap ? (
(...args: EventMap[K]) => void
) : never;

export class ClientImpl {
private readonly events = {
connect: new TypedEvent<Connect>(),
end: new TypedEvent<End>(),
error: new TypedEvent<DatabaseError>(),
notice: new TypedEvent<ClientNotice>(),
notification: new TypedEvent<Notification>(),
};

private ending = false;
private readonly events = new EventEmitter<EventMap>();

private connected = false;
private connecting = false;
private error = false;

private ending?: Resolve<NodeJS.ErrnoException>;
private connecting?: Resolve<Error>;

private readonly encoding: BufferEncoding;
private readonly writer: Writer;

Expand Down Expand Up @@ -217,7 +216,7 @@ export class ClientImpl {

this.stream.on('close', () => {
this.closed = true;
this.events.end.emit(null);
this.ending?.();
});

this.stream.on('connect', () => {
Expand All @@ -237,14 +236,14 @@ export class ClientImpl {
/* istanbul ignore next */
this.stream.on('error', (error: NodeJS.ErrnoException) => {
if (this.connecting) {
this.events.connect.emit(error);
this.connecting(error);
} else {
// Don't raise ECONNRESET errors - they can & should be
// ignored during disconnect.
if (this.ending && error.errno === constants.errno.ECONNRESET)
return;

this.events.end.emit(error);
if (this.ending) {
if (error.errno === constants.errno.ECONNRESET) return;
this.ending();
}
}
});

Expand Down Expand Up @@ -294,7 +293,7 @@ export class ClientImpl {

const abort = (error: Error) => {
this.handleError(error);
this.events.connect.emit(error);
this.connecting?.(error);
};

const startup = (stream?: Socket) => {
Expand Down Expand Up @@ -384,12 +383,14 @@ export class ClientImpl {
const read = this.handle(buffer, offset, size);
offset += read;
remaining = size - read;
} catch (error) {
} catch (error: unknown) {
logger.warn(error);
if (this.connecting) {
this.events.connect.emit(error as Error);
this.connecting(error as Error);
} else {
try {
// In normal operation (including regular handling of errors),
// there's nothing further to clean up at this point.
while (this.handleError(error as Error)) {
logger.info(
'Cancelled query due to an internal error',
Expand Down Expand Up @@ -420,20 +421,25 @@ export class ClientImpl {
if (this.error) {
throw new Error("Can't connect in error state");
}
this.connecting = true;

const timeout =
this.config.connectionTimeout ?? DEFAULTS.connectionTimeout;

let p = this.events.connect.once().then((error: Connect) => {
if (error) {
this.connecting = false;
this.stream.destroy();
throw error;
}
return {
encrypted: this.stream instanceof TLSSocket,
parameters: this.parameters as ReadonlyMap<string, string>,
let p = new Promise<ConnectionInfo>((resolve, reject) => {
this.connecting = (error?: Error) => {
this.connecting = undefined;
if (error) {
this.stream.destroy();
reject(error);
} else {
resolve({
encrypted: this.stream instanceof TLSSocket,
parameters: this.parameters as ReadonlyMap<
string,
string
>,
});
}
};
});

Expand Down Expand Up @@ -476,8 +482,6 @@ export class ClientImpl {
throw new Error('Connection unexpectedly destroyed');
}

this.ending = true;

if (this.connected) {
this.writer.end();
this.send();
Expand All @@ -486,32 +490,21 @@ export class ClientImpl {
} else {
this.stream.destroy();
}
return new Promise<void>((resolve, reject) =>
this.events.end.once().then((value) => {
if (value === null) resolve();
reject(value);
}),
);
return new Promise<void>((resolve, reject) => {
this.ending = (error?: NodeJS.ErrnoException) => {
this.ending = undefined;
if (!error) resolve();
reject(error);
};
});
}

on(event: 'notification', callback: Callback<Notification>): void;
on(event: 'error', callback: Callback<DatabaseError>): void;
on(event: 'notice', callback: Callback<ClientNotice>): void;
on(event: string, callback: CallbackOf<Event>): void {
switch (event) {
case 'error': {
this.events.error.on(callback as Callback<DatabaseError>);
break;
}
case 'notice': {
this.events.notice.on(callback as Callback<ClientNotice>);
break;
}
case 'notification': {
this.events.notification.on(callback as Callback<Notification>);
break;
}
}
on<K extends keyof EventMap>(event: K, listener: EventListener<K>): void {
this.events.on(event, listener);
}

off<K extends keyof EventMap>(event: K, listener: EventListener<K>): void {
this.events.off(event, listener);
}

/** Prepare a statement for later execution.
Expand Down Expand Up @@ -954,7 +947,7 @@ export class ClientImpl {
outer: switch (code) {
case 0: {
nextTick(() => {
this.events.connect.emit(null);
this.connecting?.();
});
break;
}
Expand Down Expand Up @@ -1094,10 +1087,16 @@ export class ClientImpl {

if (this.connecting) throw error;

this.events.error.emit(error);
loop: if (!this.handleError(error)) {
try {
this.events.emit('error', error);
} catch {
// If there are no subscribers for the event, an error
// is raised. We're not interesting in this behavior.
}

if (!this.handleError(error)) {
throw new Error(
'Internal error occurred while processing database error',
'An error occurred without an active query',
);
}
break;
Expand All @@ -1106,15 +1105,15 @@ export class ClientImpl {
const notice = this.parseError(
buffer.subarray(start, start + length),
);
this.events.notice.emit(notice);
this.events.emit('notice', notice);
break;
}
case Message.NotificationResponse: {
const reader = new Reader(buffer, start);
const processId = reader.readInt32BE();
const channel = reader.readCString(this.encoding);
const payload = reader.readCString(this.encoding);
this.events.notification.emit({
this.events.emit('notification', {
processId: processId,
channel: channel,
payload: payload,
Expand Down Expand Up @@ -1150,7 +1149,6 @@ export class ClientImpl {
this.errorHandlerQueue.shift();
this.cleanupQueue.expect(Cleanup.ErrorHandler);
} else {
this.connecting = false;
this.connected = true;
}
const status = buffer.readInt8(start);
Expand Down
35 changes: 19 additions & 16 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
import { ClientImpl, SSLMode } from './client.js';
import type { Configuration, ConnectionInfo } from './client.js';
export type {
Callback,
ClientNotice,
DataTypeError,
Notification,
PreparedStatement,
SSL,
import { ClientImpl, type ConnectionInfo } from './client.js';
export {
SSLMode,
type ClientNotice,
type Configuration,
type DataTypeError,
type EventListener,
type EventMap,
type Notification,
type PreparedStatement,
type SSL,
} from './client.js';
export type { BufferEncoding, Point, ValueTypeReader } from './types.js';
import type { Configuration } from './client.js';
export {
DataFormat,
DataType,
type BufferEncoding,
type Point,
type ValueTypeReader,
} from './types.js';
export type { Query, QueryOptions } from './query.js';
export type {
Result,
Expand All @@ -25,12 +34,6 @@ export type {
TransactionStatus,
} from './protocol.js';

export type { Configuration };

export { DataFormat, DataType } from './types.js';

export { SSLMode };

interface _Client extends ClientImpl {}

/** A database client, encapsulating a single connection to the database.
Expand Down
Loading

0 comments on commit 57cef65

Please sign in to comment.