Skip to content

Commit

Permalink
Add daily scheduled rewards (#131)
Browse files Browse the repository at this point in the history
* add read scheduled rewards

* fix lint

* fix lint

* fix migration number

* wrap observation functions

* wip

* simplify, dry run works

* fix lint

* split up loops

* get participants from db

* sentry

* read from last 3 days of participants

* big refactor

* add tests

* tests pass

* fix lint

* add missing env

* fix missing ci db

* fix missing migrate call

* fix migration bin

* clean up

* fix lint

* fix migrate bin

* clean up

* Revert "clean up"

This reverts commit 6f574ac.

* refactor `migrate()`

* remove implicit migrate

* create `db` package

* add missing dep

* add missing dependency

* try run `npm ci` first

* try run `npm install` first

* clean up

* try update node

* fix Dockerfile

* downgrade again to keep diff small

* add back implicit migrate

* Update db/index.js

Co-authored-by: Miroslav Bajtoš <oss@bajtos.net>

* rename pg pools

* Update db/typings.d.ts

Co-authored-by: Miroslav Bajtoš <oss@bajtos.net>

* harden tests using hooks

* improve test assertions

* refactor loop

* move `migrations` into `db`

* inline `observer()` in `dry-run.js`

* use helper from spark-evaluate

* fix test

* ci: add dry-run

* add glif token to dry-run

* fix dry-run import

* refine migration

* fix import

* docs

* fix version

* revert

* log

* refactor

* observer: add missing Sentry init

* Update README.md

Co-authored-by: Miroslav Bajtoš <oss@bajtos.net>

* fix export

* fix test

* fix

---------

Co-authored-by: Miroslav Bajtoš <oss@bajtos.net>
  • Loading branch information
juliangruber and bajtos authored Jun 12, 2024
1 parent d3d12fe commit e03bcf0
Show file tree
Hide file tree
Showing 31 changed files with 449 additions and 280 deletions.
35 changes: 35 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,16 @@ jobs:
--health-retries 5
env:
DATABASE_URL: postgres://postgres:postgres@localhost:5432/spark_stats
EVALUATE_DB_URL: postgres://postgres:postgres@localhost:5432/spark_evaluate
NPM_CONFIG_WORKSPACE: observer
steps:
- run: psql "${DATABASE_URL}" -c "CREATE DATABASE spark_evaluate"
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: 20
- run: npm ci
- run: npm run migrate
- run: npm test

lint-all:
Expand All @@ -72,6 +75,38 @@ jobs:
node-version: 20
- run: npm ci
- run: npm run lint

dry-run:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:latest
env:
POSTGRES_DB: spark_stats
POSTGRES_PASSWORD: postgres
POSTGRES_USER: postgres
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
env:
DATABASE_URL: postgres://postgres:postgres@localhost:5432/spark_stats
EVALUATE_DB_URL: postgres://postgres:postgres@localhost:5432/spark_evaluate
NPM_CONFIG_WORKSPACE: observer
steps:
- run: psql "${DATABASE_URL}" -c "CREATE DATABASE spark_evaluate"
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: 20
- run: npm ci
- run: npm run migrate
- run: npm run dry-run
env:
GLIF_TOKEN: ${{ secrets.GLIF_TOKEN }}

docker-build:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ COPY --link package-lock.json package.json ./

# We cannot use a wildcard until `COPY --parents` is stabilised
# See https://docs.docker.com/reference/dockerfile/#copy---parents
COPY --link migrations/package.json ./migrations/
COPY --link db/package.json ./db/
COPY --link stats/package.json ./stats/
COPY --link observer/package.json ./observer/

Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ Base URL: http://stats.filspark.com/

http://stats.filspark.com/participants/change-rates

- `GET /participants/scheduled-rewards?address=<address>&from=<day>&to=<day>`

http://stats.filspark.com/participants/scheduled-rewards

- `GET /miners/retrieval-success-rate/summary?from=<day>&to=<day>`

http://stats.filspark.com/miners/retrieval-success-rate/summary
Expand Down
93 changes: 93 additions & 0 deletions db/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { migrateWithPgClient as migrateEvaluateDB } from 'spark-evaluate/lib/migrate.js'
import pg from 'pg'
import { dirname, join } from 'node:path'
import { fileURLToPath } from 'node:url'
import Postgrator from 'postgrator'

export { migrateEvaluateDB }

const {
// DATABASE_URL points to `spark_stats` database managed by this monorepo
DATABASE_URL = 'postgres://localhost:5432/spark_stats',

// EVALUATE_DB_URL points to `spark_evaluate` database managed by spark-evaluate repo.
// Eventually, we should move the code updating stats from spark-evaluate to this repo
// and then we won't need two connection strings.
EVALUATE_DB_URL = 'postgres://localhost:5432/spark_evaluate'
} = process.env

const migrationsDirectory = join(
dirname(fileURLToPath(import.meta.url)),
'migrations'
)

const poolConfig = {
// allow the pool to close all connections and become empty
min: 0,
// this values should correlate with service concurrency hard_limit configured in fly.toml
// and must take into account the connection limit of our PG server, see
// https://fly.io/docs/postgres/managing/configuration-tuning/
max: 100,
// close connections that haven't been used for one second
idleTimeoutMillis: 1000,
// automatically close connections older than 60 seconds
maxLifetimeSeconds: 60
}

const onError = err => {
// Prevent crashing the process on idle client errors, the pool will recover
// itself. If all connections are lost, the process will still crash.
// https://github.com/brianc/node-postgres/issues/1324#issuecomment-308778405
console.error('An idle client has experienced an error', err.stack)
}

export const getStatsPgPool = async () => {
const stats = new pg.Pool({
...poolConfig,
connectionString: DATABASE_URL
})
stats.on('error', onError)
await migrateStatsDB(stats)
return stats
}

export const getEvaluatePgPool = async () => {
const evaluate = new pg.Pool({
...poolConfig,
connectionString: EVALUATE_DB_URL
})
evaluate.on('error', onError)
await evaluate.query('SELECT 1')
return evaluate
}

/**
* @returns {Promise<import('./typings').pgPools>}
*/
export const getPgPools = async () => {
const stats = await getStatsPgPool()
const evaluate = await getEvaluatePgPool()
const end = () => Promise.all([stats.end(), evaluate.end()])

return { stats, evaluate, end }
}

/**
* @param {pg.Client} client
*/
export const migrateStatsDB = async (client) => {
const postgrator = new Postgrator({
migrationPattern: join(migrationsDirectory, '*'),
driver: 'pg',
execQuery: (query) => client.query(query)
})
console.log(
'Migrating `spark-stats` DB schema from version %s to version %s',
await postgrator.getDatabaseVersion(),
await postgrator.getMaxVersion()
)

await postgrator.migrate()

console.log('Migrated `spark-stats` DB schema to version', await postgrator.getDatabaseVersion())
}
File renamed without changes.
6 changes: 6 additions & 0 deletions db/migrations/003.do.scheduled-rewards.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE daily_scheduled_rewards (
day DATE NOT NULL,
participant_address TEXT NOT NULL,
scheduled_rewards NUMERIC NOT NULL,
PRIMARY KEY (day, participant_address)
);
8 changes: 4 additions & 4 deletions migrations/package.json → db/package.json
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
{
"name": "@filecoin-station/spark-stats-db-migrations",
"name": "@filecoin-station/spark-stats-db",
"version": "1.0.0",
"type": "module",
"main": "index.js",
"private": true,
"scripts": {
"lint": "standard",
"test": "mocha"
"lint": "standard"
},
"devDependencies": {
"standard": "^17.1.0"
"standard": "^17.1.0",
"spark-evaluate": "filecoin-station/spark-evaluate#main"
},
"dependencies": {
"pg": "^8.12.0",
Expand Down
6 changes: 6 additions & 0 deletions db/typings.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import type { Pool } from 'pg'

export interface pgPools {
stats: Pool;
evaluate: Pool;
}
43 changes: 0 additions & 43 deletions migrations/index.js

This file was deleted.

16 changes: 9 additions & 7 deletions observer/bin/dry-run.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@ import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator'
import { ethers } from 'ethers'

import { RPC_URL, rpcHeaders } from '../lib/config.js'
import { observeTransferEvents } from '../lib/observer.js'
import { getPgPool } from '../lib/db.js'
import { observeTransferEvents, observeScheduledRewards } from '../lib/observer.js'
import { getPgPools } from '@filecoin-station/spark-stats-db'

/** @type {pg.Pool} */
const pgPool = await getPgPool()
const pgPools = await getPgPools()

const fetchRequest = new ethers.FetchRequest(RPC_URL)
fetchRequest.setHeader('Authorization', rpcHeaders.Authorization || '')
const provider = new ethers.JsonRpcProvider(fetchRequest, null, { polling: true })

const ieContract = new ethers.Contract(SparkImpactEvaluator.ADDRESS, SparkImpactEvaluator.ABI, provider)

await pgPool.query('DELETE FROM daily_reward_transfers')
await pgPools.stats.query('DELETE FROM daily_reward_transfers')

await observeTransferEvents(pgPool, ieContract, provider)
await Promise.all([
observeTransferEvents(pgPools.stats, ieContract, provider),
observeScheduledRewards(pgPools, ieContract)
])

await pgPool.end()
await pgPools.end()
8 changes: 4 additions & 4 deletions observer/bin/migrate.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { DATABASE_URL } from '../lib/config.js'
import { migrateWithPgConfig as migrateStatsDB } from '@filecoin-station/spark-stats-db-migrations'
import { migrateStatsDB, migrateEvaluateDB, getPgPools } from '@filecoin-station/spark-stats-db'

console.log('Migrating spark_stats database')
await migrateStatsDB({ connectionString: DATABASE_URL })
const pgPools = await getPgPools()
await migrateStatsDB(pgPools.stats)
await migrateEvaluateDB(pgPools.evaluate)
54 changes: 42 additions & 12 deletions observer/bin/spark-observer.js
Original file line number Diff line number Diff line change
@@ -1,27 +1,57 @@
import '../lib/instrument.js'
import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator'
import { ethers } from 'ethers'
import * as Sentry from '@sentry/node'
import timers from 'node:timers/promises'

import { RPC_URL, rpcHeaders, OBSERVATION_INTERVAL_MS } from '../lib/config.js'
import { getPgPool } from '../lib/db.js'
import { observeTransferEvents } from '../lib/observer.js'
import { RPC_URL, rpcHeaders } from '../lib/config.js'
import { getPgPools } from '@filecoin-station/spark-stats-db'
import {
observeTransferEvents,
observeScheduledRewards
} from '../lib/observer.js'

const pgPool = await getPgPool()
const pgPools = await getPgPools()

const fetchRequest = new ethers.FetchRequest(RPC_URL)
fetchRequest.setHeader('Authorization', rpcHeaders.Authorization || '')
const provider = new ethers.JsonRpcProvider(fetchRequest, null, { polling: true })

const ieContract = new ethers.Contract(SparkImpactEvaluator.ADDRESS, SparkImpactEvaluator.ABI, provider)

// Listen for Transfer events from the IE contract
while (true) {
try {
await observeTransferEvents(pgPool, ieContract, provider)
} catch (e) {
console.error(e)
Sentry.captureException(e)
const ONE_HOUR = 60 * 60 * 1000

const loopObserveTransferEvents = async () => {
while (true) {
const start = new Date()
try {
await observeTransferEvents(pgPools, ieContract, provider)
} catch (e) {
console.error(e)
Sentry.captureException(e)
}
const dt = new Date() - start
console.log(`Observing Transfer events took ${dt}ms`)
await timers.setTimeout(ONE_HOUR - dt)
}
await timers.setTimeout(OBSERVATION_INTERVAL_MS)
}

const loopObserveScheduledRewards = async () => {
while (true) {
const start = new Date()
try {
await observeScheduledRewards(pgPools, ieContract, provider)
} catch (e) {
console.error(e)
Sentry.captureException(e)
}
const dt = new Date() - start
console.log(`Observing scheduled rewards took ${dt}ms`)
await timers.setTimeout((24 * ONE_HOUR) - dt)
}
}

await Promise.all([
loopObserveTransferEvents(),
loopObserveScheduledRewards()
])
10 changes: 2 additions & 8 deletions observer/lib/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@ const {
// supports rpc failover
// RPC_URLS = 'https://api.node.glif.io/rpc/v0,https://api.chain.love/rpc/v1',
RPC_URLS = 'https://api.node.glif.io/rpc/v0',
GLIF_TOKEN,
// DATABASE_URL points to `spark_stats` database managed by this monorepo
DATABASE_URL = 'postgres://localhost:5432/spark_stats',
// Sleep one hour between observations
OBSERVATION_INTERVAL_MS = 1000 * 60 * 60
GLIF_TOKEN
} = process.env

const rpcUrls = RPC_URLS.split(',')
Expand All @@ -21,7 +17,5 @@ if (RPC_URL.includes('glif')) {

export {
RPC_URL,
DATABASE_URL,
rpcHeaders,
OBSERVATION_INTERVAL_MS
rpcHeaders
}
Loading

0 comments on commit e03bcf0

Please sign in to comment.