Skip to content

Commit

Permalink
get recent participants from spark-evaluations-recent-participants
Browse files Browse the repository at this point in the history
  • Loading branch information
juliangruber committed Sep 11, 2024
1 parent 07cff62 commit 19e6327
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 26 deletions.
18 changes: 12 additions & 6 deletions observer/bin/dry-run.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator'
import * as SparkEvaluationsRecentParticipants from '@filecoin-station/spark-evaluations-recent-participants'
import { ethers } from 'ethers'

import { RPC_URL, rpcHeaders } from '../lib/config.js'
import { observeTransferEvents, observeScheduledRewards } from '../lib/observer.js'
import { getPgPools } from '@filecoin-station/spark-stats-db'
import { getStatsPgPool } from '@filecoin-station/spark-stats-db'

const pgPools = await getPgPools()
const pgPoolStats = await getStatsPgPool()

const fetchRequest = new ethers.FetchRequest(RPC_URL)
fetchRequest.setHeader('Authorization', rpcHeaders.Authorization || '')
const provider = new ethers.JsonRpcProvider(fetchRequest, null, { polling: true })

const ieContract = new ethers.Contract(SparkImpactEvaluator.ADDRESS, SparkImpactEvaluator.ABI, provider)
const recentParticipantsContract = new ethers.Contract(
SparkEvaluationsRecentParticipants.ADDRESS,
SparkEvaluationsRecentParticipants.ABI,
provider
)

await pgPools.stats.query('DELETE FROM daily_reward_transfers')
await pgPoolStats.query('DELETE FROM daily_reward_transfers')

await Promise.all([
observeTransferEvents(pgPools.stats, ieContract, provider),
observeScheduledRewards(pgPools, ieContract)
observeTransferEvents(pgPoolStats, ieContract, provider),
observeScheduledRewards(pgPoolStats, ieContract, recentParticipantsContract)
])

await pgPools.stats.end()
await pgPoolStats.end()
14 changes: 10 additions & 4 deletions observer/bin/spark-observer.js
Original file line number Diff line number Diff line change
@@ -1,31 +1,37 @@
import '../lib/instrument.js'
import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator'
import * as SparkEvaluationsRecentParticipants from '@filecoin-station/spark-evaluations-recent-participants'
import { ethers } from 'ethers'
import * as Sentry from '@sentry/node'
import timers from 'node:timers/promises'

import { RPC_URL, rpcHeaders } from '../lib/config.js'
import { getPgPools } from '@filecoin-station/spark-stats-db'
import { getStatsPgPool } from '@filecoin-station/spark-stats-db'
import {
observeTransferEvents,
observeScheduledRewards
} from '../lib/observer.js'

const pgPools = await getPgPools()
const pgPoolStats = await getStatsPgPool()

const fetchRequest = new ethers.FetchRequest(RPC_URL)
fetchRequest.setHeader('Authorization', rpcHeaders.Authorization || '')
const provider = new ethers.JsonRpcProvider(fetchRequest, null, { polling: true })

const ieContract = new ethers.Contract(SparkImpactEvaluator.ADDRESS, SparkImpactEvaluator.ABI, provider)
const recentParticipantsContract = new ethers.Contract(
SparkEvaluationsRecentParticipants.ADDRESS,
SparkEvaluationsRecentParticipants.ABI,
provider
)

const ONE_HOUR = 60 * 60 * 1000

const loopObserveTransferEvents = async () => {
while (true) {
const start = Date.now()
try {
await observeTransferEvents(pgPools.stats, ieContract, provider)
await observeTransferEvents(pgPoolStats, ieContract, provider)
} catch (e) {
console.error(e)
Sentry.captureException(e)
Expand All @@ -40,7 +46,7 @@ const loopObserveScheduledRewards = async () => {
while (true) {
const start = Date.now()
try {
await observeScheduledRewards(pgPools, ieContract)
await observeScheduledRewards(pgPoolStats, ieContract, recentParticipantsContract)
} catch (e) {
console.error(e)
Sentry.captureException(e)
Expand Down
18 changes: 7 additions & 11 deletions observer/lib/observer.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,14 @@ export const observeTransferEvents = async (pgPoolStats, ieContract, provider) =

/**
* Observe scheduled rewards on the Filecoin blockchain
* @param {import('@filecoin-station/spark-stats-db').PgPools} pgPools
* @param {import('@filecoin-station/spark-stats-db').Queryable} pgPoolStats
* @param {import('ethers').Contract} ieContract
* @param {import('ethers').Contract} recentParticipantsContract
*/
export const observeScheduledRewards = async (pgPools, ieContract) => {
export const observeScheduledRewards = async (pgPoolStats, ieContract, recentParticipantsContract) => {
console.log('Querying scheduled rewards from impact evaluator')
const { rows } = await pgPools.evaluate.query(`
SELECT participant_address
FROM participants p
JOIN daily_participants d ON p.id = d.participant_id
WHERE d.day >= now() - interval '3 days'
`)
for (const { participant_address: address } of rows) {
const participants = await recentParticipantsContract.get()
for (const address of participants) {
let scheduledRewards
try {
scheduledRewards = await ieContract.rewardsScheduledFor(address)
Expand All @@ -59,10 +55,10 @@ export const observeScheduledRewards = async (pgPools, ieContract) => {
address,
{ cause: err }
)
continue
continue
}
console.log('Scheduled rewards for', address, scheduledRewards)
await pgPools.stats.query(`
await pgPoolStats.query(`
INSERT INTO daily_scheduled_rewards
(day, participant_address, scheduled_rewards)
VALUES (now(), $1, $2)
Expand Down
23 changes: 18 additions & 5 deletions observer/test/observer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,8 @@ describe('observer', () => {
beforeEach(async () => {
await pgPools.evaluate.query('DELETE FROM recent_station_details')
await pgPools.evaluate.query('DELETE FROM recent_participant_subnets')
await pgPools.evaluate.query('DELETE FROM daily_participants')
await pgPools.evaluate.query('DELETE FROM participants')
await pgPools.stats.query('DELETE FROM daily_scheduled_rewards')
await givenDailyParticipants(pgPools.evaluate, today(), ['0xCURRENT'])
await givenDailyParticipants(pgPools.evaluate, '2000-01-01', ['0xOLD'])
})

it('observes scheduled rewards', async () => {
Expand All @@ -149,7 +146,15 @@ describe('observer', () => {
}
}
}
await observeScheduledRewards(pgPools, ieContract)
/** @type {any} */
const recentParticipantsContract = {
get: async () => ['0xCURRENT']
}
await observeScheduledRewards(
pgPools.stats,
ieContract,
recentParticipantsContract
)
const { rows } = await pgPools.stats.query(`
SELECT participant_address, scheduled_rewards
FROM daily_scheduled_rewards
Expand All @@ -164,7 +169,15 @@ describe('observer', () => {
const ieContract = {
rewardsScheduledFor: async () => 200n
}
await observeScheduledRewards(pgPools, ieContract)
/** @type {any} */
const recentParticipantsContract = {
get: async () => ['0xCURRENT']
}
await observeScheduledRewards(
pgPools.stats,
ieContract,
recentParticipantsContract
)
const { rows } = await pgPools.stats.query(`
SELECT participant_address, scheduled_rewards
FROM daily_scheduled_rewards
Expand Down
8 changes: 8 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,8 @@
"env": [
"mocha"
]
},
"dependencies": {
"@filecoin-station/spark-evaluations-recent-participants": "^3.0.0"
}
}

0 comments on commit 19e6327

Please sign in to comment.