Skip to content
This repository has been archived by the owner on Apr 22, 2023. It is now read-only.

Commit

Permalink
cluster: centralize removal from workers list.
Browse files Browse the repository at this point in the history
Currently, cluster workers can be removed from the workers list in three
different places:
- In the exit event handler for the worker process.
- In the disconnect event handler of the worker process.
- In the disconnect event handler of the cluster master.

However, handles for a given worker are cleaned up only in one of these
places: in the cluster master's disconnect event handler.

Because these events happen asynchronously, it is possible that the
workers list is empty before we even clean up one handle. This makes
the assert that makes sure that no handle is left when the workers
list is empty fail.

This commit removes the worker from the cluster.workers list only when
the worker is dead _and_ disconnected, at which point we're sure that
its associated handles are cleaned up.

Fixes #8191 and #8192.

Reviewed-By: Fedor Indutny <fedor@indutny.com>
  • Loading branch information
Julien Gilli authored and indutny committed Sep 2, 2014
1 parent fcfe820 commit 90d1147
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 19 deletions.
17 changes: 15 additions & 2 deletions doc/api/cluster.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,10 @@ A hash that stores the active worker objects, keyed by `id` field. Makes it
easy to loop through all the workers. It is only available in the master
process.

A worker is removed from cluster.workers just before the `'disconnect'` or
`'exit'` event is emitted.
A worker is removed from cluster.workers after the worker has disconnected _and_
exited. The order between these two events cannot be determined in advance.
However, it is guaranteed that the removal from the cluster.workers list happens
before last `'disconnect'` or `'exit'` event is emitted.

// Go through all workers
function eachWorker(callback) {
Expand Down Expand Up @@ -511,6 +513,17 @@ the `disconnect` event has not been emitted after some time.
});
}

### worker.isDead()

This function returns `true` if the worker's process has terminated (either
because of exiting or being signaled). Otherwise, it returns `false`.

### worker.isConnected()

This function returns `true` if the worker is connected to its master via its IPC
channel, `false` otherwise. A worker is connected to its master after it's been
created. It is disconnected after the `disconnect` event is emitted.

### Event: 'message'

* `message` {Object}
Expand Down
70 changes: 54 additions & 16 deletions lib/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ Worker.prototype.send = function() {
this.process.send.apply(this.process, arguments);
};

Worker.prototype.isDead = function isDead() {
return this.process.exitCode != null || this.process.signalCode != null;
};

Worker.prototype.isConnected = function isConnected() {
return this.process.connected;
};

// Master/worker specific methods are defined in the *Init() functions.

function SharedHandle(key, address, port, addressType, backlog, fd) {
Expand Down Expand Up @@ -310,20 +318,62 @@ function masterInit() {
id: id,
process: workerProcess
});

function removeWorker(worker) {
assert(worker);

delete cluster.workers[worker.id];

if (Object.keys(cluster.workers).length === 0) {
assert(Object.keys(handles).length === 0, 'Resource leak detected.');
intercom.emit('disconnect');
}
}

function removeHandlesForWorker(worker) {
assert(worker);

for (var key in handles) {
var handle = handles[key];
if (handle.remove(worker)) delete handles[key];
}
}

worker.process.once('exit', function(exitCode, signalCode) {
/*
* Remove the worker from the workers list only
* if it has disconnected, otherwise we might
* still want to access it.
*/
if (!worker.isConnected()) removeWorker(worker);

worker.suicide = !!worker.suicide;
worker.state = 'dead';
worker.emit('exit', exitCode, signalCode);
cluster.emit('exit', worker, exitCode, signalCode);
delete cluster.workers[worker.id];
});

worker.process.once('disconnect', function() {
/*
* Now is a good time to remove the handles
* associated with this worker because it is
* not connected to the master anymore.
*/
removeHandlesForWorker(worker);

/*
* Remove the worker from the workers list only
* if its process has exited. Otherwise, we might
* still want to access it.
*/
if (worker.isDead()) removeWorker(worker);

worker.suicide = !!worker.suicide;
worker.state = 'disconnected';
worker.emit('disconnect');
cluster.emit('disconnect', worker);
delete cluster.workers[worker.id];
});

worker.process.on('internalMessage', internal(worker, onmessage));
process.nextTick(function() {
cluster.emit('fork', worker);
Expand All @@ -345,18 +395,6 @@ function masterInit() {
if (cb) intercom.once('disconnect', cb);
};

cluster.on('disconnect', function(worker) {
delete cluster.workers[worker.id];
for (var key in handles) {
var handle = handles[key];
if (handle.remove(worker)) delete handles[key];
}
if (Object.keys(cluster.workers).length === 0) {
assert(Object.keys(handles).length === 0, 'Resource leak detected.');
intercom.emit('disconnect');
}
});

Worker.prototype.disconnect = function() {
this.suicide = true;
send(this, { act: 'disconnect' });
Expand All @@ -365,7 +403,7 @@ function masterInit() {
Worker.prototype.destroy = function(signo) {
signo = signo || 'SIGTERM';
var proc = this.process;
if (proc.connected) {
if (this.isConnected()) {
this.once('disconnect', proc.kill.bind(proc, signo));
this.disconnect();
return;
Expand Down Expand Up @@ -595,7 +633,7 @@ function workerInit() {

Worker.prototype.destroy = function() {
this.suicide = true;
if (!process.connected) process.exit(0);
if (!this.isConnected()) process.exit(0);
var exit = process.exit.bind(null, 0);
send({ act: 'suicide' }, exit);
process.once('disconnect', exit);
Expand Down
2 changes: 1 addition & 1 deletion src/node.js
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@
};

startup.processKillAndExit = function() {
process.exitCode = 0;

process.exit = function(code) {
if (code || code === 0)
process.exitCode = code;
Expand Down
37 changes: 37 additions & 0 deletions test/simple/test-cluster-worker-isconnected.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
var cluster = require('cluster');
var assert = require('assert');
var util = require('util');

if (cluster.isMaster) {
var worker = cluster.fork();

assert.ok(worker.isConnected(),
"isConnected() should return true as soon as the worker has " +
"been created.");

worker.on('disconnect', function() {
assert.ok(!worker.isConnected(),
"After a disconnect event has been emitted, " +
"isConncted should return false");
});

worker.on('message', function(msg) {
if (msg === 'readyToDisconnect') {
worker.disconnect();
}
})

} else {
assert.ok(cluster.worker.isConnected(),
"isConnected() should return true from within a worker at all " +
"times.");

cluster.worker.process.on('disconnect', function() {
assert.ok(!cluster.worker.isConnected(),
"isConnected() should return false from within a worker " +
"after its underlying process has been disconnected from " +
"the master");
})

process.send('readyToDisconnect');
}
27 changes: 27 additions & 0 deletions test/simple/test-cluster-worker-isdead.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
var cluster = require('cluster');
var assert = require('assert');
var net = require('net');

if (cluster.isMaster) {
var worker = cluster.fork();
assert.ok(!worker.isDead(),
"isDead() should return false right after the worker has been " +
"created.");

worker.on('exit', function() {
assert.ok(!worker.isConnected(),
"After an event has been emitted, " +
"isDead should return true");
})

worker.on('message', function(msg) {
if (msg === 'readyToDie') {
worker.kill();
}
});

} else if (cluster.isWorker) {
assert.ok(!cluster.worker.isDead(),
"isDead() should return false when called from within a worker");
process.send('readyToDie');
}

0 comments on commit 90d1147

Please sign in to comment.