Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Match notifier semantics to async iterables #1332

Merged
merged 1 commit into from
Jul 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions packages/cosmic-swingset/lib/ag-solo/vats/lib-wallet.js
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ export async function makeWallet({
// handle the update, which has already resolved to a record. If the offer is
// 'done', mark the offer 'complete', otherwise resubscribe to the notifier.
function updateOrResubscribe(id, offerHandle, update) {
const { updateHandle, done } = update;
if (done) {
const { updateCount } = update;
if (updateCount === undefined) {
// TODO do we still need these?
idToOfferHandle.delete(id);

Expand All @@ -245,7 +245,7 @@ export async function makeWallet({
idToNotifierP.delete(id);
} else {
E(idToNotifierP.get(id))
.getUpdateSince(updateHandle)
.getUpdateSince(updateCount)
.then(nextUpdate => updateOrResubscribe(id, offerHandle, nextUpdate));
}
}
Expand Down
165 changes: 119 additions & 46 deletions packages/notifier/src/notifier.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,49 @@
import { producePromise } from '@agoric/produce-promise';

/**
* @typedef {Object} UpdateHandle a value used to mark the position in the update stream
* @typedef {number | undefined} UpdateCount a value used to mark the position
* in the update stream. For the last state, the updateCount is undefined.
*/

/**
* @template T the type of the state value
* @typedef {Object} UpdateRecord<T>
* @property {T} value is whatever state the service wants to publish
* @property {UpdateHandle} updateHandle is a value that identifies the update
* @property {boolean} done false until the updater publishes a final state
* @property {UpdateCount} updateCount is a value that identifies the update
*/

/**
* @template T the type of the notifier state
* @callback GetUpdateSince<T> Can be called repeatedly to get a sequence of update records
* @param {UpdateHandle} [updateHandle] return update record as of a handle
* If the handle argument is omitted or differs from the current handle, return the current record.
* Otherwise, after the next state change, the promise will resolve to the then-current value of the record.
* @callback GetUpdateSince<T> Can be called repeatedly to get a sequence of
* update records
* @param {UpdateCount} [updateCount] return update record as of a handle
* If the handle argument is omitted or differs from the current handle,
* return the current record.
* Otherwise, after the next state change, the promise will resolve to the
* then-current value of the record.
* @returns {Promise<UpdateRecord<T>>} resolves to the corresponding update
*/

/**
* @template T the type of the notifier state
* @typedef {Object} Notifier<T> an object that can be used to get the current state or updates
* @property {GetUpdateSince<T>} getUpdateSince return update record as of a handle
* @property {() => UpdateRecord<T>} getCurrentUpdate return the current update record
* @typedef {Object} Notifier<T> an object that can be used to get the current
* state or updates
* @property {GetUpdateSince<T>} getUpdateSince return update record as of a
* handle
*/

/**
* @template T the type of the notifier state
* @typedef {Object} Updater<T> an object that should be closely held, as anyone with access to
* @typedef {Object} Updater<T> an object that should be closely held, as
* anyone with access to
* it can provide updates
* @property {(state: T) => void} updateState sets the new state, and resolves the outstanding promise to send an update
* @property {(finalState: T) => void} resolve sets the final state, sends a final update, and freezes the
* @property {(state: T) => void} updateState sets the new state, and resolves
* the outstanding promise to send an update
* @property {(finalState: T) => void} finish sets the final state, sends a
* final update, and freezes the
* updater
* @property {(reason: T) => void} reject the stream becomes erroneously
* terminated, allegedly for the stated reason.
*/

/**
Expand All @@ -48,6 +57,13 @@ import { producePromise } from '@agoric/produce-promise';
* @property {Updater<T>} updater the (closely-held) notifier producer
*/

/**
* Whether to enable deprecated legacy features to support legacy clients
* during the transition. TODO once all clients are updated to the new API,
* remove this flag and all code enabled by this flag.
*/
const supportLegacy = true;

/**
* Produces a pair of objects, which allow a service to produce a stream of
* update promises.
Expand All @@ -56,55 +72,112 @@ import { producePromise } from '@agoric/produce-promise';
* @param {T} [initialState] the first state to be returned
* @returns {NotifierRecord<T>} the notifier and updater
*/
export const makeNotifierKit = (initialState = undefined) => {
let currentPromiseRec = producePromise();
let currentResponse = harden({
value: initialState,
updateHandle: {},
done: false,
});

function getCurrentUpdate() {
return currentResponse;
// The initial state argument has to be truly optional even though it can
// be any first class value including `undefined`. We need to distinguish the
// presence vs the absence of it, which we cannot do with the optional argument
// syntax. Rather we use the arity of the arguments array.
//
// If no initial state is provided to `makeNotifierKit`, then it starts without
// an initial state. Its initial state will instead be the state of the first
// update.
export const makeNotifierKit = (...args) => {
let nextPromiseKit = producePromise();
let currentUpdateCount = 1; // avoid falsy numbers
let currentResponse;

const hasState = () => currentResponse !== undefined;

const final = () => currentUpdateCount === undefined;

const extraProperties = () =>
supportLegacy
? {
updateHandle: currentUpdateCount,
done: final(),
}
: {};

if (args.length >= 1) {
// start as hasState() && !final()
currentResponse = harden({
value: args[0],
updateCount: currentUpdateCount,
...extraProperties(),
});
}

function getUpdateSince(updateHandle = undefined) {
if (updateHandle === currentResponse.updateHandle) {
return currentPromiseRec.promise;
// else start as !hasState() && !final()

// NaN matches nothing
function getUpdateSince(updateCount = NaN) {
if (
hasState() &&
(final() || currentResponse.updateCount !== updateCount)
) {
// If hasState() and either it is final() or it is
// not the state of updateCount, return the current state.
return Promise.resolve(currentResponse);
}
return Promise.resolve(currentResponse);
// otherwise return a promise for the next state.
return nextPromiseKit.promise;
}

function updateState(state) {
if (!currentResponse.updateHandle) {
throw new Error('Cannot update state after resolve.');
if (final()) {
throw new Error('Cannot update state after termination.');
}

currentResponse = harden({ value: state, updateHandle: {}, done: false });
currentPromiseRec.resolve(currentResponse);
currentPromiseRec = producePromise();
// become hasState() && !final()
currentUpdateCount += 1;
currentResponse = harden({
value: state,
updateCount: currentUpdateCount,
...extraProperties(),
});
nextPromiseKit.resolve(currentResponse);
nextPromiseKit = producePromise();
}

function resolve(finalState) {
if (!currentResponse.updateHandle) {
throw new Error('Cannot resolve again.');
function finish(finalState) {
if (final()) {
throw new Error('Cannot finish after termination.');
}

// become hasState() && final()
currentUpdateCount = undefined;
currentResponse = harden({
value: finalState,
updateHandle: undefined,
done: true,
updateCount: currentUpdateCount,
...extraProperties(),
});
currentPromiseRec.resolve(currentResponse);
nextPromiseKit.resolve(currentResponse);
nextPromiseKit = undefined;
}

function reject(reason) {
if (final()) {
throw new Error('Cannot reject after termination.');
}

// become !hasState() && final()
currentUpdateCount = undefined;
currentResponse = undefined;
nextPromiseKit.reject(reason);
}

// notifier facet is separate so it can be handed out loosely while updater is
// tightly held
const notifier = harden({ getUpdateSince, getCurrentUpdate });
const updater = harden({ updateState, resolve });
// notifier facet is separate so it can be handed out loosely while updater
// is tightly held
const notifier = harden({ getUpdateSince });
const updater = harden({
updateState,
finish,
reject,
...(supportLegacy ? { resolve: finish } : {}),
});
return harden({ notifier, updater });
};

// Deprecated. TODO remove

export { makeNotifierKit as produceNotifier };
// Deprecated. TODO remove once no clients need it.
// Unlike makeNotifierKit, produceIssuerKit always produces
// a notifier with an initial state, which defaults to undefined.
export const produceNotifier = (initialState = undefined) =>
makeNotifierKit(initialState);
34 changes: 17 additions & 17 deletions packages/notifier/test/test-notifier.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ test('notifier - single update', async t => {
const updateDeNovo = await notifier.getUpdateSince();
t.equals(updateDeNovo.value, 1, 'initial state is one');

const updateInWaiting = notifier.getUpdateSince(updateDeNovo.updateHandle);
const updateInWaiting = notifier.getUpdateSince(updateDeNovo.updateCount);
const all = Promise.all([updateInWaiting]).then(([update]) => {
t.equals(update.value, 3, 'updated state is eventually three');
});

const update2 = await notifier.getUpdateSince({});
const update2 = await notifier.getUpdateSince();
t.equals(update2.value, 1);
updater.updateState(3);
await all;
Expand All @@ -47,15 +47,15 @@ test('notifier - initial update', async t => {
/** @type {NotifierRecord<number>} */
const { notifier, updater } = makeNotifierKit(1);

const updateDeNovo = notifier.getCurrentUpdate();
const updateDeNovo = await notifier.getUpdateSince();
t.equals(updateDeNovo.value, 1, 'initial state is one');

const updateInWaiting = notifier.getUpdateSince(updateDeNovo.updateHandle);
const updateInWaiting = notifier.getUpdateSince(updateDeNovo.updateCount);
const all = Promise.all([updateInWaiting]).then(([update]) => {
t.equals(update.value, 3, 'updated state is eventually three');
});

const update2 = await notifier.getUpdateSince({});
const update2 = await notifier.getUpdateSince();
t.equals(update2.value, 1);
updater.updateState(3);
await all;
Expand All @@ -65,22 +65,22 @@ test('notifier - update after state change', async t => {
t.plan(5);
const { notifier, updater } = makeNotifierKit(1);

const updateDeNovo = notifier.getCurrentUpdate();
const updateDeNovo = await notifier.getUpdateSince();

const updateInWaiting = notifier.getUpdateSince(updateDeNovo.updateHandle);
const updateInWaiting = notifier.getUpdateSince(updateDeNovo.updateCount);
t.equals(updateDeNovo.value, 1, 'first state check (1)');
const all = Promise.all([updateInWaiting]).then(([update1]) => {
t.equals(update1.value, 3, '4th check (delayed) 3');
const thirdStatePromise = notifier.getUpdateSince(update1.updateHandle);
const thirdStatePromise = notifier.getUpdateSince(update1.updateCount);
Promise.all([thirdStatePromise]).then(([update2]) => {
t.equals(update2.value, 5, '5th check (delayed) 5');
});
});

t.equals(notifier.getCurrentUpdate().value, 1, '2nd check (1)');
t.equals((await notifier.getUpdateSince()).value, 1, '2nd check (1)');
updater.updateState(3);

t.equals(notifier.getCurrentUpdate().value, 3, '3rd check (3)');
t.equals((await notifier.getUpdateSince()).value, 3, '3rd check (3)');
updater.updateState(5);
await all;
});
Expand All @@ -90,21 +90,21 @@ test('notifier - final state', async t => {
/** @type {NotifierRecord<number|string>} */
const { notifier, updater } = makeNotifierKit(1);

const updateDeNovo = notifier.getCurrentUpdate();
const updateInWaiting = notifier.getUpdateSince(updateDeNovo.updateHandle);
const updateDeNovo = await notifier.getUpdateSince();
const updateInWaiting = notifier.getUpdateSince(updateDeNovo.updateCount);
t.equals(updateDeNovo.value, 1, 'initial state is one');
const all = Promise.all([updateInWaiting]).then(([update]) => {
t.equals(update.value, 'final', 'state is "final"');
t.notOk(update.updateHandle, 'no handle after close');
const postFinalUpdate = notifier.getUpdateSince(update.updateHandle);
t.notOk(update.updateCount, 'no handle after close');
const postFinalUpdate = notifier.getUpdateSince(update.updateCount);
Promise.all([postFinalUpdate]).then(([after]) => {
t.equals(after.value, 'final', 'stable');
t.notOk(after.updateHandle, 'no handle after close');
t.notOk(after.updateCount, 'no handle after close');
});
});

const invalidHandle = await notifier.getUpdateSince({});
const invalidHandle = await notifier.getUpdateSince();
t.equals(invalidHandle.value, 1, 'still one');
updater.resolve('final');
updater.finish('final');
await all;
});
6 changes: 3 additions & 3 deletions packages/swingset-runner/demo/zoeTests/vat-alice.js
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,11 @@ const build = async (zoe, issuers, payments, installations, timer) => {
await showPurseBalance(simoleanPurseP, 'aliceSimoleanPurse', log);
};

function logStateOnChanges(notifier, lastHandle = undefined) {
const updateRecordP = E(notifier).getUpdateSince(lastHandle);
function logStateOnChanges(notifier, lastCount = undefined) {
const updateRecordP = E(notifier).getUpdateSince(lastCount);
updateRecordP.then(updateRec => {
log(updateRec.value);
logStateOnChanges(notifier, updateRec.updateHandle);
logStateOnChanges(notifier, updateRec.updateCount);
});
}

Expand Down
2 changes: 1 addition & 1 deletion packages/zoe/src/contractFacet.js
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ export function buildRootObject(_vatPowers) {
addOffer: (offerHandle, proposal, allocation) => {
const ignoringUpdater = harden({
updateState: () => {},
resolve: () => {},
finish: () => {},
});
const offerRecord = {
instanceHandle,
Expand Down
2 changes: 1 addition & 1 deletion packages/zoe/src/state.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ const makeOfferTable = () => {
deleteOffers: offerHandles => {
return offerHandles.map(offerHandle => {
const { updater } = table.get(offerHandle);
updater.resolve(undefined);
updater.finish(undefined);
return table.delete(offerHandle);
});
},
Expand Down
6 changes: 3 additions & 3 deletions packages/zoe/src/zoe.js
Original file line number Diff line number Diff line change
Expand Up @@ -611,13 +611,13 @@ function makeZoe(vatAdminSvc) {
// deposited. Keywords in the want clause are mapped to the empty
// amount for that keyword's Issuer.
const recordOffer = amountsArray => {
const notifierRec = makeNotifierKit();
const notifierKit = makeNotifierKit(undefined);
const offerRecord = {
instanceHandle,
proposal: cleanedProposal,
currentAllocation: arrayToObj(amountsArray, userKeywords),
notifier: notifierRec.notifier,
updater: notifierRec.updater,
notifier: notifierKit.notifier,
updater: notifierKit.updater,
};
const { zcfForZoe } = instanceTable.get(instanceHandle);
payoutMap.init(offerHandle, producePromise());
Expand Down
6 changes: 3 additions & 3 deletions packages/zoe/test/swingsetTests/zoe/vat-alice.js
Original file line number Diff line number Diff line change
Expand Up @@ -253,11 +253,11 @@ const build = async (log, zoe, issuers, payments, installations, timer) => {
await showPurseBalance(simoleanPurseP, 'aliceSimoleanPurse', log);
};

function logStateOnChanges(notifier, lastHandle = undefined) {
const updateRecordP = E(notifier).getUpdateSince(lastHandle);
function logStateOnChanges(notifier, lastCount = undefined) {
const updateRecordP = E(notifier).getUpdateSince(lastCount);
updateRecordP.then(updateRec => {
log(updateRec.value);
logStateOnChanges(notifier, updateRec.updateHandle);
logStateOnChanges(notifier, updateRec.updateCount);
});
}

Expand Down
Loading