diff --git a/Changelog.md b/Changelog.md index c1f445a4..c9219f6a 100644 --- a/Changelog.md +++ b/Changelog.md @@ -2,7 +2,8 @@ ### Master Branch -* Support pub/sub in Cluster mode. +* Continue monitoring after reconnection([#52](https://github.com/luin/ioredis/issues/52)). +* Support pub/sub in Cluster mode([#54](https://github.com/luin/ioredis/issues/54)). * Auto-reconnect when none of startup nodes is ready([#56](https://github.com/luin/ioredis/issues/56)). ### v1.3.6 - May 22, 2015 diff --git a/lib/command.js b/lib/command.js index 9d388422..a730311d 100644 --- a/lib/command.js +++ b/lib/command.js @@ -240,6 +240,8 @@ Command.FLAGS = { VALID_WHEN_LOADING: ['info', 'auth', 'select', 'subscribe', 'unsubscribe', 'psubscribe', 'pubsubscribe', 'publish', 'shutdown', 'replconf', 'role', 'pubsub', 'command', 'latency'], // Commands that can be processed when client is in the subscriber mode VALID_IN_SUBSCRIBER_MODE: ['subscribe', 'psubscribe', 'unsubscribe', 'punsubscribe', 'ping', 'quit'], + // Commands that are valid in monitor mode + VALID_IN_MONITOR_MODE: ['monitor', 'auth'], // Commands that will turn current connection into subscriber mode ENTER_SUBSCRIBER_MODE: ['subscribe', 'psubscribe'], // Commands that may make current connection quit subscriber mode diff --git a/lib/commander.js b/lib/commander.js index b1a9944a..dedf3121 100644 --- a/lib/commander.js +++ b/lib/commander.js @@ -108,9 +108,7 @@ function generateFunction (_commandName, _encoding) { options.errorStack = new Error().stack; } - var command = new Command(commandName, args, options, callback); - - return this.sendCommand(command); + return this.sendCommand(new Command(commandName, args, options, callback)); }; } diff --git a/lib/redis.js b/lib/redis.js index fd824cba..8ef6f2c4 100644 --- a/lib/redis.js +++ b/lib/redis.js @@ -221,10 +221,7 @@ Redis.prototype.connect = function (callback) { this.condition = { select: this.options.db, auth: this.options.password, - mode: { - subscriber: false, - monitor: false - } + subscriber: false }; var _this = this; @@ -306,7 +303,7 @@ Redis.prototype.end = function () { * @public */ Redis.prototype.duplicate = function (override) { - return new Redis(_.defaults(override || {}, this.options)); + return new Redis(_.assign(_.cloneDeep(this.options), override || {})); }; /** @@ -410,10 +407,11 @@ Redis.prototype.silentEmit = function (eventName) { * @public */ Redis.prototype.monitor = function (callback) { - var monitorInstance = this.duplicate({ lazyConnect: false }); - monitorInstance.options.enableReadyCheck = false; - monitorInstance.condition.mode.monitoring = true; - monitorInstance.prevCondition = monitorInstance.condition; + var monitorInstance = this.duplicate({ + monitor: true, + enableReadyCheck: false, + lazyConnect: false + }); return new Promise(function (resolve) { monitorInstance.once('monitoring', function () { @@ -463,7 +461,7 @@ Redis.prototype.sendCommand = function (command, stream) { command.reject(new Error('Connection is closed.')); return command.promise; } - if (this.condition.mode.subscriber && !_.includes(Command.FLAGS.VALID_IN_SUBSCRIBER_MODE, command.name)) { + if (this.condition.subscriber && !_.includes(Command.FLAGS.VALID_IN_SUBSCRIBER_MODE, command.name)) { command.reject(new Error('Connection in subscriber mode, only subscriber commands may be used')); return command.promise; } @@ -513,6 +511,6 @@ Redis.prototype.sendCommand = function (command, stream) { return command.promise; }; -_.assign(Redis.prototype, require('./redis/prototype/parser')); +_.assign(Redis.prototype, require('./redis/parser')); module.exports = Redis; diff --git a/lib/redis/event_handler.js b/lib/redis/event_handler.js index 46f51d36..9411cf6e 100644 --- a/lib/redis/event_handler.js +++ b/lib/redis/event_handler.js @@ -1,6 +1,8 @@ 'use strict'; var debug = require('debug')('ioredis:connection'); +var Command = require('../command'); +var _ = require('lodash'); exports.connectHandler = function (self) { return function () { @@ -99,61 +101,65 @@ exports.readyHandler = function (self) { self.setStatus('ready'); self.retryAttempts = 0; - var item; - var finalSelect = self.condition.select; - self.condition.select = 0; + if (self.options.monitor) { + self.call('monitor', function () { + var sendCommand = self.sendCommand; + self.sendCommand = function (command) { + if (_.includes(Command.FLAGS.VALID_IN_MONITOR_MODE, command.name)) { + return sendCommand.call(self, command); + } + command.reject(new Error('Connection is in monitoring mode, can\'t process commands.')); + return command.promise; + }; + self.emit('monitoring'); + }); + } else { + var item; + var finalSelect = self.condition.select; + self.condition.select = 0; + + if (self.prevCommandQueue) { + if (self.options.autoResendUnfulfilledCommands) { + debug('resend %d unfulfilled commands', self.prevCommandQueue.length); + while (self.prevCommandQueue.length > 0) { + item = self.prevCommandQueue.shift(); + if (item.select !== self.condition.select && item.command.name !== 'select') { + self.select(item.select); + } + self.sendCommand(item.command, item.stream); + } + } else { + self.prevCommandQueue = null; + } + } - if (self.prevCommandQueue) { - if (self.options.autoResendUnfulfilledCommands) { - debug('resend %d unfulfilled commands', self.prevCommandQueue.length); - while (self.prevCommandQueue.length > 0) { - item = self.prevCommandQueue.shift(); + if (self.offlineQueue.length) { + debug('send %d commands in offline queue', self.offlineQueue.length); + var offlineQueue = self.offlineQueue; + self.offlineQueue = []; + while (offlineQueue.length > 0) { + item = offlineQueue.shift(); if (item.select !== self.condition.select && item.command.name !== 'select') { self.select(item.select); } self.sendCommand(item.command, item.stream); } - } else { - self.prevCommandQueue = null; } - } - if (self.offlineQueue.length) { - debug('send %d commands in offline queue', self.offlineQueue.length); - var offlineQueue = self.offlineQueue; - self.offlineQueue = []; - while (offlineQueue.length > 0) { - item = offlineQueue.shift(); - if (item.select !== self.condition.select && item.command.name !== 'select') { - self.select(item.select); - } - self.sendCommand(item.command, item.stream); + if (self.condition.select !== finalSelect) { + debug('connect to db [%d]', finalSelect); + self.selectBuffer(finalSelect); } - } - - if (self.condition.select !== finalSelect) { - debug('connect to db [%d]', finalSelect); - self.selectBuffer(finalSelect); - } - if (self.prevCondition) { - var condition = self.prevCondition; - if (condition.mode.monitoring) { - self.call('monitor', function () { - self.sendCommand = function (command) { - command.reject(new Error('Connection is in monitoring mode, can\'t process commands.')); - return command.promise; - }; - self.emit('monitoring'); - }); - } else { - if (condition.mode.subscriber && self.options.autoResubscribe) { - var subscribeChannels = condition.mode.subscriber.channels('subscribe'); + if (self.prevCondition) { + var condition = self.prevCondition; + if (condition.subscriber && self.options.autoResubscribe) { + var subscribeChannels = condition.subscriber.channels('subscribe'); if (subscribeChannels.length) { debug('subscribe %d channels', subscribeChannels.length); self.subscribe(subscribeChannels); } - var psubscribeChannels = condition.mode.subscriber.channels('psubscribe'); + var psubscribeChannels = condition.subscriber.channels('psubscribe'); if (psubscribeChannels.length) { debug('psubscribe %d channels', psubscribeChannels.length); self.psubscribe(psubscribeChannels); diff --git a/lib/redis/prototype/parser.js b/lib/redis/parser.js similarity index 89% rename from lib/redis/prototype/parser.js rename to lib/redis/parser.js index 0796ee68..39eb1911 100644 --- a/lib/redis/prototype/parser.js +++ b/lib/redis/parser.js @@ -1,8 +1,8 @@ 'use strict'; var _ = require('lodash'); -var Command = require('../../command'); -var SubscriptionSet = require('../../subscription_set'); +var Command = require('../command'); +var SubscriptionSet = require('../subscription_set'); var debug = require('debug')('ioredis:reply'); /** @@ -47,7 +47,7 @@ _.forEach(['message', 'pmessage', 'subscribe', 'psubscribe', 'unsubscribe', 'pun sharedBuffers[str] = new Buffer(str); }); exports.returnReply = function (reply) { - if (this.condition.mode.monitoring) { + if (this.options.monitor) { // Valid commands in the monitoring mode are AUTH and MONITOR, // both of which always reply with 'OK'. var replyStr = reply.toString(); @@ -69,7 +69,7 @@ exports.returnReply = function (reply) { } var item, channel, count; - if (this.condition.mode.subscriber) { + if (this.condition.subscriber) { var replyType = Array.isArray(reply) ? reply[0].toString() : null; debug('receive reply "%s" in subscriber mode', replyType); @@ -94,7 +94,7 @@ exports.returnReply = function (reply) { case 'subscribe': case 'psubscribe': channel = reply[1].toString(); - this.condition.mode.subscriber.add(replyType, channel); + this.condition.subscriber.add(replyType, channel); item = this.commandQueue.shift(); if (!fillSubCommand(item.command, reply[2])) { this.commandQueue.unshift(item); @@ -104,11 +104,11 @@ exports.returnReply = function (reply) { case 'punsubscribe': channel = reply[1] ? reply[1].toString() : null; if (channel) { - this.condition.mode.subscriber.del(replyType, channel); + this.condition.subscriber.del(replyType, channel); } count = reply[2]; if (count === 0) { - this.condition.mode.subscriber = false; + this.condition.subscriber = false; } item = this.commandQueue.shift(); if (!fillUnsubCommand(item.command, count)) { @@ -122,11 +122,12 @@ exports.returnReply = function (reply) { } else { item = this.commandQueue.shift(); if (!item) { + console.log('error', this.condition, reply.toString(), this.options.type); return this.emit('error', new Error('Command queue state error. If you can reproduce this, please report it.')); } if (_.includes(Command.FLAGS.ENTER_SUBSCRIBER_MODE, item.command.name)) { - this.condition.mode.subscriber = new SubscriptionSet(); - this.condition.mode.subscriber.add(item.command.name, reply[1].toString()); + this.condition.subscriber = new SubscriptionSet(); + this.condition.subscriber.add(item.command.name, reply[1].toString()); if (!fillSubCommand(item.command, reply[2])) { this.commandQueue.unshift(item); diff --git a/test/functional/monitor.js b/test/functional/monitor.js index c1525db8..10d24e34 100644 --- a/test/functional/monitor.js +++ b/test/functional/monitor.js @@ -26,4 +26,21 @@ describe('monitor', function () { }); }); }); + + it('should continue monitoring after reconnection', function (done) { + var redis = new Redis(); + redis.monitor(function (err, monitor) { + monitor.on('monitor', function (time, args) { + if (args[0] === 'set') { + redis.disconnect(); + monitor.disconnect(); + done(); + } + }); + monitor.disconnect(true); + monitor.on('ready', function () { + redis.set('foo', 'bar'); + }); + }); + }); });