Skip to content

Commit

Permalink
fix: handle syscallResult and deliveryResult consistently among workers
Browse files Browse the repository at this point in the history
fixes #1775
  • Loading branch information
warner committed Sep 21, 2020
1 parent afeced8 commit 9e6e31a
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 33 deletions.
9 changes: 8 additions & 1 deletion packages/SwingSet/src/kernel/vatManager/nodeWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,16 @@ export function makeNodeWorkerVatManagerFactory(tools) {
transcriptManager,
);
function handleSyscall(vatSyscallObject) {
// we are invoked by an async postMessage from the worker thread, whose
// vat code has moved on (it really wants a synchronous/immediate
// syscall)
const type = vatSyscallObject[0];
if (type === 'callNow') {
throw Error(`nodeWorker cannot block, cannot use syscall.callNow`);
}
// This might throw an Error if the syscall was faulty, in which case
// the vat will be terminated soon. It returns a vatSyscallResults,
// which we discard because there is nobody to send it to.
doSyscall(vatSyscallObject);
}

Expand Down Expand Up @@ -96,7 +102,8 @@ export function makeNodeWorkerVatManagerFactory(tools) {
if (waiting) {
const resolve = waiting;
waiting = null;
resolve();
const deliveryResult = args;
resolve(deliveryResult);
}
} else {
parentLog(`unrecognized uplink message ${type}`);
Expand Down
22 changes: 12 additions & 10 deletions packages/SwingSet/src/kernel/vatManager/nodeWorkerSupervisor.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ async function doProcess(dispatchRecord, errmsg) {
workerLog(`runAndWait`);
await runAndWait(() => dispatch[dispatchOp](...dispatchArgs), errmsg);
workerLog(`doProcess done`);
const vatDeliveryResults = harden(['ok']);
return vatDeliveryResults;
}

function doMessage(targetSlot, msg) {
const errmsg = `vat[${targetSlot}].${msg.method} dispatch failed`;
return doProcess(
['deliver', targetSlot, msg.method, msg.args, msg.result],
errmsg,
);
}

function doNotify(vpid, vp) {
Expand All @@ -65,7 +75,6 @@ function doNotify(vpid, vp) {
}
}

let syscallLog;
parentPort.on('message', ([type, ...margs]) => {
workerLog(`received`, type);
if (type === 'start') {
Expand Down Expand Up @@ -121,16 +130,9 @@ parentPort.on('message', ([type, ...margs]) => {
}
const [dtype, ...dargs] = margs;
if (dtype === 'message') {
const [targetSlot, msg] = dargs;
const errmsg = `vat[${targetSlot}].${msg.method} dispatch failed`;
doProcess(
['deliver', targetSlot, msg.method, msg.args, msg.result],
errmsg,
).then(() => {
sendUplink(['deliverDone']);
});
doMessage(...dargs).then(res => sendUplink(['deliverDone', ...res]));
} else if (dtype === 'notify') {
doNotify(...dargs).then(() => sendUplink(['deliverDone', syscallLog]));
doNotify(...dargs).then(res => sendUplink(['deliverDone', ...res]));
} else {
throw Error(`bad delivery type ${dtype}`);
}
Expand Down
22 changes: 12 additions & 10 deletions packages/SwingSet/src/kernel/vatManager/subprocessSupervisor.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ async function doProcess(dispatchRecord, errmsg) {
workerLog(`runAndWait`);
await runAndWait(() => dispatch[dispatchOp](...dispatchArgs), errmsg);
workerLog(`doProcess done`);
const vatDeliveryResults = harden(['ok']);
return vatDeliveryResults;
}

function doMessage(targetSlot, msg) {
const errmsg = `vat[${targetSlot}].${msg.method} dispatch failed`;
return doProcess(
['deliver', targetSlot, msg.method, msg.args, msg.result],
errmsg,
);
}

function doNotify(vpid, vp) {
Expand Down Expand Up @@ -79,7 +89,6 @@ function sendUplink(msg) {
// toParent.write('child ack');
// });

let syscallLog;
fromParent.on('data', data => {
const [type, ...margs] = JSON.parse(data);
workerLog(`received`, type);
Expand Down Expand Up @@ -136,16 +145,9 @@ fromParent.on('data', data => {
}
const [dtype, ...dargs] = margs;
if (dtype === 'message') {
const [targetSlot, msg] = dargs;
const errmsg = `vat[${targetSlot}].${msg.method} dispatch failed`;
doProcess(
['deliver', targetSlot, msg.method, msg.args, msg.result],
errmsg,
).then(() => {
sendUplink(['deliverDone']);
});
doMessage(...dargs).then(res => sendUplink(['deliverDone', ...res]));
} else if (dtype === 'notify') {
doNotify(...dargs).then(() => sendUplink(['deliverDone', syscallLog]));
doNotify(...dargs).then(res => sendUplink(['deliverDone', ...res]));
} else {
throw Error(`bad delivery type ${dtype}`);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,18 @@ export function makeNodeSubprocessFactory(tools) {
transcriptManager,
);
function handleSyscall(vatSyscallObject) {
// We are currently invoked by an async piped from the worker thread,
// whose vat code has moved on (it really wants a synchronous/immediate
// syscall). TODO: unlike threads, subprocesses could be made to wait
// by doing a blocking read from the pipe, so we could fix this, and
// re-enable syscall.callNow
const type = vatSyscallObject[0];
if (type === 'callNow') {
throw Error(`nodeWorker cannot block, cannot use syscall.callNow`);
}
// This might throw an Error if the syscall was faulty, in which case
// the vat will be terminated soon. It returns a vatSyscallResults,
// which we discard because there is currently nobody to send it to.
doSyscall(vatSyscallObject);
}

Expand Down Expand Up @@ -96,7 +104,8 @@ export function makeNodeSubprocessFactory(tools) {
if (waiting) {
const resolve = waiting;
waiting = null;
resolve();
const deliveryResult = args;
resolve(deliveryResult);
}
} else {
parentLog(`unrecognized uplink message ${type}`);
Expand Down
26 changes: 15 additions & 11 deletions packages/xs-vat-worker/src/vatWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ function makeWorker(io, setImmediate) {
setImmediate,
);
workerLog(`doProcess done`);
const vatDeliveryResults = harden(['ok']);
return vatDeliveryResults;
}

function doMessage(targetSlot, msg) {
const errmsg = `vat[${targetSlot}].${msg.method} dispatch failed`;
return doProcess(
['deliver', targetSlot, msg.method, msg.args, msg.result],
errmsg,
);
}

function doNotify(vpid, vp) {
Expand Down Expand Up @@ -82,7 +92,6 @@ function makeWorker(io, setImmediate) {
// toParent.write('child ack');
// });

let syscallLog;
const handle = harden(async ([type, ...margs]) => {
workerLog(`received`, type);
if (type === 'start') {
Expand Down Expand Up @@ -144,17 +153,12 @@ function makeWorker(io, setImmediate) {
}
const [dtype, ...dargs] = margs;
if (dtype === 'message') {
const [targetSlot, msg] = dargs;
const errmsg = `vat[${targetSlot}].${msg.method} dispatch failed`;
await doProcess(
['deliver', targetSlot, msg.method, msg.args, msg.result],
errmsg,
).then(() => {
sendUplink(['deliverDone']);
});
await doMessage(...dargs).then(res =>
sendUplink(['deliverDone', ...res]),
);
} else if (dtype === 'notify') {
await doNotify(...dargs).then(() =>
sendUplink(['deliverDone', syscallLog]),
await doNotify(...dargs).then(res =>
sendUplink(['deliverDone', ...res]),
);
} else {
throw Error(`bad delivery type ${dtype}`);
Expand Down

0 comments on commit 9e6e31a

Please sign in to comment.