Skip to content

Commit

Permalink
Continue monitoring after reconnection. Close #52
Browse files Browse the repository at this point in the history
  • Loading branch information
luin committed Jun 3, 2015
1 parent 1bcbc0c commit 109a6ec
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 65 deletions.
3 changes: 2 additions & 1 deletion Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

### Master Branch

* Support pub/sub in Cluster mode.
* Continue monitoring after reconnection([#52](https://github.com/luin/ioredis/issues/52)).
* Support pub/sub in Cluster mode([#54](https://github.com/luin/ioredis/issues/54)).
* Auto-reconnect when none of startup nodes is ready([#56](https://github.com/luin/ioredis/issues/56)).

### v1.3.6 - May 22, 2015
Expand Down
2 changes: 2 additions & 0 deletions lib/command.js
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ Command.FLAGS = {
VALID_WHEN_LOADING: ['info', 'auth', 'select', 'subscribe', 'unsubscribe', 'psubscribe', 'pubsubscribe', 'publish', 'shutdown', 'replconf', 'role', 'pubsub', 'command', 'latency'],
// Commands that can be processed when client is in the subscriber mode
VALID_IN_SUBSCRIBER_MODE: ['subscribe', 'psubscribe', 'unsubscribe', 'punsubscribe', 'ping', 'quit'],
// Commands that are valid in monitor mode
VALID_IN_MONITOR_MODE: ['monitor', 'auth'],
// Commands that will turn current connection into subscriber mode
ENTER_SUBSCRIBER_MODE: ['subscribe', 'psubscribe'],
// Commands that may make current connection quit subscriber mode
Expand Down
4 changes: 1 addition & 3 deletions lib/commander.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,7 @@ function generateFunction (_commandName, _encoding) {
options.errorStack = new Error().stack;
}

var command = new Command(commandName, args, options, callback);

return this.sendCommand(command);
return this.sendCommand(new Command(commandName, args, options, callback));
};
}

Expand Down
20 changes: 9 additions & 11 deletions lib/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,7 @@ Redis.prototype.connect = function (callback) {
this.condition = {
select: this.options.db,
auth: this.options.password,
mode: {
subscriber: false,
monitor: false
}
subscriber: false
};

var _this = this;
Expand Down Expand Up @@ -306,7 +303,7 @@ Redis.prototype.end = function () {
* @public
*/
Redis.prototype.duplicate = function (override) {
return new Redis(_.defaults(override || {}, this.options));
return new Redis(_.assign(_.cloneDeep(this.options), override || {}));
};

/**
Expand Down Expand Up @@ -410,10 +407,11 @@ Redis.prototype.silentEmit = function (eventName) {
* @public
*/
Redis.prototype.monitor = function (callback) {
var monitorInstance = this.duplicate({ lazyConnect: false });
monitorInstance.options.enableReadyCheck = false;
monitorInstance.condition.mode.monitoring = true;
monitorInstance.prevCondition = monitorInstance.condition;
var monitorInstance = this.duplicate({
monitor: true,
enableReadyCheck: false,
lazyConnect: false
});

return new Promise(function (resolve) {
monitorInstance.once('monitoring', function () {
Expand Down Expand Up @@ -463,7 +461,7 @@ Redis.prototype.sendCommand = function (command, stream) {
command.reject(new Error('Connection is closed.'));
return command.promise;
}
if (this.condition.mode.subscriber && !_.includes(Command.FLAGS.VALID_IN_SUBSCRIBER_MODE, command.name)) {
if (this.condition.subscriber && !_.includes(Command.FLAGS.VALID_IN_SUBSCRIBER_MODE, command.name)) {
command.reject(new Error('Connection in subscriber mode, only subscriber commands may be used'));
return command.promise;
}
Expand Down Expand Up @@ -513,6 +511,6 @@ Redis.prototype.sendCommand = function (command, stream) {
return command.promise;
};

_.assign(Redis.prototype, require('./redis/prototype/parser'));
_.assign(Redis.prototype, require('./redis/parser'));

module.exports = Redis;
88 changes: 47 additions & 41 deletions lib/redis/event_handler.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
'use strict';

var debug = require('debug')('ioredis:connection');
var Command = require('../command');
var _ = require('lodash');

exports.connectHandler = function (self) {
return function () {
Expand Down Expand Up @@ -99,61 +101,65 @@ exports.readyHandler = function (self) {
self.setStatus('ready');
self.retryAttempts = 0;

var item;
var finalSelect = self.condition.select;
self.condition.select = 0;
if (self.options.monitor) {
self.call('monitor', function () {
var sendCommand = self.sendCommand;
self.sendCommand = function (command) {
if (_.includes(Command.FLAGS.VALID_IN_MONITOR_MODE, command.name)) {
return sendCommand.call(self, command);
}
command.reject(new Error('Connection is in monitoring mode, can\'t process commands.'));
return command.promise;
};
self.emit('monitoring');
});
} else {
var item;
var finalSelect = self.condition.select;
self.condition.select = 0;

if (self.prevCommandQueue) {
if (self.options.autoResendUnfulfilledCommands) {
debug('resend %d unfulfilled commands', self.prevCommandQueue.length);
while (self.prevCommandQueue.length > 0) {
item = self.prevCommandQueue.shift();
if (item.select !== self.condition.select && item.command.name !== 'select') {
self.select(item.select);
}
self.sendCommand(item.command, item.stream);
}
} else {
self.prevCommandQueue = null;
}
}

if (self.prevCommandQueue) {
if (self.options.autoResendUnfulfilledCommands) {
debug('resend %d unfulfilled commands', self.prevCommandQueue.length);
while (self.prevCommandQueue.length > 0) {
item = self.prevCommandQueue.shift();
if (self.offlineQueue.length) {
debug('send %d commands in offline queue', self.offlineQueue.length);
var offlineQueue = self.offlineQueue;
self.offlineQueue = [];
while (offlineQueue.length > 0) {
item = offlineQueue.shift();
if (item.select !== self.condition.select && item.command.name !== 'select') {
self.select(item.select);
}
self.sendCommand(item.command, item.stream);
}
} else {
self.prevCommandQueue = null;
}
}

if (self.offlineQueue.length) {
debug('send %d commands in offline queue', self.offlineQueue.length);
var offlineQueue = self.offlineQueue;
self.offlineQueue = [];
while (offlineQueue.length > 0) {
item = offlineQueue.shift();
if (item.select !== self.condition.select && item.command.name !== 'select') {
self.select(item.select);
}
self.sendCommand(item.command, item.stream);
if (self.condition.select !== finalSelect) {
debug('connect to db [%d]', finalSelect);
self.selectBuffer(finalSelect);
}
}

if (self.condition.select !== finalSelect) {
debug('connect to db [%d]', finalSelect);
self.selectBuffer(finalSelect);
}

if (self.prevCondition) {
var condition = self.prevCondition;
if (condition.mode.monitoring) {
self.call('monitor', function () {
self.sendCommand = function (command) {
command.reject(new Error('Connection is in monitoring mode, can\'t process commands.'));
return command.promise;
};
self.emit('monitoring');
});
} else {
if (condition.mode.subscriber && self.options.autoResubscribe) {
var subscribeChannels = condition.mode.subscriber.channels('subscribe');
if (self.prevCondition) {
var condition = self.prevCondition;
if (condition.subscriber && self.options.autoResubscribe) {
var subscribeChannels = condition.subscriber.channels('subscribe');
if (subscribeChannels.length) {
debug('subscribe %d channels', subscribeChannels.length);
self.subscribe(subscribeChannels);
}
var psubscribeChannels = condition.mode.subscriber.channels('psubscribe');
var psubscribeChannels = condition.subscriber.channels('psubscribe');
if (psubscribeChannels.length) {
debug('psubscribe %d channels', psubscribeChannels.length);
self.psubscribe(psubscribeChannels);
Expand Down
19 changes: 10 additions & 9 deletions lib/redis/prototype/parser.js → lib/redis/parser.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
'use strict';

var _ = require('lodash');
var Command = require('../../command');
var SubscriptionSet = require('../../subscription_set');
var Command = require('../command');
var SubscriptionSet = require('../subscription_set');
var debug = require('debug')('ioredis:reply');

/**
Expand Down Expand Up @@ -47,7 +47,7 @@ _.forEach(['message', 'pmessage', 'subscribe', 'psubscribe', 'unsubscribe', 'pun
sharedBuffers[str] = new Buffer(str);
});
exports.returnReply = function (reply) {
if (this.condition.mode.monitoring) {
if (this.options.monitor) {
// Valid commands in the monitoring mode are AUTH and MONITOR,
// both of which always reply with 'OK'.
var replyStr = reply.toString();
Expand All @@ -69,7 +69,7 @@ exports.returnReply = function (reply) {
}

var item, channel, count;
if (this.condition.mode.subscriber) {
if (this.condition.subscriber) {
var replyType = Array.isArray(reply) ? reply[0].toString() : null;
debug('receive reply "%s" in subscriber mode', replyType);

Expand All @@ -94,7 +94,7 @@ exports.returnReply = function (reply) {
case 'subscribe':
case 'psubscribe':
channel = reply[1].toString();
this.condition.mode.subscriber.add(replyType, channel);
this.condition.subscriber.add(replyType, channel);
item = this.commandQueue.shift();
if (!fillSubCommand(item.command, reply[2])) {
this.commandQueue.unshift(item);
Expand All @@ -104,11 +104,11 @@ exports.returnReply = function (reply) {
case 'punsubscribe':
channel = reply[1] ? reply[1].toString() : null;
if (channel) {
this.condition.mode.subscriber.del(replyType, channel);
this.condition.subscriber.del(replyType, channel);
}
count = reply[2];
if (count === 0) {
this.condition.mode.subscriber = false;
this.condition.subscriber = false;
}
item = this.commandQueue.shift();
if (!fillUnsubCommand(item.command, count)) {
Expand All @@ -122,11 +122,12 @@ exports.returnReply = function (reply) {
} else {
item = this.commandQueue.shift();
if (!item) {
console.log('error', this.condition, reply.toString(), this.options.type);
return this.emit('error', new Error('Command queue state error. If you can reproduce this, please report it.'));
}
if (_.includes(Command.FLAGS.ENTER_SUBSCRIBER_MODE, item.command.name)) {
this.condition.mode.subscriber = new SubscriptionSet();
this.condition.mode.subscriber.add(item.command.name, reply[1].toString());
this.condition.subscriber = new SubscriptionSet();
this.condition.subscriber.add(item.command.name, reply[1].toString());

if (!fillSubCommand(item.command, reply[2])) {
this.commandQueue.unshift(item);
Expand Down
17 changes: 17 additions & 0 deletions test/functional/monitor.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,21 @@ describe('monitor', function () {
});
});
});

it('should continue monitoring after reconnection', function (done) {
var redis = new Redis();
redis.monitor(function (err, monitor) {
monitor.on('monitor', function (time, args) {
if (args[0] === 'set') {
redis.disconnect();
monitor.disconnect();
done();
}
});
monitor.disconnect(true);
monitor.on('ready', function () {
redis.set('foo', 'bar');
});
});
});
});

0 comments on commit 109a6ec

Please sign in to comment.