From ecc13ad2ca6a76454d487664ab646e4858a03730 Mon Sep 17 00:00:00 2001 From: luin Date: Tue, 16 Jul 2019 22:55:18 +0800 Subject: [PATCH] fix(cluster): remove node immediately when slots are redistributed Close: #930 --- lib/cluster/ConnectionPool.ts | 18 +++++++++++++++--- test/unit/clusters/ConnectionPool.ts | 9 +++++++++ 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/lib/cluster/ConnectionPool.ts b/lib/cluster/ConnectionPool.ts index 8db158c3..18282bb5 100644 --- a/lib/cluster/ConnectionPool.ts +++ b/lib/cluster/ConnectionPool.ts @@ -93,9 +93,7 @@ export default class ConnectionPool extends EventEmitter { this.nodes[readOnly ? "slave" : "master"][key] = redis; redis.once("end", () => { - delete this.nodes.all[key]; - delete this.nodes.master[key]; - delete this.nodes.slave[key]; + this.removeNode(key); this.emit("-node", redis, key); if (!Object.keys(this.nodes.all).length) { this.emit("drain"); @@ -112,6 +110,19 @@ export default class ConnectionPool extends EventEmitter { return redis; } + /** + * Remove a node from the pool. + */ + private removeNode(key: string): void { + const { nodes } = this; + if (nodes.all[key]) { + debug("Remove %s from the pool", key); + delete nodes.all[key]; + } + delete nodes.master[key]; + delete nodes.slave[key]; + } + /** * Reset the pool with a set of nodes. * The old node will be removed. @@ -136,6 +147,7 @@ export default class ConnectionPool extends EventEmitter { if (!newNodes[key]) { debug("Disconnect %s because the node does not hold any slot", key); this.nodes.all[key].disconnect(); + this.removeNode(key); } }); Object.keys(newNodes).forEach(key => { diff --git a/test/unit/clusters/ConnectionPool.ts b/test/unit/clusters/ConnectionPool.ts index 3ebbdd24..aff8e014 100644 --- a/test/unit/clusters/ConnectionPool.ts +++ b/test/unit/clusters/ConnectionPool.ts @@ -24,5 +24,14 @@ describe("ConnectionPool", () => { expect(stub.callCount).to.eql(2); expect(stub.firstCall.args[1]).to.eql(false); }); + + it("remove the node immediately instead of waiting for 'end' event", () => { + const pool = new ConnectionPool({}); + pool.reset([{ host: "127.0.0.1", port: 300001 }]); + expect(pool.getNodes().length).to.eql(1); + + pool.reset([]); + expect(pool.getNodes().length).to.eql(0); + }); }); });