Skip to content

Commit

Permalink
child_process: add callback parameter to .send()
Browse files Browse the repository at this point in the history
Add an optional callback parameter to `ChildProcess.prototype.send()`
that is invoked when the message has been sent.

Juggle the control channel's reference count so that in-flight messages
keep the event loop (and therefore the process) alive until they have
been sent.

`ChildProcess.prototype.send()` and `process.send()` used to operate
synchronously but became asynchronous in commit libuv/libuv@393c1c5
("unix: set non-block mode in uv_{pipe,tcp,udp}_open"), which landed
in io.js in commit 07bd05b ("deps: update libuv to 1.2.1").

Fixes: #760
PR-URL: #2620
Reviewed-By: trevnorris - Trevor Norris <trev.norris@gmail.com>
Reviewed-By: jasnell - James M Snell <jasnell@gmail.com>
  • Loading branch information
bnoordhuis authored and rvagg committed Sep 6, 2015
1 parent 599d4f5 commit 607aa3a
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 36 deletions.
23 changes: 14 additions & 9 deletions doc/api/child_process.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,15 @@ to a process.

See `kill(2)`

### child.send(message[, sendHandle])
### child.send(message[, sendHandle][, callback])

* `message` {Object}
* `sendHandle` {Handle object}
* `callback` {Function}
* Return: Boolean

When using `child_process.fork()` you can write to the child using
`child.send(message, [sendHandle])` and messages are received by
`child.send(message[, sendHandle][, callback])` and messages are received by
a `'message'` event on the child.

For example:
Expand All @@ -246,11 +248,6 @@ And then the child script, `'sub.js'` might look like this:
In the child the `process` object will have a `send()` method, and `process`
will emit objects each time it receives a message on its channel.

Please note that the `send()` method on both the parent and child are
synchronous - sending large chunks of data is not advised (pipes can be used
instead, see
[`child_process.spawn`](#child_process_child_process_spawn_command_args_options)).

There is a special case when sending a `{cmd: 'NODE_foo'}` message. All messages
containing a `NODE_` prefix in its `cmd` property will not be emitted in
the `message` event, since they are internal messages used by Node.js core.
Expand All @@ -261,8 +258,16 @@ The `sendHandle` option to `child.send()` is for sending a TCP server or
socket object to another process. The child will receive the object as its
second argument to the `message` event.

Emits an `'error'` event if the message cannot be sent, for example because
the child process has already exited.
The `callback` option is a function that is invoked after the message is
sent but before the target may have received it. It is called with a single
argument: `null` on success, or an `Error` object on failure.

`child.send()` emits an `'error'` event if no callback was given and the message
cannot be sent, for example because the child process has already exited.

Returns `true` under normal circumstances or `false` when the backlog of
unsent messages exceeds a threshold that makes it unwise to send more.
Use the callback mechanism to implement flow control.

#### Example: sending server object

Expand Down
4 changes: 3 additions & 1 deletion doc/api/cluster.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -426,10 +426,12 @@ exit, the master may choose not to respawn a worker based on this value.
// kill worker
worker.kill();

### worker.send(message[, sendHandle])
### worker.send(message[, sendHandle][, callback])

* `message` {Object}
* `sendHandle` {Handle object}
* `callback` {Function}
* Return: Boolean

Send a message to a worker or master, optionally with a handle.

Expand Down
10 changes: 3 additions & 7 deletions lib/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,12 @@ exports._forkChild = function(fd) {
var p = new Pipe(true);
p.open(fd);
p.unref();
setupChannel(process, p);

var refs = 0;
const control = setupChannel(process, p);
process.on('newListener', function(name) {
if (name !== 'message' && name !== 'disconnect') return;
if (++refs === 1) p.ref();
if (name === 'message' || name === 'disconnect') control.ref();
});
process.on('removeListener', function(name) {
if (name !== 'message' && name !== 'disconnect') return;
if (--refs === 0) p.unref();
if (name === 'message' || name === 'disconnect') control.unref();
});
};

Expand Down
91 changes: 72 additions & 19 deletions lib/internal/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,25 @@ function setupChannel(target, channel) {
target._channel = channel;
target._handleQueue = null;

const control = new class extends EventEmitter {
constructor() {
super();
this.channel = channel;
this.refs = 0;
}
ref() {
if (++this.refs === 1) {
this.channel.ref();
}
}
unref() {
if (--this.refs === 0) {
this.channel.unref();
this.emit('unref');
}
}
};

var decoder = new StringDecoder('utf8');
var jsonBuffer = '';
channel.buffering = false;
Expand Down Expand Up @@ -446,7 +465,7 @@ function setupChannel(target, channel) {
target._handleQueue = null;

queue.forEach(function(args) {
target._send(args.message, args.handle, false);
target._send(args.message, args.handle, false, args.callback);
});

// Process a pending disconnect (if any).
Expand Down Expand Up @@ -478,14 +497,24 @@ function setupChannel(target, channel) {
});
});

target.send = function(message, handle) {
if (!this.connected)
this.emit('error', new Error('channel closed'));
else
this._send(message, handle, false);
target.send = function(message, handle, callback) {
if (typeof handle === 'function') {
callback = handle;
handle = undefined;
}
if (this.connected) {
this._send(message, handle, false, callback);
return;
}
const ex = new Error('channel closed');
if (typeof callback === 'function') {
process.nextTick(callback, ex);
} else {
this.emit('error', ex); // FIXME(bnoordhuis) Defer to next tick.
}
};

target._send = function(message, handle, swallowErrors) {
target._send = function(message, handle, swallowErrors, callback) {
assert(this.connected || this._channel);

if (message === undefined)
Expand Down Expand Up @@ -516,7 +545,11 @@ function setupChannel(target, channel) {

// Queue-up message and handle if we haven't received ACK yet.
if (this._handleQueue) {
this._handleQueue.push({ message: message.msg, handle: handle });
this._handleQueue.push({
callback: callback,
handle: handle,
message: message.msg,
});
return;
}

Expand All @@ -538,24 +571,43 @@ function setupChannel(target, channel) {
} else if (this._handleQueue &&
!(message && message.cmd === 'NODE_HANDLE_ACK')) {
// Queue request anyway to avoid out-of-order messages.
this._handleQueue.push({ message: message, handle: null });
this._handleQueue.push({
callback: callback,
handle: null,
message: message,
});
return;
}

var req = new WriteWrap();
req.oncomplete = nop;
req.async = false;

var string = JSON.stringify(message) + '\n';
var err = channel.writeUtf8String(req, string, handle);

if (err) {
if (!swallowErrors)
this.emit('error', errnoException(err, 'write'));
} else if (handle && !this._handleQueue) {
this._handleQueue = [];
}

if (obj && obj.postSend) {
req.oncomplete = obj.postSend.bind(null, handle);
if (err === 0) {
if (handle && !this._handleQueue)
this._handleQueue = [];
req.oncomplete = function() {
if (this.async === true)
control.unref();
if (obj && obj.postSend)
obj.postSend(handle);
if (typeof callback === 'function')
callback(null);
};
if (req.async === true) {
control.ref();
} else {
process.nextTick(function() { req.oncomplete(); });
}
} else if (!swallowErrors) {
const ex = errnoException(err, 'write');
if (typeof callback === 'function') {
process.nextTick(callback, ex);
} else {
this.emit('error', ex); // FIXME(bnoordhuis) Defer to next tick.
}
}

/* If the master is > 2 read() calls behind, please stop sending. */
Expand Down Expand Up @@ -616,6 +668,7 @@ function setupChannel(target, channel) {
};

channel.readStart();
return control;
}


Expand Down
19 changes: 19 additions & 0 deletions test/parallel/test-child-process-send-cb.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const fork = require('child_process').fork;

if (process.argv[2] === 'child') {
process.send('ok', common.mustCall(function(err) {
assert.strictEqual(err, null);
}));
} else {
const child = fork(process.argv[1], ['child']);
child.on('message', common.mustCall(function(message) {
assert.strictEqual(message, 'ok');
}));
child.on('exit', common.mustCall(function(exitCode, signalCode) {
assert.strictEqual(exitCode, 0);
assert.strictEqual(signalCode, null);
}));
}

0 comments on commit 607aa3a

Please sign in to comment.