Skip to content

Commit

Permalink
api: Add fixed # tasks per round. Closes space-meridian/roadmap#82 (#385
Browse files Browse the repository at this point in the history
)

* api: Add fixed # tasks per round. Closes space-meridian/roadmap#82

* docs

* fix tests

* disable dynamic task count in tests

* fix more tests

* fix more tests

* fix more tests

* use new `measurement_count` tracked internally

* docs

* fix publish query

* fix test

* try fix query

* fix 0 measurements case

* publish: fix include contract address in query

* fix test

* wip

* implement ratio between node and round tasks

* docs

* select spark round id, not meridian round number

* fix

* fix

* fix

* convert to int

* add telemetry

* fix lint

* fix no previous round case

* handle 0 (not just null)

* fix query

* cast to int

* telemetry: handle no prev round case

* mock one telemetry call

* fix lint

* fix missing export

* fix test

* fix test

* fix lint

* tests

* tests

* tests

* tests

* tests

* tests

* tests

* fix signature

* tests

* add passing test

* add passing test

* add passing test

* refactor

* NODE_TASKS_TO_ROUND_TASKS_RATIO -> ROUND_TASKS_TO_NODE_TASKS_RATIO

* fixup! refactor
  • Loading branch information
juliangruber authored Sep 19, 2024
1 parent 8d1dd9d commit 5f7ab44
Show file tree
Hide file tree
Showing 7 changed files with 411 additions and 52 deletions.
6 changes: 5 additions & 1 deletion api/bin/spark.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import pg from 'pg'
import { startRoundTracker } from '../lib/round-tracker.js'
import { migrate } from '../../migrations/index.js'
import { clearNetworkInfoStationIdsSeen } from '../lib/network-info-logger.js'
import { recordNetworkInfoTelemetry } from '../../common/telemetry.js'

const {
PORT = 8080,
Expand Down Expand Up @@ -41,7 +42,10 @@ console.log('Initializing round tracker...')
const start = Date.now()

try {
const currentRound = await startRoundTracker({ pgPool: client })
const currentRound = await startRoundTracker({
pgPool: client,
recordTelemetry: recordNetworkInfoTelemetry
})
console.log(
'Initialized round tracker in %sms. SPARK round number at service startup: %s',
Date.now() - start,
Expand Down
94 changes: 69 additions & 25 deletions api/lib/round-tracker.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,24 @@ import assert from 'node:assert'
import * as Sentry from '@sentry/node'
import { createMeridianContract } from './ie-contract.js'

// The number of tasks per round is proportionate to the SPARK round length - longer rounds require
// more tasks per round.
//
// See https://www.notion.so/pl-strflt/SPARK-tasking-v2-604e26d57f6b4892946525bcb3a77104?pvs=4#ded1cd98c2664a2289453d38e2715643
// for more details, this constant represents TC (tasks per committee).
//
// We will need to tweak this value based on measurements; that's why I put it here as a constant.
export const TASKS_PER_ROUND = 1000

// How many tasks is each SPARK checker node expected to complete every round (at most).
export const MAX_TASKS_PER_NODE = 15
// Tweak this to control the network's overall task count.
export const TASKS_EXECUTED_PER_ROUND = 500_000

// Baseline values for how many tasks should be completed every round, and how
// many tasks each SPARK checker node is expected to complete (every round, at
// most). The actual value will be set dynamically based on
// TASKS_EXECUTED_PER_ROUND and the number of tasks executed in the last round.
export const BASELINE_TASKS_PER_ROUND = 1000
export const BASELINE_TASKS_PER_NODE = 15

export const ROUND_TASKS_TO_NODE_TASKS_RATIO = BASELINE_TASKS_PER_ROUND / BASELINE_TASKS_PER_NODE

/** @typedef {Awaited<ReturnType<import('./ie-contract.js').createMeridianContract>>} MeridianContract */

/**
* @param {object} args
* @param {import('pg').Pool} args.pgPool
* @param {import('../../common/typings.js').RecordTelemetryFn} recordTelemetry
* @param {AbortSignal} [args.signal]
* @returns {
* sparkRoundNumber: bigint;
Expand All @@ -27,7 +28,7 @@ export const MAX_TASKS_PER_NODE = 15
* roundStartEpoch: bigint;
* }
*/
export async function startRoundTracker ({ pgPool, signal }) {
export async function startRoundTracker ({ pgPool, signal, recordTelemetry }) {
const contract = await createMeridianContract()

const onRoundStart = (newRoundIndex, ...args) => {
Expand All @@ -41,7 +42,7 @@ export async function startRoundTracker ({ pgPool, signal }) {
)
}

updateSparkRound(pgPool, contract, newRoundIndex, blockNumber).catch(err => {
updateSparkRound(pgPool, contract, newRoundIndex, recordTelemetry, blockNumber).catch(err => {
console.error('Cannot handle RoundStart:', err)
Sentry.captureException(err)
})
Expand All @@ -53,17 +54,18 @@ export async function startRoundTracker ({ pgPool, signal }) {
})
}

const currentRound = await updateSparkRound(pgPool, contract, await contract.currentRoundIndex())
const currentRound = await updateSparkRound(pgPool, contract, await contract.currentRoundIndex(), recordTelemetry)
return currentRound
}

/**
* @param {import('pg').Pool} pgPool
* @param {MeridianContract} contract
* @param {bigint} newRoundIndex
* @param {import('../../common/typings.js').RecordTelemetryFn} recordTelemetry
* @param {number} [roundStartEpoch]
*/
async function updateSparkRound (pgPool, contract, newRoundIndex, roundStartEpoch) {
async function updateSparkRound (pgPool, contract, newRoundIndex, recordTelemetry, roundStartEpoch) {
const meridianRoundIndex = BigInt(newRoundIndex)
const meridianContractAddress = await contract.getAddress()

Expand All @@ -78,7 +80,8 @@ async function updateSparkRound (pgPool, contract, newRoundIndex, roundStartEpoc
meridianContractAddress,
meridianRoundIndex,
roundStartEpoch,
pgClient
pgClient,
recordTelemetry
})
await pgClient.query('COMMIT')
console.log('SPARK round started: %s (epoch: %s)', sparkRoundNumber, roundStartEpoch)
Expand Down Expand Up @@ -169,7 +172,8 @@ export async function mapCurrentMeridianRoundToSparkRound ({
meridianContractAddress,
meridianRoundIndex,
roundStartEpoch,
pgClient
pgClient,
recordTelemetry
}) {
let sparkRoundNumber

Expand Down Expand Up @@ -223,7 +227,8 @@ export async function mapCurrentMeridianRoundToSparkRound ({
sparkRoundNumber,
meridianContractAddress,
meridianRoundIndex,
roundStartEpoch
roundStartEpoch,
recordTelemetry
})

return sparkRoundNumber
Expand All @@ -233,29 +238,68 @@ export async function maybeCreateSparkRound (pgClient, {
sparkRoundNumber,
meridianContractAddress,
meridianRoundIndex,
roundStartEpoch
roundStartEpoch,
recordTelemetry
}) {
const { rowCount } = await pgClient.query(`
// maxTasksPerNode(round(n)) =
// BASELINE_TASKS_PER_NODE
// if n=0
// BASELINE_TASKS_PER_NODE
// if measurementCount(round(n-1)) = 0
// maxTasksPerNode(round(n-1)) * (TASKS_EXECUTED_PER_ROUND / measurementCount(round(n-1)))
// otherwise
const { rows: [previousRound] } = await pgClient.query(`
SELECT measurement_count, max_tasks_per_node
FROM spark_rounds
WHERE id = $1 - 1::bigint
`, [
sparkRoundNumber
])
const { rows, rowCount } = await pgClient.query(`
INSERT INTO spark_rounds
(id, created_at, meridian_address, meridian_round, start_epoch, max_tasks_per_node)
VALUES ($1, now(), $2, $3, $4, $5)
VALUES (
$1,
now(),
$2,
$3,
$4,
(
$5::int /* previousRound.max_tasks_per_node || BASELINE_TASKS_PER_NODE */
* $6::int /* TASKS_EXECUTED_PER_ROUND */
/ $7::int /* previousRound.measurement_count || TASKS_EXECUTED_PER_ROUND */
)
)
ON CONFLICT DO NOTHING
RETURNING max_tasks_per_node
`, [
sparkRoundNumber,
meridianContractAddress,
meridianRoundIndex,
roundStartEpoch,
MAX_TASKS_PER_NODE
previousRound?.max_tasks_per_node || BASELINE_TASKS_PER_NODE,
TASKS_EXECUTED_PER_ROUND,
previousRound?.measurement_count || TASKS_EXECUTED_PER_ROUND
])

if (rowCount) {
// We created a new SPARK round. Let's define retrieval tasks for this new round.
// This is a short- to medium-term solution until we move to fully decentralized tasking
await defineTasksForRound(pgClient, sparkRoundNumber)
const taskCount = Math.floor(
rows[0].max_tasks_per_node * ROUND_TASKS_TO_NODE_TASKS_RATIO
)
await defineTasksForRound(pgClient, sparkRoundNumber, taskCount)
recordTelemetry('round', point => {
point.intField('current_round_measurement_count_target', TASKS_EXECUTED_PER_ROUND)
point.intField('current_round_task_count', taskCount)
point.intField('current_round_node_max_task_count', rows[0].max_tasks_per_node)
point.intField('previous_round_measurement_count', previousRound?.measurement_count ?? 0)
point.intField('previous_round_node_max_task_count', previousRound?.max_tasks_per_node ?? 0)
})
}
}

async function defineTasksForRound (pgClient, sparkRoundNumber) {
async function defineTasksForRound (pgClient, sparkRoundNumber, taskCount) {
await pgClient.query(`
INSERT INTO retrieval_tasks (round_id, cid, miner_id, clients)
WITH selected AS (
Expand All @@ -273,7 +317,7 @@ async function defineTasksForRound (pgClient, sparkRoundNumber) {
GROUP BY selected.cid, selected.miner_id;
`, [
sparkRoundNumber,
TASKS_PER_ROUND
taskCount
])
}

Expand Down
Loading

0 comments on commit 5f7ab44

Please sign in to comment.