Skip to content

Commit

Permalink
feat(pegasus): rejectStuckTransfers, getRemoteDenomSubscription
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed Feb 4, 2022
1 parent a5413f7 commit 54bf0bc
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 5 deletions.
65 changes: 62 additions & 3 deletions packages/pegasus/src/pegasus.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
// @ts-check

import { assert, details as X, q } from '@agoric/assert';
import { makeLegacyWeakMap, makeStore } from '@agoric/store';
import { makeLegacyWeakMap, makeStore, makeLegacyMap } from '@agoric/store';
import { E } from '@agoric/eventual-send';
import { assertProposalShape } from '@agoric/zoe/src/contractSupport/index.js';
import { Far } from '@endo/marshal';
import { makeSubscriptionKit } from '@agoric/notifier';

import '@agoric/vats/exported.js';
import '@agoric/swingset-vat/src/vats/network/types.js';
Expand Down Expand Up @@ -190,6 +191,7 @@ const makePegasus = (zcf, board, namesByAddress) => {
/**
* @typedef {Object} LocalDenomState
* @property {Store<Denom, PromiseRecord<Courier>>} remoteDenomToCourierPK
* @property {Subscription<Denom>} remoteDenomSubscription
* @property {number} lastDenomNonce
* @property {ERef<TransferProtocol>} transferProtocol
*/
Expand Down Expand Up @@ -257,17 +259,27 @@ const makePegasus = (zcf, board, namesByAddress) => {
/**
* @type {Store<Denom, PromiseRecord<Courier>>}
*/
const remoteDenomToCourierPK = makeStore('Denomination');
const remoteDenomToCourierPK = makeLegacyMap('Denomination');

/**
* @type {SubscriptionRecord<Denom>}
*/
const {
subscription: remoteDenomSubscription,
publication: remoteDenomPublication,
} = makeSubscriptionKit();

return Far('pegConnectionHandler', {
async onOpen(c) {
// Register C with the table of Peg receivers.
connectionToLocalDenomState.init(c, {
remoteDenomToCourierPK,
lastDenomNonce: 0,
transferProtocol,
remoteDenomSubscription,
});
},
async onReceive(c, packetBytes) {
async onReceive(_c, packetBytes) {
// Dispatch the packet to the appropriate Peg for this connection.
const parts = await E(transferProtocol).parseTransferPacket(
packetBytes,
Expand All @@ -276,6 +288,11 @@ const makePegasus = (zcf, board, namesByAddress) => {
const { remoteDenom } = parts;
assert.typeof(remoteDenom, 'string');

if (!remoteDenomToCourierPK.has(remoteDenom)) {
// This is the first time we've heard of this denomination.
remoteDenomPublication.updateState(remoteDenom);
}

// Wait for the courier to be instantiated.
const courierPK = getCourierPK(remoteDenom, remoteDenomToCourierPK);
const { receive } = await courierPK.promise;
Expand All @@ -284,9 +301,51 @@ const makePegasus = (zcf, board, namesByAddress) => {
async onClose(c) {
// Unregister C. Pending transfers will be rejected by the Network API.
connectionToLocalDenomState.delete(c);
remoteDenomPublication.fail(
assert.error(X`pegConnectionHandler closed`),
);
},
});
},

/**
* Get a subscription for remote denoms added on a connection.
*
* @param {ERef<Connection>} connectionP
*/
async getRemoteDenomSubscription(connectionP) {
const connection = await connectionP;
const { remoteDenomSubscription } = connectionToLocalDenomState.get(
connection,
);
return remoteDenomSubscription;
},

/**
* Abort any in-progress remoteDenom transfers if there has not yet been a
* pegRemote or pegLocal for it.
*
* This races against any attempts to obtain metadata and establish a given
* peg.
*
* It's alright to expose to the holder of the connection.
*
* @param {ERef<Connection>} connectionP
* @param {string} remoteDenom
*/
async rejectStuckTransfers(connectionP, remoteDenom) {
const connection = await connectionP;
const { remoteDenomToCourierPK } = connectionToLocalDenomState.get(
connection,
);

const { reject } = remoteDenomToCourierPK.get(remoteDenom);
reject(assert.error(X`${remoteDenom} is temporarily unavailable`));

// Allow new transfers to be initiated.
remoteDenomToCourierPK.delete(remoteDenom);
},

/**
* Peg a remote asset over a network connection.
*
Expand Down
3 changes: 1 addition & 2 deletions packages/pegasus/test/test-peg.js
Original file line number Diff line number Diff line change
Expand Up @@ -183,5 +183,4 @@ async function testRemotePeg(t) {
t.assert(!stillIsLive, 'payment is consumed');
}

test('remote peg', t =>
testRemotePeg(t).catch(err => t.not(err, err, 'unexpected exception')));
test('remote peg', t => testRemotePeg(t));

0 comments on commit 54bf0bc

Please sign in to comment.