From 4fde8b78f17b8b227a4bc9dd1f790035df224a2c Mon Sep 17 00:00:00 2001 From: Brian C Date: Mon, 30 Nov 2020 09:25:01 -0600 Subject: [PATCH] Fix double readyForQuery (#2420) This is fixing a double readyForQuery message being sent from the backend (because we were calling sync after an error, which I already fixed in the main driver). Also closes #2333 --- packages/pg-cursor/index.js | 27 +++++++--- packages/pg-query-stream/test/error.ts | 69 ++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 6 deletions(-) diff --git a/packages/pg-cursor/index.js b/packages/pg-cursor/index.js index 9d672dbff..d26e77bdc 100644 --- a/packages/pg-cursor/index.js +++ b/packages/pg-cursor/index.js @@ -37,6 +37,7 @@ Cursor.prototype._rowDescription = function () { } Cursor.prototype.submit = function (connection) { + this.state = 'submitted' this.connection = connection this._portal = 'C_' + nextUniqueID++ @@ -87,7 +88,12 @@ Cursor.prototype._closePortal = function () { // open can lock tables for modification if inside a transaction. // see https://github.com/brianc/node-pg-cursor/issues/56 this.connection.close({ type: 'P', name: this._portal }) - this.connection.sync() + + // If we've received an error we already sent a sync message. + // do not send another sync as it triggers another readyForQuery message. + if (this.state !== 'error') { + this.connection.sync() + } } Cursor.prototype.handleRowDescription = function (msg) { @@ -138,8 +144,18 @@ Cursor.prototype.handleEmptyQuery = function () { } Cursor.prototype.handleError = function (msg) { - this.connection.removeListener('noData', this._ifNoData) - this.connection.removeListener('rowDescription', this._rowDescription) + // If we're in an initialized state we've never been submitted + // and don't have a connection instance reference yet. + // This can happen if you queue a stream and close the client before + // the client has submitted the stream. In this scenario we don't have + // a connection so there's nothing to unsubscribe from. + if (this.state !== 'initialized') { + this.connection.removeListener('noData', this._ifNoData) + this.connection.removeListener('rowDescription', this._rowDescription) + // call sync to trigger a readyForQuery + this.connection.sync() + } + this.state = 'error' this._error = msg // satisfy any waiting callback @@ -155,8 +171,6 @@ Cursor.prototype.handleError = function (msg) { // only dispatch error events if we have a listener this.emit('error', msg) } - // call sync to keep this connection from hanging - this.connection.sync() } Cursor.prototype._getRows = function (rows, cb) { @@ -189,6 +203,7 @@ Cursor.prototype.close = function (cb) { return } } + this._closePortal() this.state = 'done' if (cb) { @@ -199,7 +214,7 @@ Cursor.prototype.close = function (cb) { } Cursor.prototype.read = function (rows, cb) { - if (this.state === 'idle') { + if (this.state === 'idle' || this.state === 'submitted') { return this._getRows(rows, cb) } if (this.state === 'busy' || this.state === 'initialized') { diff --git a/packages/pg-query-stream/test/error.ts b/packages/pg-query-stream/test/error.ts index c92cd0091..220a52485 100644 --- a/packages/pg-query-stream/test/error.ts +++ b/packages/pg-query-stream/test/error.ts @@ -1,6 +1,7 @@ import assert from 'assert' import helper from './helper' import QueryStream from '../src' +import { Pool, Client } from 'pg' helper('error', function (client) { it('receives error on stream', function (done) { @@ -21,3 +22,71 @@ helper('error', function (client) { client.query('SELECT NOW()', done) }) }) + +describe('error recovery', () => { + // created from https://github.com/chrisdickinson/pg-test-case + it('recovers from a streaming error in a transaction', async () => { + const pool = new Pool() + const client = await pool.connect() + await client.query(`CREATE TEMP TABLE frobnicators ( + id serial primary key, + updated timestamp + )`) + await client.query(`BEGIN;`) + const query = new QueryStream(`INSERT INTO frobnicators ("updated") VALUES ($1) RETURNING "id"`, [Date.now()]) + let error: Error | undefined = undefined + query.on('data', console.log).on('error', (e) => { + error = e + }) + client.query(query) // useless callback necessitated by an older version of honeycomb-beeline + + await client.query(`ROLLBACK`) + assert(error, 'Error should not be undefined') + const { rows } = await client.query('SELECT NOW()') + assert.strictEqual(rows.length, 1) + client.release() + const client2 = await pool.connect() + await client2.query(`BEGIN`) + client2.release() + pool.end() + }) + + // created from https://github.com/brianc/node-postgres/pull/2333 + it('handles an error on a stream after a plain text non-stream error', async () => { + const client = new Client() + const stmt = 'SELECT * FROM goose;' + await client.connect() + return new Promise((resolve, reject) => { + client.query(stmt).catch((e) => { + assert(e, 'Query should have rejected with an error') + const stream = new QueryStream('SELECT * FROM duck') + client.query(stream) + stream.on('data', () => {}) + stream.on('error', () => { + client.end((err) => { + err ? reject(err) : resolve() + }) + }) + }) + }) + }) + + it('does not crash when closing a connection with a queued stream', async () => { + const client = new Client() + const stmt = 'SELECT * FROM goose;' + await client.connect() + return new Promise(async (resolve) => { + let queryError: Error | undefined + client.query(stmt).catch((e) => { + queryError = e + }) + const stream = client.query(new QueryStream(stmt)) + stream.on('data', () => {}) + stream.on('error', () => { + assert(queryError, 'query should have errored due to client ending') + resolve() + }) + await client.end() + }) + }) +})