diff --git a/packages/pegasus/src/pegasus.js b/packages/pegasus/src/pegasus.js index 12eb50d94cd..51456f70463 100644 --- a/packages/pegasus/src/pegasus.js +++ b/packages/pegasus/src/pegasus.js @@ -138,43 +138,39 @@ const makeCourier = ({ /** @type {Receiver} */ const receive = async ({ value, depositAddress }) => { - const tryToDeposit = async () => { - const localAmount = AmountMath.make(localBrand, value); - - // Look up the deposit facet for this board address, if there is one. - /** @type {DepositFacet} */ - const depositFacet = await E(board) - .getValue(depositAddress) - .catch(_ => E(namesByAddress).lookup(depositAddress, 'depositFacet')); - - const { userSeat, zcfSeat } = zcf.makeEmptySeatKit(); - - // Redeem the backing payment. - try { - redeem(zcfSeat, { Transfer: localAmount }); - zcfSeat.exit(); - } catch (e) { - zcfSeat.fail(e); - throw e; - } - - // Once we've gotten to this point, their payment is committed and - // won't be refunded on a failed receive. - const payout = await E(userSeat).getPayout('Transfer'); - - // Send the payout promise to the deposit facet. - E(depositFacet) - .receive(payout) - .catch(_ => {}); - - // We don't want to wait for the depositFacet to return, so that - // it can't hang up (i.e. DoS) an ordered channel, which relies on - // us returning promptly. - }; - - return tryToDeposit() - .then(_ => E(transferProtocol).makeTransferPacketAck(true)) - .catch(error => E(transferProtocol).makeTransferPacketAck(false, error)); + const localAmount = AmountMath.make(localBrand, value); + + // Look up the deposit facet for this board address, if there is one. + /** @type {DepositFacet} */ + const depositFacet = await E(board) + .getValue(depositAddress) + .catch(_ => E(namesByAddress).lookup(depositAddress, 'depositFacet')); + + const { userSeat, zcfSeat } = zcf.makeEmptySeatKit(); + + // Redeem the backing payment. + try { + redeem(zcfSeat, { Transfer: localAmount }); + zcfSeat.exit(); + } catch (e) { + zcfSeat.fail(e); + throw e; + } + + // Once we've gotten to this point, their payment is committed and + // won't be refunded on a failed receive. + const payout = await E(userSeat).getPayout('Transfer'); + + // Send the payout promise to the deposit facet. + // + // We don't want to wait for the depositFacet to return, so that + // it can't hang up (i.e. DoS) an ordered channel, which relies on + // us returning promptly. + E(depositFacet) + .receive(payout) + .catch(_ => {}); + + return E(transferProtocol).makeTransferPacketAck(true); }; return Far('courier', { send, receive }); @@ -280,23 +276,29 @@ const makePegasus = (zcf, board, namesByAddress) => { }); }, async onReceive(_c, packetBytes) { - // Dispatch the packet to the appropriate Peg for this connection. - const parts = await E(transferProtocol).parseTransferPacket( - packetBytes, + const doReceive = async () => { + // Dispatch the packet to the appropriate Peg for this connection. + const parts = await E(transferProtocol).parseTransferPacket( + packetBytes, + ); + + const { remoteDenom } = parts; + assert.typeof(remoteDenom, 'string'); + + if (!remoteDenomToCourierPK.has(remoteDenom)) { + // This is the first time we've heard of this denomination. + remoteDenomPublication.updateState(remoteDenom); + } + + // Wait for the courier to be instantiated. + const courierPK = getCourierPK(remoteDenom, remoteDenomToCourierPK); + const { receive } = await courierPK.promise; + return receive(parts); + }; + + return doReceive().catch(error => + E(transferProtocol).makeTransferPacketAck(false, error), ); - - const { remoteDenom } = parts; - assert.typeof(remoteDenom, 'string'); - - if (!remoteDenomToCourierPK.has(remoteDenom)) { - // This is the first time we've heard of this denomination. - remoteDenomPublication.updateState(remoteDenom); - } - - // Wait for the courier to be instantiated. - const courierPK = getCourierPK(remoteDenom, remoteDenomToCourierPK); - const { receive } = await courierPK.promise; - return receive(parts); }, async onClose(c) { // Unregister C. Pending transfers will be rejected by the Network API. @@ -339,7 +341,8 @@ const makePegasus = (zcf, board, namesByAddress) => { connection, ); - const { reject } = remoteDenomToCourierPK.get(remoteDenom); + const { reject, promise } = remoteDenomToCourierPK.get(remoteDenom); + promise.catch(() => {}); reject(assert.error(X`${remoteDenom} is temporarily unavailable`)); // Allow new transfers to be initiated. diff --git a/packages/pegasus/test/test-peg.js b/packages/pegasus/test/test-peg.js index 0d0a91a5bf9..c08d0d4d72d 100644 --- a/packages/pegasus/test/test-peg.js +++ b/packages/pegasus/test/test-peg.js @@ -13,6 +13,10 @@ import { makeZoeKit } from '@agoric/zoe'; import fakeVatAdmin from '@agoric/zoe/tools/fakeVatAdmin.js'; import { Far } from '@endo/marshal'; +import { makeSubscription } from '@agoric/notifier'; + +import '@agoric/ertp/exported.js'; +import { makePromiseKit } from '@agoric/promise-kit'; const filename = new URL(import.meta.url).pathname; const dirname = path.dirname(filename); @@ -23,12 +27,15 @@ const contractPath = `${dirname}/../src/pegasus.js`; * @param {import('tape-promise/tape').Test} t */ async function testRemotePeg(t) { - t.plan(13); + t.plan(16); /** - * @type {import('@agoric/ertp').DepositFacet?} + * @type {PromiseRecord} */ - let localDepositFacet; + const { + promise: localDepositFacet, + resolve: resolveLocalDepositFacet, + } = makePromiseKit(); const fakeBoard = Far('fakeBoard', { getValue(id) { if (id === '0x1234') { @@ -103,13 +110,6 @@ async function testRemotePeg(t) { const chandler = E(pegasus).makePegConnectionHandler(); const connP = E(portP).connect(portName, chandler); - const pegP = await E(pegasus).pegRemote('Gaia', connP, 'uatom'); - const localBrand = await E(pegP).getLocalBrand(); - const localIssuer = await E(pegasus).getLocalIssuer(localBrand); - - const localPurseP = E(localIssuer).makeEmptyPurse(); - localDepositFacet = await E(localPurseP).getDepositFacet(); - // Get some local Atoms. const sendPacket = { amount: '100000000000000000001', @@ -117,8 +117,26 @@ async function testRemotePeg(t) { receiver: '0x1234', sender: 'FIXME:sender', }; + await connP; + const sendAckDataP = E(gaiaConnection).send(JSON.stringify(sendPacket)); + + // Note that we can create the peg after the fact. + const remoteDenomSub = makeSubscription( + E( + E(pegasus).getRemoteDenomSubscription(connP), + ).getSharableSubscriptionInternals(), + ); + const remoteDenomAit = remoteDenomSub[Symbol.asyncIterator](); + t.deepEqual(await remoteDenomAit.next(), { done: false, value: 'uatom' }); - const sendAckData = await E(gaiaConnection).send(JSON.stringify(sendPacket)); + const pegP = await E(pegasus).pegRemote('Gaia', connP, 'uatom'); + const localBrand = await E(pegP).getLocalBrand(); + const localIssuer = await E(pegasus).getLocalIssuer(localBrand); + + const localPurseP = E(localIssuer).makeEmptyPurse(); + resolveLocalDepositFacet(E(localPurseP).getDepositFacet()); + + const sendAckData = await sendAckDataP; const sendAck = JSON.parse(sendAckData); t.deepEqual(sendAck, { success: true }, 'Gaia sent the atoms'); if (!sendAck.success) { @@ -155,6 +173,26 @@ async function testRemotePeg(t) { 'we received more shadow atoms', ); + const sendPacket3 = { + amount: '13', + denom: 'umuon', + receiver: 'agoric1234567', + sender: 'FIXME:sender4', + }; + const sendAckData3P = E(gaiaConnection).send(JSON.stringify(sendPacket3)); + + // Wait for the packet to go through. + t.deepEqual(await remoteDenomAit.next(), { done: false, value: 'umuon' }); + E(pegasus).rejectStuckTransfers(connP, 'umuon'); + + const sendAckData3 = await sendAckData3P; + const sendAck3 = JSON.parse(sendAckData3); + t.deepEqual( + sendAck3, + { success: false, error: 'Error: "umuon" is temporarily unavailable' }, + 'rejecting transfers works', + ); + const localAtoms = await E(localPurseP).withdraw(localAtomsAmount); const allegedName = await E(pegP).getAllegedName();