Skip to content

Commit

Permalink
fix: simplify scheduling in distributeFees
Browse files Browse the repository at this point in the history
closes #3044
  • Loading branch information
Chris-Hibbert committed May 6, 2021
1 parent 56c5943 commit fec73fd
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 133 deletions.
3 changes: 0 additions & 3 deletions packages/vats/src/bootstrap.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,6 @@ export function buildRootObject(vatPowers, vatParameters) {
// Start the reward distributor.
const epochTimerService = chainTimerService;
const distributorParams = {
depositsPerUpdate: 51,
updateInterval: 1n, // 1 second
epochInterval: 60n * 60n, // 1 hour
runIssuer: centralIssuer,
runBrand: centralBrand,
Expand All @@ -175,7 +173,6 @@ export function buildRootObject(vatPowers, vatParameters) {
E(vats.distributeFees).makeTreasuryFeeCollector(zoe, treasuryCreator),
E(bankManager).getDepositFacet(),
epochTimerService,
chainTimerService,
harden(distributorParams),
)
.catch(e => console.error('Error distributing fees', e));
Expand Down
50 changes: 19 additions & 31 deletions packages/vats/src/distributeFees.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,13 @@ export function makeTreasuryFeeCollector(zoe, treasuryCreatorFacet) {
// from accounts to purses. Each time the epochTimer signals the end of an
// Epoch, it will ask the bank's notifier for a fresh list of accounts and ask
// the treasury for the fees that have been collected to date. It will then
// divide the funds evenly amount the accounts, and send payments in batches of
// depositsPerUpdate every updateInterval. When the payment doesn't divide
// evenly among the accounts, it holds the remainder till the next epoch.
// divide the funds evenly amount the accounts, and send payments.
// When the payment doesn't divide evenly among the accounts, it holds the
// remainder till the next epoch.

/** @type {BuildFeeDistributor} */
export function buildDistributor(treasury, bank, epochTimer, timer, params) {
export function buildDistributor(treasury, bank, epochTimer, params) {
const {
depositsPerUpdate,
updateInterval,
// By default, we assume the epochTimer fires once per epoch.
epochInterval = 1n,
runIssuer,
Expand All @@ -42,32 +40,21 @@ export function buildDistributor(treasury, bank, epochTimer, timer, params) {
const accountsNotifier = E(bank).getAccountsNotifier();
let leftOverPayment;
let leftOverValue = 0n;
const queuedAccounts = [];
const queuedPayments = [];
let lastWallTimeUpdate;
const timerNotifier = E(timer).makeNotifier(0n, updateInterval);

/**
* @param accounts
* @param payments
* @param {(pmt: Payment[]) => void} disposeRejectedPayments
*/
async function scheduleDeposits(disposeRejectedPayments) {
if (!queuedPayments.length) {
async function scheduleDeposits(accounts, payments, disposeRejectedPayments) {
if (!payments.length) {
return;
}

({ updateCount: lastWallTimeUpdate } = await E(
timerNotifier,
).getUpdateSince(lastWallTimeUpdate));

// queuedPayments may have changed since the `await`.
if (!queuedPayments.length) {
return;
}

const accounts = queuedAccounts.splice(0, depositsPerUpdate);
const payments = queuedPayments.splice(0, depositsPerUpdate);
E(bank)
.depositMultiple(runBrand, accounts, payments)
// The scheduler will decide how fast to process the deposits.
Promise.all(
accounts.map((acct, i) => E(bank).deposit(runBrand, acct, payments[i])),
)
.then(settledResults => {
const rejectedPayments = payments.filter(
(_pmt, i) =>
Expand All @@ -78,7 +65,6 @@ export function buildDistributor(treasury, bank, epochTimer, timer, params) {
disposeRejectedPayments(rejectedPayments);
})
.catch(e => console.error(`distributeFees cannot depositMultiple`, e));
scheduleDeposits(disposeRejectedPayments);
}

async function schedulePayments() {
Expand All @@ -105,12 +91,14 @@ export function buildDistributor(treasury, bank, epochTimer, timer, params) {
const manyPayments = await E(runIssuer).splitMany(payment, amounts);
// manyPayments is hardened, so we can't use pop()
leftOverPayment = manyPayments[manyPayments.length - 1];
queuedPayments.push(...manyPayments.slice(0, manyPayments.length - 1));
queuedAccounts.push(...accounts);

scheduleDeposits(_pmts => {
// TODO: Somehow reclaim the rejected payments.
});
scheduleDeposits(
accounts,
manyPayments.slice(0, manyPayments.length - 1),
_pmts => {
// TODO: Somehow reclaim the rejected payments.
},
);
}

const timeObserver = {
Expand Down
15 changes: 4 additions & 11 deletions packages/vats/src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,13 @@
/**
* @typedef {Object} BankDepositFacet
*
* @property {(brand: Brand, accounts: string[], payments: Payment[]) => Promise<PromiseSettledResult<Amount>[]>} depositMultiple
* @property {(brand: Brand, account: string, payment: Payment) => Promise<PromiseSettledResult<Amount>>} deposit
* @property {() => Notifier<string[]>} getAccountsNotifier
*/

/**
* @typedef {Object} DistributorParams
*
* @property {number} depositsPerUpdate - (number) how many payments should be
* sent to the Bank interface per updateInterval
* @property {bigint} updateInterval - (bigint) parameter to the timer
* controlling the interval at which deposits are sent to the Bank API
* @property {bigint} [epochInterval=1n] - parameter to the epochTimer
* controlling the interval at which rewards should be sent to the bank.
* @property {Issuer} runIssuer
Expand All @@ -77,12 +73,9 @@
* collectFees() method, which will return a payment. can be populated with
* makeTreasuryFeeCollector(zoe, treasuryCreatorFacet)
* @param {ERef<BankDepositFacet>} bank - object with getAccountsNotifier() and
* depositMultiple() @param {ERef<TimerService>} epochTimer - timer that
* notifies at the end of each Epoch. The epochInterval parameter controls the
* interval.
* @param {ERef<TimerService>} timer - timer controlling frequency at which
* batches of payments are sent to the bank for processing. The parameter
* updateInterval specifies the interval at which updates are sent.
* deposit()
* @param {ERef<TimerService>} epochTimer - timer that notifies at the end of
* each Epoch. The epochInterval parameter controls the interval.
* @param {DistributorParams} params
* @returns {Promise<void>}
*/
41 changes: 5 additions & 36 deletions packages/vats/src/vat-bank.js
Original file line number Diff line number Diff line change
Expand Up @@ -265,42 +265,11 @@ export function buildRootObject(_vatPowers) {
getAccountsNotifier() {
return accountsNotifier;
},
/**
* Send many independent deposits, all of the same brand. If any of them
* fail, then you should reclaim the corresponding payments since they
* didn't get deposited.
*
* @param {Brand} brand
* @param {Array<string>} accounts
* @param {Array<Payment>} payments
* @returns {Promise<PromiseSettledResult<Amount>[]>}
*/
depositMultiple(brand, accounts, payments) {
/**
* @param {string} account
* @param {Payment} payment
*/
const doDeposit = async (account, payment) => {
// The purse we send it to will do the proper verification as part
// of deposit.
const bank = getBankForAddress(account);
const purse = bank.getPurse(brand);
return E(purse).deposit(payment);
};

// We want just a regular iterable that yields deposit promises.
function* generateDepositPromises() {
const max = Math.max(accounts.length, payments.length);
for (let i = 0; i < max; i += 1) {
// Create a deposit promise.
yield doDeposit(accounts[i], payments[i]);
}
}

// We wait for all deposits to settle so that the whole batch
// completes, even if there are failures with individual accounts,
// payments, or deposits.
return Promise.allSettled(generateDepositPromises());
deposit(brand, account, payment) {
// The purse will do the proper verification as part of deposit.
const bank = getBankForAddress(account);
const purse = bank.getPurse(brand);
return E(purse).deposit(payment);
},
});

Expand Down
71 changes: 19 additions & 52 deletions packages/vats/test/test-distributeFees.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ function makeFakeBank() {

return {
getAccountsNotifier: () => notifier,
depositMultiple: (brand, a, p) => {
deposit: async (brand, a, p) => {
depositAccounts.push(a);
depositPayments.push(p);
return p.map(_pmt => ({ status: 'fulfilled' }));
return { status: 'fulfilled' };
},

// tools for the fake:
Expand Down Expand Up @@ -64,15 +64,12 @@ test('fee distribution', async t => {
const bankUpdater = bank.getUpdater();
const treasury = makeFakeTreasury();
const epochTimer = buildManualTimer(console.log);
const wallTimer = buildManualTimer(console.log);
const distributorParams = {
depositsPerUpdate: 2,
updateInterval: 1n,
epochInterval: 1n,
runIssuer: issuer,
runBrand: brand,
};
buildDistributor(treasury, bank, epochTimer, wallTimer, distributorParams);
buildDistributor(treasury, bank, epochTimer, distributorParams);

treasury.pushFees(runMint.mintPayment(amountMath.make(brand, 500n)));
bankUpdater.updateState(['a37', 'a2389', 'a274', 'a16', 'a1772']);
Expand All @@ -83,36 +80,8 @@ test('fee distribution', async t => {
await epochTimer.tick();
await waitForPromisesToSettle();

t.deepEqual(bank.getAccounts(), [['a37', 'a2389']]);
assertPaymentArray(t, bank.getPayments()[0], 2, 100, issuer, brand);

await wallTimer.tick();
waitForPromisesToSettle();

t.deepEqual(bank.getAccounts(), [
['a37', 'a2389'],
['a274', 'a16'],
]);
assertPaymentArray(t, bank.getPayments()[1], 2, 100, issuer, brand);

await wallTimer.tick();
waitForPromisesToSettle();

t.deepEqual(bank.getAccounts(), [
['a37', 'a2389'],
['a274', 'a16'],
['a1772'],
]);
assertPaymentArray(t, bank.getPayments()[2], 1, 100, issuer, brand);

await wallTimer.tick();
waitForPromisesToSettle();

t.deepEqual(bank.getAccounts(), [
['a37', 'a2389'],
['a274', 'a16'],
['a1772'],
]);
t.deepEqual(bank.getAccounts(), ['a37', 'a2389', 'a274', 'a16', 'a1772']);
assertPaymentArray(t, bank.getPayments(), 5, 100, issuer, brand);
});

test('fee distribution, leftovers', async t => {
Expand All @@ -122,15 +91,12 @@ test('fee distribution, leftovers', async t => {
const bankUpdater = bank.getUpdater();
const treasury = makeFakeTreasury();
const epochTimer = buildManualTimer(console.log);
const wallTimer = buildManualTimer(console.log);
const distributorParams = {
depositsPerUpdate: 7,
updateInterval: 1n,
epochInterval: 1n,
runIssuer: issuer,
runBrand: brand,
};
buildDistributor(treasury, bank, epochTimer, wallTimer, distributorParams);
buildDistributor(treasury, bank, epochTimer, distributorParams);

treasury.pushFees(runMint.mintPayment(amountMath.make(brand, 12n)));
bankUpdater.updateState(['a37', 'a2389', 'a274', 'a16', 'a1772']);
Expand All @@ -141,25 +107,26 @@ test('fee distribution, leftovers', async t => {
await epochTimer.tick();
await waitForPromisesToSettle();

t.deepEqual(bank.getAccounts(), [['a37', 'a2389', 'a274', 'a16', 'a1772']]);
assertPaymentArray(t, bank.getPayments()[0], 5, 2, issuer, brand);

await wallTimer.tick();
waitForPromisesToSettle();
t.deepEqual(bank.getAccounts(), ['a37', 'a2389', 'a274', 'a16', 'a1772']);
assertPaymentArray(t, bank.getPayments(), 5, 2, issuer, brand);

// Pay them again
treasury.pushFees(runMint.mintPayment(amountMath.make(brand, 13n)));
await wallTimer.tick();

await epochTimer.tick();
await waitForPromisesToSettle();

await wallTimer.tick();
waitForPromisesToSettle();

t.deepEqual(bank.getAccounts(), [
['a37', 'a2389', 'a274', 'a16', 'a1772'],
['a37', 'a2389', 'a274', 'a16', 'a1772'],
'a37',
'a2389',
'a274',
'a16',
'a1772',
'a37',
'a2389',
'a274',
'a16',
'a1772',
]);
assertPaymentArray(t, bank.getPayments()[1], 5, 3, issuer, brand);
assertPaymentArray(t, bank.getPayments().slice(5, 10), 5, 3, issuer, brand);
});

0 comments on commit fec73fd

Please sign in to comment.