Skip to content

Commit

Permalink
Merge pull request #1630 from Agoric/mfig/1605-mailbox-state
Browse files Browse the repository at this point in the history
More general mailbox state persistence
  • Loading branch information
michaelfig authored Aug 29, 2020
2 parents 474f2aa + 0bc03b3 commit b9a320f
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 145 deletions.
54 changes: 40 additions & 14 deletions packages/SwingSet/src/devices/mailbox.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,36 @@ import Nat from '@agoric/nat';
// replace it with one that tracks which parts of the state have been
// modified, to build more efficient Merkle proofs.

export function buildMailboxStateMap() {
const state = harden(new Map());
export function importMailbox(data, inout = {}) {
const outbox = new Map();
data.outbox.forEach(m => {
outbox.set(Nat(m[0]), m[1]);
});
inout.ack = data.ack;
inout.outbox = outbox;
return inout;
}

export function exportMailbox(inout) {
const messages = [];
inout.outbox.forEach((body, msgnum) => {
messages.push([msgnum, body]);
});
messages.sort((a, b) => a[0] - b[0]);
return {
ack: inout.ack,
outbox: messages,
};
}

export function buildMailboxStateMap(state = harden(new Map())) {
function getOrCreatePeer(peer) {
if (!state.has(peer)) {
state.set(peer, { outbox: harden(new Map()), inboundAck: 0 });
const inout = {
outbox: harden(new Map()),
ack: 0,
};
state.set(peer, inout);
}
return state.get(peer);
}
Expand All @@ -92,18 +116,17 @@ export function buildMailboxStateMap() {
}

function setAcknum(peer, msgnum) {
getOrCreatePeer(`${peer}`).inboundAck = Nat(msgnum);
getOrCreatePeer(`${peer}`).ack = Nat(msgnum);
}

function exportToData() {
const data = {};
state.forEach((inout, peer) => {
const messages = [];
inout.outbox.forEach((body, msgnum) => {
messages.push([msgnum, body]);
});
messages.sort((a, b) => a[0] - b[0]);
data[peer] = { outbox: messages, inboundAck: inout.inboundAck };
const exported = exportMailbox(inout);
data[peer] = {
inboundAck: inout.ack,
outbox: exported.outbox,
};
});
return harden(data);
}
Expand All @@ -115,10 +138,13 @@ export function buildMailboxStateMap() {
for (const peer of Object.getOwnPropertyNames(data)) {
const inout = getOrCreatePeer(peer);
const d = data[peer];
d.outbox.forEach(m => {
inout.outbox.set(Nat(m[0]), m[1]);
});
inout.inboundAck = d.inboundAck;
importMailbox(
{
ack: d.inboundAck,
outbox: d.outbox,
},
inout,
);
}
}

Expand Down
22 changes: 20 additions & 2 deletions packages/agoric-cli/lib/deploy.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ export default async function deployMain(progname, rawArgs, powers, opts) {
const provide = opts.provide
.split(',')
.map(dep => dep.trim())
.filter(dep => dep);
.filter(dep => dep)
.sort();

const need = opts.need
.split(',')
.map(dep => dep.trim())
.filter(dep => dep && !provide.includes(dep));
.filter(dep => dep && !provide.includes(dep))
.sort();

if (args.length === 0 && !provide.length) {
console.error('you must specify at least one deploy.js (or --provide=XXX)');
Expand Down Expand Up @@ -92,6 +94,22 @@ export default async function deployMain(progname, rawArgs, powers, opts) {
lastUpdateCount,
);
lastUpdateCount = update.updateCount;

// Skip the deploy if our provides are not needed.
let needsProvide = !provide.length;
const notNeeded = [];
for (const dep of provide) {
if (update.value.includes(dep)) {
needsProvide = true;
} else {
notNeeded.push(dep);
}
}
if (!needsProvide) {
console.info(`Don't need our provides: ${notNeeded.join(', ')}`);
return;
}

const nextLoading = [];
for (const dep of stillLoading) {
if (update.value.includes(dep)) {
Expand Down
2 changes: 1 addition & 1 deletion packages/cosmic-swingset/lib/ag-solo/chain-cosmos-sdk.js
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ export async function connectToChain(
log(`helper said: ${stdout}`);
try {
// Try to parse the stdout.
return JSON.parse(JSON.parse(JSON.parse(stdout).value));
return JSON.parse(JSON.parse(stdout).value);
} catch (e) {
log(`failed to parse output:`, e);
}
Expand Down
71 changes: 43 additions & 28 deletions packages/cosmic-swingset/lib/ag-solo/fake-chain.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
import path from 'path';
import fs from 'fs';
import stringify from '@agoric/swingset-vat/src/kernel/json-stable-stringify';
import {
importMailbox,
exportMailbox,
} from '@agoric/swingset-vat/src/devices/mailbox';
import anylogger from 'anylogger';

import { launch } from '../launch-chain';
Expand All @@ -13,31 +17,32 @@ const log = anylogger('fake-chain');
const PRETEND_BLOCK_DELAY = 5;
const scaleBlockTime = ms => Math.floor(ms / 1000);

async function readMap(file) {
async function makeMapStorage(file) {
let content;
const map = new Map();
map.commit = async () => {
const obj = {};
[...map.entries()].forEach(([k, v]) => (obj[k] = exportMailbox(v)));
const json = stringify(obj);
await fs.promises.writeFile(file, json);
};

try {
content = await fs.promises.readFile(file);
} catch (e) {
return map;
}
const obj = JSON.parse(content);
Object.entries(obj).forEach(([k, v]) => map.set(k, v));
return map;
}
Object.entries(obj).forEach(([k, v]) => map.set(k, importMailbox(v)));

async function writeMap(file, map) {
const obj = {};
[...map.entries()].forEach(([k, v]) => (obj[k] = v));
const json = stringify(obj);
await fs.promises.writeFile(file, json);
return map;
}

export async function connectToFakeChain(basedir, GCI, delay, inbound) {
const mailboxFile = path.join(basedir, `fake-chain-${GCI}-mailbox.json`);
const bootAddress = `${GCI}-client`;

const mailboxStorage = await readMap(mailboxFile);
const mailboxStorage = await makeMapStorage(mailboxFile);

const vatsdir = path.join(basedir, 'vats');
const argv = [
Expand All @@ -59,14 +64,13 @@ export async function connectToFakeChain(basedir, GCI, delay, inbound) {
stateDBdir,
mailboxStorage,
doOutboundBridge,
flushChainSends,
vatsdir,
argv,
GCI, // debugName
);

const blockManager = makeBlockManager(s);
const { savedHeight, savedActions } = s;
const { savedHeight, savedActions, savedChainSends } = s;
const blockManager = makeBlockManager({ ...s, flushChainSends });

let blockHeight = savedHeight;
let blockTime =
Expand All @@ -90,35 +94,46 @@ export async function connectToFakeChain(basedir, GCI, delay, inbound) {
blockTime += PRETEND_BLOCK_DELAY;
blockHeight += 1;

await blockManager({ type: 'BEGIN_BLOCK', blockHeight, blockTime });
await blockManager(
{ type: 'BEGIN_BLOCK', blockHeight, blockTime },
savedChainSends,
);
for (let i = 0; i < thisBlock.length; i += 1) {
const [newMessages, acknum] = thisBlock[i];
await blockManager({
type: 'DELIVER_INBOUND',
peer: bootAddress,
messages: newMessages,
ack: acknum,
blockHeight,
blockTime,
});
await blockManager(
{
type: 'DELIVER_INBOUND',
peer: bootAddress,
messages: newMessages,
ack: acknum,
blockHeight,
blockTime,
},
savedChainSends,
);
}
await blockManager({ type: 'END_BLOCK', blockHeight, blockTime });
await blockManager(
{ type: 'END_BLOCK', blockHeight, blockTime },
savedChainSends,
);

// Done processing, "commit the block".
await blockManager({ type: 'COMMIT_BLOCK', blockHeight, blockTime });
await writeMap(mailboxFile, mailboxStorage);
await blockManager(
{ type: 'COMMIT_BLOCK', blockHeight, blockTime },
savedChainSends,
);
thisBlock = [];
blockTime += scaleBlockTime(Date.now() - actualStart);
} catch (e) {
log.error(`error fake processing`, e);
}

clearTimeout(nextBlockTimeout);
nextBlockTimeout = setTimeout(simulateBlock, maximumDelay);

// TODO: maybe add latency to the inbound messages.
const mailboxJSON = mailboxStorage.get(`mailbox.${bootAddress}`);
const mailbox = mailboxJSON && JSON.parse(mailboxJSON);
const { outbox = [], ack = 0 } = mailbox || {};
const mailbox = mailboxStorage.get(`${bootAddress}`);
const { outbox = [], ack = 0 } = mailbox ? exportMailbox(mailbox) : {};
inbound(GCI, outbox, ack);
});

Expand Down
39 changes: 20 additions & 19 deletions packages/cosmic-swingset/lib/block-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export default function makeBlockManager({
let currentActions = [];
let decohered;

async function blockManager(action) {
async function blockManager(action, savedChainSends) {
if (decohered) {
throw decohered;
}
Expand All @@ -84,6 +84,7 @@ export default function makeBlockManager({
`Committed height ${action.blockHeight} does not match computed height ${computedHeight}`,
);
}
flushChainSends(false);
break;
}

Expand Down Expand Up @@ -147,30 +148,30 @@ export default function makeBlockManager({
// eslint-disable-next-line no-await-in-loop
await kernelPerformAction(a);
}
}

// Always commit all the keeper state live.
const start = Date.now();
const { mailboxSize } = saveChainState();
const mbTime = Date.now() - start;
// We write out our on-chain state as a number of chainSends.
const start = Date.now();
await saveChainState();
const chainTime = Date.now() - start;

// Advance our saved state variables.
savedActions = currentActions;
computedHeight = action.blockHeight;
// Advance our saved state variables.
savedActions = currentActions;
computedHeight = action.blockHeight;

// Save the kernel's computed state so that we can recover if we ever
// reset before Cosmos SDK commit.
const start2 = Date.now();
saveOutsideState(computedHeight, savedActions);
savedHeight = computedHeight;
// Save the kernel's computed state so that we can recover if we ever
// reset before Cosmos SDK commit.
const start2 = Date.now();
await saveOutsideState(computedHeight, savedActions, savedChainSends);
savedHeight = computedHeight;

const saveTime = Date.now() - start2;
const saveTime = Date.now() - start2;

log.debug(
`wrote SwingSet checkpoint (mailbox=${mailboxSize}), [run=${runTime}ms, mb=${mbTime}ms, save=${saveTime}ms]`,
);
currentActions = [];
log.debug(
`wrote SwingSet checkpoint [run=${runTime}ms, chainSave=${chainTime}ms, kernelSave=${saveTime}ms]`,
);
}

currentActions = [];
break;
}

Expand Down
Loading

0 comments on commit b9a320f

Please sign in to comment.