Skip to content

Commit

Permalink
Merge pull request #1797 from Agoric/1775-worker-fixes
Browse files Browse the repository at this point in the history
worker fixes
  • Loading branch information
warner authored Sep 21, 2020
2 parents e3c969b + 49605c6 commit 44ab432
Show file tree
Hide file tree
Showing 10 changed files with 121 additions and 54 deletions.
22 changes: 16 additions & 6 deletions packages/SwingSet/src/controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import fs from 'fs';
import path from 'path';
import process from 'process';
import re2 from 're2';
import { Worker } from 'worker_threads';
import * as babelCore from '@babel/core';
Expand Down Expand Up @@ -353,6 +354,7 @@ export async function buildVatController(
}`,
);

// this launches a worker in a Node.js thread (aka "Worker")
function makeNodeWorker() {
// TODO: after we move away from `-r esm` and use real ES6 modules, point
// this at nodeWorkerSupervisor.js instead of the CJS intermediate
Expand All @@ -362,17 +364,25 @@ export async function buildVatController(
return new Worker(supercode);
}

// launch a worker in a subprocess (which runs Node.js)
function startSubprocessWorkerNode() {
const supercode = require.resolve(
'./kernel/vatManager/subprocessSupervisor.js',
);
return startSubprocessWorker(process.execPath, ['-r', 'esm', supercode]);
}

let startSubprocessWorkerXS;
const xsWorkerBin = locateWorkerBin({ resolve: path.resolve });
if (fs.existsSync(xsWorkerBin)) {
startSubprocessWorkerXS = () => startSubprocessWorker(xsWorkerBin);
}

function writeSlogObject(_obj) {
// TODO sqlite
// console.log(`--slog ${JSON.stringify(obj)}`);
}

const startSubprocessWorkerNode = () => startSubprocessWorker();
const xsWorkerBin = locateWorkerBin({ resolve: path.resolve });
const startSubprocessWorkerXS = fs.existsSync(xsWorkerBin)
? () => startSubprocessWorker({ execPath: xsWorkerBin, args: [] })
: undefined;

const kernelEndowments = {
waitUntilQuiescent,
hostStorage,
Expand Down
3 changes: 3 additions & 0 deletions packages/SwingSet/src/kernel/vatManager/factory.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,19 @@ export function makeVatManagerFactory({
const nodeWorkerFactory = makeNodeWorkerVatManagerFactory({
makeNodeWorker,
kernelKeeper,
testLog: allVatPowers.testLog,
});

const nodeSubprocessFactory = makeNodeSubprocessFactory({
startSubprocessWorker: startSubprocessWorkerNode,
kernelKeeper,
testLog: allVatPowers.testLog,
});

const xsWorkerFactory = makeNodeSubprocessFactory({
startSubprocessWorker: startSubprocessWorkerXS,
kernelKeeper,
testLog: allVatPowers.testLog,
});

function validateManagerOptions(managerOptions) {
Expand Down
13 changes: 11 additions & 2 deletions packages/SwingSet/src/kernel/vatManager/nodeWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ function parentLog(first, ...args) {
}

export function makeNodeWorkerVatManagerFactory(tools) {
const { makeNodeWorker, kernelKeeper } = tools;
const { makeNodeWorker, kernelKeeper, testLog } = tools;

function createFromBundle(vatID, bundle, managerOptions) {
const { vatParameters } = managerOptions;
Expand Down Expand Up @@ -55,10 +55,16 @@ export function makeNodeWorkerVatManagerFactory(tools) {
transcriptManager,
);
function handleSyscall(vatSyscallObject) {
// we are invoked by an async postMessage from the worker thread, whose
// vat code has moved on (it really wants a synchronous/immediate
// syscall)
const type = vatSyscallObject[0];
if (type === 'callNow') {
throw Error(`nodeWorker cannot block, cannot use syscall.callNow`);
}
// This might throw an Error if the syscall was faulty, in which case
// the vat will be terminated soon. It returns a vatSyscallResults,
// which we discard because there is nobody to send it to.
doSyscall(vatSyscallObject);
}

Expand Down Expand Up @@ -91,12 +97,15 @@ export function makeNodeWorkerVatManagerFactory(tools) {
parentLog(`syscall`, args);
const vatSyscallObject = args;
handleSyscall(vatSyscallObject);
} else if (type === 'testLog') {
testLog(...args);
} else if (type === 'deliverDone') {
parentLog(`deliverDone`);
if (waiting) {
const resolve = waiting;
waiting = null;
resolve();
const deliveryResult = args;
resolve(deliveryResult);
}
} else {
parentLog(`unrecognized uplink message ${type}`);
Expand Down
33 changes: 22 additions & 11 deletions packages/SwingSet/src/kernel/vatManager/nodeWorkerSupervisor.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ async function doProcess(dispatchRecord, errmsg) {
workerLog(`runAndWait`);
await runAndWait(() => dispatch[dispatchOp](...dispatchArgs), errmsg);
workerLog(`doProcess done`);
const vatDeliveryResults = harden(['ok']);
return vatDeliveryResults;
}

function doMessage(targetSlot, msg) {
const errmsg = `vat[${targetSlot}].${msg.method} dispatch failed`;
return doProcess(
['deliver', targetSlot, msg.method, msg.args, msg.result],
errmsg,
);
}

function doNotify(vpid, vp) {
Expand All @@ -65,7 +75,6 @@ function doNotify(vpid, vp) {
}
}

let syscallLog;
parentPort.on('message', ([type, ...margs]) => {
workerLog(`received`, type);
if (type === 'start') {
Expand Down Expand Up @@ -96,13 +105,22 @@ parentPort.on('message', ([type, ...margs]) => {
reject: (...args) => doSyscall(['reject', ...args]),
});

function testLog(...args) {
sendUplink(['testLog', ...args]);
}

const state = null;
const vatID = 'demo-vatID';
// todo: maybe add transformTildot, makeGetMeter/transformMetering to
// vatPowers, but only if options tell us they're wanted. Maybe
// transformTildot should be async and outsourced to the kernel
// process/thread.
const vatPowers = { Remotable, getInterfaceOf, makeMarshal };
const vatPowers = {
Remotable,
getInterfaceOf,
makeMarshal,
testLog,
};
dispatch = makeLiveSlots(
syscall,
state,
Expand All @@ -121,16 +139,9 @@ parentPort.on('message', ([type, ...margs]) => {
}
const [dtype, ...dargs] = margs;
if (dtype === 'message') {
const [targetSlot, msg] = dargs;
const errmsg = `vat[${targetSlot}].${msg.method} dispatch failed`;
doProcess(
['deliver', targetSlot, msg.method, msg.args, msg.result],
errmsg,
).then(() => {
sendUplink(['deliverDone']);
});
doMessage(...dargs).then(res => sendUplink(['deliverDone', ...res]));
} else if (dtype === 'notify') {
doNotify(...dargs).then(() => sendUplink(['deliverDone', syscallLog]));
doNotify(...dargs).then(res => sendUplink(['deliverDone', ...res]));
} else {
throw Error(`bad delivery type ${dtype}`);
}
Expand Down
33 changes: 22 additions & 11 deletions packages/SwingSet/src/kernel/vatManager/subprocessSupervisor.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ async function doProcess(dispatchRecord, errmsg) {
workerLog(`runAndWait`);
await runAndWait(() => dispatch[dispatchOp](...dispatchArgs), errmsg);
workerLog(`doProcess done`);
const vatDeliveryResults = harden(['ok']);
return vatDeliveryResults;
}

function doMessage(targetSlot, msg) {
const errmsg = `vat[${targetSlot}].${msg.method} dispatch failed`;
return doProcess(
['deliver', targetSlot, msg.method, msg.args, msg.result],
errmsg,
);
}

function doNotify(vpid, vp) {
Expand Down Expand Up @@ -79,7 +89,6 @@ function sendUplink(msg) {
// toParent.write('child ack');
// });

let syscallLog;
fromParent.on('data', data => {
const [type, ...margs] = JSON.parse(data);
workerLog(`received`, type);
Expand Down Expand Up @@ -111,13 +120,22 @@ fromParent.on('data', data => {
reject: (...args) => doSyscall(['reject', ...args]),
});

function testLog(...args) {
sendUplink(['testLog', ...args]);
}

const state = null;
const vatID = 'demo-vatID';
// todo: maybe add transformTildot, makeGetMeter/transformMetering to
// vatPowers, but only if options tell us they're wanted. Maybe
// transformTildot should be async and outsourced to the kernel
// process/thread.
const vatPowers = { Remotable, getInterfaceOf, makeMarshal };
const vatPowers = {
Remotable,
getInterfaceOf,
makeMarshal,
testLog,
};
dispatch = makeLiveSlots(
syscall,
state,
Expand All @@ -136,16 +154,9 @@ fromParent.on('data', data => {
}
const [dtype, ...dargs] = margs;
if (dtype === 'message') {
const [targetSlot, msg] = dargs;
const errmsg = `vat[${targetSlot}].${msg.method} dispatch failed`;
doProcess(
['deliver', targetSlot, msg.method, msg.args, msg.result],
errmsg,
).then(() => {
sendUplink(['deliverDone']);
});
doMessage(...dargs).then(res => sendUplink(['deliverDone', ...res]));
} else if (dtype === 'notify') {
doNotify(...dargs).then(() => sendUplink(['deliverDone', syscallLog]));
doNotify(...dargs).then(res => sendUplink(['deliverDone', ...res]));
} else {
throw Error(`bad delivery type ${dtype}`);
}
Expand Down
15 changes: 13 additions & 2 deletions packages/SwingSet/src/kernel/vatManager/worker-subprocess-node.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ function parentLog(first, ...args) {
}

export function makeNodeSubprocessFactory(tools) {
const { startSubprocessWorker, kernelKeeper } = tools;
const { startSubprocessWorker, kernelKeeper, testLog } = tools;

function createFromBundle(vatID, bundle, managerOptions) {
const { vatParameters } = managerOptions;
Expand Down Expand Up @@ -56,10 +56,18 @@ export function makeNodeSubprocessFactory(tools) {
transcriptManager,
);
function handleSyscall(vatSyscallObject) {
// We are currently invoked by an async piped from the worker thread,
// whose vat code has moved on (it really wants a synchronous/immediate
// syscall). TODO: unlike threads, subprocesses could be made to wait
// by doing a blocking read from the pipe, so we could fix this, and
// re-enable syscall.callNow
const type = vatSyscallObject[0];
if (type === 'callNow') {
throw Error(`nodeWorker cannot block, cannot use syscall.callNow`);
}
// This might throw an Error if the syscall was faulty, in which case
// the vat will be terminated soon. It returns a vatSyscallResults,
// which we discard because there is currently nobody to send it to.
doSyscall(vatSyscallObject);
}

Expand Down Expand Up @@ -91,12 +99,15 @@ export function makeNodeSubprocessFactory(tools) {
parentLog(`syscall`, args);
const vatSyscallObject = args;
handleSyscall(vatSyscallObject);
} else if (type === 'testLog') {
testLog(...args);
} else if (type === 'deliverDone') {
parentLog(`deliverDone`);
if (waiting) {
const resolve = waiting;
waiting = null;
resolve();
const deliveryResult = args;
resolve(deliveryResult);
}
} else {
parentLog(`unrecognized uplink message ${type}`);
Expand Down
10 changes: 2 additions & 8 deletions packages/SwingSet/src/spawnSubprocessWorker.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// this file is loaded by the controller, in the start compartment
import process from 'process';
import { spawn } from 'child_process';
import Netstring from 'netstring-stream';

Expand All @@ -10,19 +9,14 @@ function parentLog(first, ...args) {
// console.error(`--parent: ${first}`, ...args);
}

const supercode = require.resolve(
'./kernel/vatManager/subprocessSupervisor.js',
);
// we send on fd3, and receive on fd4. We pass fd1/2 (stdout/err) through, so
// console log/err from the child shows up normally. We don't use Node's
// built-in serialization feature ('ipc') because the child process won't
// always be Node.
const stdio = harden(['inherit', 'inherit', 'inherit', 'pipe', 'pipe']);

export function startSubprocessWorker(options) {
const execPath = options.execPath || process.execPath;
const args = options.args || ['-r', 'esm', supercode];
const proc = spawn(execPath, args, { stdio });
export function startSubprocessWorker(execPath, procArgs = []) {
const proc = spawn(execPath, procArgs, { stdio });

const toChild = Netstring.writeStream();
toChild.pipe(proc.stdio[3]);
Expand Down
3 changes: 3 additions & 0 deletions packages/SwingSet/test/workers/test-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ maybeTestXS('xs vat manager', async t => {

await c.run();
t.is(c.bootstrapResult.status(), 'fulfilled');
t.deepEqual(c.dump().log, ['testLog works']);

await c.shutdown();
});
Expand All @@ -28,6 +29,7 @@ test.skip('nodeWorker vat manager', async t => {

await c.run();
t.is(c.bootstrapResult.status(), 'fulfilled');
t.deepEqual(c.dump().log, ['testLog works']);

await c.shutdown();
});
Expand All @@ -40,6 +42,7 @@ test('node-subprocess vat manager', async t => {
await c.run();
t.is(c.bootstrapResult.status(), 'fulfilled');
t.deepEqual(c.dump().log, ['testLog works']);
await c.shutdown();
});
Expand Down
5 changes: 4 additions & 1 deletion packages/SwingSet/test/workers/vat-target.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ function ignore(p) {
// inbound events ('dispatch'), which will provoke a set of outbound events
// ('syscall'), that cover the full range of the dispatch/syscall interface

export function buildRootObject() {
export function buildRootObject(vatPowers) {
console.log(`vat does buildRootObject`); // make sure console works
// note: XS doesn't appear to print console.log unless an exception happens
vatPowers.testLog('testLog works');

const precB = makePromiseKit();
const precC = makePromiseKit();
let callbackObj;
Expand Down
Loading

0 comments on commit 44ab432

Please sign in to comment.