diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 922c89a..f94eea9 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -54,13 +54,16 @@ jobs:
--health-retries 5
env:
DATABASE_URL: postgres://postgres:postgres@localhost:5432/spark_stats
+ EVALUATE_DB_URL: postgres://postgres:postgres@localhost:5432/spark_evaluate
NPM_CONFIG_WORKSPACE: observer
steps:
+ - run: psql "${DATABASE_URL}" -c "CREATE DATABASE spark_evaluate"
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: 20
- run: npm ci
+ - run: npm run migrate
- run: npm test
lint-all:
@@ -72,6 +75,38 @@ jobs:
node-version: 20
- run: npm ci
- run: npm run lint
+
+ dry-run:
+ runs-on: ubuntu-latest
+ services:
+ postgres:
+ image: postgres:latest
+ env:
+ POSTGRES_DB: spark_stats
+ POSTGRES_PASSWORD: postgres
+ POSTGRES_USER: postgres
+ ports:
+ - 5432:5432
+ options: >-
+ --health-cmd pg_isready
+ --health-interval 10s
+ --health-timeout 5s
+ --health-retries 5
+ env:
+ DATABASE_URL: postgres://postgres:postgres@localhost:5432/spark_stats
+ EVALUATE_DB_URL: postgres://postgres:postgres@localhost:5432/spark_evaluate
+ NPM_CONFIG_WORKSPACE: observer
+ steps:
+ - run: psql "${DATABASE_URL}" -c "CREATE DATABASE spark_evaluate"
+ - uses: actions/checkout@v4
+ - uses: actions/setup-node@v4
+ with:
+ node-version: 20
+ - run: npm ci
+ - run: npm run migrate
+ - run: npm run dry-run
+ env:
+ GLIF_TOKEN: ${{ secrets.GLIF_TOKEN }}
docker-build:
runs-on: ubuntu-latest
diff --git a/Dockerfile b/Dockerfile
index 5ccccb1..0cfd66d 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -29,7 +29,7 @@ COPY --link package-lock.json package.json ./
# We cannot use a wildcard until `COPY --parents` is stabilised
# See https://docs.docker.com/reference/dockerfile/#copy---parents
-COPY --link migrations/package.json ./migrations/
+COPY --link db/package.json ./db/
COPY --link stats/package.json ./stats/
COPY --link observer/package.json ./observer/
diff --git a/README.md b/README.md
index 9c56722..46658c2 100644
--- a/README.md
+++ b/README.md
@@ -22,6 +22,10 @@ Base URL: http://stats.filspark.com/
http://stats.filspark.com/participants/change-rates
+- `GET /participants/scheduled-rewards?address=
&from=&to=`
+
+ http://stats.filspark.com/participants/scheduled-rewards
+
- `GET /miners/retrieval-success-rate/summary?from=&to=`
http://stats.filspark.com/miners/retrieval-success-rate/summary
diff --git a/db/index.js b/db/index.js
new file mode 100644
index 0000000..2bac676
--- /dev/null
+++ b/db/index.js
@@ -0,0 +1,93 @@
+import { migrateWithPgClient as migrateEvaluateDB } from 'spark-evaluate/lib/migrate.js'
+import pg from 'pg'
+import { dirname, join } from 'node:path'
+import { fileURLToPath } from 'node:url'
+import Postgrator from 'postgrator'
+
+export { migrateEvaluateDB }
+
+const {
+ // DATABASE_URL points to `spark_stats` database managed by this monorepo
+ DATABASE_URL = 'postgres://localhost:5432/spark_stats',
+
+ // EVALUATE_DB_URL points to `spark_evaluate` database managed by spark-evaluate repo.
+ // Eventually, we should move the code updating stats from spark-evaluate to this repo
+ // and then we won't need two connection strings.
+ EVALUATE_DB_URL = 'postgres://localhost:5432/spark_evaluate'
+} = process.env
+
+const migrationsDirectory = join(
+ dirname(fileURLToPath(import.meta.url)),
+ 'migrations'
+)
+
+const poolConfig = {
+ // allow the pool to close all connections and become empty
+ min: 0,
+ // this values should correlate with service concurrency hard_limit configured in fly.toml
+ // and must take into account the connection limit of our PG server, see
+ // https://fly.io/docs/postgres/managing/configuration-tuning/
+ max: 100,
+ // close connections that haven't been used for one second
+ idleTimeoutMillis: 1000,
+ // automatically close connections older than 60 seconds
+ maxLifetimeSeconds: 60
+}
+
+const onError = err => {
+ // Prevent crashing the process on idle client errors, the pool will recover
+ // itself. If all connections are lost, the process will still crash.
+ // https://github.com/brianc/node-postgres/issues/1324#issuecomment-308778405
+ console.error('An idle client has experienced an error', err.stack)
+}
+
+export const getStatsPgPool = async () => {
+ const stats = new pg.Pool({
+ ...poolConfig,
+ connectionString: DATABASE_URL
+ })
+ stats.on('error', onError)
+ await migrateStatsDB(stats)
+ return stats
+}
+
+export const getEvaluatePgPool = async () => {
+ const evaluate = new pg.Pool({
+ ...poolConfig,
+ connectionString: EVALUATE_DB_URL
+ })
+ evaluate.on('error', onError)
+ await evaluate.query('SELECT 1')
+ return evaluate
+}
+
+/**
+ * @returns {Promise}
+ */
+export const getPgPools = async () => {
+ const stats = await getStatsPgPool()
+ const evaluate = await getEvaluatePgPool()
+ const end = () => Promise.all([stats.end(), evaluate.end()])
+
+ return { stats, evaluate, end }
+}
+
+/**
+ * @param {pg.Client} client
+ */
+export const migrateStatsDB = async (client) => {
+ const postgrator = new Postgrator({
+ migrationPattern: join(migrationsDirectory, '*'),
+ driver: 'pg',
+ execQuery: (query) => client.query(query)
+ })
+ console.log(
+ 'Migrating `spark-stats` DB schema from version %s to version %s',
+ await postgrator.getDatabaseVersion(),
+ await postgrator.getMaxVersion()
+ )
+
+ await postgrator.migrate()
+
+ console.log('Migrated `spark-stats` DB schema to version', await postgrator.getDatabaseVersion())
+}
diff --git a/migrations/001.do.sql b/db/migrations/001.do.sql
similarity index 100%
rename from migrations/001.do.sql
rename to db/migrations/001.do.sql
diff --git a/migrations/002.do.daily-reward-transfers.sql b/db/migrations/002.do.daily-reward-transfers.sql
similarity index 100%
rename from migrations/002.do.daily-reward-transfers.sql
rename to db/migrations/002.do.daily-reward-transfers.sql
diff --git a/db/migrations/003.do.scheduled-rewards.sql b/db/migrations/003.do.scheduled-rewards.sql
new file mode 100644
index 0000000..b30d488
--- /dev/null
+++ b/db/migrations/003.do.scheduled-rewards.sql
@@ -0,0 +1,6 @@
+CREATE TABLE daily_scheduled_rewards (
+ day DATE NOT NULL,
+ participant_address TEXT NOT NULL,
+ scheduled_rewards NUMERIC NOT NULL,
+ PRIMARY KEY (day, participant_address)
+);
diff --git a/migrations/package.json b/db/package.json
similarity index 62%
rename from migrations/package.json
rename to db/package.json
index eb0149b..a88b67e 100644
--- a/migrations/package.json
+++ b/db/package.json
@@ -1,15 +1,15 @@
{
- "name": "@filecoin-station/spark-stats-db-migrations",
+ "name": "@filecoin-station/spark-stats-db",
"version": "1.0.0",
"type": "module",
"main": "index.js",
"private": true,
"scripts": {
- "lint": "standard",
- "test": "mocha"
+ "lint": "standard"
},
"devDependencies": {
- "standard": "^17.1.0"
+ "standard": "^17.1.0",
+ "spark-evaluate": "filecoin-station/spark-evaluate#main"
},
"dependencies": {
"pg": "^8.12.0",
diff --git a/db/typings.d.ts b/db/typings.d.ts
new file mode 100644
index 0000000..56419bd
--- /dev/null
+++ b/db/typings.d.ts
@@ -0,0 +1,6 @@
+import type { Pool } from 'pg'
+
+export interface pgPools {
+ stats: Pool;
+ evaluate: Pool;
+}
diff --git a/migrations/index.js b/migrations/index.js
deleted file mode 100644
index e6e417b..0000000
--- a/migrations/index.js
+++ /dev/null
@@ -1,43 +0,0 @@
-import { dirname, join } from 'node:path'
-import { fileURLToPath } from 'node:url'
-import pg from 'pg'
-import Postgrator from 'postgrator'
-
-const migrationsDirectory = join(
- dirname(fileURLToPath(import.meta.url)),
- '..',
- 'migrations'
-)
-
-/**
- *@param {pg.ClientConfig} pgConfig
- */
-export const migrateWithPgConfig = async (pgConfig) => {
- const client = new pg.Client(pgConfig)
- await client.connect()
- try {
- await migrateWithPgClient(client)
- } finally {
- await client.end()
- }
-}
-
-/**
- * @param {pg.Client} client
- */
-export const migrateWithPgClient = async (client) => {
- const postgrator = new Postgrator({
- migrationPattern: join(migrationsDirectory, '*'),
- driver: 'pg',
- execQuery: (query) => client.query(query)
- })
- console.log(
- 'Migrating DB schema from version %s to version %s',
- await postgrator.getDatabaseVersion(),
- await postgrator.getMaxVersion()
- )
-
- await postgrator.migrate()
-
- console.log('Migrated DB schema to version', await postgrator.getDatabaseVersion())
-}
diff --git a/observer/bin/dry-run.js b/observer/bin/dry-run.js
index 2617603..a1a15fc 100644
--- a/observer/bin/dry-run.js
+++ b/observer/bin/dry-run.js
@@ -2,11 +2,10 @@ import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator'
import { ethers } from 'ethers'
import { RPC_URL, rpcHeaders } from '../lib/config.js'
-import { observeTransferEvents } from '../lib/observer.js'
-import { getPgPool } from '../lib/db.js'
+import { observeTransferEvents, observeScheduledRewards } from '../lib/observer.js'
+import { getPgPools } from '@filecoin-station/spark-stats-db'
-/** @type {pg.Pool} */
-const pgPool = await getPgPool()
+const pgPools = await getPgPools()
const fetchRequest = new ethers.FetchRequest(RPC_URL)
fetchRequest.setHeader('Authorization', rpcHeaders.Authorization || '')
@@ -14,8 +13,11 @@ const provider = new ethers.JsonRpcProvider(fetchRequest, null, { polling: true
const ieContract = new ethers.Contract(SparkImpactEvaluator.ADDRESS, SparkImpactEvaluator.ABI, provider)
-await pgPool.query('DELETE FROM daily_reward_transfers')
+await pgPools.stats.query('DELETE FROM daily_reward_transfers')
-await observeTransferEvents(pgPool, ieContract, provider)
+await Promise.all([
+ observeTransferEvents(pgPools.stats, ieContract, provider),
+ observeScheduledRewards(pgPools, ieContract)
+])
-await pgPool.end()
+await pgPools.end()
diff --git a/observer/bin/migrate.js b/observer/bin/migrate.js
index 13654cb..71a14b1 100644
--- a/observer/bin/migrate.js
+++ b/observer/bin/migrate.js
@@ -1,5 +1,5 @@
-import { DATABASE_URL } from '../lib/config.js'
-import { migrateWithPgConfig as migrateStatsDB } from '@filecoin-station/spark-stats-db-migrations'
+import { migrateStatsDB, migrateEvaluateDB, getPgPools } from '@filecoin-station/spark-stats-db'
-console.log('Migrating spark_stats database')
-await migrateStatsDB({ connectionString: DATABASE_URL })
+const pgPools = await getPgPools()
+await migrateStatsDB(pgPools.stats)
+await migrateEvaluateDB(pgPools.evaluate)
diff --git a/observer/bin/spark-observer.js b/observer/bin/spark-observer.js
index 66beec1..e4addb3 100644
--- a/observer/bin/spark-observer.js
+++ b/observer/bin/spark-observer.js
@@ -1,13 +1,17 @@
+import '../lib/instrument.js'
import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator'
import { ethers } from 'ethers'
import * as Sentry from '@sentry/node'
import timers from 'node:timers/promises'
-import { RPC_URL, rpcHeaders, OBSERVATION_INTERVAL_MS } from '../lib/config.js'
-import { getPgPool } from '../lib/db.js'
-import { observeTransferEvents } from '../lib/observer.js'
+import { RPC_URL, rpcHeaders } from '../lib/config.js'
+import { getPgPools } from '@filecoin-station/spark-stats-db'
+import {
+ observeTransferEvents,
+ observeScheduledRewards
+} from '../lib/observer.js'
-const pgPool = await getPgPool()
+const pgPools = await getPgPools()
const fetchRequest = new ethers.FetchRequest(RPC_URL)
fetchRequest.setHeader('Authorization', rpcHeaders.Authorization || '')
@@ -15,13 +19,39 @@ const provider = new ethers.JsonRpcProvider(fetchRequest, null, { polling: true
const ieContract = new ethers.Contract(SparkImpactEvaluator.ADDRESS, SparkImpactEvaluator.ABI, provider)
-// Listen for Transfer events from the IE contract
-while (true) {
- try {
- await observeTransferEvents(pgPool, ieContract, provider)
- } catch (e) {
- console.error(e)
- Sentry.captureException(e)
+const ONE_HOUR = 60 * 60 * 1000
+
+const loopObserveTransferEvents = async () => {
+ while (true) {
+ const start = new Date()
+ try {
+ await observeTransferEvents(pgPools, ieContract, provider)
+ } catch (e) {
+ console.error(e)
+ Sentry.captureException(e)
+ }
+ const dt = new Date() - start
+ console.log(`Observing Transfer events took ${dt}ms`)
+ await timers.setTimeout(ONE_HOUR - dt)
}
- await timers.setTimeout(OBSERVATION_INTERVAL_MS)
}
+
+const loopObserveScheduledRewards = async () => {
+ while (true) {
+ const start = new Date()
+ try {
+ await observeScheduledRewards(pgPools, ieContract, provider)
+ } catch (e) {
+ console.error(e)
+ Sentry.captureException(e)
+ }
+ const dt = new Date() - start
+ console.log(`Observing scheduled rewards took ${dt}ms`)
+ await timers.setTimeout((24 * ONE_HOUR) - dt)
+ }
+}
+
+await Promise.all([
+ loopObserveTransferEvents(),
+ loopObserveScheduledRewards()
+])
diff --git a/observer/lib/config.js b/observer/lib/config.js
index 4bfb0f0..a6340b5 100644
--- a/observer/lib/config.js
+++ b/observer/lib/config.js
@@ -3,11 +3,7 @@ const {
// supports rpc failover
// RPC_URLS = 'https://api.node.glif.io/rpc/v0,https://api.chain.love/rpc/v1',
RPC_URLS = 'https://api.node.glif.io/rpc/v0',
- GLIF_TOKEN,
- // DATABASE_URL points to `spark_stats` database managed by this monorepo
- DATABASE_URL = 'postgres://localhost:5432/spark_stats',
- // Sleep one hour between observations
- OBSERVATION_INTERVAL_MS = 1000 * 60 * 60
+ GLIF_TOKEN
} = process.env
const rpcUrls = RPC_URLS.split(',')
@@ -21,7 +17,5 @@ if (RPC_URL.includes('glif')) {
export {
RPC_URL,
- DATABASE_URL,
- rpcHeaders,
- OBSERVATION_INTERVAL_MS
+ rpcHeaders
}
diff --git a/observer/lib/db.js b/observer/lib/db.js
deleted file mode 100644
index 1a74dd1..0000000
--- a/observer/lib/db.js
+++ /dev/null
@@ -1,34 +0,0 @@
-import { migrateWithPgClient } from '@filecoin-station/spark-stats-db-migrations'
-import pg from 'pg'
-
-import { DATABASE_URL } from '../lib/config.js'
-
-export const getPgPool = async () => {
- const pgPool = new pg.Pool({
- connectionString: DATABASE_URL,
- // allow the pool to close all connections and become empty
- min: 0,
- // this values should correlate with service concurrency hard_limit configured in fly.toml
- // and must take into account the connection limit of our PG server, see
- // https://fly.io/docs/postgres/managing/configuration-tuning/
- max: 100,
- // close connections that haven't been used for one second
- idleTimeoutMillis: 1000,
- // automatically close connections older than 60 seconds
- maxLifetimeSeconds: 60
- })
-
- pgPool.on('error', err => {
- // Prevent crashing the process on idle client errors, the pool will recover
- // itself. If all connections are lost, the process will still crash.
- // https://github.com/brianc/node-postgres/issues/1324#issuecomment-308778405
- console.error('An idle client has experienced an error', err.stack)
- })
-
- await migrateWithPgClient(pgPool)
-
- // Check that we can talk to the database
- await pgPool.query('SELECT 1')
-
- return pgPool
-}
diff --git a/observer/lib/instrument.js b/observer/lib/instrument.js
new file mode 100644
index 0000000..e804d6f
--- /dev/null
+++ b/observer/lib/instrument.js
@@ -0,0 +1,24 @@
+import * as Sentry from '@sentry/node'
+import fs from 'node:fs/promises'
+import { join, dirname } from 'node:path'
+import { fileURLToPath } from 'node:url'
+
+const { SENTRY_ENVIRONMENT = 'development' } = process.env
+
+const pkg = JSON.parse(
+ await fs.readFile(
+ join(
+ dirname(fileURLToPath(import.meta.url)),
+ '..',
+ 'package.json'
+ ),
+ 'utf8'
+ )
+)
+
+Sentry.init({
+ dsn: 'https://47b65848a6171ecd8bf9f5395a782b3f@o1408530.ingest.sentry.io/4506576125427712',
+ release: pkg.version,
+ environment: SENTRY_ENVIRONMENT,
+ tracesSampleRate: 0.1
+})
diff --git a/observer/lib/observer.js b/observer/lib/observer.js
index 7ff9e62..26bbf69 100644
--- a/observer/lib/observer.js
+++ b/observer/lib/observer.js
@@ -1,13 +1,14 @@
import { updateDailyTransferStats } from './platform-stats.js'
+import * as Sentry from '@sentry/node'
/**
* Observe the transfer events on the Filecoin blockchain
- * @param {import('pg').Pool} pgPool
+ * @param {import('pg').Pool} pgPoolStats
* @param {import('ethers').Contract} ieContract
* @param {import('ethers').Provider} provider
*/
-export const observeTransferEvents = async (pgPool, ieContract, provider) => {
- const { rows } = await pgPool.query(
+export const observeTransferEvents = async (pgPoolStats, ieContract, provider) => {
+ const { rows } = await pgPoolStats.query(
'SELECT MAX(last_checked_block) FROM daily_reward_transfers'
)
let queryFromBlock = rows[0].last_checked_block
@@ -28,6 +29,43 @@ export const observeTransferEvents = async (pgPool, ieContract, provider) => {
amount: event.args.amount
}
console.log('Transfer event:', transferEvent)
- await updateDailyTransferStats(pgPool, transferEvent, currentBlockNumber)
+ await updateDailyTransferStats(pgPoolStats, transferEvent, currentBlockNumber)
+ }
+}
+
+/**
+ * Observe scheduled rewards on the Filecoin blockchain
+ * @param {import('@filecoin-station/spark-stats-db').pgPools} pgPools
+ * @param {import('ethers').Contract} ieContract
+ */
+export const observeScheduledRewards = async (pgPools, ieContract) => {
+ console.log('Querying scheduled rewards from impact evaluator')
+ const { rows } = await pgPools.evaluate.query(`
+ SELECT participant_address
+ FROM participants p
+ JOIN daily_participants d ON p.id = d.participant_id
+ WHERE d.day >= now() - interval '3 days'
+ `)
+ for (const { participant_address: address } of rows) {
+ let scheduledRewards
+ try {
+ scheduledRewards = await ieContract.rewardsScheduledFor(address)
+ } catch (err) {
+ Sentry.captureException(err)
+ console.error(
+ 'Error querying scheduled rewards for',
+ address,
+ { cause: err }
+ )
+ continue
+ }
+ console.log('Scheduled rewards for', address, scheduledRewards)
+ await pgPools.stats.query(`
+ INSERT INTO daily_scheduled_rewards
+ (day, participant_address, scheduled_rewards)
+ VALUES (now(), $1, $2)
+ ON CONFLICT (day, participant_address) DO UPDATE SET
+ scheduled_rewards = EXCLUDED.scheduled_rewards
+ `, [address, scheduledRewards])
}
}
diff --git a/observer/package.json b/observer/package.json
index 0fc4b7d..df9d5d9 100644
--- a/observer/package.json
+++ b/observer/package.json
@@ -3,6 +3,7 @@
"type": "module",
"private": true,
"scripts": {
+ "dry-run": "node bin/dry-run.js",
"migrate": "node bin/migrate.js",
"start": "node bin/spark-observer.js",
"lint": "standard",
@@ -10,11 +11,12 @@
},
"devDependencies": {
"mocha": "^10.4.0",
+ "spark-evaluate": "github:filecoin-station/spark-evaluate#main",
"standard": "^17.1.0"
},
"dependencies": {
"@filecoin-station/spark-impact-evaluator": "^1.1.1",
- "@filecoin-station/spark-stats-db-migrations": "^1.0.0",
+ "@filecoin-station/spark-stats-db": "^1.0.0",
"@sentry/node": "^8.9.1",
"debug": "^4.3.5",
"ethers": "^6.13.0",
diff --git a/observer/test/observer.test.js b/observer/test/observer.test.js
new file mode 100644
index 0000000..80c6c7e
--- /dev/null
+++ b/observer/test/observer.test.js
@@ -0,0 +1,59 @@
+import assert from 'node:assert'
+import { observeScheduledRewards } from '../lib/observer.js'
+import { getPgPools } from '@filecoin-station/spark-stats-db'
+import { givenDailyParticipants } from 'spark-evaluate/test/helpers/queries.js'
+
+const getDayAsISOString = d => d.toISOString().split('T')[0]
+const today = () => getDayAsISOString(new Date())
+
+describe('observer', () => {
+ describe('observeScheduledRewards', () => {
+ let pgPools
+
+ before(async () => {
+ pgPools = await getPgPools()
+ })
+
+ beforeEach(async () => {
+ await pgPools.evaluate.query('DELETE FROM daily_participants')
+ await pgPools.evaluate.query('DELETE FROM participants')
+ await givenDailyParticipants(pgPools.evaluate, today(), ['0xCURRENT'])
+ await givenDailyParticipants(pgPools.evaluate, '2000-01-01', ['0xOLD'])
+ })
+
+ it('observes scheduled rewards', async () => {
+ const ieContract = {
+ rewardsScheduledFor: async (address) => {
+ if (address === '0xCURRENT') {
+ return 100n
+ } else {
+ throw new Error('Should never be called')
+ }
+ }
+ }
+ await observeScheduledRewards(pgPools, ieContract)
+ const { rows } = await pgPools.stats.query(`
+ SELECT participant_address, scheduled_rewards
+ FROM daily_scheduled_rewards
+ `)
+ assert.deepStrictEqual(rows, [{
+ participant_address: '0xCURRENT',
+ scheduled_rewards: '100'
+ }])
+ })
+ it('updates scheduled rewards', async () => {
+ const ieContract = {
+ rewardsScheduledFor: async () => 200n
+ }
+ await observeScheduledRewards(pgPools, ieContract)
+ const { rows } = await pgPools.stats.query(`
+ SELECT participant_address, scheduled_rewards
+ FROM daily_scheduled_rewards
+ `)
+ assert.deepStrictEqual(rows, [{
+ participant_address: '0xCURRENT',
+ scheduled_rewards: '200'
+ }])
+ })
+ })
+})
diff --git a/observer/test/platform-stats.test.js b/observer/test/platform-stats.test.js
index 2809be3..074e4ef 100644
--- a/observer/test/platform-stats.test.js
+++ b/observer/test/platform-stats.test.js
@@ -1,24 +1,17 @@
import assert from 'node:assert'
-import pg from 'pg'
import { beforeEach, describe, it } from 'mocha'
-import { DATABASE_URL } from '../lib/config.js'
+import { getStatsPgPool, migrateStatsDB } from '@filecoin-station/spark-stats-db'
import { updateDailyTransferStats } from '../lib/platform-stats.js'
-import { migrateWithPgClient } from '@filecoin-station/spark-stats-db-migrations'
-
-const createPgClient = async () => {
- const pgClient = new pg.Client({ connectionString: DATABASE_URL })
- await pgClient.connect()
- return pgClient
-}
describe('platform-stats-generator', () => {
/** @type {pg.Client} */
let pgClient
before(async () => {
- pgClient = await createPgClient()
- await migrateWithPgClient(pgClient)
+ const pgPool = await getStatsPgPool()
+ pgClient = await pgPool.connect()
+ await migrateStatsDB(pgClient)
})
let today
@@ -37,7 +30,7 @@ describe('platform-stats-generator', () => {
})
after(async () => {
- await pgClient.end()
+ await pgClient.release()
})
describe('updateDailyTransferStats', () => {
diff --git a/package-lock.json b/package-lock.json
index 12882d8..34fb6d6 100644
--- a/package-lock.json
+++ b/package-lock.json
@@ -6,7 +6,7 @@
"": {
"name": "@filecoin-station/spark-stats-monorepo",
"workspaces": [
- "migrations",
+ "db",
"observer",
"stats"
],
@@ -14,9 +14,22 @@
"standard": "^17.1.0"
}
},
+ "db": {
+ "name": "@filecoin-station/spark-stats-db",
+ "version": "1.0.0",
+ "dependencies": {
+ "pg": "^8.12.0",
+ "postgrator": "^7.2.0"
+ },
+ "devDependencies": {
+ "spark-evaluate": "filecoin-station/spark-evaluate#main",
+ "standard": "^17.1.0"
+ }
+ },
"migrations": {
"name": "@filecoin-station/spark-stats-db-migrations",
"version": "1.0.0",
+ "extraneous": true,
"dependencies": {
"pg": "^8.12.0",
"postgrator": "^7.2.0"
@@ -99,8 +112,8 @@
"resolved": "stats",
"link": true
},
- "node_modules/@filecoin-station/spark-stats-db-migrations": {
- "resolved": "migrations",
+ "node_modules/@filecoin-station/spark-stats-db": {
+ "resolved": "db",
"link": true
},
"node_modules/@glif/filecoin-address": {
@@ -196,10 +209,11 @@
"dev": true
},
"node_modules/@ipld/car": {
- "version": "5.3.0",
- "resolved": "https://registry.npmjs.org/@ipld/car/-/car-5.3.0.tgz",
- "integrity": "sha512-OB8LVvJeVAFFGluNIkZeDZ/aGeoekFKsuIvNT9I5sJIb5WekQuW5+lekjQ7Z7mZ7DBKuke/kI4jBT1j0/akU1w==",
+ "version": "5.3.1",
+ "resolved": "https://registry.npmjs.org/@ipld/car/-/car-5.3.1.tgz",
+ "integrity": "sha512-8fNkYAZvL9yX2zesF32k7tYqUDGG41felmmBnwjCZJto06QXCb0NOMPJc/mhNgnVa5gkKqxPO1ZdSoHuaYcVSw==",
"dev": true,
+ "license": "Apache-2.0 OR MIT",
"dependencies": {
"@ipld/dag-cbor": "^9.0.7",
"cborg": "^4.0.5",
@@ -5388,23 +5402,24 @@
}
},
"node_modules/spark-evaluate": {
- "resolved": "git+ssh://git@github.com/filecoin-station/spark-evaluate.git#6d3d547e00ceed6fdc90d59578a3d3fdfa6fdc78",
+ "resolved": "git+ssh://git@github.com/filecoin-station/spark-evaluate.git#534454c9b2323b67d50b159a3504de8a06eedcb2",
+ "integrity": "sha512-KFIA5F8Lw/+I+2MzPKT6qsgtt5LNSqdbuk9/LUTUyduGSUpQj++TCI+PlvSs2mTRQz7dZCQcXhhY5cchj7n1vA==",
"dev": true,
"dependencies": {
- "@filecoin-station/spark-impact-evaluator": "^1.1.0",
+ "@filecoin-station/spark-impact-evaluator": "^1.1.1",
"@glif/filecoin-address": "^3.0.5",
"@influxdata/influxdb-client": "^1.33.2",
- "@ipld/car": "^5.3.0",
- "@sentry/node": "^8.7.0",
+ "@ipld/car": "^5.3.1",
+ "@sentry/node": "^8.9.1",
"@web3-storage/car-block-validator": "^1.2.0",
- "debug": "^4.3.4",
+ "debug": "^4.3.5",
"ethers": "^6.10.0",
"ipfs-car": "^1.2.0",
"ipfs-unixfs-exporter": "^13.5.0",
"just-percentile": "^4.2.0",
"p-map": "^7.0.2",
"p-retry": "^6.2.0",
- "pg": "^8.11.5",
+ "pg": "^8.12.0",
"postgrator": "^7.2.0"
}
},
@@ -6127,7 +6142,7 @@
"name": "@filecoin-station/spark-observer",
"dependencies": {
"@filecoin-station/spark-impact-evaluator": "^1.1.1",
- "@filecoin-station/spark-stats-db-migrations": "^1.0.0",
+ "@filecoin-station/spark-stats-db": "^1.0.0",
"@sentry/node": "^8.9.1",
"debug": "^4.3.5",
"ethers": "^6.13.0",
@@ -6135,12 +6150,14 @@
},
"devDependencies": {
"mocha": "^10.4.0",
+ "spark-evaluate": "github:filecoin-station/spark-evaluate#534454c9b2323b67d50b159a3504de8a06eedcb2",
"standard": "^17.1.0"
}
},
"stats": {
"name": "@filecoin-station/spark-stats",
"dependencies": {
+ "@filecoin-station/spark-stats-db": "^1.0.0",
"@sentry/node": "^8.9.1",
"debug": "^4.3.5",
"http-assert": "^1.5.0",
@@ -6148,7 +6165,7 @@
"pg": "^8.12.0"
},
"devDependencies": {
- "@filecoin-station/spark-stats-db-migrations": "^1.0.0",
+ "@filecoin-station/spark-stats-db": "^1.0.0",
"mocha": "^10.4.0",
"spark-evaluate": "filecoin-station/spark-evaluate#main",
"standard": "^17.1.0"
diff --git a/package.json b/package.json
index a7ac74f..dc19535 100644
--- a/package.json
+++ b/package.json
@@ -3,7 +3,7 @@
"private": true,
"type": "module",
"workspaces": [
- "migrations",
+ "db",
"observer",
"stats"
],
diff --git a/stats/bin/migrate.js b/stats/bin/migrate.js
index 29abdff..71a14b1 100644
--- a/stats/bin/migrate.js
+++ b/stats/bin/migrate.js
@@ -1,12 +1,5 @@
-import {
- DATABASE_URL,
- EVALUATE_DB_URL
-} from '../lib/config.js'
-import { migrateWithPgConfig as migrateEvaluateDB } from 'spark-evaluate/lib/migrate.js'
-import { migrateWithPgConfig as migrateStatsDB } from '@filecoin-station/spark-stats-db-migrations'
+import { migrateStatsDB, migrateEvaluateDB, getPgPools } from '@filecoin-station/spark-stats-db'
-console.log('Migrating spark_evaluate database', EVALUATE_DB_URL)
-await migrateEvaluateDB({ connectionString: EVALUATE_DB_URL })
-
-console.log('Migrating spark_stats database', DATABASE_URL)
-await migrateStatsDB({ connectionString: DATABASE_URL })
+const pgPools = await getPgPools()
+await migrateStatsDB(pgPools.stats)
+await migrateEvaluateDB(pgPools.evaluate)
diff --git a/stats/bin/spark-stats.js b/stats/bin/spark-stats.js
index f7f9b7e..1401d07 100644
--- a/stats/bin/spark-stats.js
+++ b/stats/bin/spark-stats.js
@@ -1,9 +1,8 @@
import '../lib/instrument.js'
import http from 'node:http'
import { once } from 'node:events'
-import pg from 'pg'
import { createHandler } from '../lib/handler.js'
-import { DATABASE_URL, EVALUATE_DB_URL } from '../lib/config.js'
+import { getPgPools } from '@filecoin-station/spark-stats-db'
const {
PORT = 8080,
@@ -11,54 +10,14 @@ const {
REQUEST_LOGGING = 'true'
} = process.env
-const pgPoolConfig = {
- // allow the pool to close all connections and become empty
- min: 0,
- // this values should correlate with service concurrency hard_limit configured in fly.toml
- // and must take into account the connection limit of our PG server, see
- // https://fly.io/docs/postgres/managing/configuration-tuning/
- max: 100,
- // close connections that haven't been used for one second
- idleTimeoutMillis: 1000,
- // automatically close connections older than 60 seconds
- maxLifetimeSeconds: 60
-}
-const pgPoolErrFn = err => {
- // Prevent crashing the process on idle client errors, the pool will recover
- // itself. If all connections are lost, the process will still crash.
- // https://github.com/brianc/node-postgres/issues/1324#issuecomment-308778405
- console.error('An idle client has experienced an error', err.stack)
-}
-
-// Connect and set up the Evaluate DB
-const pgPoolEvaluateDb = new pg.Pool({
- connectionString: EVALUATE_DB_URL,
- ...pgPoolConfig
-})
-pgPoolEvaluateDb.on('error', pgPoolErrFn)
-// Check that we can talk to the database
-await pgPoolEvaluateDb.query('SELECT 1')
-
-// Connect and set up the Stats DB
-const pgPoolStatsDb = new pg.Pool({
- connectionString: DATABASE_URL,
- ...pgPoolConfig
-})
-pgPoolStatsDb.on('error', pgPoolErrFn)
-// Check that we can talk to the database
-await pgPoolStatsDb.query('SELECT 1')
-
+const pgPools = await getPgPools()
const logger = {
error: console.error,
info: console.info,
request: ['1', 'true'].includes(REQUEST_LOGGING) ? console.info : () => {}
}
-const handler = createHandler({
- pgPoolEvaluateDb,
- pgPoolStatsDb,
- logger
-})
+const handler = createHandler({ pgPools, logger })
const server = http.createServer(handler)
console.log('Starting the http server on host %j port %s', HOST, PORT)
server.listen(PORT, HOST)
diff --git a/stats/lib/config.js b/stats/lib/config.js
deleted file mode 100644
index ea99e8a..0000000
--- a/stats/lib/config.js
+++ /dev/null
@@ -1,9 +0,0 @@
-export const {
- // DATABASE_URL points to `spark_stats` database managed by this monorepo
- DATABASE_URL = 'postgres://localhost:5432/spark_stats',
-
- // EVALUATE_DB_URL points to `spark_evaluate` database managed by spark-evaluate repo.
- // Eventually, we should move the code updating stats from spark-evaluate to this repo
- // and then we won't need two connection strings.
- EVALUATE_DB_URL = 'postgres://localhost:5432/spark_evaluate'
-} = process.env
diff --git a/stats/lib/handler.js b/stats/lib/handler.js
index d8b96a7..3dacd66 100644
--- a/stats/lib/handler.js
+++ b/stats/lib/handler.js
@@ -7,6 +7,7 @@ import {
fetchMinersRSRSummary,
fetchMonthlyParticipants,
fetchParticipantChangeRates,
+ fetchParticipantScheduledRewards,
fetchRetrievalSuccessRate
} from './stats-fetchers.js'
@@ -14,20 +15,19 @@ import { handlePlatformRoutes } from './platform-routes.js'
/**
* @param {object} args
- * @param {import('pg').Pool} args.pgPoolEvaluateDb
+ * @param {import('@filecoin-station/spark-stats-db')} args.pgPools
* @param {import('pg').Pool} args.pgPoolStatsDb
* @param {import('./typings').Logger} args.logger
* @returns
*/
export const createHandler = ({
- pgPoolEvaluateDb,
- pgPoolStatsDb,
+ pgPools,
logger
}) => {
return (req, res) => {
const start = new Date()
logger.request(`${req.method} ${req.url} ...`)
- handler(req, res, pgPoolEvaluateDb, pgPoolStatsDb)
+ handler(req, res, pgPools)
.catch(err => errorHandler(res, err, logger))
.then(() => {
logger.request(`${req.method} ${req.url} ${res.statusCode} (${new Date() - start}ms)`)
@@ -38,10 +38,9 @@ export const createHandler = ({
/**
* @param {import('node:http').IncomingMessage} req
* @param {import('node:http').ServerResponse} res
- * @param {import('pg').Pool} pgPoolEvaluateDb
- * @param {import('pg').Pool} pgPoolStatsDb
+ * @param {import('@filecoin-station/spark-stats-db')} args.pgPools
*/
-const handler = async (req, res, pgPoolEvaluateDb, pgPoolStatsDb) => {
+const handler = async (req, res, pgPools) => {
// Caveat! `new URL('//foo', 'http://127.0.0.1')` would produce "http://foo/" - not what we want!
const { pathname, searchParams } = new URL(`http://127.0.0.1${req.url}`)
const segs = pathname.split('/').filter(Boolean)
@@ -51,6 +50,7 @@ const handler = async (req, res, pgPoolEvaluateDb, pgPoolStatsDb) => {
'participants/daily': fetchDailyParticipants,
'participants/monthly': fetchMonthlyParticipants,
'participants/change-rates': fetchParticipantChangeRates,
+ 'participants/scheduled-rewards': fetchParticipantScheduledRewards,
'miners/retrieval-success-rate/summary': fetchMinersRSRSummary
}
@@ -60,10 +60,10 @@ const handler = async (req, res, pgPoolEvaluateDb, pgPoolStatsDb) => {
pathname,
searchParams,
res,
- pgPoolEvaluateDb,
+ pgPools.evaluate,
fetchStatsFn
)
- } else if (await handlePlatformRoutes(req, res, pgPoolEvaluateDb, pgPoolStatsDb)) {
+ } else if (await handlePlatformRoutes(req, res, pgPools)) {
// no-op, request was handled by handlePlatformRoute
} else {
notFound(res)
diff --git a/stats/lib/platform-routes.js b/stats/lib/platform-routes.js
index e2fcf43..db7ad2e 100644
--- a/stats/lib/platform-routes.js
+++ b/stats/lib/platform-routes.js
@@ -6,7 +6,7 @@ import {
fetchDailyStationAcceptedMeasurementCount
} from './platform-stats-fetchers.js'
-export const handlePlatformRoutes = async (req, res, pgPoolEvaluateDb, pgPoolStatsDb) => {
+export const handlePlatformRoutes = async (req, res, pgPools) => {
// Caveat! `new URL('//foo', 'http://127.0.0.1')` would produce "http://foo/" - not what we want!
const { pathname, searchParams } = new URL(`http://127.0.0.1${req.url}`)
const segs = pathname.split('/').filter(Boolean)
@@ -14,19 +14,19 @@ export const handlePlatformRoutes = async (req, res, pgPoolEvaluateDb, pgPoolSta
const routeHandlerInfoMap = {
'stations/daily': {
fetchFunction: fetchDailyStationCount,
- pgPool: pgPoolEvaluateDb
+ pgPool: pgPools.evaluate
},
'stations/monthly': {
fetchFunction: fetchMonthlyStationCount,
- pgPool: pgPoolEvaluateDb
+ pgPool: pgPools.evaluate
},
'measurements/daily': {
fetchFunction: fetchDailyStationAcceptedMeasurementCount,
- pgPool: pgPoolEvaluateDb
+ pgPool: pgPools.evaluate
},
'transfers/daily': {
fetchFunction: fetchDailyRewardTransfers,
- pgPool: pgPoolStatsDb
+ pgPool: pgPools.stats
}
}
diff --git a/stats/lib/stats-fetchers.js b/stats/lib/stats-fetchers.js
index 22eb86b..ee448b5 100644
--- a/stats/lib/stats-fetchers.js
+++ b/stats/lib/stats-fetchers.js
@@ -118,6 +118,23 @@ export const fetchParticipantChangeRates = async (pgPool, filter) => {
* @param {import('pg').Pool} pgPool
* @param {import('./typings').DateRangeFilter} filter
*/
+export const fetchParticipantScheduledRewards = async (pgPool, filter) => {
+ const { rows } = await pgPool.query(`
+ SELECT scheduled_rewards
+ FROM daily_scheduled_rewards
+ WHERE address = $1 AND day >= $2 AND day <= $3
+ `, [
+ filter.address,
+ filter.from,
+ filter.to
+ ])
+ return rows
+}
+
+/**
+ * @param {import('pg').Pool} pgPool
+ * @param {import('./typings').Filter} filter
+ */
export const fetchMinersRSRSummary = async (pgPool, filter) => {
const { rows } = await pgPool.query(`
SELECT miner_id, SUM(total) as total, SUM(successful) as successful
diff --git a/stats/package.json b/stats/package.json
index 85d9340..90f3b30 100644
--- a/stats/package.json
+++ b/stats/package.json
@@ -9,12 +9,13 @@
"test": "mocha"
},
"devDependencies": {
- "@filecoin-station/spark-stats-db-migrations": "^1.0.0",
+ "@filecoin-station/spark-stats-db": "^1.0.0",
"mocha": "^10.4.0",
"spark-evaluate": "filecoin-station/spark-evaluate#main",
"standard": "^17.1.0"
},
"dependencies": {
+ "@filecoin-station/spark-stats-db": "^1.0.0",
"@sentry/node": "^8.9.1",
"debug": "^4.3.5",
"http-assert": "^1.5.0",
diff --git a/stats/test/handler.test.js b/stats/test/handler.test.js
index d84a717..5cf363e 100644
--- a/stats/test/handler.test.js
+++ b/stats/test/handler.test.js
@@ -1,19 +1,18 @@
import http from 'node:http'
import { once } from 'node:events'
import assert from 'node:assert'
-import pg from 'pg'
import createDebug from 'debug'
-import { mapParticipantsToIds } from 'spark-evaluate/lib/platform-stats.js'
+import { givenDailyParticipants } from 'spark-evaluate/test/helpers/queries.js'
+import { getEvaluatePgPool } from '@filecoin-station/spark-stats-db'
import { assertResponseStatus } from './test-helpers.js'
import { createHandler } from '../lib/handler.js'
import { today } from '../lib/request-helpers.js'
-import { EVALUATE_DB_URL } from '../lib/config.js'
const debug = createDebug('test')
describe('HTTP request handler', () => {
- /** @type {pg.Pool} */
+ /** @type {import('pg').Pool} */
let pgPool
/** @type {http.Server} */
let server
@@ -22,11 +21,13 @@ describe('HTTP request handler', () => {
before(async () => {
// handler doesn't use Stats DB
- pgPool = new pg.Pool({ connectionString: EVALUATE_DB_URL })
+ pgPool = await getEvaluatePgPool()
const handler = createHandler({
- pgPoolEvaluateDb: pgPool,
- pgPoolStatsDb: undefined,
+ pgPools: {
+ stats: null,
+ evaluate: pgPool
+ },
logger: {
info: debug,
error: console.error,
@@ -356,15 +357,3 @@ const givenRetrievalStats = async (pgPool, { day, minerId, total, successful })
[day, minerId ?? 'f1test', total, successful]
)
}
-
-const givenDailyParticipants = async (pgPool, day, participantAddresses) => {
- const ids = await mapParticipantsToIds(pgPool, new Set(participantAddresses))
- await pgPool.query(`
- INSERT INTO daily_participants (day, participant_id)
- SELECT $1 as day, UNNEST($2::INT[]) AS participant_id
- ON CONFLICT DO NOTHING
- `, [
- day,
- ids
- ])
-}
diff --git a/stats/test/platform-routes.test.js b/stats/test/platform-routes.test.js
index 5cd3610..898bdfc 100644
--- a/stats/test/platform-routes.test.js
+++ b/stats/test/platform-routes.test.js
@@ -1,32 +1,26 @@
import http from 'node:http'
import { once } from 'node:events'
import assert from 'node:assert'
-import pg from 'pg'
import createDebug from 'debug'
+import { getPgPools } from '@filecoin-station/spark-stats-db'
import { assertResponseStatus } from './test-helpers.js'
import { createHandler } from '../lib/handler.js'
-import { DATABASE_URL, EVALUATE_DB_URL } from '../lib/config.js'
const debug = createDebug('test')
describe('Platform Routes HTTP request handler', () => {
- /** @type {pg.Pool} */
- let pgPoolEvaluateDb
- /** @type {pg.Pool} */
- let pgPoolStatsDb
- /** @type {http.Server} */
+ /** @type {import('@filecoin-station/spark-stats-db').pgPools} */
+ let pgPools
let server
/** @type {string} */
let baseUrl
before(async () => {
- pgPoolEvaluateDb = new pg.Pool({ connectionString: EVALUATE_DB_URL })
- pgPoolStatsDb = new pg.Pool({ connectionString: DATABASE_URL })
+ pgPools = await getPgPools()
const handler = createHandler({
- pgPoolEvaluateDb,
- pgPoolStatsDb,
+ pgPools,
logger: {
info: debug,
error: console.error,
@@ -43,28 +37,27 @@ describe('Platform Routes HTTP request handler', () => {
after(async () => {
server.closeAllConnections()
server.close()
- await pgPoolEvaluateDb.end()
- await pgPoolStatsDb.end()
+ await pgPools.end()
})
beforeEach(async () => {
- await pgPoolEvaluateDb.query('DELETE FROM daily_stations')
- await pgPoolStatsDb.query('DELETE FROM daily_reward_transfers')
+ await pgPools.evaluate.query('DELETE FROM daily_stations')
+ await pgPools.stats.query('DELETE FROM daily_reward_transfers')
})
describe('GET /stations/daily', () => {
it('returns daily station metrics for the given date range', async () => {
- await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-10', [
+ await givenDailyStationMetrics(pgPools.evaluate, '2024-01-10', [
{ stationId: 'station1', acceptedMeasurementCount: 1 }
])
- await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-11', [
+ await givenDailyStationMetrics(pgPools.evaluate, '2024-01-11', [
{ stationId: 'station2', acceptedMeasurementCount: 1 }
])
- await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-12', [
+ await givenDailyStationMetrics(pgPools.evaluate, '2024-01-12', [
{ stationId: 'station2', acceptedMeasurementCount: 2 },
{ stationId: 'station3', acceptedMeasurementCount: 1 }
])
- await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-13', [
+ await givenDailyStationMetrics(pgPools.evaluate, '2024-01-13', [
{ stationId: 'station1', acceptedMeasurementCount: 1 }
])
@@ -88,25 +81,25 @@ describe('Platform Routes HTTP request handler', () => {
describe('GET /stations/monthly', () => {
it('returns monthly station metrics for the given date range ignoring the day number', async () => {
// before the date range
- await givenDailyStationMetrics(pgPoolEvaluateDb, '2023-12-31', [
+ await givenDailyStationMetrics(pgPools.evaluate, '2023-12-31', [
{ stationId: 'station1', acceptedMeasurementCount: 1 }
])
// in the date range
- await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-10', [
+ await givenDailyStationMetrics(pgPools.evaluate, '2024-01-10', [
{ stationId: 'station1', acceptedMeasurementCount: 1 }
])
- await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-11', [
+ await givenDailyStationMetrics(pgPools.evaluate, '2024-01-11', [
{ stationId: 'station2', acceptedMeasurementCount: 1 }
])
- await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-12', [
+ await givenDailyStationMetrics(pgPools.evaluate, '2024-01-12', [
{ stationId: 'station2', acceptedMeasurementCount: 2 },
{ stationId: 'station3', acceptedMeasurementCount: 1 }
])
- await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-02-13', [
+ await givenDailyStationMetrics(pgPools.evaluate, '2024-02-13', [
{ stationId: 'station1', acceptedMeasurementCount: 1 }
])
// after the date range
- await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-03-01', [
+ await givenDailyStationMetrics(pgPools.evaluate, '2024-03-01', [
{ stationId: 'station1', acceptedMeasurementCount: 1 }
])
@@ -129,17 +122,17 @@ describe('Platform Routes HTTP request handler', () => {
describe('GET /measurements/daily', () => {
it('returns daily total accepted measurement count for the given date range', async () => {
- await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-10', [
+ await givenDailyStationMetrics(pgPools.evaluate, '2024-01-10', [
{ stationId: 'station1', acceptedMeasurementCount: 1 }
])
- await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-11', [
+ await givenDailyStationMetrics(pgPools.evaluate, '2024-01-11', [
{ stationId: 'station2', acceptedMeasurementCount: 1 }
])
- await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-12', [
+ await givenDailyStationMetrics(pgPools.evaluate, '2024-01-12', [
{ stationId: 'station2', acceptedMeasurementCount: 2 },
{ stationId: 'station3', acceptedMeasurementCount: 1 }
])
- await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-13', [
+ await givenDailyStationMetrics(pgPools.evaluate, '2024-01-13', [
{ stationId: 'station1', acceptedMeasurementCount: 1 }
])
@@ -162,17 +155,17 @@ describe('Platform Routes HTTP request handler', () => {
describe('GET /transfers/daily', () => {
it('returns daily total Rewards sent for the given date range', async () => {
- await givenDailyRewardTransferMetrics(pgPoolStatsDb, '2024-01-10', [
+ await givenDailyRewardTransferMetrics(pgPools.stats, '2024-01-10', [
{ toAddress: 'to1', amount: 100, lastCheckedBlock: 1 }
])
- await givenDailyRewardTransferMetrics(pgPoolStatsDb, '2024-01-11', [
+ await givenDailyRewardTransferMetrics(pgPools.stats, '2024-01-11', [
{ toAddress: 'to2', amount: 150, lastCheckedBlock: 1 }
])
- await givenDailyRewardTransferMetrics(pgPoolStatsDb, '2024-01-12', [
+ await givenDailyRewardTransferMetrics(pgPools.stats, '2024-01-12', [
{ toAddress: 'to2', amount: 300, lastCheckedBlock: 1 },
{ toAddress: 'to3', amount: 250, lastCheckedBlock: 1 }
])
- await givenDailyRewardTransferMetrics(pgPoolStatsDb, '2024-01-13', [
+ await givenDailyRewardTransferMetrics(pgPools.stats, '2024-01-13', [
{ toAddress: 'to1', amount: 100, lastCheckedBlock: 1 }
])
@@ -194,8 +187,8 @@ describe('Platform Routes HTTP request handler', () => {
})
})
-const givenDailyStationMetrics = async (pgPoolEvaluateDb, day, stationStats) => {
- await pgPoolEvaluateDb.query(`
+const givenDailyStationMetrics = async (pgPoolEvaluate, day, stationStats) => {
+ await pgPoolEvaluate.query(`
INSERT INTO daily_stations (day, station_id, accepted_measurement_count)
SELECT $1 AS day, UNNEST($2::text[]) AS station_id, UNNEST($3::int[]) AS accepted_measurement_count
ON CONFLICT DO NOTHING
@@ -206,8 +199,8 @@ const givenDailyStationMetrics = async (pgPoolEvaluateDb, day, stationStats) =>
])
}
-const givenDailyRewardTransferMetrics = async (pgPoolStatsDb, day, transferStats) => {
- await pgPoolStatsDb.query(`
+const givenDailyRewardTransferMetrics = async (pgPoolStats, day, transferStats) => {
+ await pgPoolStats.query(`
INSERT INTO daily_reward_transfers (day, to_address, amount, last_checked_block)
SELECT $1 AS day, UNNEST($2::text[]) AS to_address, UNNEST($3::int[]) AS amount, UNNEST($4::int[]) AS last_checked_block
ON CONFLICT DO NOTHING