diff --git a/lib/connectors/SentinelConnector.ts b/lib/connectors/SentinelConnector.ts deleted file mode 100644 index e944a7ea..00000000 --- a/lib/connectors/SentinelConnector.ts +++ /dev/null @@ -1,269 +0,0 @@ -import {createConnection, Socket} from 'net' -import {bind, sample} from '../utils/lodash' -import {CONNECTION_CLOSED_ERROR_MSG, packObject} from '../utils/index' -import {ITcpConnectionOptions, IIpcConnectionOptions, isIIpcConnectionOptions} from './StandaloneConnector' -import { TLSSocket } from 'tls' -import AbstractConnector, {ErrorEmitter} from './AbstractConnector' -const debug = require('../utils/debug')('ioredis:SentinelConnector') - -let Redis - -interface ISentinelSlavesResponse { - port: string, - ip: string, - flags?: string -} - -interface ISentinelOptions { - role: 'master' | 'slave' - name: 'string' - sentinels: any[] - sentinelRetryStrategy?: (retryAttempts: number) => number - preferredSlaves?: - ((slaves: Array) => ISentinelSlavesResponse) | - Array<{port: string, ip: string, prio?: number}> | - {port: string, ip: string, prio?: number} - connectTimeout?: number -} - -type NodeCallback = (err: Error | null, result?: T) => void - -interface ISentinelTcpConnectionOptions extends ITcpConnectionOptions, ISentinelOptions {} -interface ISentinelIpcConnectionOptions extends IIpcConnectionOptions, ISentinelOptions {} - -export default class SentinelConnector extends AbstractConnector { - private retryAttempts: number - private currentPoint: number = -1 - private sentinels: any[] - - constructor (protected options: ISentinelTcpConnectionOptions | ISentinelIpcConnectionOptions) { - super() - - if (this.options.sentinels.length === 0) { - throw new Error('Requires at least one sentinel to connect to.') - } - if (!this.options.name) { - throw new Error('Requires the name of master.') - } - - this.sentinels = this.options.sentinels - } - - public check (info: {role?: string}): boolean { - const roleMatches: boolean = !info.role || this.options.role === info.role - if (!roleMatches) { - debug('role invalid, expected %s, but got %s', this.options.role, info.role) - } - return roleMatches - } - - public connect (callback: NodeCallback, eventEmitter: ErrorEmitter): void { - this.connecting = true - this.retryAttempts = 0 - - let lastError - const _this = this - connectToNext() - - function connectToNext() { - _this.currentPoint += 1 - if (_this.currentPoint === _this.sentinels.length) { - _this.currentPoint = -1 - - const retryDelay = typeof _this.options.sentinelRetryStrategy === 'function' - ? _this.options.sentinelRetryStrategy(++_this.retryAttempts) - : null - - let errorMsg = typeof retryDelay !== 'number' - ? 'All sentinels are unreachable and retry is disabled.' - : `All sentinels are unreachable. Retrying from scratch after ${retryDelay}ms.` - - if (lastError) { - errorMsg += ` Last error: ${lastError.message}` - } - - debug(errorMsg) - - const error = new Error(errorMsg) - if (typeof retryDelay === 'number') { - setTimeout(connectToNext, retryDelay) - eventEmitter('error', error) - } else { - callback(error) - } - return - } - - const endpoint = _this.sentinels[_this.currentPoint] - _this.resolve(endpoint, function (err, resolved) { - if (!_this.connecting) { - callback(new Error(CONNECTION_CLOSED_ERROR_MSG)) - return - } - if (resolved) { - debug('resolved: %s:%s', resolved.host, resolved.port) - _this.stream = createConnection(resolved) - callback(null, _this.stream) - } else { - var endpointAddress = endpoint.host + ':' + endpoint.port - var errorMsg = err - ? 'failed to connect to sentinel ' + endpointAddress + ' because ' + err.message - : 'connected to sentinel ' + endpointAddress + ' successfully, but got an invalid reply: ' + resolved - - debug(errorMsg) - - eventEmitter('sentinelError', new Error(errorMsg)) - - if (err) { - lastError = err - } - connectToNext() - } - }) - } - } - - private updateSentinels (client, callback: NodeCallback): void { - var _this = this - client.sentinel('sentinels', this.options.name, function (err, result) { - if (err) { - client.disconnect() - return callback(err) - } - if (Array.isArray(result)) { - for (var i = 0; i < result.length; ++i) { - var sentinel = packObject(result[i]) - var flags = sentinel.flags ? sentinel.flags.split(',') : [] - if (flags.indexOf('disconnected') === -1 && sentinel.ip && sentinel.port) { - var endpoint = { host: sentinel.ip, port: parseInt(sentinel.port, 10) } - var isDuplicate = _this.sentinels.some(bind(isSentinelEql, null, endpoint)) - if (!isDuplicate) { - debug('adding sentinel %s:%s', endpoint.host, endpoint.port) - _this.sentinels.push(endpoint) - } - } - } - debug('sentinels', _this.sentinels) - } - callback(null) - }) - } - - private resolveMaster (client, callback: NodeCallback): void { - var _this = this - client.sentinel('get-master-addr-by-name', this.options.name, function (err, result) { - if (err) { - client.disconnect() - return callback(err) - } - _this.updateSentinels(client, function (err) { - client.disconnect() - if (err) { - return callback(err) - } - callback(null, Array.isArray(result) ? { host: result[0], port: result[1] } : null) - }) - }) - } - - private resolveSlave (client, callback: NodeCallback): void { - client.sentinel('slaves', this.options.name, (err, result) => { - client.disconnect() - if (err) { - return callback(err) - } - let selectedSlave: ISentinelSlavesResponse - if (Array.isArray(result)) { - const availableSlaves: Array<{port: string, ip: string, flags?: string}> = [] - for (var i = 0; i < result.length; ++i) { - const slave: ISentinelSlavesResponse = packObject(result[i]) - if (slave.flags && !slave.flags.match(/(disconnected|s_down|o_down)/)) { - availableSlaves.push(slave) - } - } - // allow the options to prefer particular slave(s) - let {preferredSlaves} = this.options - if (typeof preferredSlaves === 'function') { - selectedSlave = preferredSlaves(availableSlaves) - } else if (preferredSlaves !== null && typeof preferredSlaves === 'object') { - const preferredSlavesArray = Array.isArray(preferredSlaves) - ? preferredSlaves - : [preferredSlaves] - - // sort by priority - preferredSlavesArray.sort((a, b) => { - // default the priority to 1 - if (!a.prio) { - a.prio = 1 - } - if (!b.prio) { - b.prio = 1 - } - - // lowest priority first - if (a.prio < b.prio) { - return -1 - } - if (a.prio > b.prio) { - return 1 - } - return 0 - }) - - // loop over preferred slaves and return the first match - for (let p = 0; p < preferredSlavesArray.length; p++) { - for (let a = 0; a < availableSlaves.length; a++) { - const slave = availableSlaves[a] - if (slave.ip === preferredSlavesArray[p].ip) { - if (slave.port === preferredSlavesArray[p].port) { - selectedSlave = slave - break - } - } - } - if (selectedSlave) { - break - } - } - // if none of the preferred slaves are available, a random available slave is returned - } - if (!selectedSlave) { - // get a random available slave - selectedSlave = sample(availableSlaves) - } - } - callback(null, selectedSlave ? {host: selectedSlave.ip, port: Number(selectedSlave.port)} : null) - }) - } - - private resolve (endpoint, callback: NodeCallback): void { - if (typeof Redis === 'undefined') { - Redis = require('../redis') - } - var client = new Redis({ - port: endpoint.port || 26379, - host: endpoint.host, - family: endpoint.family || (isIIpcConnectionOptions(this.options) ? undefined : this.options.family), - retryStrategy: null, - enableReadyCheck: false, - connectTimeout: this.options.connectTimeout, - dropBufferSupport: true - }) - - // ignore the errors since resolve* methods will handle them - client.on('error', noop) - - if (this.options.role === 'slave') { - this.resolveSlave(client, callback) - } else { - this.resolveMaster(client, callback) - } - } -} - -function noop (): void {} - -function isSentinelEql (a, b): boolean { - return ((a.host || '127.0.0.1') === (b.host || '127.0.0.1')) && - ((a.port || 26379) === (b.port || 26379)) -} \ No newline at end of file diff --git a/lib/connectors/SentinelConnector/SentinelIterator.ts b/lib/connectors/SentinelConnector/SentinelIterator.ts new file mode 100644 index 00000000..e7ad26bb --- /dev/null +++ b/lib/connectors/SentinelConnector/SentinelIterator.ts @@ -0,0 +1,43 @@ +import {ISentinelAddress} from './types' + +function isSentinelEql (a: ISentinelAddress, b: ISentinelAddress): boolean { + return ((a.host || '127.0.0.1') === (b.host || '127.0.0.1')) && + ((a.port || 26379) === (b.port || 26379)) +} + +export default class SentinelIterator { + private cursor: number = 0 + + constructor (private sentinels: ISentinelAddress[]) {} + + hasNext (): boolean { + return this.cursor < this.sentinels.length + } + + next (): ISentinelAddress | null { + return this.hasNext() ? this.sentinels[this.cursor++] : null + } + + reset (moveCurrentEndpointToFirst: boolean): void { + if (moveCurrentEndpointToFirst && this.sentinels.length > 1 && this.cursor !== 1) { + const remains = this.sentinels.slice(this.cursor - 1) + this.sentinels = remains.concat(this.sentinels.slice(0, this.cursor - 1)) + } + this.cursor = 0 + } + + add (sentinel: ISentinelAddress): boolean { + for (let i = 0; i < this.sentinels.length; i++) { + if (isSentinelEql(sentinel, this.sentinels[i])) { + return false + } + } + + this.sentinels.push(sentinel) + return true + } + + toString (): string { + return `${JSON.stringify(this.sentinels)} @${this.cursor}` + } +} \ No newline at end of file diff --git a/lib/connectors/SentinelConnector/index.ts b/lib/connectors/SentinelConnector/index.ts new file mode 100644 index 00000000..15391f42 --- /dev/null +++ b/lib/connectors/SentinelConnector/index.ts @@ -0,0 +1,275 @@ +import {createConnection, Socket} from 'net' +import {sample} from '../../utils/lodash' +import {CONNECTION_CLOSED_ERROR_MSG, packObject} from '../../utils/index' +import {TLSSocket} from 'tls' +import {ITcpConnectionOptions, isIIpcConnectionOptions} from '../StandaloneConnector' +import SentinelIterator from './SentinelIterator' +import {ISentinelAddress} from './types'; +import AbstractConnector, { ErrorEmitter } from '../AbstractConnector' +const debug = require('../../utils/debug')('ioredis:SentinelConnector') + +let Redis + +interface IAddressFromResponse { + port: string, + ip: string, + flags?: string +} + +type NodeCallback = (err: Error | null, result?: T) => void +type PreferredSlaves = + ((slaves: Array) => IAddressFromResponse) | + Array<{port: string, ip: string, prio?: number}> | + {port: string, ip: string, prio?: number} + +interface ISentinelConnectionOptions extends ITcpConnectionOptions { + role: 'master' | 'slave' + name: 'string' + sentinels: Array + sentinelRetryStrategy?: (retryAttempts: number) => number + preferredSlaves?: PreferredSlaves + connectTimeout?: number +} + +export default class SentinelConnector extends AbstractConnector { + private retryAttempts: number + private sentinelIterator: SentinelIterator + + constructor (protected options: ISentinelConnectionOptions) { + super() + + if (this.options.sentinels.length === 0) { + throw new Error('Requires at least one sentinel to connect to.') + } + if (!this.options.name) { + throw new Error('Requires the name of master.') + } + + this.sentinelIterator = new SentinelIterator(this.options.sentinels) + } + + public check (info: {role?: string}): boolean { + const roleMatches: boolean = !info.role || this.options.role === info.role + if (!roleMatches) { + debug('role invalid, expected %s, but got %s', this.options.role, info.role) + // Start from the next item. + // Note that `reset` will move the cursor to the previous element, + // so we advance two steps here. + this.sentinelIterator.next() + this.sentinelIterator.next() + this.sentinelIterator.reset(true) + } + return roleMatches + } + + public connect (callback: NodeCallback, eventEmitter: ErrorEmitter): void { + this.connecting = true + this.retryAttempts = 0 + + let lastError + const _this = this + connectToNext() + + function connectToNext() { + if (!_this.sentinelIterator.hasNext()) { + _this.sentinelIterator.reset(false) + const retryDelay = typeof _this.options.sentinelRetryStrategy === 'function' + ? _this.options.sentinelRetryStrategy(++_this.retryAttempts) + : null + + let errorMsg = typeof retryDelay !== 'number' + ? 'All sentinels are unreachable and retry is disabled.' + : `All sentinels are unreachable. Retrying from scratch after ${retryDelay}ms.` + + if (lastError) { + errorMsg += ` Last error: ${lastError.message}` + } + + debug(errorMsg) + + const error = new Error(errorMsg) + if (typeof retryDelay === 'number') { + setTimeout(connectToNext, retryDelay) + eventEmitter('error', error) + } else { + callback(error) + } + return + } + + const endpoint = _this.sentinelIterator.next() + _this.resolve(endpoint, function (err, resolved) { + if (!_this.connecting) { + callback(new Error(CONNECTION_CLOSED_ERROR_MSG)) + return + } + if (resolved) { + debug('resolved: %s:%s', resolved.host, resolved.port) + _this.stream = createConnection(resolved) + _this.sentinelIterator.reset(true) + callback(null, _this.stream) + } else { + const endpointAddress = endpoint.host + ':' + endpoint.port + const errorMsg = err + ? 'failed to connect to sentinel ' + endpointAddress + ' because ' + err.message + : 'connected to sentinel ' + endpointAddress + ' successfully, but got an invalid reply: ' + resolved + + debug(errorMsg) + + eventEmitter('sentinelError', new Error(errorMsg)) + + if (err) { + lastError = err + } + connectToNext() + } + }) + } + } + + private updateSentinels (client, callback: NodeCallback): void { + client.sentinel('sentinels', this.options.name, (err, result) => { + if (err) { + client.disconnect() + return callback(err) + } + if (!Array.isArray(result)) { + return callback(null) + } + + result.map(packObject).forEach(sentinel => { + const flags = sentinel.flags ? sentinel.flags.split(',') : [] + if (flags.indexOf('disconnected') === -1 && sentinel.ip && sentinel.port) { + const endpoint = addressResponseToAddress(sentinel) + if (this.sentinelIterator.add(endpoint)) { + debug('adding sentinel %s:%s', endpoint.host, endpoint.port) + } + } + }) + debug('Updated internal sentinels: %s', this.sentinelIterator) + callback(null) + }) + } + + private resolveMaster (client, callback: NodeCallback): void { + client.sentinel('get-master-addr-by-name', this.options.name, (err, result) => { + if (err) { + client.disconnect() + return callback(err) + } + this.updateSentinels(client, (err) => { + client.disconnect() + if (err) { + return callback(err) + } + callback(null, Array.isArray(result) ? { host: result[0], port: Number(result[1]) } : null) + }) + }) + } + + private resolveSlave (client, callback: NodeCallback): void { + client.sentinel('slaves', this.options.name, (err, result) => { + client.disconnect() + if (err) { + return callback(err) + } + + if (!Array.isArray(result)) { + return callback(null, null) + } + + const availableSlaves = result.map(packObject).filter(slave => ( + slave.flags && !slave.flags.match(/(disconnected|s_down|o_down)/) + )) + + callback(null, selectPreferredSentinel(availableSlaves, this.options.preferredSlaves)) + }) + } + + private resolve (endpoint, callback: NodeCallback): void { + if (typeof Redis === 'undefined') { + Redis = require('../../redis') + } + var client = new Redis({ + port: endpoint.port || 26379, + host: endpoint.host, + family: endpoint.family || (isIIpcConnectionOptions(this.options) ? undefined : this.options.family), + retryStrategy: null, + enableReadyCheck: false, + connectTimeout: this.options.connectTimeout, + dropBufferSupport: true + }) + + // ignore the errors since resolve* methods will handle them + client.on('error', noop) + + if (this.options.role === 'slave') { + this.resolveSlave(client, callback) + } else { + this.resolveMaster(client, callback) + } + } +} + +function selectPreferredSentinel (availableSlaves: IAddressFromResponse[], preferredSlaves?: PreferredSlaves): ISentinelAddress | null { + if (availableSlaves.length === 0) { + return null + } + + let selectedSlave: IAddressFromResponse + if (typeof preferredSlaves === 'function') { + selectedSlave = preferredSlaves(availableSlaves) + } else if (preferredSlaves !== null && typeof preferredSlaves === 'object') { + const preferredSlavesArray = Array.isArray(preferredSlaves) + ? preferredSlaves + : [preferredSlaves] + + // sort by priority + preferredSlavesArray.sort((a, b) => { + // default the priority to 1 + if (!a.prio) { + a.prio = 1 + } + if (!b.prio) { + b.prio = 1 + } + + // lowest priority first + if (a.prio < b.prio) { + return -1 + } + if (a.prio > b.prio) { + return 1 + } + return 0 + }) + + // loop over preferred slaves and return the first match + for (let p = 0; p < preferredSlavesArray.length; p++) { + for (let a = 0; a < availableSlaves.length; a++) { + const slave = availableSlaves[a] + if (slave.ip === preferredSlavesArray[p].ip) { + if (slave.port === preferredSlavesArray[p].port) { + selectedSlave = slave + break + } + } + } + if (selectedSlave) { + break + } + } + } + + // if none of the preferred slaves are available, a random available slave is returned + if (!selectedSlave) { + selectedSlave = sample(availableSlaves) + } + return addressResponseToAddress(selectedSlave) +} + +function addressResponseToAddress (input: IAddressFromResponse): ISentinelAddress { + return {host: input.ip, port: Number(input.port)} +} + +function noop (): void {} diff --git a/lib/connectors/SentinelConnector/types.ts b/lib/connectors/SentinelConnector/types.ts new file mode 100644 index 00000000..2b3ec617 --- /dev/null +++ b/lib/connectors/SentinelConnector/types.ts @@ -0,0 +1,4 @@ +export interface ISentinelAddress { + port: number + host: string +} diff --git a/test/functional/sentinel.js b/test/functional/sentinel.js index 05111e4b..c7882861 100644 --- a/test/functional/sentinel.js +++ b/test/functional/sentinel.js @@ -199,23 +199,19 @@ describe('sentinel', function () { }); it('should connect to the next sentinel if the role is wrong', function (done) { - var sentinel = new MockServer(27379, function (argv) { + new MockServer(27379, function (argv) { if (argv[0] === 'sentinel' && argv[1] === 'get-master-addr-by-name' && argv[2] === 'master') { return ['127.0.0.1', '17380']; } }); - var sentinel2 = new MockServer(27380); - sentinel2.on('connect', function () { + var sentinel = new MockServer(27380); + sentinel.on('connect', function () { redis.disconnect(); - sentinel.disconnect(function () { - master.disconnect(function () { - sentinel2.disconnect(done); - }); - }); + done(); }); - var master = new MockServer(17380, function (argv) { + new MockServer(17380, function (argv) { if (argv[0] === 'info') { return 'role:slave'; } @@ -292,7 +288,7 @@ describe('sentinel', function () { }); it('should connect to the slave successfully based on preferred slave filter function', function (done) { - var sentinel = new MockServer(27379, function (argv) { + new MockServer(27379, function (argv) { if (argv[0] === 'sentinel' && argv[1] === 'slaves' && argv[2] === 'master') { return [['ip', '127.0.0.1', 'port', '17381', 'flags', 'slave']]; } @@ -301,9 +297,7 @@ describe('sentinel', function () { var slave = new MockServer(17381); slave.on('connect', function () { redis.disconnect(); - sentinel.disconnect(function () { - slave.disconnect(done); - }); + done(); }); var redis = new Redis({ @@ -312,7 +306,7 @@ describe('sentinel', function () { ], name: 'master', role: 'slave', - preferredSlaves: function(slaves){ + preferredSlaves (slaves) { for (var i = 0; i < slaves.length; i++){ var slave = slaves[i]; if (slave.ip == '127.0.0.1' && slave.port =='17381'){