Skip to content

Commit

Permalink
Created daily_node_metrics table and received/stored station_id
Browse files Browse the repository at this point in the history
  • Loading branch information
PatrickNercessian committed Apr 11, 2024
1 parent 971848a commit 38a3f2a
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 2 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Set up [PostgreSQL](https://www.postgresql.org/) with default settings:
- Database: spark_stats

Alternatively, set the environment variable `$DATABASE_URL` with
`postgres://${USER}:${PASS}@${HOST}:${POST}/${DATABASE}`.
`postgres://${USER}:${PASS}@${HOST}:${PORT}/${DATABASE}`.

The Postgres user and database need to exist already, and the user
needs full management permissions for the database.
Expand Down
18 changes: 18 additions & 0 deletions lib/platform-stats.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import createDebug from 'debug'

const debug = createDebug('spark:platform-stats')

/**
* @param {import('pg').Client} pgClient
* @param {import('./preprocess').Measurement[]} honestMeasurements
*/
export const updateDailyNodeMetrics = async (pgClient, honestMeasurements) => {
debug('Updating daily node metrics, count=%s', honestMeasurements.length)
for (const m of honestMeasurements) {
await pgClient.query(`
INSERT INTO daily_node_metrics (station_id, metric_date)
VALUES ($1, now()::date)
ON CONFLICT (station_id, metric_date) DO NOTHING
`, [m.station_id]) // TODO: when we add more fields, we should update the ON CONFLICT clause to update the fields
}
}
1 change: 1 addition & 0 deletions lib/preprocess.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export class Measurement {
this.end_at = parseDateTime(m.end_at)
this.status_code = m.status_code
this.indexerResult = pointerize(m.indexer_result)
this.station_id = pointerize(m.station_id)
}
}

Expand Down
2 changes: 2 additions & 0 deletions lib/public-stats.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import assert from 'node:assert'
import createDebug from 'debug'

import { updateDailyNodeMetrics } from './platform-stats.js'
import { getTaskId } from './retrieval-stats.js'

const debug = createDebug('spark:public-stats')
Expand Down Expand Up @@ -31,6 +32,7 @@ export const updatePublicStats = async ({ createPgClient, honestMeasurements })
}
await updateDailyParticipants(pgClient, participants)
await updateIndexerQueryStats(pgClient, honestMeasurements)
await updateDailyNodeMetrics(pgClient, honestMeasurements)
} finally {
await pgClient.end()
}
Expand Down
5 changes: 5 additions & 0 deletions migrations/006.do.daily-node-metrics.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE daily_node_metrics (
metric_date DATE NOT NULL,
station_id TEXT NOT NULL,
PRIMARY KEY (metric_date, station_id)
)
3 changes: 2 additions & 1 deletion test/helpers/test-data.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ export const VALID_MEASUREMENT = {
finished_at: new Date('2023-11-01T09:00:10.000Z').getTime(),
byte_length: 1024,
retrievalResult: 'OK',
indexerResult: 'OK'
indexerResult: 'OK',
station_id: 'station1'
}

// Fraud detection is mutating the measurements parsed from JSON
Expand Down
62 changes: 62 additions & 0 deletions test/platform-stats.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import assert from 'node:assert'
import pg from 'pg'
import { beforeEach, describe, it } from 'mocha'

import { DATABASE_URL } from '../lib/config.js'
import { migrateWithPgClient } from '../lib/migrate.js'
import { VALID_MEASUREMENT } from './helpers/test-data.js'
import { updateDailyNodeMetrics } from '../lib/platform-stats.js'

const createPgClient = async () => {
const pgClient = new pg.Client({ connectionString: DATABASE_URL })
await pgClient.connect()
return pgClient
}

describe('platform-stats', () => {
let pgClient

before(async () => {
pgClient = await createPgClient()
await migrateWithPgClient(pgClient)
})

beforeEach(async () => {
await pgClient.query('DELETE FROM daily_node_metrics')
await pgClient.query('BEGIN TRANSACTION')
})

afterEach(async () => {
await pgClient.query('END TRANSACTION')
})

after(async () => {
await pgClient.end()
})

it('updates daily node metrics with new measurements', async () => {
const honestMeasurements = [
{ ...VALID_MEASUREMENT, station_id: 'station1' },
{ ...VALID_MEASUREMENT, station_id: 'station2' }
]

await updateDailyNodeMetrics(pgClient, honestMeasurements)

const { rows } = await pgClient.query('SELECT station_id FROM daily_node_metrics')
assert.strictEqual(rows.length, 2)
assert.deepStrictEqual(rows.map(row => row.station_id).sort(), ['station1', 'station2'])
})

it('ignores duplicate measurements for the same station on the same day', async () => {
const honestMeasurements = [
{ ...VALID_MEASUREMENT, station_id: 'station1' },
{ ...VALID_MEASUREMENT, station_id: 'station1' } // Duplicate station_id
]

await updateDailyNodeMetrics(pgClient, honestMeasurements)

const { rows } = await pgClient.query('SELECT station_id FROM daily_node_metrics')
assert.strictEqual(rows.length, 1)
assert.strictEqual(rows[0].station_id, 'station1')
})
})

0 comments on commit 38a3f2a

Please sign in to comment.