diff --git a/doc/api/cluster.markdown b/doc/api/cluster.markdown index c87217f8cf6..b7cfee4ce8c 100644 --- a/doc/api/cluster.markdown +++ b/doc/api/cluster.markdown @@ -118,6 +118,21 @@ where the 'listening' event is emitted. console.log("We are now connected"); }); +## Event: 'disconnect' + +* `worker` {Worker object} + +When a workers IPC channel has disconnected this event is emitted. This will happen +when the worker die, usually after calling `.destroy()`. + +But also when calling `.disconnect()`, in this case it is possible there is delay +between the `disconnect` and `death` and the event can be used to detect if the +process is stuck in a cleanup or if there are long living connection. + + cluster.on('disconnect', function(worker) { + console.log('The worker #' + worker.uniqueID + ' has disconnected'); + }); + ## Event: 'death' * `worker` {Worker object} @@ -179,6 +194,16 @@ Spawn a new worker process. This can only be called from the master process. All settings set by the `.setupMaster` is stored in this settings object. This object is not supposed to be change or set manually. +## cluster.disconnect([callback]) + +* `callback` {Function} called when all workers are disconnected and handlers are closed + +When calling this method all workers will commit a graceful suicide. When they are +disconnected all internal handlers will be closed, allowing the master process to +die graceful if no other event is waiting. + +The method takes an optional callback argument there will be called when finished. + ## cluster.workers * {Object} @@ -232,9 +257,8 @@ See: [Child Process module](child_process.html) * {Boolean} -This property is a boolean. It is set when a worker dies, until then it is -`undefined`. It is true if the worker was killed using the `.destroy()` -method, and false otherwise. +This property is a boolean. It is set when a worker dies after calling `.destroy()` +or immediately after calling the `.disconnect()` method. Until then it is `undefined`. ### worker.send(message, [sendHandle]) @@ -273,6 +297,55 @@ a suicide boolean is set to true. // destroy worker worker.destroy(); + +## Worker.disconnect() + +When calling this function the worker will no longer accept new connections, but +they will be handled by any other listening worker. Existing connection will be +allowed to exit as usual. When no more connections exist, the IPC channel to the worker +will close allowing it to die graceful. When the IPC channel is closed the `disconnect` +event will emit, this is then followed by the `death` event, there is emitted when +the worker finally die. + +Because there might be long living connections, it is useful to implement a timeout. +This example ask the worker to disconnect and after 2 seconds it will destroy the +server. An alternative wound be to execute `worker.destroy()` after 2 seconds, but +that would normally not allow the worker to do any cleanup if needed. + + if (cluster.isMaster) { + var worker = cluser.fork(); + var timeout; + + worker.on('listening', function () { + worker.disconnect(); + timeout = setTimeout(function () { + worker.send('force kill'); + }, 2000); + }); + + worker.on('disconnect', function () { + clearTimeout(timeout); + }); + + } else if (cluster.isWorker) { + var net = require('net'); + var server = net.createServer(function (socket) { + // connection never end + }); + + server.listen(8000); + + server.on('close', function () { + // cleanup + }); + + process.on('message', function (msg) { + if (msg === 'force kill') { + server.destroy(); + } + }); + } + ### Event: 'message' * `message` {Object} @@ -342,6 +415,17 @@ on the specified worker. // Worker is listening }; +## Event: 'disconnect' + +* `worker` {Worker object} + +Same as the `cluster.on('disconnect')` event, but emits only when the state change +on the specified worker. + + cluster.fork().on('disconnect', function (worker) { + // Worker has disconnected + }; + ## Event: 'death' * `worker` {Worker object} diff --git a/lib/cluster.js b/lib/cluster.js index 556ab67fd86..835f987d149 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -77,6 +77,19 @@ function eachWorker(cb) { } } +// Extremely simple progress tracker +function ProgressTracker(missing, callback) { + this.missing = missing; + this.callback = callback; +} +ProgressTracker.prototype.done = function() { + this.missing -= 1; + this.check(); +}; +ProgressTracker.prototype.check = function() { + if (this.missing === 0) this.callback(); +}; + cluster.setupMaster = function(options) { // This can only be called from the master. assert(cluster.isMaster); @@ -238,7 +251,10 @@ if (cluster.isMaster) { // Messages to a worker will be handled using this methods else if (cluster.isWorker) { - // TODO: the disconnect step will use this + // Handle worker.disconnect from master + messageHandingObject.disconnect = function(message, worker) { + worker.disconnect(); + }; } function toDecInt(value) { @@ -291,9 +307,11 @@ function Worker(customEnv) { }); } - // handle internalMessage and exit event + // handle internalMessage, exit and disconnect event this.process.on('internalMessage', handleMessage.bind(null, this)); this.process.on('exit', prepareDeath.bind(null, this, 'dead', 'death')); + this.process.on('disconnect', + prepareDeath.bind(null, this, 'disconnected', 'disconnect')); // relay message and error this.process.on('message', this.emit.bind(this, 'message')); @@ -354,14 +372,6 @@ Worker.prototype.send = function() { this.process.send.apply(this.process, arguments); }; - -function closeWorkerChannel(worker, callback) { - //Apparently the .close method is async, but do not have a callback - worker.process._channel.close(); - worker.process._channel = null; - process.nextTick(callback); -} - // Kill the worker without restarting Worker.prototype.destroy = function() { var self = this; @@ -371,9 +381,14 @@ Worker.prototype.destroy = function() { if (cluster.isMaster) { // Disconnect IPC channel // this way the worker won't need to propagate suicide state to master - closeWorkerChannel(this, function() { + if (self.process.connected) { + self.process.once('disconnect', function() { + self.process.kill(); + }); + self.process.disconnect(); + } else { self.process.kill(); - }); + } } else { // Channel is open @@ -401,6 +416,59 @@ Worker.prototype.destroy = function() { } }; +// The .disconnect function will close all server and then disconnect +// the IPC channel. +if (cluster.isMaster) { + // Used in master + Worker.prototype.disconnect = function() { + this.suicide = true; + + sendInternalMessage(this, {cmd: 'disconnect'}); + }; + +} else { + // Used in workers + Worker.prototype.disconnect = function() { + var self = this; + + this.suicide = true; + + // keep track of open servers + var servers = Object.keys(serverLisenters).length; + var progress = new ProgressTracker(servers, function() { + // there are no more servers open so we will close the IPC channel. + // Closeing the IPC channel will emit emit a disconnect event + // in both master and worker on the process object. + // This event will be handled by prepearDeath. + self.process.disconnect(); + }); + + // depending on where this function was called from (master or worker) + // the suicide state has allready been set. + // But it dosn't really matter if we set it again. + sendInternalMessage(this, {cmd: 'suicide'}, function() { + // in case there are no servers + progress.check(); + + // closeing all servers graceful + var server; + for (var key in serverLisenters) { + server = serverLisenters[key]; + + // in case the server is closed we wont close it again + if (server._handle === null) { + progress.done(); + continue; + } + + server.on('close', progress.done.bind(progress)); + server.close(); + } + }); + + }; +} + // Fork a new worker cluster.fork = function(env) { // This can only be called from the master. @@ -412,6 +480,33 @@ cluster.fork = function(env) { return (new cluster.Worker(env)); }; +// execute .disconnect on all workers and close handlers when done +cluster.disconnect = function(callback) { + // This can only be called from the master. + assert(cluster.isMaster); + + // Close all TCP handlers when all workers are disconnected + var workers = Object.keys(cluster.workers).length; + var progress = new ProgressTracker(workers, function() { + for (var key in serverHandlers) { + serverHandlers[key].close(); + delete serverHandlers[key]; + } + + // call callback when done + if (callback) callback(); + }); + + // begin disconnecting all workers + eachWorker(function(worker) { + worker.once('disconnect', progress.done.bind(progress)); + worker.disconnect(); + }); + + // in case there wasn't any workers + progress.check(); +}; + // Sync way to quickly kill all cluster workers // However the workers may not die instantly function quickDestroyCluster() { diff --git a/test/simple/test-cluster-disconnect.js b/test/simple/test-cluster-disconnect.js new file mode 100644 index 00000000000..4ea6afcaa22 --- /dev/null +++ b/test/simple/test-cluster-disconnect.js @@ -0,0 +1,122 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + + +var common = require('../common'); +var assert = require('assert'); +var cluster = require('cluster'); +var net = require('net'); + +if (cluster.isWorker) { + net.createServer(function(socket) { + socket.end('echo'); + }).listen(common.PORT, '127.0.0.1'); + + net.createServer(function(socket) { + socket.end('echo'); + }).listen(common.PORT + 1, '127.0.0.1'); + +} else if (cluster.isMaster) { + + // test a single TCP server + var testConnection = function(port, cb) { + var socket = net.connect(port, '127.0.0.1', function() { + // buffer result + var result = ''; + socket.on('data', function(chunk) { result += chunk; }); + + // check result + socket.on('end', function() { + cb(result === 'echo'); + }); + }); + }; + + // test both servers created in the cluster + var testCluster = function(cb) { + var servers = 2; + var done = 0; + + for (var i = 0, l = servers; i < l; i++) { + testConnection(common.PORT + i, function(sucess) { + assert.ok(sucess); + done += 1; + if (done === servers) { + cb(); + } + }); + } + }; + + // start two workers and execute callback when both is listening + var startCluster = function(cb) { + var workers = 2; + var online = 0; + + for (var i = 0, l = workers; i < l; i++) { + + var worker = cluster.fork(); + worker.on('listening', function() { + online += 1; + if (online === workers) { + cb(); + } + }); + } + }; + + + var results = { + start: 0, + test: 0, + disconnect: 0 + }; + + var test = function(again) { + //1. start cluster + startCluster(function() { + results.start += 1; + + //2. test cluster + testCluster(function() { + results.test += 1; + + //3. disconnect cluster + cluster.disconnect(function() { + results.disconnect += 1; + + // run test again to confirm cleanup + if (again) { + test(); + } + }); + }); + }); + }; + + test(true); + + process.once('exit', function() { + assert.equal(results.start, 2); + assert.equal(results.test, 2); + assert.equal(results.disconnect, 2); + }); +} diff --git a/test/simple/test-cluster-worker-disconnect.js b/test/simple/test-cluster-worker-disconnect.js new file mode 100644 index 00000000000..af2bf7078d2 --- /dev/null +++ b/test/simple/test-cluster-worker-disconnect.js @@ -0,0 +1,110 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + + +var common = require('../common'); +var assert = require('assert'); +var cluster = require('cluster'); + +if (cluster.isWorker) { + var http = require('http'); + http.Server(function() { + + }).listen(common.PORT, '127.0.0.1'); + +} else if (cluster.isMaster) { + + var checks = { + cluster: { + emitDisconnect: false, + emitDeath: false, + callback: false + }, + worker: { + emitDisconnect: false, + emitDeath: false, + state: false, + suicideMode: false, + died: false + } + }; + + // helper function to check if a process is alive + var alive = function(pid) { + try { + process.kill(pid, 0); + return true; + } catch (e) { + return false; + } + }; + + // start worker + var worker = cluster.fork(); + + // Disconnect worker when it is ready + worker.once('listening', function() { + worker.disconnect(); + }); + + // Check cluster events + cluster.once('disconnect', function() { + checks.cluster.emitDisconnect = true; + }); + cluster.once('death', function() { + checks.cluster.emitDeath = true; + }); + + // Check worker eventes and properties + worker.once('disconnect', function() { + checks.worker.emitDisconnect = true; + checks.worker.suicideMode = worker.suicide; + checks.worker.state = worker.state; + }); + + // Check that the worker died + worker.once('death', function() { + checks.worker.emitDeath = true; + checks.worker.died = !alive(worker.process.pid); + process.nextTick(function() { + process.exit(0); + }); + }); + + process.once('exit', function() { + + var w = checks.worker; + var c = checks.cluster; + + // events + assert.ok(w.emitDisconnect, 'Disconnect event did not emit'); + assert.ok(c.emitDisconnect, 'Disconnect event did not emit'); + assert.ok(w.emitDeath, 'Death event did not emit'); + assert.ok(c.emitDeath, 'Death event did not emit'); + + // flags + assert.equal(w.state, 'disconnected', 'The state property was not set'); + assert.equal(w.suicideMode, true, 'Suicide mode was not set'); + + // is process alive + assert.ok(w.died, 'The worker did not die'); + }); +}