Skip to content

Commit

Permalink
child_process: reduce nextTick() usage
Browse files Browse the repository at this point in the history
PR-URL: nodejs#13459
Reviewed-By: Refael Ackermann <refack@gmail.com>
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
  • Loading branch information
mscdex committed Jun 9, 2017
1 parent dd83d11 commit 8208fda
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 21 deletions.
70 changes: 70 additions & 0 deletions benchmark/cluster/echo.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
'use strict';

const cluster = require('cluster');
if (cluster.isMaster) {
const common = require('../common.js');
const bench = common.createBenchmark(main, {
workers: [1],
payload: ['string', 'object'],
sendsPerBroadcast: [1, 10],
n: [1e5]
});

function main(conf) {
var n = +conf.n;
var workers = +conf.workers;
var sends = +conf.sendsPerBroadcast;
var expectedPerBroadcast = sends * workers;
var payload;
var readies = 0;
var broadcasts = 0;
var msgCount = 0;

switch (conf.payload) {
case 'string':
payload = 'hello world!';
break;
case 'object':
payload = { action: 'pewpewpew', powerLevel: 9001 };
break;
default:
throw new Error('Unsupported payload type');
}

for (var i = 0; i < workers; ++i)
cluster.fork().on('online', onOnline).on('message', onMessage);

function onOnline(msg) {
if (++readies === workers) {
bench.start();
broadcast();
}
}

function broadcast() {
var id;
if (broadcasts++ === n) {
bench.end(n);
for (id in cluster.workers)
cluster.workers[id].disconnect();
return;
}
for (id in cluster.workers) {
const worker = cluster.workers[id];
for (var i = 0; i < sends; ++i)
worker.send(payload);
}
}

function onMessage(msg) {
if (++msgCount === expectedPerBroadcast) {
msgCount = 0;
broadcast();
}
}
}
} else {
process.on('message', function(msg) {
process.send(msg);
});
}
53 changes: 32 additions & 21 deletions lib/internal/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -456,16 +456,22 @@ function setupChannel(target, channel) {
}
chunks[0] = jsonBuffer + chunks[0];

var nextTick = false;
for (var i = 0; i < numCompleteChunks; i++) {
var message = JSON.parse(chunks[i]);

// There will be at most one NODE_HANDLE message in every chunk we
// read because SCM_RIGHTS messages don't get coalesced. Make sure
// that we deliver the handle with the right message however.
if (message && message.cmd === 'NODE_HANDLE')
handleMessage(target, message, recvHandle);
else
handleMessage(target, message, undefined);
if (isInternal(message)) {
if (message.cmd === 'NODE_HANDLE')
handleMessage(message, recvHandle, true, false);
else
handleMessage(message, undefined, true, false);
} else {
handleMessage(message, undefined, false, nextTick);
nextTick = true;
}
}
jsonBuffer = incompleteChunk;
this.buffering = jsonBuffer.length !== 0;
Expand Down Expand Up @@ -526,7 +532,7 @@ function setupChannel(target, channel) {

// Convert handle object
obj.got.call(this, message, handle, function(handle) {
handleMessage(target, message.msg, handle);
handleMessage(message.msg, handle, isInternal(message.msg), false);
});
});

Expand Down Expand Up @@ -732,27 +738,32 @@ function setupChannel(target, channel) {
process.nextTick(finish);
};

function emit(event, message, handle) {
target.emit(event, message, handle);
}

function handleMessage(message, handle, internal, nextTick) {
if (!target.channel)
return;

var eventName = (internal ? 'internalMessage' : 'message');
if (nextTick)
process.nextTick(emit, eventName, message, handle);
else
target.emit(eventName, message, handle);
}

channel.readStart();
return control;
}


const INTERNAL_PREFIX = 'NODE_';
function handleMessage(target, message, handle) {
if (!target.channel)
return;

var eventName = 'message';
if (message !== null &&
typeof message === 'object' &&
typeof message.cmd === 'string' &&
message.cmd.length > INTERNAL_PREFIX.length &&
message.cmd.slice(0, INTERNAL_PREFIX.length) === INTERNAL_PREFIX) {
eventName = 'internalMessage';
}
process.nextTick(() => {
target.emit(eventName, message, handle);
});
function isInternal(message) {
return (message !== null &&
typeof message === 'object' &&
typeof message.cmd === 'string' &&
message.cmd.length > INTERNAL_PREFIX.length &&
message.cmd.slice(0, INTERNAL_PREFIX.length) === INTERNAL_PREFIX);
}

function nop() { }
Expand Down

0 comments on commit 8208fda

Please sign in to comment.