Skip to content

Commit

Permalink
feat: handle eligible deal stats via spark-api (#206)
Browse files Browse the repository at this point in the history
Rework the endpoints `{miner/client/allocator}/:id/deals/eligible/summary`
to return a (temporary) redirect to spark-api.

---------

Signed-off-by: Miroslav Bajtoš <oss@bajtos.net>
Co-authored-by: Julian Gruber <julian@juliangruber.com>
  • Loading branch information
bajtos and juliangruber authored Sep 2, 2024
1 parent 31313ec commit d8b7e91
Show file tree
Hide file tree
Showing 10 changed files with 35 additions and 235 deletions.
6 changes: 0 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ jobs:
env:
DATABASE_URL: postgres://postgres:postgres@localhost:5432/spark_stats
EVALUATE_DB_URL: postgres://postgres:postgres@localhost:5432/spark_evaluate
API_DB_URL: postgres://postgres:postgres@localhost:5432/spark
NPM_CONFIG_WORKSPACE: stats
steps:
- run: psql "${DATABASE_URL}" -c "CREATE DATABASE spark_evaluate"
- run: psql "${DATABASE_URL}" -c "CREATE DATABASE spark"
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
Expand Down Expand Up @@ -57,11 +55,9 @@ jobs:
env:
DATABASE_URL: postgres://postgres:postgres@localhost:5432/spark_stats
EVALUATE_DB_URL: postgres://postgres:postgres@localhost:5432/spark_evaluate
API_DB_URL: postgres://postgres:postgres@localhost:5432/spark
NPM_CONFIG_WORKSPACE: observer
steps:
- run: psql "${DATABASE_URL}" -c "CREATE DATABASE spark_evaluate"
- run: psql "${DATABASE_URL}" -c "CREATE DATABASE spark"
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
Expand Down Expand Up @@ -100,11 +96,9 @@ jobs:
env:
DATABASE_URL: postgres://postgres:postgres@localhost:5432/spark_stats
EVALUATE_DB_URL: postgres://postgres:postgres@localhost:5432/spark_evaluate
API_DB_URL: postgres://postgres:postgres@localhost:5432/spark
NPM_CONFIG_WORKSPACE: observer
steps:
- run: psql "${DATABASE_URL}" -c "CREATE DATABASE spark_evaluate"
- run: psql "${DATABASE_URL}" -c "CREATE DATABASE spark"
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
Expand Down
28 changes: 3 additions & 25 deletions db/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { migrateWithPgClient as migrateEvaluateDB } from 'spark-evaluate/lib/migrate.js'
import { migrate as migrateApiDB } from 'spark-api/migrations/index.js'
import pg from 'pg'
import { dirname, join } from 'node:path'
import { fileURLToPath } from 'node:url'
Expand All @@ -9,18 +8,14 @@ import Postgrator from 'postgrator'
/** @typedef {import('./typings.js').PgPools} PgPools */
/** @typedef {import('./typings.js').PgPoolStats} PgPoolStats */
/** @typedef {import('./typings.js').PgPoolEvaluate} PgPoolEvaluate */
/** @typedef {import('./typings.js').PgPoolApi} PgPoolApi */
/** @typedef {import('./typings.js').Queryable} Queryable */

export { migrateEvaluateDB, migrateApiDB }
export { migrateEvaluateDB }

const {
// DATABASE_URL points to `spark_stats` database managed by this monorepo
DATABASE_URL = 'postgres://localhost:5432/spark_stats',

// API_DB_URL points to `spark` database managed by spark-api repo.
API_DB_URL = 'postgres://localhost:5432/spark',

// EVALUATE_DB_URL points to `spark_evaluate` database managed by spark-evaluate repo.
// Eventually, we should move the code updating stats from spark-evaluate to this repo
// and then we won't need two connection strings.
Expand Down Expand Up @@ -84,32 +79,15 @@ export const getEvaluatePgPool = async () => {
return evaluate
}

/**
* @returns {Promise<PgPoolApi>}
*/
export const getApiPgPool = async () => {
const stats = Object.assign(
new pg.Pool({
...poolConfig,
connectionString: API_DB_URL
}),
/** @type {const} */({ db: 'api' })
)
stats.on('error', onError)
await stats.query('SELECT 1')
return stats
}

/**
* @returns {Promise<PgPools>}
*/
export const getPgPools = async () => {
const stats = await getStatsPgPool()
const evaluate = await getEvaluatePgPool()
const api = await getApiPgPool()
const end = async () => { await Promise.all([stats.end(), evaluate.end(), api.end()]) }
const end = async () => { await Promise.all([stats.end(), evaluate.end()]) }

return { stats, evaluate, api, end }
return { stats, evaluate, end }
}

/**
Expand Down
1 change: 0 additions & 1 deletion db/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
"dependencies": {
"pg": "^8.12.0",
"postgrator": "^7.2.0",
"spark-api": "https://github.com/filecoin-station/spark-api/archive/7075fb55b253d48d5d5eb4846f13a3f688d80437.tar.gz",
"spark-evaluate": "filecoin-station/spark-evaluate#main"
},
"standard": {
Expand Down
6 changes: 0 additions & 6 deletions db/typings.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,13 @@ export interface PgPoolStats extends pg.Pool {
db: 'stats'
}

export interface PgPoolApi extends pg.Pool {
db: 'api'
}

export type PgPool =
| PgPoolEvaluate
| PgPoolStats
| PgPoolApi

export interface PgPools {
stats: PgPoolStats;
evaluate: PgPoolEvaluate;
api: PgPoolApi;
end(): Promise<void>
}

Expand Down
10 changes: 0 additions & 10 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions stats/bin/migrate.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import {
getPgPools,
migrateApiDB,
migrateEvaluateDB,
migrateStatsDB
} from '@filecoin-station/spark-stats-db'

const pgPools = await getPgPools()
await migrateStatsDB(pgPools.stats)
await migrateEvaluateDB(pgPools.evaluate)
await migrateApiDB(pgPools.api)
3 changes: 2 additions & 1 deletion stats/bin/spark-stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { getPgPools } from '@filecoin-station/spark-stats-db'
const {
PORT = '8080',
HOST = '127.0.0.1',
SPARK_API_BASE_URL = 'https://api.filspark.com/',
REQUEST_LOGGING = 'true'
} = process.env

Expand All @@ -17,7 +18,7 @@ const logger = {
request: ['1', 'true'].includes(REQUEST_LOGGING) ? console.info : () => {}
}

const handler = createHandler({ pgPools, logger })
const handler = createHandler({ SPARK_API_BASE_URL, pgPools, logger })
const server = http.createServer(handler)
console.log('Starting the http server on host %j port %s', HOST, PORT)
server.listen(Number(PORT), HOST)
Expand Down
97 changes: 14 additions & 83 deletions stats/lib/handler.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as Sentry from '@sentry/node'
import { json } from 'http-responders'
import { redirect } from 'http-responders'

import { getStatsWithFilterAndCaching } from './request-helpers.js'

Expand All @@ -22,18 +22,20 @@ import { handlePlatformRoutes } from './platform-routes.js'

/**
* @param {object} args
* @param {string} args.SPARK_API_BASE_URL
* @param {import('@filecoin-station/spark-stats-db').PgPools} args.pgPools
* @param {import('./typings.d.ts').Logger} args.logger
* @returns
*/
export const createHandler = ({
SPARK_API_BASE_URL,
pgPools,
logger
}) => {
return (req, res) => {
const start = Date.now()
logger.request(`${req.method} ${req.url} ...`)
handler(req, res, pgPools)
handler(req, res, pgPools, SPARK_API_BASE_URL)
.catch(err => errorHandler(res, err, logger))
.then(() => {
logger.request(`${req.method} ${req.url} ${res.statusCode} (${Date.now() - start}ms)`)
Expand Down Expand Up @@ -73,8 +75,9 @@ const createRespondWithFetchFn =
* @param {import('node:http').IncomingMessage} req
* @param {import('node:http').ServerResponse} res
* @param {import('@filecoin-station/spark-stats-db').PgPools} pgPools
* @param {string} SPARK_API_BASE_URL
*/
const handler = async (req, res, pgPools) => {
const handler = async (req, res, pgPools, SPARK_API_BASE_URL) => {
// Caveat! `new URL('//foo', 'http://127.0.0.1')` would produce "http://foo/" - not what we want!
const { pathname, searchParams } = new URL(`http://127.0.0.1${req.url}`)
const segs = pathname.split('/').filter(Boolean)
Expand Down Expand Up @@ -102,11 +105,11 @@ const handler = async (req, res, pgPools) => {
} else if (req.method === 'GET' && url === '/miners/retrieval-success-rate/summary') {
await respond(fetchMinersRSRSummary)
} else if (req.method === 'GET' && segs[0] === 'miner' && segs[1] && segs[2] === 'deals' && segs[3] === 'eligible' && segs[4] === 'summary') {
await getRetrievableDealsForMiner(req, res, pgPools.api, segs[1])
redirectToSparkApi(req, res, SPARK_API_BASE_URL)
} else if (req.method === 'GET' && segs[0] === 'client' && segs[1] && segs[2] === 'deals' && segs[3] === 'eligible' && segs[4] === 'summary') {
await getRetrievableDealsForClient(req, res, pgPools.api, segs[1])
redirectToSparkApi(req, res, SPARK_API_BASE_URL)
} else if (req.method === 'GET' && segs[0] === 'allocator' && segs[1] && segs[2] === 'deals' && segs[3] === 'eligible' && segs[4] === 'summary') {
await getRetrievableDealsForAllocator(req, res, pgPools.api, segs[1])
redirectToSparkApi(req, res, SPARK_API_BASE_URL)
} else if (await handlePlatformRoutes(req, res, pgPools)) {
// no-op, request was handled by handlePlatformRoute
} else if (req.method === 'GET' && url === '/') {
Expand Down Expand Up @@ -141,86 +144,14 @@ const notFound = (res) => {
}

/**
* @param {import('node:http').IncomingMessage} _req
* @param {import('node:http').IncomingMessage} req
* @param {import('node:http').ServerResponse} res
* @param {PgPools['api']} client
* @param {string} minerId
* @param {string} SPARK_API_BASE_URL
*/
const getRetrievableDealsForMiner = async (_req, res, client, minerId) => {
/** @type {{rows: {client_id: string; deal_count: number}[]}} */
const { rows } = await client.query(`
SELECT client_id, COUNT(cid)::INTEGER as deal_count FROM retrievable_deals
WHERE miner_id = $1 AND expires_at > now()
GROUP BY client_id
ORDER BY deal_count DESC, client_id ASC
`, [
minerId
])

// Cache the response for 6 hours
res.setHeader('cache-control', `max-age=${6 * 3600}`)

const body = {
minerId,
dealCount: rows.reduce((sum, row) => sum + row.deal_count, 0),
clients:
rows.map(
// eslint-disable-next-line camelcase
({ client_id, deal_count }) => ({ clientId: client_id, dealCount: deal_count })
)
}

json(res, body)
}

const getRetrievableDealsForClient = async (_req, res, client, clientId) => {
/** @type {{rows: {miner_id: string; deal_count: number}[]}} */
const { rows } = await client.query(`
SELECT miner_id, COUNT(cid)::INTEGER as deal_count FROM retrievable_deals
WHERE client_id = $1 AND expires_at > now()
GROUP BY miner_id
ORDER BY deal_count DESC, miner_id ASC
`, [
clientId
])

const redirectToSparkApi = (req, res, SPARK_API_BASE_URL) => {
// Cache the response for 6 hours
res.setHeader('cache-control', `max-age=${6 * 3600}`)

const body = {
clientId,
dealCount: rows.reduce((sum, row) => sum + row.deal_count, 0),
providers: rows.map(
// eslint-disable-next-line camelcase
({ miner_id, deal_count }) => ({ minerId: miner_id, dealCount: deal_count })
)
}
json(res, body)
}

const getRetrievableDealsForAllocator = async (_req, res, client, allocatorId) => {
/** @type {{rows: {client_id: string; deal_count: number}[]}} */
const { rows } = await client.query(`
SELECT ac.client_id, COUNT(cid)::INTEGER as deal_count
FROM allocator_clients ac
LEFT JOIN retrievable_deals rd ON ac.client_id = rd.client_id
WHERE ac.allocator_id = $1 AND expires_at > now()
GROUP BY ac.client_id
ORDER BY deal_count DESC, ac.client_id ASC
`, [
allocatorId
])

// Cache the response for 6 hours
res.setHeader('cache-control', `max-age=${6 * 3600}`)

const body = {
allocatorId,
dealCount: rows.reduce((sum, row) => sum + row.deal_count, 0),
clients: rows.map(
// eslint-disable-next-line camelcase
({ client_id, deal_count }) => ({ clientId: client_id, dealCount: deal_count })
)
}
json(res, body)
const location = new URL(req.url, SPARK_API_BASE_URL).toString()
redirect(req, res, location, 302)
}
Loading

0 comments on commit d8b7e91

Please sign in to comment.