Skip to content

Commit

Permalink
feat: Add support ssubscribe (#1690)
Browse files Browse the repository at this point in the history
  • Loading branch information
gogogo1024 committed Jan 25, 2023
1 parent 7effb62 commit 6285e80
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 12 deletions.
12 changes: 8 additions & 4 deletions lib/Command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,17 @@ export interface CommandNameFlags {
"psubscribe",
"unsubscribe",
"punsubscribe",
"ssubscribe",
"sunsubscribe",
"ping",
"quit"
];
// Commands that are valid in monitor mode
VALID_IN_MONITOR_MODE: ["monitor", "auth"];
// Commands that will turn current connection into subscriber mode
ENTER_SUBSCRIBER_MODE: ["subscribe", "psubscribe"];
ENTER_SUBSCRIBER_MODE: ["subscribe", "psubscribe", "ssubscribe"];
// Commands that may make current connection quit subscriber mode
EXIT_SUBSCRIBER_MODE: ["unsubscribe", "punsubscribe"];
EXIT_SUBSCRIBER_MODE: ["unsubscribe", "punsubscribe", "sunsubscribe"];
// Commands that will make client disconnect from server TODO shutdown?
WILL_DISCONNECT: ["quit"];
}
Expand Down Expand Up @@ -84,12 +86,14 @@ export default class Command implements Respondable {
"psubscribe",
"unsubscribe",
"punsubscribe",
"ssubscribe",
"sunsubscribe",
"ping",
"quit",
],
VALID_IN_MONITOR_MODE: ["monitor", "auth"],
ENTER_SUBSCRIBER_MODE: ["subscribe", "psubscribe"],
EXIT_SUBSCRIBER_MODE: ["unsubscribe", "punsubscribe"],
ENTER_SUBSCRIBER_MODE: ["subscribe", "psubscribe", "ssubscribe"],
EXIT_SUBSCRIBER_MODE: ["unsubscribe", "punsubscribe", "sunsubscribe"],
WILL_DISCONNECT: ["quit"],
};

Expand Down
13 changes: 13 additions & 0 deletions lib/DataHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,18 @@ export default class DataHandler {
this.redis.emit("pmessageBuffer", pattern, reply[2], reply[3]);
break;
}
case "smessage": {
if (this.redis.listeners("smessage").length > 0) {
this.redis.emit(
"smessage",
reply[1].toString(),
reply[2] ? reply[2].toString() : ""
);
}
this.redis.emit("smessageBuffer", reply[1], reply[2]);
break;
}
case "ssubscribe":
case "subscribe":
case "psubscribe": {
const channel = reply[1].toString();
Expand All @@ -156,6 +168,7 @@ export default class DataHandler {
}
break;
}
case "sunsubscribe":
case "unsubscribe":
case "punsubscribe": {
const channel = reply[1] ? reply[1].toString() : null;
Expand Down
7 changes: 6 additions & 1 deletion lib/SubscriptionSet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export default class SubscriptionSet {
private set: { [key: string]: { [channel: string]: boolean } } = {
subscribe: {},
psubscribe: {},
ssubscribe: {},
};

add(set: AddSet, channel: string) {
Expand All @@ -27,7 +28,8 @@ export default class SubscriptionSet {
isEmpty(): boolean {
return (
this.channels("subscribe").length === 0 &&
this.channels("psubscribe").length === 0
this.channels("psubscribe").length === 0 &&
this.channels("ssubscribe").length === 0
);
}
}
Expand All @@ -39,5 +41,8 @@ function mapSet(set: AddSet | DelSet): AddSet {
if (set === "punsubscribe") {
return "psubscribe";
}
if (set === "sunsubscribe") {
return "ssubscribe";
}
return set;
}
24 changes: 17 additions & 7 deletions lib/cluster/ClusterSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,17 @@ export default class ClusterSubscriber {

private onSubscriberEnd = () => {
if (!this.started) {
debug("subscriber has disconnected, but ClusterSubscriber is not started, so not reconnecting.");
debug(
"subscriber has disconnected, but ClusterSubscriber is not started, so not reconnecting."
);
return;
}
// If the subscriber closes whilst it's still the active connection,
// we might as well try to connecting to a new node if possible to
// minimise the number of missed publishes.
debug("subscriber has disconnected, selecting a new one...");
this.selectSubscriber();
}
};

private selectSubscriber() {
const lastActiveSubscriber = this.lastActiveSubscriber;
Expand Down Expand Up @@ -122,7 +124,7 @@ export default class ClusterSubscriber {
// Don't try to reconnect the subscriber connection. If the connection fails
// we will get an end event (handled below), at which point we'll pick a new
// node from the pool and try to connect to that as the subscriber connection.
retryStrategy: null
retryStrategy: null,
});

// Ignore the errors since they're handled in the connection pool.
Expand All @@ -136,22 +138,25 @@ export default class ClusterSubscriber {
this.subscriber.once("end", this.onSubscriberEnd);

// Re-subscribe previous channels
const previousChannels = { subscribe: [], psubscribe: [] };
const previousChannels = { subscribe: [], psubscribe: [], ssubscribe: [] };
if (lastActiveSubscriber) {
const condition =
lastActiveSubscriber.condition || lastActiveSubscriber.prevCondition;
if (condition && condition.subscriber) {
previousChannels.subscribe = condition.subscriber.channels("subscribe");
previousChannels.psubscribe =
condition.subscriber.channels("psubscribe");
previousChannels.ssubscribe =
condition.subscriber.channels("ssubscribe");
}
}
if (
previousChannels.subscribe.length ||
previousChannels.psubscribe.length
previousChannels.psubscribe.length ||
previousChannels.ssubscribe.length
) {
let pending = 0;
for (const type of ["subscribe", "psubscribe"]) {
for (const type of ["subscribe", "psubscribe", "ssubscribe"]) {
const channels = previousChannels[type];
if (channels.length) {
pending += 1;
Expand All @@ -171,7 +176,12 @@ export default class ClusterSubscriber {
} else {
this.lastActiveSubscriber = this.subscriber;
}
for (const event of ["message", "messageBuffer"]) {
for (const event of [
"message",
"messageBuffer",
"smessage",
"smessageBuffer",
]) {
this.subscriber.on(event, (arg1, arg2) => {
this.emitter.emit(event, arg1, arg2);
});
Expand Down
5 changes: 5 additions & 0 deletions lib/redis/event_handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ export function readyHandler(self) {
debug("psubscribe %d channels", psubscribeChannels.length);
self.psubscribe(psubscribeChannels);
}
const ssubscribeChannels = condition.subscriber.channels("ssubscribe");
if (ssubscribeChannels.length) {
debug("ssubscribe %d channels", ssubscribeChannels.length);
self.ssubscribe(ssubscribeChannels);
}
}
}

Expand Down
108 changes: 108 additions & 0 deletions test/functional/cluster/spub_ssub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import MockServer, { getConnectionName } from "../../helpers/mock_server";
import { expect } from "chai";
import { Cluster } from "../../../lib";
import * as sinon from "sinon";
import Redis from "../../../lib/Redis";
import { noop } from "../../../lib/utils";

describe("cluster:spub/ssub", function () {
it("should receive messages", (done) => {
const handler = function (argv) {
if (argv[0] === "cluster" && argv[1] === "SLOTS") {
return [
[0, 1, ["127.0.0.1", 30001]],
[2, 16383, ["127.0.0.1", 30002]],
];
}
};
const node1 = new MockServer(30001, handler);
new MockServer(30002, handler);

const options = [{ host: "127.0.0.1", port: "30001" }];
const ssub = new Cluster(options);

ssub.ssubscribe("test cluster", function () {
node1.write(node1.findClientByName("ioredis-cluster(subscriber)"), [
"smessage",
"test shard channel",
"hi",
]);
});
ssub.on("smessage", function (channel, message) {
expect(channel).to.eql("test shard channel");
expect(message).to.eql("hi");
ssub.disconnect();
done();
});
});

it("should works when sending regular commands", (done) => {
const handler = function (argv) {
if (argv[0] === "cluster" && argv[1] === "SLOTS") {
return [[0, 16383, ["127.0.0.1", 30001]]];
}
};
new MockServer(30001, handler);

const ssub = new Cluster([{ port: "30001" }]);

ssub.ssubscribe("test cluster", function () {
ssub.set("foo", "bar").then((res) => {
expect(res).to.eql("OK");
ssub.disconnect();
done();
});
});
});

it("supports password", (done) => {
const handler = function (argv, c) {
if (argv[0] === "auth") {
c.password = argv[1];
return;
}
if (argv[0] === "ssubscribe") {
expect(c.password).to.eql("abc");
expect(getConnectionName(c)).to.eql("ioredis-cluster(subscriber)");
}
if (argv[0] === "cluster" && argv[1] === "SLOTS") {
return [[0, 16383, ["127.0.0.1", 30001]]];
}
};
new MockServer(30001, handler);

const ssub = new Redis.Cluster([{ port: "30001", password: "abc" }]);

ssub.ssubscribe("test cluster", function () {
ssub.disconnect();
done();
});
});

it("should re-ssubscribe after reconnection", (done) => {
new MockServer(30001, function (argv) {
if (argv[0] === "cluster" && argv[1] === "SLOTS") {
return [[0, 16383, ["127.0.0.1", 30001]]];
} else if (argv[0] === "ssubscribe" || argv[0] === "psubscribe") {
return [argv[0], argv[1]];
}
});
const client = new Cluster([{ host: "127.0.0.1", port: "30001" }]);

client.ssubscribe("test cluster", function () {
const stub = sinon
.stub(Redis.prototype, "ssubscribe")
.callsFake((channels) => {
expect(channels).to.eql(["test cluster"]);
stub.restore();
client.disconnect();
done();
return Redis.prototype.ssubscribe.apply(this, arguments);
});
client.once("end", function () {
client.connect().catch(noop);
});
client.disconnect();
});
});
});
Loading

0 comments on commit 6285e80

Please sign in to comment.