Skip to content

Commit

Permalink
feat: demand-paged vats are reloaded from heap snapshots (#2848)
Browse files Browse the repository at this point in the history
This enhances SwingSet to have a "Vat Warehouse" which limits the number of
"paged-in" vats to some maximum (currently 50). The idea is to conserve
system RAM by allowing idle vats to remain "paged-out", which consumes only
space on disk, until someone sends a message to them. The vat is then paged
in, by creating a new xsnap process and reloading the necessary vat state.

This reload process is greatly accelerated by loading a heap snapshot, if one
is available. We only need to replay the suffix of the transcript that was
recorded after the snapshot was taken, rather than the full (huge)
transcript. Heap snapshots are stored in a new swingstore component named the
"stream store".

For each vat, the warehouse saves a heap snapshot after a configurable number
of deliveries (default 200). In addition, it saves an initial snapshot after
just a few deliveries (default 2), because all contracts vats start out with
a large delivery that provides the contract bundle to evaluate. By taking a
snapshot quickly, we can avoid the time needed to re-evaluate that large
bundle on almost all process restarts. This algorithm is a best guess: we'll
refine it as we gather more data about the tradeoff between work now (the
time it takes to create and write a snapshot), the storage space consumed by
those snapshots, and work later (replaying more transcript).

We're estimating that a typical contract snapshot consumes about 300kB
(compressed).

closes #2273
closes #2277
refs #2422
refs #2138 (might close it)


* refactor(replay): hoist handle declaration

* chore(xsnap): clarify names of snapStore temp files for debugging

* feat(swingset): initializeSwingset snapshots XS supervisor

 - solo: add xsnap, tmp dependencies
 - cosmic-swingset: declare dependencies on xsnap, tmp
 - snapshotSupervisor()
 - vk.saveSnapshot(), vk.getLastSnapshot()
   - test: mock vatKeeper needs getLastSnapshot()
 - test(snapstore): update snapshot hash
 - makeSnapstore in solo, cosmic-swingset
   - chore(solo): create xs-snapshots directory
 - more getVatKeeper -> provideVatKeeper
 - startPos arg for replayTransript()
 - typecheck shows vatAdminRootKref could be missing
 - test pre-SES snapshot size
   - hoist snapSize to test title
   - clarify SES vs. pre-SES XS workers
   - factor bootWorker out of bootSESWorker
   - hoist Kb, relativeSize for sharing between tests

misc:
 - WIP: restore from snapshot
 - hard-code remote style

fix(swingset): don't leak xs-worker in initializeSwingset

When taking a snapshot of the supervisor in initializeSwingset, we
neglected to `.close()` it.

Lack of a name hindered diagnosis, so let's fix that while we're at
it.

* feat(swingset): save snapshot periodically after deliveries

 - vk.saveSnapShot() handles snapshotInterval
   - annotate type of kvStore in makeVatKeeper
   - move getLastSnapshot up for earlier use
   - refactor: rename snapshotDetail to lastSnapshot
   - factor out getTranscriptEnd
 - vatWarehouse.maybeSaveSnapshot()
 - saveSnapshot:
   - don't require snapStore
   - fix startPos type
 - provide snapstore to vatKeeper via kernelKeeper
 - buildKernel: get snapstore out of hostStorage
 - chore: don't try to snapshot a terminated vat

* feat(swingset): load vats from snapshots

 - don't `setBundle` when loading from snapshot
 - provide startPos to replayTranscript()
 - test reloading a vat

* refactor(vatWarehouse): factor out, test LRU logic

* fix(vat-warehouse): remove vatID from LRU when evicting

* chore(vatKeeper): prune debug logging in saveSnapshot (FIXUP)

* feat(swingset): log bringing vats online (esp from snapshot)

 - manager.replayTranscript returns number of entries replayed

* chore: resove "skip crank buffering?" issue

after discussion with CM:
maybeSaveSnapshot() happens before commitCrank()
so nothing special needed here

* chore: prune makeSnapshot arg from evict()

Not only is this option not implemented now, but CM's analysis shows
that adding it would likely be harmful.

* test(swingset): teardown snap-store

* chore(swingset): initial sketch of snapshot reload test

* refactor: let itemCount be not-optional in StreamPosition

* feat: snapshot early then infrequently

  - refactor: move snapshot decision up from vk.saveSnapshot() up
    to vw.maybeSaveSnapshot

* test: provide getLastSnapshot to mock vatKeeper

* chore: vattp: turn off managerType local work-around

* chore: vat-warehouse: initial snapshot after 2 deliveries

integration testing shows this is closer to ideal

* chore: prune deterministic snapshot assertion

oops. rebase problem.

* chore: fix test-snapstore ld.asset

rebase / merge problem?!

* chore: never mind supervisorHash optimization

With snapshotInitial at 2, there is little reason to snapshot after
loading the supervisor bundles. The code doesn't carry its own weight.

Plus, it seems to introduce a strange bug with marshal or something...

```
  test/test-home.js:37

   36:   const { board } = E.get(home);
   37:   await t.throwsAsync(
   38:     () => E(board).getValue('148'),

  getting a value for a fake id throws

  Returned promise rejected with unexpected exception:

  Error {
    message: 'Remotable (a string) is already frozen',
  }
```

* docs(swingset): document lastSnapshot kernel DB key

* refactor: capitalize makeSnapStore consistently

* refactor: replayTranscript caller is responsible to getLastSnapshot

* test(swingset): consistent vat-warehouse test naming

* refactor(swingset): compute transcriptSnapshotStats in vatKeeper

In an attempt to avoid reading the lastSnapshot DB key if the
t.endPosition key was enough information to decide to take a snapshot,
the vatWarehouse was peeking into the vatKeeper's business.  Let's go
with code clarity over (un-measured) performance.

* chore: use harden, not freeze; clarify lru

* chore: use distinct fixture directories to avoid collision

The "temporary" snapstore directories used by two different tests began to
overlap when the tests were moved into the same parent dir, and one test was
deleting the directory while the other was still using it (as well as
mingling files at runtime), causing an xsnap process to die with an IO error
if the test were run in parallel.

This changes the the two tests to use distinct directories. In the long run,
we should either have them use `mktmp` to build a randomly-named known-unique
directory, or establish a convention where tempdir names match the name of
the test file and case using them, to avoid collisions as we add more tests.

Co-authored-by: Brian Warner <warner@lothar.com>
  • Loading branch information
dckc and warner authored Jun 26, 2021
1 parent f3e4f87 commit cb239cb
Show file tree
Hide file tree
Showing 27 changed files with 636 additions and 217 deletions.
54 changes: 19 additions & 35 deletions packages/SwingSet/src/controller.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
/* global require */
// @ts-check
import fs from 'fs';
import path from 'path';
import process from 'process';
import re2 from 're2';
import { performance } from 'perf_hooks';
import { spawn as ambientSpawn } from 'child_process';
import { type as osType } from 'os';
import { Worker } from 'worker_threads';
import anylogger from 'anylogger';
import { tmpName } from 'tmp';

import { assert, details as X } from '@agoric/assert';
import { isTamed, tameMetering } from '@agoric/tame-metering';
import { importBundle } from '@agoric/import-bundle';
import { makeMeteringTransformer } from '@agoric/transform-metering';
import { xsnap, makeSnapstore, recordXSnap } from '@agoric/xsnap';
import { xsnap, recordXSnap } from '@agoric/xsnap';

import engineGC from './engine-gc.js';
import { WeakRef, FinalizationRegistry } from './weakref.js';
Expand Down Expand Up @@ -49,12 +47,12 @@ function unhandledRejectionHandler(e) {
/**
* @param {{ moduleFormat: string, source: string }[]} bundles
* @param {{
* snapstorePath?: string,
* snapStore?: SnapStore,
* spawn: typeof import('child_process').spawn
* env: Record<string, string | undefined>,
* }} opts
*/
export function makeStartXSnap(bundles, { snapstorePath, env, spawn }) {
export function makeStartXSnap(bundles, { snapStore, env, spawn }) {
/** @type { import('@agoric/xsnap/src/xsnap').XSnapOptions } */
const xsnapOpts = {
os: osType(),
Expand All @@ -79,37 +77,27 @@ export function makeStartXSnap(bundles, { snapstorePath, env, spawn }) {
};
}

/** @type { ReturnType<typeof makeSnapstore> } */
let snapStore;

if (snapstorePath) {
fs.mkdirSync(snapstorePath, { recursive: true });

snapStore = makeSnapstore(snapstorePath, {
tmpName,
existsSync: fs.existsSync,
createReadStream: fs.createReadStream,
createWriteStream: fs.createWriteStream,
rename: fs.promises.rename,
unlink: fs.promises.unlink,
resolve: path.resolve,
});
}

let supervisorHash = '';
/**
* @param {string} name
* @param {(request: Uint8Array) => Promise<Uint8Array>} handleCommand
* @param { boolean } [metered]
* @param { string } [snapshotHash]
*/
async function startXSnap(name, handleCommand, metered) {
if (supervisorHash) {
return snapStore.load(supervisorHash, async snapshot => {
async function startXSnap(
name,
handleCommand,
metered,
snapshotHash = undefined,
) {
if (snapStore && snapshotHash) {
// console.log('startXSnap from', { snapshotHash });
return snapStore.load(snapshotHash, async snapshot => {
const xs = doXSnap({ snapshot, name, handleCommand, ...xsnapOpts });
await xs.evaluate('null'); // ensure that spawn is done
return xs;
});
}
// console.log('fresh xsnap', { snapStore: snapStore });
const meterOpts = metered ? {} : { meteringLimit: 0 };
const worker = doXSnap({ handleCommand, name, ...meterOpts, ...xsnapOpts });

Expand All @@ -121,9 +109,6 @@ export function makeStartXSnap(bundles, { snapstorePath, env, spawn }) {
// eslint-disable-next-line no-await-in-loop
await worker.evaluate(`(${bundle.source}\n)()`.trim());
}
if (snapStore) {
supervisorHash = await snapStore.save(async fn => worker.snapshot(fn));
}
return worker;
}
return startXSnap;
Expand All @@ -140,7 +125,6 @@ export function makeStartXSnap(bundles, { snapstorePath, env, spawn }) {
* slogFile?: string,
* testTrackDecref?: unknown,
* warehousePolicy?: { maxVatsOnline?: number },
* snapstorePath?: string,
* spawn?: typeof import('child_process').spawn,
* env?: Record<string, string | undefined>
* }} runtimeOptions
Expand All @@ -162,7 +146,6 @@ export async function makeSwingsetController(
debugPrefix = '',
slogCallbacks,
slogFile,
snapstorePath,
spawn = ambientSpawn,
warehousePolicy = {},
} = runtimeOptions;
Expand Down Expand Up @@ -300,7 +283,11 @@ export async function makeSwingsetController(
// @ts-ignore assume supervisorBundle is set
JSON.parse(kvStore.get('supervisorBundle')),
];
const startXSnap = makeStartXSnap(bundles, { snapstorePath, env, spawn });
const startXSnap = makeStartXSnap(bundles, {
snapStore: hostStorage.snapStore,
env,
spawn,
});

const kernelEndowments = {
waitUntilQuiescent,
Expand Down Expand Up @@ -430,7 +417,6 @@ export async function makeSwingsetController(
* debugPrefix?: string,
* slogCallbacks?: unknown,
* testTrackDecref?: unknown,
* snapstorePath?: string,
* warehousePolicy?: { maxVatsOnline?: number },
* slogFile?: string,
* }} runtimeOptions
Expand All @@ -447,15 +433,13 @@ export async function buildVatController(
kernelBundles,
debugPrefix,
slogCallbacks,
snapstorePath,
warehousePolicy,
slogFile,
} = runtimeOptions;
const actualRuntimeOptions = {
verbose,
debugPrefix,
slogCallbacks,
snapstorePath,
warehousePolicy,
slogFile,
};
Expand Down
5 changes: 0 additions & 5 deletions packages/SwingSet/src/initializeSwingset.js
Original file line number Diff line number Diff line change
Expand Up @@ -310,11 +310,6 @@ export async function initializeSwingset(
// it to comms
config.vats.vattp = {
bundle: kernelBundles.vattp,
creationOptions: {
// we saw evidence of vattp dropping messages, and out of caution,
// we're keeping it on an in-kernel worker for now. See #3039.
managerType: 'local',
},
};

// timer wrapper vat is added automatically, but TODO: bootstraps must
Expand Down
9 changes: 8 additions & 1 deletion packages/SwingSet/src/kernel/kernel.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ export default function buildKernel(
} = kernelOptions;
const logStartup = verbose ? console.debug : () => 0;

const { kvStore, streamStore } = /** @type { HostStore } */ (hostStorage);
const {
kvStore,
streamStore,
snapStore,
} = /** @type { HostStore } */ (hostStorage);
insistStorageAPI(kvStore);
const { enhancedCrankBuffer, abortCrank, commitCrank } = wrapStorage(kvStore);
const vatAdminRootKref = kvStore.get('vatAdminRootKref');
Expand All @@ -138,6 +142,7 @@ export default function buildKernel(
enhancedCrankBuffer,
streamStore,
kernelSlog,
snapStore,
);

const meterManager = makeMeterManager(replaceGlobalMeter);
Expand Down Expand Up @@ -673,6 +678,8 @@ export default function buildKernel(
if (!didAbort) {
kernelKeeper.processRefcounts();
kernelKeeper.saveStats();
// eslint-disable-next-line no-use-before-define
await vatWarehouse.maybeSaveSnapshot();
}
commitCrank();
kernelKeeper.incrementCrankNumber();
Expand Down
10 changes: 9 additions & 1 deletion packages/SwingSet/src/kernel/state/kernelKeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const enableKernelGC = true;
// v$NN.nextDeliveryNum = $NN
// v$NN.t.endPosition = $NN
// v$NN.vs.$key = string
// v$NN.lastSnapshot = JSON({ snapshotID, startPos })

// d$NN.o.nextID = $NN
// d$NN.c.$kernelSlot = $deviceSlot = o-$NN/d+$NN/d-$NN
Expand Down Expand Up @@ -109,8 +110,14 @@ const FIRST_CRANK_NUMBER = 0n;
* @param {KVStorePlus} kvStore
* @param {StreamStore} streamStore
* @param {KernelSlog} kernelSlog
* @param {SnapStore=} snapStore
*/
export default function makeKernelKeeper(kvStore, streamStore, kernelSlog) {
export default function makeKernelKeeper(
kvStore,
streamStore,
kernelSlog,
snapStore = undefined,
) {
insistEnhancedStorageAPI(kvStore);

/**
Expand Down Expand Up @@ -939,6 +946,7 @@ export default function makeKernelKeeper(kvStore, streamStore, kernelSlog) {
incStat,
decStat,
getCrankNumber,
snapStore,
);
ephemeral.vatKeepers.set(vatID, vk);
return vk;
Expand Down
58 changes: 57 additions & 1 deletion packages/SwingSet/src/kernel/state/vatKeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export function initializeVatState(kvStore, streamStore, vatID) {
/**
* Produce a vat keeper for a vat.
*
* @param {*} kvStore The keyValue store in which the persistent state will be kept
* @param {KVStorePlus} kvStore The keyValue store in which the persistent state will be kept
* @param {StreamStore} streamStore Accompanying stream store, for the transcripts
* @param {*} kernelSlog
* @param {string} vatID The vat ID string of the vat in question
Expand All @@ -60,6 +60,7 @@ export function initializeVatState(kvStore, streamStore, vatID) {
* @param {*} incStat
* @param {*} decStat
* @param {*} getCrankNumber
* @param { SnapStore= } snapStore
* returns an object to hold and access the kernel's state for the given vat
*/
export function makeVatKeeper(
Expand All @@ -79,6 +80,7 @@ export function makeVatKeeper(
incStat,
decStat,
getCrankNumber,
snapStore = undefined,
) {
insistVatID(vatID);
const transcriptStream = `transcript-${vatID}`;
Expand Down Expand Up @@ -417,6 +419,57 @@ export function makeVatKeeper(
kvStore.set(`${vatID}.t.endPosition`, `${JSON.stringify(newPos)}`);
}

/** @returns { StreamPosition } */
function getTranscriptEndPosition() {
return JSON.parse(
kvStore.get(`${vatID}.t.endPosition`) ||
assert.fail('missing endPosition'),
);
}

/**
* @returns {{ snapshotID: string, startPos: StreamPosition } | undefined}
*/
function getLastSnapshot() {
const notation = kvStore.get(`${vatID}.lastSnapshot`);
if (!notation) {
return undefined;
}
const { snapshotID, startPos } = JSON.parse(notation);
assert.typeof(snapshotID, 'string');
assert(startPos);
return { snapshotID, startPos };
}

function transcriptSnapshotStats() {
const totalEntries = getTranscriptEndPosition().itemCount;
const lastSnapshot = getLastSnapshot();
const snapshottedEntries = lastSnapshot
? lastSnapshot.startPos.itemCount
: 0;
return { totalEntries, snapshottedEntries };
}

/**
* Store a snapshot, if given a snapStore.
*
* @param { VatManager } manager
* @returns { Promise<boolean> }
*/
async function saveSnapshot(manager) {
if (!snapStore || !manager.makeSnapshot) {
return false;
}

const snapshotID = await manager.makeSnapshot(snapStore);
const endPosition = getTranscriptEndPosition();
kvStore.set(
`${vatID}.lastSnapshot`,
JSON.stringify({ snapshotID, startPos: endPosition }),
);
return true;
}

function vatStats() {
function getCount(key, first) {
const id = Nat(BigInt(kvStore.get(key)));
Expand Down Expand Up @@ -477,8 +530,11 @@ export function makeVatKeeper(
deleteCListEntry,
deleteCListEntriesForKernelSlots,
getTranscript,
transcriptSnapshotStats,
addToTranscript,
vatStats,
dumpState,
saveSnapshot,
getLastSnapshot,
});
}
27 changes: 22 additions & 5 deletions packages/SwingSet/src/kernel/vatManager/manager-helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ import { makeTranscriptManager } from './transcript.js';

/**
*
* @typedef { { getManager: (shutdown: () => Promise<void>) => VatManager,
* @typedef { { getManager: (shutdown: () => Promise<void>,
* makeSnapshot?: (ss: SnapStore) => Promise<string>) => VatManager,
* syscallFromWorker: (vso: VatSyscallObject) => VatSyscallResult,
* setDeliverToWorker: (dtw: unknown) => void,
* } } ManagerKit
Expand Down Expand Up @@ -178,12 +179,18 @@ function makeManagerKit(
kernelSlog.write({ type: 'finish-replay-delivery', vatID, deliveryNum });
}

async function replayTranscript() {
/**
* @param {StreamPosition | undefined} startPos
* @returns { Promise<number?> } number of deliveries, or null if !useTranscript
*/
async function replayTranscript(startPos) {
// console.log('replay from', { vatID, startPos });

if (transcriptManager) {
const total = vatKeeper.vatStats().transcriptCount;
kernelSlog.write({ type: 'start-replay', vatID, deliveries: total });
let deliveryNum = 0;
for (const t of vatKeeper.getTranscript()) {
for (const t of vatKeeper.getTranscript(startPos)) {
// if (deliveryNum % 100 === 0) {
// console.debug(`replay vatID:${vatID} deliveryNum:${deliveryNum} / ${total}`);
// }
Expand All @@ -194,7 +201,10 @@ function makeManagerKit(
}
transcriptManager.checkReplayError();
kernelSlog.write({ type: 'finish-replay', vatID });
return deliveryNum;
}

return null;
}

/**
Expand Down Expand Up @@ -235,10 +245,17 @@ function makeManagerKit(
/**
*
* @param { () => Promise<void>} shutdown
* @param { (ss: SnapStore) => Promise<string> } makeSnapshot
* @returns { VatManager }
*/
function getManager(shutdown) {
return harden({ replayTranscript, replayOneDelivery, deliver, shutdown });
function getManager(shutdown, makeSnapshot) {
return harden({
replayTranscript,
replayOneDelivery,
deliver,
shutdown,
makeSnapshot,
});
}

return harden({ getManager, syscallFromWorker, setDeliverToWorker });
Expand Down
Loading

0 comments on commit cb239cb

Please sign in to comment.