Skip to content

Commit

Permalink
fix: rejectStuckTransfers sends a TransferProtocol-level error
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelfig committed Feb 4, 2022
1 parent 54bf0bc commit 8374f94
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 65 deletions.
111 changes: 57 additions & 54 deletions packages/pegasus/src/pegasus.js
Original file line number Diff line number Diff line change
Expand Up @@ -138,43 +138,39 @@ const makeCourier = ({

/** @type {Receiver} */
const receive = async ({ value, depositAddress }) => {
const tryToDeposit = async () => {
const localAmount = AmountMath.make(localBrand, value);

// Look up the deposit facet for this board address, if there is one.
/** @type {DepositFacet} */
const depositFacet = await E(board)
.getValue(depositAddress)
.catch(_ => E(namesByAddress).lookup(depositAddress, 'depositFacet'));

const { userSeat, zcfSeat } = zcf.makeEmptySeatKit();

// Redeem the backing payment.
try {
redeem(zcfSeat, { Transfer: localAmount });
zcfSeat.exit();
} catch (e) {
zcfSeat.fail(e);
throw e;
}

// Once we've gotten to this point, their payment is committed and
// won't be refunded on a failed receive.
const payout = await E(userSeat).getPayout('Transfer');

// Send the payout promise to the deposit facet.
E(depositFacet)
.receive(payout)
.catch(_ => {});

// We don't want to wait for the depositFacet to return, so that
// it can't hang up (i.e. DoS) an ordered channel, which relies on
// us returning promptly.
};

return tryToDeposit()
.then(_ => E(transferProtocol).makeTransferPacketAck(true))
.catch(error => E(transferProtocol).makeTransferPacketAck(false, error));
const localAmount = AmountMath.make(localBrand, value);

// Look up the deposit facet for this board address, if there is one.
/** @type {DepositFacet} */
const depositFacet = await E(board)
.getValue(depositAddress)
.catch(_ => E(namesByAddress).lookup(depositAddress, 'depositFacet'));

const { userSeat, zcfSeat } = zcf.makeEmptySeatKit();

// Redeem the backing payment.
try {
redeem(zcfSeat, { Transfer: localAmount });
zcfSeat.exit();
} catch (e) {
zcfSeat.fail(e);
throw e;
}

// Once we've gotten to this point, their payment is committed and
// won't be refunded on a failed receive.
const payout = await E(userSeat).getPayout('Transfer');

// Send the payout promise to the deposit facet.
//
// We don't want to wait for the depositFacet to return, so that
// it can't hang up (i.e. DoS) an ordered channel, which relies on
// us returning promptly.
E(depositFacet)
.receive(payout)
.catch(_ => {});

return E(transferProtocol).makeTransferPacketAck(true);
};

return Far('courier', { send, receive });
Expand Down Expand Up @@ -280,23 +276,29 @@ const makePegasus = (zcf, board, namesByAddress) => {
});
},
async onReceive(_c, packetBytes) {
// Dispatch the packet to the appropriate Peg for this connection.
const parts = await E(transferProtocol).parseTransferPacket(
packetBytes,
const doReceive = async () => {
// Dispatch the packet to the appropriate Peg for this connection.
const parts = await E(transferProtocol).parseTransferPacket(
packetBytes,
);

const { remoteDenom } = parts;
assert.typeof(remoteDenom, 'string');

if (!remoteDenomToCourierPK.has(remoteDenom)) {
// This is the first time we've heard of this denomination.
remoteDenomPublication.updateState(remoteDenom);
}

// Wait for the courier to be instantiated.
const courierPK = getCourierPK(remoteDenom, remoteDenomToCourierPK);
const { receive } = await courierPK.promise;
return receive(parts);
};

return doReceive().catch(error =>
E(transferProtocol).makeTransferPacketAck(false, error),
);

const { remoteDenom } = parts;
assert.typeof(remoteDenom, 'string');

if (!remoteDenomToCourierPK.has(remoteDenom)) {
// This is the first time we've heard of this denomination.
remoteDenomPublication.updateState(remoteDenom);
}

// Wait for the courier to be instantiated.
const courierPK = getCourierPK(remoteDenom, remoteDenomToCourierPK);
const { receive } = await courierPK.promise;
return receive(parts);
},
async onClose(c) {
// Unregister C. Pending transfers will be rejected by the Network API.
Expand Down Expand Up @@ -339,7 +341,8 @@ const makePegasus = (zcf, board, namesByAddress) => {
connection,
);

const { reject } = remoteDenomToCourierPK.get(remoteDenom);
const { reject, promise } = remoteDenomToCourierPK.get(remoteDenom);
promise.catch(() => {});
reject(assert.error(X`${remoteDenom} is temporarily unavailable`));

// Allow new transfers to be initiated.
Expand Down
60 changes: 49 additions & 11 deletions packages/pegasus/test/test-peg.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import { makeZoeKit } from '@agoric/zoe';

import fakeVatAdmin from '@agoric/zoe/tools/fakeVatAdmin.js';
import { Far } from '@endo/marshal';
import { makeSubscription } from '@agoric/notifier';

import '@agoric/ertp/exported.js';
import { makePromiseKit } from '@agoric/promise-kit';

const filename = new URL(import.meta.url).pathname;
const dirname = path.dirname(filename);
Expand All @@ -23,12 +27,15 @@ const contractPath = `${dirname}/../src/pegasus.js`;
* @param {import('tape-promise/tape').Test} t
*/
async function testRemotePeg(t) {
t.plan(13);
t.plan(16);

/**
* @type {import('@agoric/ertp').DepositFacet?}
* @type {PromiseRecord<import('@agoric/ertp').DepositFacet>}
*/
let localDepositFacet;
const {
promise: localDepositFacet,
resolve: resolveLocalDepositFacet,
} = makePromiseKit();
const fakeBoard = Far('fakeBoard', {
getValue(id) {
if (id === '0x1234') {
Expand Down Expand Up @@ -103,22 +110,33 @@ async function testRemotePeg(t) {
const chandler = E(pegasus).makePegConnectionHandler();
const connP = E(portP).connect(portName, chandler);

const pegP = await E(pegasus).pegRemote('Gaia', connP, 'uatom');
const localBrand = await E(pegP).getLocalBrand();
const localIssuer = await E(pegasus).getLocalIssuer(localBrand);

const localPurseP = E(localIssuer).makeEmptyPurse();
localDepositFacet = await E(localPurseP).getDepositFacet();

// Get some local Atoms.
const sendPacket = {
amount: '100000000000000000001',
denom: 'uatom',
receiver: '0x1234',
sender: 'FIXME:sender',
};
await connP;
const sendAckDataP = E(gaiaConnection).send(JSON.stringify(sendPacket));

// Note that we can create the peg after the fact.
const remoteDenomSub = makeSubscription(
E(
E(pegasus).getRemoteDenomSubscription(connP),
).getSharableSubscriptionInternals(),
);
const remoteDenomAit = remoteDenomSub[Symbol.asyncIterator]();
t.deepEqual(await remoteDenomAit.next(), { done: false, value: 'uatom' });

const sendAckData = await E(gaiaConnection).send(JSON.stringify(sendPacket));
const pegP = await E(pegasus).pegRemote('Gaia', connP, 'uatom');
const localBrand = await E(pegP).getLocalBrand();
const localIssuer = await E(pegasus).getLocalIssuer(localBrand);

const localPurseP = E(localIssuer).makeEmptyPurse();
resolveLocalDepositFacet(E(localPurseP).getDepositFacet());

const sendAckData = await sendAckDataP;
const sendAck = JSON.parse(sendAckData);
t.deepEqual(sendAck, { success: true }, 'Gaia sent the atoms');
if (!sendAck.success) {
Expand Down Expand Up @@ -155,6 +173,26 @@ async function testRemotePeg(t) {
'we received more shadow atoms',
);

const sendPacket3 = {
amount: '13',
denom: 'umuon',
receiver: 'agoric1234567',
sender: 'FIXME:sender4',
};
const sendAckData3P = E(gaiaConnection).send(JSON.stringify(sendPacket3));

// Wait for the packet to go through.
t.deepEqual(await remoteDenomAit.next(), { done: false, value: 'umuon' });
E(pegasus).rejectStuckTransfers(connP, 'umuon');

const sendAckData3 = await sendAckData3P;
const sendAck3 = JSON.parse(sendAckData3);
t.deepEqual(
sendAck3,
{ success: false, error: 'Error: "umuon" is temporarily unavailable' },
'rejecting transfers works',
);

const localAtoms = await E(localPurseP).withdraw(localAtomsAmount);

const allegedName = await E(pegP).getAllegedName();
Expand Down

0 comments on commit 8374f94

Please sign in to comment.