diff --git a/packages/SwingSet/package.json b/packages/SwingSet/package.json index 5640f0cbb63..aef9293b1f0 100644 --- a/packages/SwingSet/package.json +++ b/packages/SwingSet/package.json @@ -39,6 +39,7 @@ "@agoric/tame-metering": "^1.2.3", "@agoric/transform-eventual-send": "^1.3.1", "@agoric/transform-metering": "^1.3.0", + "@agoric/xs-vat-worker": "^0.1.0", "@babel/core": "^7.5.0", "@babel/generator": "^7.6.4", "anylogger": "^0.21.0", diff --git a/packages/SwingSet/src/controller.js b/packages/SwingSet/src/controller.js index 0e3c955c115..1691abcea2c 100644 --- a/packages/SwingSet/src/controller.js +++ b/packages/SwingSet/src/controller.js @@ -16,6 +16,7 @@ import { importBundle } from '@agoric/import-bundle'; import { initSwingStore } from '@agoric/swing-store-simple'; import { makeMeteringTransformer } from '@agoric/transform-metering'; import { makeTransform } from '@agoric/transform-eventual-send'; +import { xsWorkerBin } from '@agoric/xs-vat-worker'; import { startSubprocessWorker } from './spawnSubprocessWorker'; import { assertKnownOptions } from './assertOptions'; @@ -321,6 +322,10 @@ export async function buildVatController( // console.log(`--slog ${JSON.stringify(obj)}`); } + const startXsWorker = xsWorkerBin + ? () => startSubprocessWorker(xsWorkerBin, []) + : undefined; + const kernelEndowments = { waitUntilQuiescent, hostStorage, @@ -333,6 +338,7 @@ export async function buildVatController( makeNodeWorker, startSubprocessWorker, writeSlogObject, + startXsWorker, }; const kernelOptions = { verbose }; diff --git a/packages/SwingSet/src/kernel/kernel.js b/packages/SwingSet/src/kernel/kernel.js index 025f7c38e30..aefb2426879 100644 --- a/packages/SwingSet/src/kernel/kernel.js +++ b/packages/SwingSet/src/kernel/kernel.js @@ -52,6 +52,7 @@ export default function buildKernel(kernelEndowments, kernelOptions = {}) { makeNodeWorker, startSubprocessWorker, writeSlogObject, + startXsWorker, } = kernelEndowments; const { verbose } = kernelOptions; const logStartup = verbose ? console.debug : () => 0; @@ -613,6 +614,7 @@ export default function buildKernel(kernelEndowments, kernelOptions = {}) { waitUntilQuiescent, makeNodeWorker, startSubprocessWorker, + startXsWorker, }); function buildVatSyscallHandler(vatID, translators) { diff --git a/packages/SwingSet/src/kernel/vatManager/factory.js b/packages/SwingSet/src/kernel/vatManager/factory.js index f3f6706ed0d..6b85e83126a 100644 --- a/packages/SwingSet/src/kernel/vatManager/factory.js +++ b/packages/SwingSet/src/kernel/vatManager/factory.js @@ -14,6 +14,7 @@ export function makeVatManagerFactory({ waitUntilQuiescent, makeNodeWorker, startSubprocessWorker, + startXsWorker, }) { const localFactory = makeLocalVatManagerFactory({ allVatPowers, @@ -34,6 +35,11 @@ export function makeVatManagerFactory({ kernelKeeper, }); + const xsWorkerFactory = makeNodeSubprocessFactory({ + startSubprocessWorker: startXsWorker, + kernelKeeper, + }); + function validateManagerOptions(managerOptions) { assertKnownOptions(managerOptions, [ 'enablePipelining', @@ -93,6 +99,13 @@ export function makeVatManagerFactory({ ); } + if (managerType === 'xs-worker') { + if (!xsWorkerFactory) { + throw new Error('manager type xs-worker not available'); + } + return xsWorkerFactory.createFromBundle(vatID, bundle, managerOptions); + } + throw Error( `unknown type ${managerType}, not local/nodeWorker/node-subprocess`, ); diff --git a/packages/SwingSet/src/spawnSubprocessWorker.js b/packages/SwingSet/src/spawnSubprocessWorker.js index 057612825e8..ec7dfe0534b 100644 --- a/packages/SwingSet/src/spawnSubprocessWorker.js +++ b/packages/SwingSet/src/spawnSubprocessWorker.js @@ -20,8 +20,10 @@ const supercode = require.resolve( // always be Node. const stdio = harden(['inherit', 'inherit', 'inherit', 'pipe', 'pipe']); -export function startSubprocessWorker() { - const proc = spawn(process.execPath, ['-r', 'esm', supercode], { stdio }); +export function startSubprocessWorker(execPath, args) { + execPath = execPath || process.execPath; + args = args || ['-r', 'esm', supercode]; + const proc = spawn(execPath, args, { stdio }); const toChild = Netstring.writeStream(); toChild.pipe(proc.stdio[3]); diff --git a/packages/SwingSet/test/workers/test-worker.js b/packages/SwingSet/test/workers/test-worker.js index c4cd0d889bc..4d877490b72 100644 --- a/packages/SwingSet/test/workers/test-worker.js +++ b/packages/SwingSet/test/workers/test-worker.js @@ -1,7 +1,25 @@ import '@agoric/install-ses'; import test from 'ava'; +import { xsWorkerBin } from '@agoric/xs-vat-worker/src/locate'; import { loadBasedir, buildVatController } from '../../src/index'; +test('xs vat manager', async t => { + if (!xsWorkerBin) { + console.warn('XS vat worker not built; skipping'); + t.falsy.skip(false); + return; + } + + const config = await loadBasedir(__dirname); + config.vats.target.creationOptions = { managerType: 'xs-worker' }; + const c = await buildVatController(config, []); + + await c.run(); + t.is(c.bootstrapResult.status(), 'fulfilled'); + + await c.shutdown(); +}); + test('nodeWorker vat manager', async t => { const config = await loadBasedir(__dirname); config.vats.target.creationOptions = { managerType: 'nodeWorker' }; diff --git a/packages/xs-vat-worker/package.json b/packages/xs-vat-worker/package.json index 45ce24dea4f..736d505a961 100644 --- a/packages/xs-vat-worker/package.json +++ b/packages/xs-vat-worker/package.json @@ -34,6 +34,7 @@ "@agoric/eventual-send": "^0.9.3", "@agoric/import-bundle": "^0.0.8", "@agoric/install-ses": "^0.2.0", + "@agoric/marshal": "^0.2.3", "@agoric/promise-kit": "^0.1.3", "@agoric/swingset-vat": "^0.6.0", "anylogger": "^1.0.4", diff --git a/packages/xs-vat-worker/src/vatWorker.js b/packages/xs-vat-worker/src/vatWorker.js index a7e741379eb..2a30509bc08 100644 --- a/packages/xs-vat-worker/src/vatWorker.js +++ b/packages/xs-vat-worker/src/vatWorker.js @@ -1,11 +1,19 @@ import { importBundle } from '@agoric/import-bundle'; import { HandledPromise } from '@agoric/eventual-send'; +import { Remotable, getInterfaceOf } from '@agoric/marshal'; // TODO? import anylogger from 'anylogger'; import { makeLiveSlots } from '@agoric/swingset-vat/src/kernel/liveSlots'; -const EOF = new Error('EOF'); +function workerLog(first, ...args) { + console.log(`---worker: ${first}`, ...args); +} + +function assert(ok, whynot) { + if (!ok) { + throw new Error(whynot); + } +} -// from SwingSet/src/controller.js function makeConsole(_tag) { const log = console; // TODO? anylogger(tag); const cons = {}; @@ -15,19 +23,9 @@ function makeConsole(_tag) { return harden(cons); } -function makeVatEndowments(consoleTag) { - return harden({ - console: makeConsole(`SwingSet:${consoleTag}`), - HandledPromise, - // TODO: re2 is a RegExp work-a-like that disables backtracking expressions for - // safer memory consumption - RegExp, - }); -} - // see also: detecting an empty vat promise queue (end of "crank") // https://github.com/Agoric/agoric-sdk/issues/45 -function endOfCrank(setImmediate) { +function waitUntilQuiescent(setImmediate) { return new Promise((resolve, _reject) => { setImmediate(() => { // console.log('hello from setImmediate callback. The promise queue is presumably empty.'); @@ -36,106 +34,145 @@ function endOfCrank(setImmediate) { }); } +function runAndWait(f, errmsg, setImmediate) { + Promise.resolve() + .then(f) + .then(undefined, err => workerLog(`doProcess: ${errmsg}:`, err)); + return waitUntilQuiescent(setImmediate); +} + function makeWorker(io, setImmediate) { - let vatNS = null; let dispatch; - let state; - - const format = msg => JSON.stringify(msg); - const sync = (method, args) => { - io.writeMessage(format({ msgtype: 'syscall', method, args })); - return JSON.parse(io.readMessage()); - }; - const syscall = harden({ - subscribe(...args) { - return sync('subscribe', args); - }, - send(...args) { - return sync('send', args); - }, - fulfillToData(...args) { - return sync('fulfillToData', args); - }, - fulfillToPresence(...args) { - return sync('fulfillToPresence', args); - }, - reject(...args) { - return sync('reject', args); - }, - }); - - async function loadBundle(name, bundle) { - if (vatNS !== null) { - throw new Error('bundle already loaded'); - } - vatNS = await importBundle(bundle, { - filePrefix: name, - endowments: makeVatEndowments(name), - }); - // TODO: be sure console.log isn't mixed with protocol stream - // console.log('loaded bundle with methods', Object.keys(vatNS)); + async function doProcess(dispatchRecord, errmsg) { + const dispatchOp = dispatchRecord[0]; + const dispatchArgs = dispatchRecord.slice(1); + workerLog(`runAndWait`); + await runAndWait( + () => dispatch[dispatchOp](...dispatchArgs), + errmsg, + setImmediate, + ); + workerLog(`doProcess done`); + } - state = {}; // ?? - dispatch = makeLiveSlots(syscall, state, vatNS.buildRootObject); + function doNotify(vpid, vp) { + const errmsg = `vat.promise[${vpid}] ${vp.state} failed`; + switch (vp.state) { + case 'fulfilledToPresence': + return doProcess(['notifyFulfillToPresence', vpid, vp.slot], errmsg); + case 'redirected': + throw new Error('not implemented yet'); + case 'fulfilledToData': + return doProcess(['notifyFulfillToData', vpid, vp.data], errmsg); + case 'rejected': + return doProcess(['notifyReject', vpid, vp.data], errmsg); + default: + throw Error(`unknown promise state '${vp.state}'`); + } } - function turnCrank(dispatchType, args) { - return new Promise((resolve, reject) => { - try { - dispatch[dispatchType](...args); - } catch (error) { - // console.log({ dispatchError: error }); - reject(error); - return; - } - endOfCrank(setImmediate).then(resolve); - }); + function sendUplink(msg) { + assert(msg instanceof Array, `msg must be an Array`); + io.writeMessage(JSON.stringify(msg)); } - const name = 'WORKER'; // TODO? - async function handle(message) { - switch (message.msgtype) { - case 'load-bundle': - try { - await loadBundle(name, message.bundle); - } catch (error) { - // console.log('load-bundle failed:', error); - io.writeMessage( - format({ msgtype: 'load-bundle-nak', error: error.message }), - ); - break; - } - io.writeMessage(format({ msgtype: 'load-bundle-ack' })); - break; - case 'dispatch': - try { - await turnCrank(message.type, message.args); - } catch (error) { - io.writeMessage( - format({ - msgtype: 'dispatch-nak', - error: error instanceof Error ? error.message : error, - }), - ); - break; + // fromParent.on('data', data => { + // workerLog('data from parent', data); + // toParent.write('child ack'); + // }); + + let syscallLog; + const handle = harden(async ([type, ...margs]) => { + workerLog(`received`, type); + if (type === 'start') { + // TODO: parent should send ['start', vatID] + workerLog(`got start`); + sendUplink(['gotStart']); + } else if (type === 'setBundle') { + const [bundle, vatParameters] = margs; + const endowments = { + console: makeConsole(`SwingSet:vatWorker`), + HandledPromise, + }; + // ISSUE: this draft code is contorted because it started + // as code that didn't return anything but now it + // has to return a promise to be resolved before + // reading the next input. + return importBundle(bundle, { endowments }).then(vatNS => { + workerLog(`got vatNS:`, Object.keys(vatNS).join(',')); + sendUplink(['gotBundle']); + + function doSyscall(vatSyscallObject) { + sendUplink(['syscall', ...vatSyscallObject]); } - io.writeMessage(format({ msgtype: 'dispatch-ack' })); - break; - case 'finish': - io.writeMessage(format({ msgtype: 'finish-ack' })); - break; - default: - console.warn('unexpected msgtype', message.msgtype); + const syscall = harden({ + send: (...args) => doSyscall(['send', ...args]), + callNow: (..._args) => { + throw Error(`nodeWorker cannot syscall.callNow`); + }, + subscribe: (...args) => doSyscall(['subscribe', ...args]), + fulfillToData: (...args) => doSyscall(['fulfillToData', ...args]), + fulfillToPresence: (...args) => + doSyscall(['fulfillToPresence', ...args]), + reject: (...args) => doSyscall(['reject', ...args]), + }); + + const state = null; + const vatID = 'demo-vatID'; + // todo: maybe add transformTildot, makeGetMeter/transformMetering to + // vatPowers, but only if options tell us they're wanted. Maybe + // transformTildot should be async and outsourced to the kernel + // process/thread. + const vatPowers = { Remotable, getInterfaceOf }; + dispatch = makeLiveSlots( + syscall, + state, + vatNS.buildRootObject, + vatID, + vatPowers, + vatParameters, + ); + workerLog(`got dispatch:`, Object.keys(dispatch).join(',')); + sendUplink(['dispatchReady']); + return type; + }); + } else if (type === 'deliver') { + if (!dispatch) { + workerLog(`error: deliver before dispatchReady`); + return undefined; + } + const [dtype, ...dargs] = margs; + if (dtype === 'message') { + const [targetSlot, msg] = dargs; + const errmsg = `vat[${targetSlot}].${msg.method} dispatch failed`; + await doProcess( + ['deliver', targetSlot, msg.method, msg.args, msg.result], + errmsg, + ).then(() => { + sendUplink(['deliverDone']); + }); + } else if (dtype === 'notify') { + await doNotify(...dargs).then(() => + sendUplink(['deliverDone', syscallLog]), + ); + } else { + throw Error(`bad delivery type ${dtype}`); + } + } else { + workerLog(`unrecognized downlink message ${type}`); } - return message.msgtype; - } + return type; + }); return harden({ handle }); } export async function main({ readMessage, writeMessage, setImmediate }) { + workerLog(`supervisor started`); + const worker = makeWorker({ readMessage, writeMessage }, setImmediate); + const EOF = new Error('EOF'); for (;;) { let message;