diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 922c89a..f94eea9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -54,13 +54,16 @@ jobs: --health-retries 5 env: DATABASE_URL: postgres://postgres:postgres@localhost:5432/spark_stats + EVALUATE_DB_URL: postgres://postgres:postgres@localhost:5432/spark_evaluate NPM_CONFIG_WORKSPACE: observer steps: + - run: psql "${DATABASE_URL}" -c "CREATE DATABASE spark_evaluate" - uses: actions/checkout@v4 - uses: actions/setup-node@v4 with: node-version: 20 - run: npm ci + - run: npm run migrate - run: npm test lint-all: @@ -72,6 +75,38 @@ jobs: node-version: 20 - run: npm ci - run: npm run lint + + dry-run: + runs-on: ubuntu-latest + services: + postgres: + image: postgres:latest + env: + POSTGRES_DB: spark_stats + POSTGRES_PASSWORD: postgres + POSTGRES_USER: postgres + ports: + - 5432:5432 + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + env: + DATABASE_URL: postgres://postgres:postgres@localhost:5432/spark_stats + EVALUATE_DB_URL: postgres://postgres:postgres@localhost:5432/spark_evaluate + NPM_CONFIG_WORKSPACE: observer + steps: + - run: psql "${DATABASE_URL}" -c "CREATE DATABASE spark_evaluate" + - uses: actions/checkout@v4 + - uses: actions/setup-node@v4 + with: + node-version: 20 + - run: npm ci + - run: npm run migrate + - run: npm run dry-run + env: + GLIF_TOKEN: ${{ secrets.GLIF_TOKEN }} docker-build: runs-on: ubuntu-latest diff --git a/Dockerfile b/Dockerfile index 5ccccb1..0cfd66d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -29,7 +29,7 @@ COPY --link package-lock.json package.json ./ # We cannot use a wildcard until `COPY --parents` is stabilised # See https://docs.docker.com/reference/dockerfile/#copy---parents -COPY --link migrations/package.json ./migrations/ +COPY --link db/package.json ./db/ COPY --link stats/package.json ./stats/ COPY --link observer/package.json ./observer/ diff --git a/README.md b/README.md index f64ec03..b389c83 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,10 @@ Base URL: http://stats.filspark.com/ http://stats.filspark.com/participants/change-rates +- `GET /participants/scheduled-rewards?address=
&from=&to=` + + http://stats.filspark.com/participants/scheduled-rewards + - `GET /miners/retrieval-success-rate/summary?from=&to=` http://stats.filspark.com/miners/retrieval-success-rate/summary diff --git a/db/index.js b/db/index.js new file mode 100644 index 0000000..2bac676 --- /dev/null +++ b/db/index.js @@ -0,0 +1,93 @@ +import { migrateWithPgClient as migrateEvaluateDB } from 'spark-evaluate/lib/migrate.js' +import pg from 'pg' +import { dirname, join } from 'node:path' +import { fileURLToPath } from 'node:url' +import Postgrator from 'postgrator' + +export { migrateEvaluateDB } + +const { + // DATABASE_URL points to `spark_stats` database managed by this monorepo + DATABASE_URL = 'postgres://localhost:5432/spark_stats', + + // EVALUATE_DB_URL points to `spark_evaluate` database managed by spark-evaluate repo. + // Eventually, we should move the code updating stats from spark-evaluate to this repo + // and then we won't need two connection strings. + EVALUATE_DB_URL = 'postgres://localhost:5432/spark_evaluate' +} = process.env + +const migrationsDirectory = join( + dirname(fileURLToPath(import.meta.url)), + 'migrations' +) + +const poolConfig = { + // allow the pool to close all connections and become empty + min: 0, + // this values should correlate with service concurrency hard_limit configured in fly.toml + // and must take into account the connection limit of our PG server, see + // https://fly.io/docs/postgres/managing/configuration-tuning/ + max: 100, + // close connections that haven't been used for one second + idleTimeoutMillis: 1000, + // automatically close connections older than 60 seconds + maxLifetimeSeconds: 60 +} + +const onError = err => { + // Prevent crashing the process on idle client errors, the pool will recover + // itself. If all connections are lost, the process will still crash. + // https://github.com/brianc/node-postgres/issues/1324#issuecomment-308778405 + console.error('An idle client has experienced an error', err.stack) +} + +export const getStatsPgPool = async () => { + const stats = new pg.Pool({ + ...poolConfig, + connectionString: DATABASE_URL + }) + stats.on('error', onError) + await migrateStatsDB(stats) + return stats +} + +export const getEvaluatePgPool = async () => { + const evaluate = new pg.Pool({ + ...poolConfig, + connectionString: EVALUATE_DB_URL + }) + evaluate.on('error', onError) + await evaluate.query('SELECT 1') + return evaluate +} + +/** + * @returns {Promise} + */ +export const getPgPools = async () => { + const stats = await getStatsPgPool() + const evaluate = await getEvaluatePgPool() + const end = () => Promise.all([stats.end(), evaluate.end()]) + + return { stats, evaluate, end } +} + +/** + * @param {pg.Client} client + */ +export const migrateStatsDB = async (client) => { + const postgrator = new Postgrator({ + migrationPattern: join(migrationsDirectory, '*'), + driver: 'pg', + execQuery: (query) => client.query(query) + }) + console.log( + 'Migrating `spark-stats` DB schema from version %s to version %s', + await postgrator.getDatabaseVersion(), + await postgrator.getMaxVersion() + ) + + await postgrator.migrate() + + console.log('Migrated `spark-stats` DB schema to version', await postgrator.getDatabaseVersion()) +} diff --git a/migrations/001.do.sql b/db/migrations/001.do.sql similarity index 100% rename from migrations/001.do.sql rename to db/migrations/001.do.sql diff --git a/migrations/002.do.daily-reward-transfers.sql b/db/migrations/002.do.daily-reward-transfers.sql similarity index 100% rename from migrations/002.do.daily-reward-transfers.sql rename to db/migrations/002.do.daily-reward-transfers.sql diff --git a/db/migrations/003.do.scheduled-rewards.sql b/db/migrations/003.do.scheduled-rewards.sql new file mode 100644 index 0000000..b30d488 --- /dev/null +++ b/db/migrations/003.do.scheduled-rewards.sql @@ -0,0 +1,6 @@ +CREATE TABLE daily_scheduled_rewards ( + day DATE NOT NULL, + participant_address TEXT NOT NULL, + scheduled_rewards NUMERIC NOT NULL, + PRIMARY KEY (day, participant_address) +); diff --git a/migrations/package.json b/db/package.json similarity index 62% rename from migrations/package.json rename to db/package.json index eb0149b..a88b67e 100644 --- a/migrations/package.json +++ b/db/package.json @@ -1,15 +1,15 @@ { - "name": "@filecoin-station/spark-stats-db-migrations", + "name": "@filecoin-station/spark-stats-db", "version": "1.0.0", "type": "module", "main": "index.js", "private": true, "scripts": { - "lint": "standard", - "test": "mocha" + "lint": "standard" }, "devDependencies": { - "standard": "^17.1.0" + "standard": "^17.1.0", + "spark-evaluate": "filecoin-station/spark-evaluate#main" }, "dependencies": { "pg": "^8.12.0", diff --git a/db/typings.d.ts b/db/typings.d.ts new file mode 100644 index 0000000..56419bd --- /dev/null +++ b/db/typings.d.ts @@ -0,0 +1,6 @@ +import type { Pool } from 'pg' + +export interface pgPools { + stats: Pool; + evaluate: Pool; +} diff --git a/migrations/index.js b/migrations/index.js deleted file mode 100644 index e6e417b..0000000 --- a/migrations/index.js +++ /dev/null @@ -1,43 +0,0 @@ -import { dirname, join } from 'node:path' -import { fileURLToPath } from 'node:url' -import pg from 'pg' -import Postgrator from 'postgrator' - -const migrationsDirectory = join( - dirname(fileURLToPath(import.meta.url)), - '..', - 'migrations' -) - -/** - *@param {pg.ClientConfig} pgConfig - */ -export const migrateWithPgConfig = async (pgConfig) => { - const client = new pg.Client(pgConfig) - await client.connect() - try { - await migrateWithPgClient(client) - } finally { - await client.end() - } -} - -/** - * @param {pg.Client} client - */ -export const migrateWithPgClient = async (client) => { - const postgrator = new Postgrator({ - migrationPattern: join(migrationsDirectory, '*'), - driver: 'pg', - execQuery: (query) => client.query(query) - }) - console.log( - 'Migrating DB schema from version %s to version %s', - await postgrator.getDatabaseVersion(), - await postgrator.getMaxVersion() - ) - - await postgrator.migrate() - - console.log('Migrated DB schema to version', await postgrator.getDatabaseVersion()) -} diff --git a/observer/bin/dry-run.js b/observer/bin/dry-run.js index 2617603..a1a15fc 100644 --- a/observer/bin/dry-run.js +++ b/observer/bin/dry-run.js @@ -2,11 +2,10 @@ import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator' import { ethers } from 'ethers' import { RPC_URL, rpcHeaders } from '../lib/config.js' -import { observeTransferEvents } from '../lib/observer.js' -import { getPgPool } from '../lib/db.js' +import { observeTransferEvents, observeScheduledRewards } from '../lib/observer.js' +import { getPgPools } from '@filecoin-station/spark-stats-db' -/** @type {pg.Pool} */ -const pgPool = await getPgPool() +const pgPools = await getPgPools() const fetchRequest = new ethers.FetchRequest(RPC_URL) fetchRequest.setHeader('Authorization', rpcHeaders.Authorization || '') @@ -14,8 +13,11 @@ const provider = new ethers.JsonRpcProvider(fetchRequest, null, { polling: true const ieContract = new ethers.Contract(SparkImpactEvaluator.ADDRESS, SparkImpactEvaluator.ABI, provider) -await pgPool.query('DELETE FROM daily_reward_transfers') +await pgPools.stats.query('DELETE FROM daily_reward_transfers') -await observeTransferEvents(pgPool, ieContract, provider) +await Promise.all([ + observeTransferEvents(pgPools.stats, ieContract, provider), + observeScheduledRewards(pgPools, ieContract) +]) -await pgPool.end() +await pgPools.end() diff --git a/observer/bin/migrate.js b/observer/bin/migrate.js index 13654cb..71a14b1 100644 --- a/observer/bin/migrate.js +++ b/observer/bin/migrate.js @@ -1,5 +1,5 @@ -import { DATABASE_URL } from '../lib/config.js' -import { migrateWithPgConfig as migrateStatsDB } from '@filecoin-station/spark-stats-db-migrations' +import { migrateStatsDB, migrateEvaluateDB, getPgPools } from '@filecoin-station/spark-stats-db' -console.log('Migrating spark_stats database') -await migrateStatsDB({ connectionString: DATABASE_URL }) +const pgPools = await getPgPools() +await migrateStatsDB(pgPools.stats) +await migrateEvaluateDB(pgPools.evaluate) diff --git a/observer/bin/spark-observer.js b/observer/bin/spark-observer.js index 66beec1..e4addb3 100644 --- a/observer/bin/spark-observer.js +++ b/observer/bin/spark-observer.js @@ -1,13 +1,17 @@ +import '../lib/instrument.js' import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator' import { ethers } from 'ethers' import * as Sentry from '@sentry/node' import timers from 'node:timers/promises' -import { RPC_URL, rpcHeaders, OBSERVATION_INTERVAL_MS } from '../lib/config.js' -import { getPgPool } from '../lib/db.js' -import { observeTransferEvents } from '../lib/observer.js' +import { RPC_URL, rpcHeaders } from '../lib/config.js' +import { getPgPools } from '@filecoin-station/spark-stats-db' +import { + observeTransferEvents, + observeScheduledRewards +} from '../lib/observer.js' -const pgPool = await getPgPool() +const pgPools = await getPgPools() const fetchRequest = new ethers.FetchRequest(RPC_URL) fetchRequest.setHeader('Authorization', rpcHeaders.Authorization || '') @@ -15,13 +19,39 @@ const provider = new ethers.JsonRpcProvider(fetchRequest, null, { polling: true const ieContract = new ethers.Contract(SparkImpactEvaluator.ADDRESS, SparkImpactEvaluator.ABI, provider) -// Listen for Transfer events from the IE contract -while (true) { - try { - await observeTransferEvents(pgPool, ieContract, provider) - } catch (e) { - console.error(e) - Sentry.captureException(e) +const ONE_HOUR = 60 * 60 * 1000 + +const loopObserveTransferEvents = async () => { + while (true) { + const start = new Date() + try { + await observeTransferEvents(pgPools, ieContract, provider) + } catch (e) { + console.error(e) + Sentry.captureException(e) + } + const dt = new Date() - start + console.log(`Observing Transfer events took ${dt}ms`) + await timers.setTimeout(ONE_HOUR - dt) } - await timers.setTimeout(OBSERVATION_INTERVAL_MS) } + +const loopObserveScheduledRewards = async () => { + while (true) { + const start = new Date() + try { + await observeScheduledRewards(pgPools, ieContract, provider) + } catch (e) { + console.error(e) + Sentry.captureException(e) + } + const dt = new Date() - start + console.log(`Observing scheduled rewards took ${dt}ms`) + await timers.setTimeout((24 * ONE_HOUR) - dt) + } +} + +await Promise.all([ + loopObserveTransferEvents(), + loopObserveScheduledRewards() +]) diff --git a/observer/lib/config.js b/observer/lib/config.js index 4bfb0f0..a6340b5 100644 --- a/observer/lib/config.js +++ b/observer/lib/config.js @@ -3,11 +3,7 @@ const { // supports rpc failover // RPC_URLS = 'https://api.node.glif.io/rpc/v0,https://api.chain.love/rpc/v1', RPC_URLS = 'https://api.node.glif.io/rpc/v0', - GLIF_TOKEN, - // DATABASE_URL points to `spark_stats` database managed by this monorepo - DATABASE_URL = 'postgres://localhost:5432/spark_stats', - // Sleep one hour between observations - OBSERVATION_INTERVAL_MS = 1000 * 60 * 60 + GLIF_TOKEN } = process.env const rpcUrls = RPC_URLS.split(',') @@ -21,7 +17,5 @@ if (RPC_URL.includes('glif')) { export { RPC_URL, - DATABASE_URL, - rpcHeaders, - OBSERVATION_INTERVAL_MS + rpcHeaders } diff --git a/observer/lib/db.js b/observer/lib/db.js deleted file mode 100644 index 1a74dd1..0000000 --- a/observer/lib/db.js +++ /dev/null @@ -1,34 +0,0 @@ -import { migrateWithPgClient } from '@filecoin-station/spark-stats-db-migrations' -import pg from 'pg' - -import { DATABASE_URL } from '../lib/config.js' - -export const getPgPool = async () => { - const pgPool = new pg.Pool({ - connectionString: DATABASE_URL, - // allow the pool to close all connections and become empty - min: 0, - // this values should correlate with service concurrency hard_limit configured in fly.toml - // and must take into account the connection limit of our PG server, see - // https://fly.io/docs/postgres/managing/configuration-tuning/ - max: 100, - // close connections that haven't been used for one second - idleTimeoutMillis: 1000, - // automatically close connections older than 60 seconds - maxLifetimeSeconds: 60 - }) - - pgPool.on('error', err => { - // Prevent crashing the process on idle client errors, the pool will recover - // itself. If all connections are lost, the process will still crash. - // https://github.com/brianc/node-postgres/issues/1324#issuecomment-308778405 - console.error('An idle client has experienced an error', err.stack) - }) - - await migrateWithPgClient(pgPool) - - // Check that we can talk to the database - await pgPool.query('SELECT 1') - - return pgPool -} diff --git a/observer/lib/instrument.js b/observer/lib/instrument.js new file mode 100644 index 0000000..e804d6f --- /dev/null +++ b/observer/lib/instrument.js @@ -0,0 +1,24 @@ +import * as Sentry from '@sentry/node' +import fs from 'node:fs/promises' +import { join, dirname } from 'node:path' +import { fileURLToPath } from 'node:url' + +const { SENTRY_ENVIRONMENT = 'development' } = process.env + +const pkg = JSON.parse( + await fs.readFile( + join( + dirname(fileURLToPath(import.meta.url)), + '..', + 'package.json' + ), + 'utf8' + ) +) + +Sentry.init({ + dsn: 'https://47b65848a6171ecd8bf9f5395a782b3f@o1408530.ingest.sentry.io/4506576125427712', + release: pkg.version, + environment: SENTRY_ENVIRONMENT, + tracesSampleRate: 0.1 +}) diff --git a/observer/lib/observer.js b/observer/lib/observer.js index 7ff9e62..26bbf69 100644 --- a/observer/lib/observer.js +++ b/observer/lib/observer.js @@ -1,13 +1,14 @@ import { updateDailyTransferStats } from './platform-stats.js' +import * as Sentry from '@sentry/node' /** * Observe the transfer events on the Filecoin blockchain - * @param {import('pg').Pool} pgPool + * @param {import('pg').Pool} pgPoolStats * @param {import('ethers').Contract} ieContract * @param {import('ethers').Provider} provider */ -export const observeTransferEvents = async (pgPool, ieContract, provider) => { - const { rows } = await pgPool.query( +export const observeTransferEvents = async (pgPoolStats, ieContract, provider) => { + const { rows } = await pgPoolStats.query( 'SELECT MAX(last_checked_block) FROM daily_reward_transfers' ) let queryFromBlock = rows[0].last_checked_block @@ -28,6 +29,43 @@ export const observeTransferEvents = async (pgPool, ieContract, provider) => { amount: event.args.amount } console.log('Transfer event:', transferEvent) - await updateDailyTransferStats(pgPool, transferEvent, currentBlockNumber) + await updateDailyTransferStats(pgPoolStats, transferEvent, currentBlockNumber) + } +} + +/** + * Observe scheduled rewards on the Filecoin blockchain + * @param {import('@filecoin-station/spark-stats-db').pgPools} pgPools + * @param {import('ethers').Contract} ieContract + */ +export const observeScheduledRewards = async (pgPools, ieContract) => { + console.log('Querying scheduled rewards from impact evaluator') + const { rows } = await pgPools.evaluate.query(` + SELECT participant_address + FROM participants p + JOIN daily_participants d ON p.id = d.participant_id + WHERE d.day >= now() - interval '3 days' + `) + for (const { participant_address: address } of rows) { + let scheduledRewards + try { + scheduledRewards = await ieContract.rewardsScheduledFor(address) + } catch (err) { + Sentry.captureException(err) + console.error( + 'Error querying scheduled rewards for', + address, + { cause: err } + ) + continue + } + console.log('Scheduled rewards for', address, scheduledRewards) + await pgPools.stats.query(` + INSERT INTO daily_scheduled_rewards + (day, participant_address, scheduled_rewards) + VALUES (now(), $1, $2) + ON CONFLICT (day, participant_address) DO UPDATE SET + scheduled_rewards = EXCLUDED.scheduled_rewards + `, [address, scheduledRewards]) } } diff --git a/observer/package.json b/observer/package.json index 0fc4b7d..df9d5d9 100644 --- a/observer/package.json +++ b/observer/package.json @@ -3,6 +3,7 @@ "type": "module", "private": true, "scripts": { + "dry-run": "node bin/dry-run.js", "migrate": "node bin/migrate.js", "start": "node bin/spark-observer.js", "lint": "standard", @@ -10,11 +11,12 @@ }, "devDependencies": { "mocha": "^10.4.0", + "spark-evaluate": "github:filecoin-station/spark-evaluate#main", "standard": "^17.1.0" }, "dependencies": { "@filecoin-station/spark-impact-evaluator": "^1.1.1", - "@filecoin-station/spark-stats-db-migrations": "^1.0.0", + "@filecoin-station/spark-stats-db": "^1.0.0", "@sentry/node": "^8.9.1", "debug": "^4.3.5", "ethers": "^6.13.0", diff --git a/observer/test/observer.test.js b/observer/test/observer.test.js new file mode 100644 index 0000000..80c6c7e --- /dev/null +++ b/observer/test/observer.test.js @@ -0,0 +1,59 @@ +import assert from 'node:assert' +import { observeScheduledRewards } from '../lib/observer.js' +import { getPgPools } from '@filecoin-station/spark-stats-db' +import { givenDailyParticipants } from 'spark-evaluate/test/helpers/queries.js' + +const getDayAsISOString = d => d.toISOString().split('T')[0] +const today = () => getDayAsISOString(new Date()) + +describe('observer', () => { + describe('observeScheduledRewards', () => { + let pgPools + + before(async () => { + pgPools = await getPgPools() + }) + + beforeEach(async () => { + await pgPools.evaluate.query('DELETE FROM daily_participants') + await pgPools.evaluate.query('DELETE FROM participants') + await givenDailyParticipants(pgPools.evaluate, today(), ['0xCURRENT']) + await givenDailyParticipants(pgPools.evaluate, '2000-01-01', ['0xOLD']) + }) + + it('observes scheduled rewards', async () => { + const ieContract = { + rewardsScheduledFor: async (address) => { + if (address === '0xCURRENT') { + return 100n + } else { + throw new Error('Should never be called') + } + } + } + await observeScheduledRewards(pgPools, ieContract) + const { rows } = await pgPools.stats.query(` + SELECT participant_address, scheduled_rewards + FROM daily_scheduled_rewards + `) + assert.deepStrictEqual(rows, [{ + participant_address: '0xCURRENT', + scheduled_rewards: '100' + }]) + }) + it('updates scheduled rewards', async () => { + const ieContract = { + rewardsScheduledFor: async () => 200n + } + await observeScheduledRewards(pgPools, ieContract) + const { rows } = await pgPools.stats.query(` + SELECT participant_address, scheduled_rewards + FROM daily_scheduled_rewards + `) + assert.deepStrictEqual(rows, [{ + participant_address: '0xCURRENT', + scheduled_rewards: '200' + }]) + }) + }) +}) diff --git a/observer/test/platform-stats.test.js b/observer/test/platform-stats.test.js index 2809be3..074e4ef 100644 --- a/observer/test/platform-stats.test.js +++ b/observer/test/platform-stats.test.js @@ -1,24 +1,17 @@ import assert from 'node:assert' -import pg from 'pg' import { beforeEach, describe, it } from 'mocha' -import { DATABASE_URL } from '../lib/config.js' +import { getStatsPgPool, migrateStatsDB } from '@filecoin-station/spark-stats-db' import { updateDailyTransferStats } from '../lib/platform-stats.js' -import { migrateWithPgClient } from '@filecoin-station/spark-stats-db-migrations' - -const createPgClient = async () => { - const pgClient = new pg.Client({ connectionString: DATABASE_URL }) - await pgClient.connect() - return pgClient -} describe('platform-stats-generator', () => { /** @type {pg.Client} */ let pgClient before(async () => { - pgClient = await createPgClient() - await migrateWithPgClient(pgClient) + const pgPool = await getStatsPgPool() + pgClient = await pgPool.connect() + await migrateStatsDB(pgClient) }) let today @@ -37,7 +30,7 @@ describe('platform-stats-generator', () => { }) after(async () => { - await pgClient.end() + await pgClient.release() }) describe('updateDailyTransferStats', () => { diff --git a/package-lock.json b/package-lock.json index 12882d8..34fb6d6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -6,7 +6,7 @@ "": { "name": "@filecoin-station/spark-stats-monorepo", "workspaces": [ - "migrations", + "db", "observer", "stats" ], @@ -14,9 +14,22 @@ "standard": "^17.1.0" } }, + "db": { + "name": "@filecoin-station/spark-stats-db", + "version": "1.0.0", + "dependencies": { + "pg": "^8.12.0", + "postgrator": "^7.2.0" + }, + "devDependencies": { + "spark-evaluate": "filecoin-station/spark-evaluate#main", + "standard": "^17.1.0" + } + }, "migrations": { "name": "@filecoin-station/spark-stats-db-migrations", "version": "1.0.0", + "extraneous": true, "dependencies": { "pg": "^8.12.0", "postgrator": "^7.2.0" @@ -99,8 +112,8 @@ "resolved": "stats", "link": true }, - "node_modules/@filecoin-station/spark-stats-db-migrations": { - "resolved": "migrations", + "node_modules/@filecoin-station/spark-stats-db": { + "resolved": "db", "link": true }, "node_modules/@glif/filecoin-address": { @@ -196,10 +209,11 @@ "dev": true }, "node_modules/@ipld/car": { - "version": "5.3.0", - "resolved": "https://registry.npmjs.org/@ipld/car/-/car-5.3.0.tgz", - "integrity": "sha512-OB8LVvJeVAFFGluNIkZeDZ/aGeoekFKsuIvNT9I5sJIb5WekQuW5+lekjQ7Z7mZ7DBKuke/kI4jBT1j0/akU1w==", + "version": "5.3.1", + "resolved": "https://registry.npmjs.org/@ipld/car/-/car-5.3.1.tgz", + "integrity": "sha512-8fNkYAZvL9yX2zesF32k7tYqUDGG41felmmBnwjCZJto06QXCb0NOMPJc/mhNgnVa5gkKqxPO1ZdSoHuaYcVSw==", "dev": true, + "license": "Apache-2.0 OR MIT", "dependencies": { "@ipld/dag-cbor": "^9.0.7", "cborg": "^4.0.5", @@ -5388,23 +5402,24 @@ } }, "node_modules/spark-evaluate": { - "resolved": "git+ssh://git@github.com/filecoin-station/spark-evaluate.git#6d3d547e00ceed6fdc90d59578a3d3fdfa6fdc78", + "resolved": "git+ssh://git@github.com/filecoin-station/spark-evaluate.git#534454c9b2323b67d50b159a3504de8a06eedcb2", + "integrity": "sha512-KFIA5F8Lw/+I+2MzPKT6qsgtt5LNSqdbuk9/LUTUyduGSUpQj++TCI+PlvSs2mTRQz7dZCQcXhhY5cchj7n1vA==", "dev": true, "dependencies": { - "@filecoin-station/spark-impact-evaluator": "^1.1.0", + "@filecoin-station/spark-impact-evaluator": "^1.1.1", "@glif/filecoin-address": "^3.0.5", "@influxdata/influxdb-client": "^1.33.2", - "@ipld/car": "^5.3.0", - "@sentry/node": "^8.7.0", + "@ipld/car": "^5.3.1", + "@sentry/node": "^8.9.1", "@web3-storage/car-block-validator": "^1.2.0", - "debug": "^4.3.4", + "debug": "^4.3.5", "ethers": "^6.10.0", "ipfs-car": "^1.2.0", "ipfs-unixfs-exporter": "^13.5.0", "just-percentile": "^4.2.0", "p-map": "^7.0.2", "p-retry": "^6.2.0", - "pg": "^8.11.5", + "pg": "^8.12.0", "postgrator": "^7.2.0" } }, @@ -6127,7 +6142,7 @@ "name": "@filecoin-station/spark-observer", "dependencies": { "@filecoin-station/spark-impact-evaluator": "^1.1.1", - "@filecoin-station/spark-stats-db-migrations": "^1.0.0", + "@filecoin-station/spark-stats-db": "^1.0.0", "@sentry/node": "^8.9.1", "debug": "^4.3.5", "ethers": "^6.13.0", @@ -6135,12 +6150,14 @@ }, "devDependencies": { "mocha": "^10.4.0", + "spark-evaluate": "github:filecoin-station/spark-evaluate#534454c9b2323b67d50b159a3504de8a06eedcb2", "standard": "^17.1.0" } }, "stats": { "name": "@filecoin-station/spark-stats", "dependencies": { + "@filecoin-station/spark-stats-db": "^1.0.0", "@sentry/node": "^8.9.1", "debug": "^4.3.5", "http-assert": "^1.5.0", @@ -6148,7 +6165,7 @@ "pg": "^8.12.0" }, "devDependencies": { - "@filecoin-station/spark-stats-db-migrations": "^1.0.0", + "@filecoin-station/spark-stats-db": "^1.0.0", "mocha": "^10.4.0", "spark-evaluate": "filecoin-station/spark-evaluate#main", "standard": "^17.1.0" diff --git a/package.json b/package.json index a7ac74f..dc19535 100644 --- a/package.json +++ b/package.json @@ -3,7 +3,7 @@ "private": true, "type": "module", "workspaces": [ - "migrations", + "db", "observer", "stats" ], diff --git a/stats/bin/migrate.js b/stats/bin/migrate.js index 29abdff..71a14b1 100644 --- a/stats/bin/migrate.js +++ b/stats/bin/migrate.js @@ -1,12 +1,5 @@ -import { - DATABASE_URL, - EVALUATE_DB_URL -} from '../lib/config.js' -import { migrateWithPgConfig as migrateEvaluateDB } from 'spark-evaluate/lib/migrate.js' -import { migrateWithPgConfig as migrateStatsDB } from '@filecoin-station/spark-stats-db-migrations' +import { migrateStatsDB, migrateEvaluateDB, getPgPools } from '@filecoin-station/spark-stats-db' -console.log('Migrating spark_evaluate database', EVALUATE_DB_URL) -await migrateEvaluateDB({ connectionString: EVALUATE_DB_URL }) - -console.log('Migrating spark_stats database', DATABASE_URL) -await migrateStatsDB({ connectionString: DATABASE_URL }) +const pgPools = await getPgPools() +await migrateStatsDB(pgPools.stats) +await migrateEvaluateDB(pgPools.evaluate) diff --git a/stats/bin/spark-stats.js b/stats/bin/spark-stats.js index f7f9b7e..1401d07 100644 --- a/stats/bin/spark-stats.js +++ b/stats/bin/spark-stats.js @@ -1,9 +1,8 @@ import '../lib/instrument.js' import http from 'node:http' import { once } from 'node:events' -import pg from 'pg' import { createHandler } from '../lib/handler.js' -import { DATABASE_URL, EVALUATE_DB_URL } from '../lib/config.js' +import { getPgPools } from '@filecoin-station/spark-stats-db' const { PORT = 8080, @@ -11,54 +10,14 @@ const { REQUEST_LOGGING = 'true' } = process.env -const pgPoolConfig = { - // allow the pool to close all connections and become empty - min: 0, - // this values should correlate with service concurrency hard_limit configured in fly.toml - // and must take into account the connection limit of our PG server, see - // https://fly.io/docs/postgres/managing/configuration-tuning/ - max: 100, - // close connections that haven't been used for one second - idleTimeoutMillis: 1000, - // automatically close connections older than 60 seconds - maxLifetimeSeconds: 60 -} -const pgPoolErrFn = err => { - // Prevent crashing the process on idle client errors, the pool will recover - // itself. If all connections are lost, the process will still crash. - // https://github.com/brianc/node-postgres/issues/1324#issuecomment-308778405 - console.error('An idle client has experienced an error', err.stack) -} - -// Connect and set up the Evaluate DB -const pgPoolEvaluateDb = new pg.Pool({ - connectionString: EVALUATE_DB_URL, - ...pgPoolConfig -}) -pgPoolEvaluateDb.on('error', pgPoolErrFn) -// Check that we can talk to the database -await pgPoolEvaluateDb.query('SELECT 1') - -// Connect and set up the Stats DB -const pgPoolStatsDb = new pg.Pool({ - connectionString: DATABASE_URL, - ...pgPoolConfig -}) -pgPoolStatsDb.on('error', pgPoolErrFn) -// Check that we can talk to the database -await pgPoolStatsDb.query('SELECT 1') - +const pgPools = await getPgPools() const logger = { error: console.error, info: console.info, request: ['1', 'true'].includes(REQUEST_LOGGING) ? console.info : () => {} } -const handler = createHandler({ - pgPoolEvaluateDb, - pgPoolStatsDb, - logger -}) +const handler = createHandler({ pgPools, logger }) const server = http.createServer(handler) console.log('Starting the http server on host %j port %s', HOST, PORT) server.listen(PORT, HOST) diff --git a/stats/lib/config.js b/stats/lib/config.js deleted file mode 100644 index ea99e8a..0000000 --- a/stats/lib/config.js +++ /dev/null @@ -1,9 +0,0 @@ -export const { - // DATABASE_URL points to `spark_stats` database managed by this monorepo - DATABASE_URL = 'postgres://localhost:5432/spark_stats', - - // EVALUATE_DB_URL points to `spark_evaluate` database managed by spark-evaluate repo. - // Eventually, we should move the code updating stats from spark-evaluate to this repo - // and then we won't need two connection strings. - EVALUATE_DB_URL = 'postgres://localhost:5432/spark_evaluate' -} = process.env diff --git a/stats/lib/handler.js b/stats/lib/handler.js index d8b96a7..3dacd66 100644 --- a/stats/lib/handler.js +++ b/stats/lib/handler.js @@ -7,6 +7,7 @@ import { fetchMinersRSRSummary, fetchMonthlyParticipants, fetchParticipantChangeRates, + fetchParticipantScheduledRewards, fetchRetrievalSuccessRate } from './stats-fetchers.js' @@ -14,20 +15,19 @@ import { handlePlatformRoutes } from './platform-routes.js' /** * @param {object} args - * @param {import('pg').Pool} args.pgPoolEvaluateDb + * @param {import('@filecoin-station/spark-stats-db')} args.pgPools * @param {import('pg').Pool} args.pgPoolStatsDb * @param {import('./typings').Logger} args.logger * @returns */ export const createHandler = ({ - pgPoolEvaluateDb, - pgPoolStatsDb, + pgPools, logger }) => { return (req, res) => { const start = new Date() logger.request(`${req.method} ${req.url} ...`) - handler(req, res, pgPoolEvaluateDb, pgPoolStatsDb) + handler(req, res, pgPools) .catch(err => errorHandler(res, err, logger)) .then(() => { logger.request(`${req.method} ${req.url} ${res.statusCode} (${new Date() - start}ms)`) @@ -38,10 +38,9 @@ export const createHandler = ({ /** * @param {import('node:http').IncomingMessage} req * @param {import('node:http').ServerResponse} res - * @param {import('pg').Pool} pgPoolEvaluateDb - * @param {import('pg').Pool} pgPoolStatsDb + * @param {import('@filecoin-station/spark-stats-db')} args.pgPools */ -const handler = async (req, res, pgPoolEvaluateDb, pgPoolStatsDb) => { +const handler = async (req, res, pgPools) => { // Caveat! `new URL('//foo', 'http://127.0.0.1')` would produce "http://foo/" - not what we want! const { pathname, searchParams } = new URL(`http://127.0.0.1${req.url}`) const segs = pathname.split('/').filter(Boolean) @@ -51,6 +50,7 @@ const handler = async (req, res, pgPoolEvaluateDb, pgPoolStatsDb) => { 'participants/daily': fetchDailyParticipants, 'participants/monthly': fetchMonthlyParticipants, 'participants/change-rates': fetchParticipantChangeRates, + 'participants/scheduled-rewards': fetchParticipantScheduledRewards, 'miners/retrieval-success-rate/summary': fetchMinersRSRSummary } @@ -60,10 +60,10 @@ const handler = async (req, res, pgPoolEvaluateDb, pgPoolStatsDb) => { pathname, searchParams, res, - pgPoolEvaluateDb, + pgPools.evaluate, fetchStatsFn ) - } else if (await handlePlatformRoutes(req, res, pgPoolEvaluateDb, pgPoolStatsDb)) { + } else if (await handlePlatformRoutes(req, res, pgPools)) { // no-op, request was handled by handlePlatformRoute } else { notFound(res) diff --git a/stats/lib/platform-routes.js b/stats/lib/platform-routes.js index e2fcf43..db7ad2e 100644 --- a/stats/lib/platform-routes.js +++ b/stats/lib/platform-routes.js @@ -6,7 +6,7 @@ import { fetchDailyStationAcceptedMeasurementCount } from './platform-stats-fetchers.js' -export const handlePlatformRoutes = async (req, res, pgPoolEvaluateDb, pgPoolStatsDb) => { +export const handlePlatformRoutes = async (req, res, pgPools) => { // Caveat! `new URL('//foo', 'http://127.0.0.1')` would produce "http://foo/" - not what we want! const { pathname, searchParams } = new URL(`http://127.0.0.1${req.url}`) const segs = pathname.split('/').filter(Boolean) @@ -14,19 +14,19 @@ export const handlePlatformRoutes = async (req, res, pgPoolEvaluateDb, pgPoolSta const routeHandlerInfoMap = { 'stations/daily': { fetchFunction: fetchDailyStationCount, - pgPool: pgPoolEvaluateDb + pgPool: pgPools.evaluate }, 'stations/monthly': { fetchFunction: fetchMonthlyStationCount, - pgPool: pgPoolEvaluateDb + pgPool: pgPools.evaluate }, 'measurements/daily': { fetchFunction: fetchDailyStationAcceptedMeasurementCount, - pgPool: pgPoolEvaluateDb + pgPool: pgPools.evaluate }, 'transfers/daily': { fetchFunction: fetchDailyRewardTransfers, - pgPool: pgPoolStatsDb + pgPool: pgPools.stats } } diff --git a/stats/lib/stats-fetchers.js b/stats/lib/stats-fetchers.js index 2cddd19..ef64b92 100644 --- a/stats/lib/stats-fetchers.js +++ b/stats/lib/stats-fetchers.js @@ -118,6 +118,23 @@ export const fetchParticipantChangeRates = async (pgPool, filter) => { * @param {import('pg').Pool} pgPool * @param {import('./typings').DateRangeFilter} filter */ +export const fetchParticipantScheduledRewards = async (pgPool, filter) => { + const { rows } = await pgPool.query(` + SELECT scheduled_rewards + FROM daily_scheduled_rewards + WHERE address = $1 AND day >= $2 AND day <= $3 + `, [ + filter.address, + filter.from, + filter.to + ]) + return rows +} + +/** + * @param {import('pg').Pool} pgPool + * @param {import('./typings').Filter} filter + */ export const fetchMinersRSRSummary = async (pgPool, filter) => { const { rows } = await pgPool.query(` SELECT miner_id, SUM(total) as total, SUM(successful) as successful diff --git a/stats/package.json b/stats/package.json index 85d9340..90f3b30 100644 --- a/stats/package.json +++ b/stats/package.json @@ -9,12 +9,13 @@ "test": "mocha" }, "devDependencies": { - "@filecoin-station/spark-stats-db-migrations": "^1.0.0", + "@filecoin-station/spark-stats-db": "^1.0.0", "mocha": "^10.4.0", "spark-evaluate": "filecoin-station/spark-evaluate#main", "standard": "^17.1.0" }, "dependencies": { + "@filecoin-station/spark-stats-db": "^1.0.0", "@sentry/node": "^8.9.1", "debug": "^4.3.5", "http-assert": "^1.5.0", diff --git a/stats/test/handler.test.js b/stats/test/handler.test.js index c7cc79c..9cc6663 100644 --- a/stats/test/handler.test.js +++ b/stats/test/handler.test.js @@ -1,19 +1,18 @@ import http from 'node:http' import { once } from 'node:events' import assert from 'node:assert' -import pg from 'pg' import createDebug from 'debug' -import { mapParticipantsToIds } from 'spark-evaluate/lib/platform-stats.js' +import { givenDailyParticipants } from 'spark-evaluate/test/helpers/queries.js' +import { getEvaluatePgPool } from '@filecoin-station/spark-stats-db' import { assertResponseStatus } from './test-helpers.js' import { createHandler } from '../lib/handler.js' import { today } from '../lib/request-helpers.js' -import { EVALUATE_DB_URL } from '../lib/config.js' const debug = createDebug('test') describe('HTTP request handler', () => { - /** @type {pg.Pool} */ + /** @type {import('pg').Pool} */ let pgPool /** @type {http.Server} */ let server @@ -22,11 +21,13 @@ describe('HTTP request handler', () => { before(async () => { // handler doesn't use Stats DB - pgPool = new pg.Pool({ connectionString: EVALUATE_DB_URL }) + pgPool = await getEvaluatePgPool() const handler = createHandler({ - pgPoolEvaluateDb: pgPool, - pgPoolStatsDb: undefined, + pgPools: { + stats: null, + evaluate: pgPool + }, logger: { info: debug, error: console.error, @@ -388,15 +389,3 @@ const givenRetrievalStats = async (pgPool, { day, minerId, total, successful }) [day, minerId ?? 'f1test', total, successful] ) } - -const givenDailyParticipants = async (pgPool, day, participantAddresses) => { - const ids = await mapParticipantsToIds(pgPool, new Set(participantAddresses)) - await pgPool.query(` - INSERT INTO daily_participants (day, participant_id) - SELECT $1 as day, UNNEST($2::INT[]) AS participant_id - ON CONFLICT DO NOTHING - `, [ - day, - ids - ]) -} diff --git a/stats/test/platform-routes.test.js b/stats/test/platform-routes.test.js index 5cd3610..898bdfc 100644 --- a/stats/test/platform-routes.test.js +++ b/stats/test/platform-routes.test.js @@ -1,32 +1,26 @@ import http from 'node:http' import { once } from 'node:events' import assert from 'node:assert' -import pg from 'pg' import createDebug from 'debug' +import { getPgPools } from '@filecoin-station/spark-stats-db' import { assertResponseStatus } from './test-helpers.js' import { createHandler } from '../lib/handler.js' -import { DATABASE_URL, EVALUATE_DB_URL } from '../lib/config.js' const debug = createDebug('test') describe('Platform Routes HTTP request handler', () => { - /** @type {pg.Pool} */ - let pgPoolEvaluateDb - /** @type {pg.Pool} */ - let pgPoolStatsDb - /** @type {http.Server} */ + /** @type {import('@filecoin-station/spark-stats-db').pgPools} */ + let pgPools let server /** @type {string} */ let baseUrl before(async () => { - pgPoolEvaluateDb = new pg.Pool({ connectionString: EVALUATE_DB_URL }) - pgPoolStatsDb = new pg.Pool({ connectionString: DATABASE_URL }) + pgPools = await getPgPools() const handler = createHandler({ - pgPoolEvaluateDb, - pgPoolStatsDb, + pgPools, logger: { info: debug, error: console.error, @@ -43,28 +37,27 @@ describe('Platform Routes HTTP request handler', () => { after(async () => { server.closeAllConnections() server.close() - await pgPoolEvaluateDb.end() - await pgPoolStatsDb.end() + await pgPools.end() }) beforeEach(async () => { - await pgPoolEvaluateDb.query('DELETE FROM daily_stations') - await pgPoolStatsDb.query('DELETE FROM daily_reward_transfers') + await pgPools.evaluate.query('DELETE FROM daily_stations') + await pgPools.stats.query('DELETE FROM daily_reward_transfers') }) describe('GET /stations/daily', () => { it('returns daily station metrics for the given date range', async () => { - await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-10', [ + await givenDailyStationMetrics(pgPools.evaluate, '2024-01-10', [ { stationId: 'station1', acceptedMeasurementCount: 1 } ]) - await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-11', [ + await givenDailyStationMetrics(pgPools.evaluate, '2024-01-11', [ { stationId: 'station2', acceptedMeasurementCount: 1 } ]) - await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-12', [ + await givenDailyStationMetrics(pgPools.evaluate, '2024-01-12', [ { stationId: 'station2', acceptedMeasurementCount: 2 }, { stationId: 'station3', acceptedMeasurementCount: 1 } ]) - await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-13', [ + await givenDailyStationMetrics(pgPools.evaluate, '2024-01-13', [ { stationId: 'station1', acceptedMeasurementCount: 1 } ]) @@ -88,25 +81,25 @@ describe('Platform Routes HTTP request handler', () => { describe('GET /stations/monthly', () => { it('returns monthly station metrics for the given date range ignoring the day number', async () => { // before the date range - await givenDailyStationMetrics(pgPoolEvaluateDb, '2023-12-31', [ + await givenDailyStationMetrics(pgPools.evaluate, '2023-12-31', [ { stationId: 'station1', acceptedMeasurementCount: 1 } ]) // in the date range - await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-10', [ + await givenDailyStationMetrics(pgPools.evaluate, '2024-01-10', [ { stationId: 'station1', acceptedMeasurementCount: 1 } ]) - await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-11', [ + await givenDailyStationMetrics(pgPools.evaluate, '2024-01-11', [ { stationId: 'station2', acceptedMeasurementCount: 1 } ]) - await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-12', [ + await givenDailyStationMetrics(pgPools.evaluate, '2024-01-12', [ { stationId: 'station2', acceptedMeasurementCount: 2 }, { stationId: 'station3', acceptedMeasurementCount: 1 } ]) - await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-02-13', [ + await givenDailyStationMetrics(pgPools.evaluate, '2024-02-13', [ { stationId: 'station1', acceptedMeasurementCount: 1 } ]) // after the date range - await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-03-01', [ + await givenDailyStationMetrics(pgPools.evaluate, '2024-03-01', [ { stationId: 'station1', acceptedMeasurementCount: 1 } ]) @@ -129,17 +122,17 @@ describe('Platform Routes HTTP request handler', () => { describe('GET /measurements/daily', () => { it('returns daily total accepted measurement count for the given date range', async () => { - await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-10', [ + await givenDailyStationMetrics(pgPools.evaluate, '2024-01-10', [ { stationId: 'station1', acceptedMeasurementCount: 1 } ]) - await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-11', [ + await givenDailyStationMetrics(pgPools.evaluate, '2024-01-11', [ { stationId: 'station2', acceptedMeasurementCount: 1 } ]) - await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-12', [ + await givenDailyStationMetrics(pgPools.evaluate, '2024-01-12', [ { stationId: 'station2', acceptedMeasurementCount: 2 }, { stationId: 'station3', acceptedMeasurementCount: 1 } ]) - await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-13', [ + await givenDailyStationMetrics(pgPools.evaluate, '2024-01-13', [ { stationId: 'station1', acceptedMeasurementCount: 1 } ]) @@ -162,17 +155,17 @@ describe('Platform Routes HTTP request handler', () => { describe('GET /transfers/daily', () => { it('returns daily total Rewards sent for the given date range', async () => { - await givenDailyRewardTransferMetrics(pgPoolStatsDb, '2024-01-10', [ + await givenDailyRewardTransferMetrics(pgPools.stats, '2024-01-10', [ { toAddress: 'to1', amount: 100, lastCheckedBlock: 1 } ]) - await givenDailyRewardTransferMetrics(pgPoolStatsDb, '2024-01-11', [ + await givenDailyRewardTransferMetrics(pgPools.stats, '2024-01-11', [ { toAddress: 'to2', amount: 150, lastCheckedBlock: 1 } ]) - await givenDailyRewardTransferMetrics(pgPoolStatsDb, '2024-01-12', [ + await givenDailyRewardTransferMetrics(pgPools.stats, '2024-01-12', [ { toAddress: 'to2', amount: 300, lastCheckedBlock: 1 }, { toAddress: 'to3', amount: 250, lastCheckedBlock: 1 } ]) - await givenDailyRewardTransferMetrics(pgPoolStatsDb, '2024-01-13', [ + await givenDailyRewardTransferMetrics(pgPools.stats, '2024-01-13', [ { toAddress: 'to1', amount: 100, lastCheckedBlock: 1 } ]) @@ -194,8 +187,8 @@ describe('Platform Routes HTTP request handler', () => { }) }) -const givenDailyStationMetrics = async (pgPoolEvaluateDb, day, stationStats) => { - await pgPoolEvaluateDb.query(` +const givenDailyStationMetrics = async (pgPoolEvaluate, day, stationStats) => { + await pgPoolEvaluate.query(` INSERT INTO daily_stations (day, station_id, accepted_measurement_count) SELECT $1 AS day, UNNEST($2::text[]) AS station_id, UNNEST($3::int[]) AS accepted_measurement_count ON CONFLICT DO NOTHING @@ -206,8 +199,8 @@ const givenDailyStationMetrics = async (pgPoolEvaluateDb, day, stationStats) => ]) } -const givenDailyRewardTransferMetrics = async (pgPoolStatsDb, day, transferStats) => { - await pgPoolStatsDb.query(` +const givenDailyRewardTransferMetrics = async (pgPoolStats, day, transferStats) => { + await pgPoolStats.query(` INSERT INTO daily_reward_transfers (day, to_address, amount, last_checked_block) SELECT $1 AS day, UNNEST($2::text[]) AS to_address, UNNEST($3::int[]) AS amount, UNNEST($4::int[]) AS last_checked_block ON CONFLICT DO NOTHING