diff --git a/packages/pegasus/src/pegasus.js b/packages/pegasus/src/pegasus.js index aed2417d510..12eb50d94cd 100644 --- a/packages/pegasus/src/pegasus.js +++ b/packages/pegasus/src/pegasus.js @@ -1,10 +1,11 @@ // @ts-check import { assert, details as X, q } from '@agoric/assert'; -import { makeLegacyWeakMap, makeStore } from '@agoric/store'; +import { makeLegacyWeakMap, makeStore, makeLegacyMap } from '@agoric/store'; import { E } from '@agoric/eventual-send'; import { assertProposalShape } from '@agoric/zoe/src/contractSupport/index.js'; import { Far } from '@endo/marshal'; +import { makeSubscriptionKit } from '@agoric/notifier'; import '@agoric/vats/exported.js'; import '@agoric/swingset-vat/src/vats/network/types.js'; @@ -190,6 +191,7 @@ const makePegasus = (zcf, board, namesByAddress) => { /** * @typedef {Object} LocalDenomState * @property {Store>} remoteDenomToCourierPK + * @property {Subscription} remoteDenomSubscription * @property {number} lastDenomNonce * @property {ERef} transferProtocol */ @@ -257,7 +259,16 @@ const makePegasus = (zcf, board, namesByAddress) => { /** * @type {Store>} */ - const remoteDenomToCourierPK = makeStore('Denomination'); + const remoteDenomToCourierPK = makeLegacyMap('Denomination'); + + /** + * @type {SubscriptionRecord} + */ + const { + subscription: remoteDenomSubscription, + publication: remoteDenomPublication, + } = makeSubscriptionKit(); + return Far('pegConnectionHandler', { async onOpen(c) { // Register C with the table of Peg receivers. @@ -265,9 +276,10 @@ const makePegasus = (zcf, board, namesByAddress) => { remoteDenomToCourierPK, lastDenomNonce: 0, transferProtocol, + remoteDenomSubscription, }); }, - async onReceive(c, packetBytes) { + async onReceive(_c, packetBytes) { // Dispatch the packet to the appropriate Peg for this connection. const parts = await E(transferProtocol).parseTransferPacket( packetBytes, @@ -276,6 +288,11 @@ const makePegasus = (zcf, board, namesByAddress) => { const { remoteDenom } = parts; assert.typeof(remoteDenom, 'string'); + if (!remoteDenomToCourierPK.has(remoteDenom)) { + // This is the first time we've heard of this denomination. + remoteDenomPublication.updateState(remoteDenom); + } + // Wait for the courier to be instantiated. const courierPK = getCourierPK(remoteDenom, remoteDenomToCourierPK); const { receive } = await courierPK.promise; @@ -284,9 +301,51 @@ const makePegasus = (zcf, board, namesByAddress) => { async onClose(c) { // Unregister C. Pending transfers will be rejected by the Network API. connectionToLocalDenomState.delete(c); + remoteDenomPublication.fail( + assert.error(X`pegConnectionHandler closed`), + ); }, }); }, + + /** + * Get a subscription for remote denoms added on a connection. + * + * @param {ERef} connectionP + */ + async getRemoteDenomSubscription(connectionP) { + const connection = await connectionP; + const { remoteDenomSubscription } = connectionToLocalDenomState.get( + connection, + ); + return remoteDenomSubscription; + }, + + /** + * Abort any in-progress remoteDenom transfers if there has not yet been a + * pegRemote or pegLocal for it. + * + * This races against any attempts to obtain metadata and establish a given + * peg. + * + * It's alright to expose to the holder of the connection. + * + * @param {ERef} connectionP + * @param {string} remoteDenom + */ + async rejectStuckTransfers(connectionP, remoteDenom) { + const connection = await connectionP; + const { remoteDenomToCourierPK } = connectionToLocalDenomState.get( + connection, + ); + + const { reject } = remoteDenomToCourierPK.get(remoteDenom); + reject(assert.error(X`${remoteDenom} is temporarily unavailable`)); + + // Allow new transfers to be initiated. + remoteDenomToCourierPK.delete(remoteDenom); + }, + /** * Peg a remote asset over a network connection. * diff --git a/packages/pegasus/test/test-peg.js b/packages/pegasus/test/test-peg.js index 43c49e9e41b..0d0a91a5bf9 100644 --- a/packages/pegasus/test/test-peg.js +++ b/packages/pegasus/test/test-peg.js @@ -183,5 +183,4 @@ async function testRemotePeg(t) { t.assert(!stillIsLive, 'payment is consumed'); } -test('remote peg', t => - testRemotePeg(t).catch(err => t.not(err, err, 'unexpected exception'))); +test('remote peg', t => testRemotePeg(t));