diff --git a/lib/committee.js b/lib/committee.js new file mode 100644 index 0000000..74ac2f5 --- /dev/null +++ b/lib/committee.js @@ -0,0 +1,214 @@ +import assert from 'node:assert' +import createDebug from 'debug' +import { getTaskId } from './retrieval-stats.js' + +/** @import {Measurement} from './preprocess.js' */ +/** @import {RetrievalResult, CommitteeCheckError} from './typings.js' */ + +const debug = createDebug('spark:committee') + +/** @typedef {Map} TaskIdToCommitteeMap */ + +/** @typedef {{ + indexerResult: string | CommitteeCheckError; + retrievalResult: RetrievalResult + }} CommitteeEvaluation + */ +export class Committee { + /** @type {Measurement[]} */ + #measurements + + /** + * @param {Pick} retrievalTask + */ + constructor ({ cid, minerId }) { + this.retrievalTask = { minerId, cid } + + this.#measurements = [] + + /** @type {CommitteeEvaluation | undefined} */ + this.evaluation = undefined + } + + get size () { + return this.#measurements.length + } + + get measurements () { + const ret = [...this.#measurements] + Object.freeze(ret) + return ret + } + + /** + * @param {Measurement} m + */ + addMeasurement (m) { + assert.strictEqual(m.cid, this.retrievalTask.cid, 'cid must match') + assert.strictEqual(m.minerId, this.retrievalTask.minerId, 'minerId must match') + assert.strictEqual(m.fraudAssessment, 'OK', 'only accepted measurements can be added') + this.#measurements.push(m) + } + + /** + * @param {object} args + * @param {number} args.requiredCommitteeSize + * @returns + */ + evaluate ({ requiredCommitteeSize }) { + debug( + 'Evaluating task %o with a committee of %s measurements', + this.retrievalTask, + this.#measurements.length + ) + + if (this.#measurements.length < requiredCommitteeSize) { + debug('→ committee is too small (size=%s required=%s); retrievalResult=COMMITTEE_TOO_SMALL', + this.#measurements.length, + requiredCommitteeSize + ) + this.evaluation = { + indexerResult: 'COMMITTEE_TOO_SMALL', + retrievalResult: 'COMMITTEE_TOO_SMALL' + } + for (const m of this.#measurements) m.fraudAssessment = 'COMMITTEE_TOO_SMALL' + return + } + + debug('- searching for majority in indexer results') + /** @type {(keyof Measurement)[]} */ + const indexerResultProps = [ + 'providerId', + 'indexerResult', + 'provider_address', + 'protocol' + ] + const indexerResultMajority = this.#findMajority(indexerResultProps) + const indexerResult = indexerResultMajority + ? indexerResultMajority.majorityValue.indexerResult + : 'MAJORITY_NOT_FOUND' + + debug('- searching for majority in retrieval results') + /** @type {(keyof Measurement)[]} */ + const retrievalResultProps = [ + ...indexerResultProps, + // NOTE: We are not checking the fields that were used to calculate + // the retrievalResult value: + // - status_code + // - timeout + // - car_too_large + // If there is an agreement on the retrieval result, then those fields + // must have the same value too. + 'retrievalResult', + 'byte_length', + 'carChecksum' + ] + + const retrievalResultMajority = this.#findMajority(retrievalResultProps) + /** @type {CommitteeEvaluation['retrievalResult']} */ + let retrievalResult + if (retrievalResultMajority) { + retrievalResult = retrievalResultMajority.majorityValue.retrievalResult + for (const m of retrievalResultMajority.minorityMeasurements) { + m.fraudAssessment = 'MINORITY_RESULT' + } + } else { + retrievalResult = 'MAJORITY_NOT_FOUND' + for (const m of this.#measurements) m.fraudAssessment = 'MAJORITY_NOT_FOUND' + } + + this.evaluation = { + indexerResult, + retrievalResult + } + } + + /** + * @param {(keyof Measurement)[]} measurementFields + */ + #findMajority (measurementFields) { + /** @param {Measurement} m */ + const getResult = m => pick(m, ...measurementFields) + + /** @type {Map} */ + const resultGroups = new Map() + + // 1. Group measurements using the result as the grouping key + for (const m of this.#measurements) { + const key = JSON.stringify(getResult(m)) + let list = resultGroups.get(key) + if (!list) { + list = [] + resultGroups.set(key, list) + } + list.push(m) + } + + if (debug.enabled) { + debug('- results found:') + for (const k of resultGroups.keys()) { + debug(' %o', JSON.parse(k)) + } + } + + // 2. Sort the measurement groups by their size + const keys = Array.from(resultGroups.keys()) + keys.sort( + (a, b) => (resultGroups.get(a)?.length ?? 0) - (resultGroups.get(b)?.length ?? 0) + ) + const measurementGroups = keys.map(k => resultGroups.get(k)) + + // 3. Find the majority + const majorityMeasurements = measurementGroups.pop() + const majoritySize = majorityMeasurements.length + const majorityValue = getResult(majorityMeasurements[0]) + + debug('- majority=%s committee-size=%s value=%o', majoritySize, this.size, majorityValue) + if (majoritySize <= this.size / 2) { + debug('→ majority is not absolute; result=MAJORITY_NOT_FOUND') + return undefined + } else { + debug('→ majority agrees on result=%o', majorityValue) + return { + majorityValue, + majorityMeasurements, + minorityMeasurements: measurementGroups.flat() + } + } + } +} + +/** + * @template T + * @template {keyof T} K + * @param {T} obj + * @param {K[]} keys + * @returns {Pick} + */ +function pick (obj, ...keys) { + /** @type {any} */ + const ret = {} + keys.forEach(key => { + ret[key] = obj[key] + }) + return ret +} + +/** + * @param {Iterable} measurements + * @returns {TaskIdToCommitteeMap} + */ +export function groupMeasurementsToCommittees (measurements) { + /** @type {TaskIdToCommitteeMap} */ + const committeesMap = new Map() + for (const m of measurements) { + const key = getTaskId(m) + let c = committeesMap.get(key) + if (!c) { + c = new Committee(m) + committeesMap.set(key, c) + } + c.addMeasurement(m) + } + return committeesMap +} diff --git a/lib/evaluate.js b/lib/evaluate.js index 5def768..9ea6e3e 100644 --- a/lib/evaluate.js +++ b/lib/evaluate.js @@ -5,13 +5,15 @@ import { getRandomnessForSparkRound } from './drand-client.js' import { updatePublicStats } from './public-stats.js' import { buildRetrievalStats, getTaskId, recordCommitteeSizes } from './retrieval-stats.js' import { getTasksAllowedForStations } from './tasker.js' +import { groupMeasurementsToCommittees } from './committee.js' -/** @import {Measurement} from '../lib/preprocess.js' */ +/** @import {Measurement} from './preprocess.js' */ const debug = createDebug('spark:evaluate') export const MAX_SCORE = 1_000_000_000_000_000n export const MAX_SET_SCORES_PARTICIPANTS = 700 +export const REQUIRED_COMMITTEE_SIZE = 30 export class SetScoresBucket { constructor () { @@ -46,6 +48,7 @@ export const createSetScoresBuckets = participants => { * @param {object} args * @param {import('./round.js').RoundData} args.round * @param {bigint} args.roundIndex + * @param {number} [args.requiredCommitteeSize] * @param {any} args.ieContractWithSigner * @param {import('./spark-api.js').fetchRoundDetails} args.fetchRoundDetails, * @param {import('./typings.js').RecordTelemetryFn} args.recordTelemetry @@ -55,12 +58,15 @@ export const createSetScoresBuckets = participants => { export const evaluate = async ({ round, roundIndex, + requiredCommitteeSize, ieContractWithSigner, fetchRoundDetails, recordTelemetry, createPgClient, logger }) => { + requiredCommitteeSize ??= REQUIRED_COMMITTEE_SIZE + // Get measurements /** @type {Measurement[]} */ const measurements = round.measurements || [] @@ -73,7 +79,13 @@ export const evaluate = async ({ const started = Date.now() - await runFraudDetection(roundIndex, measurements, sparkRoundDetails, logger) + const { committees } = await runFraudDetection({ + roundIndex, + measurements, + sparkRoundDetails, + requiredCommitteeSize, + logger + }) const honestMeasurements = measurements.filter(m => m.fraudAssessment === 'OK') // Calculate reward shares @@ -207,7 +219,13 @@ export const evaluate = async ({ try { recordTelemetry('committees', (point) => { point.intField('round_index', roundIndex) - recordCommitteeSizes(honestMeasurements, point) + point.intField('committees_all', committees.length) + point.intField('committees_too_small', + committees + .filter(c => c.evaluation?.retrievalResult === 'COMMITTEE_TOO_SMALL') + .length + ) + recordCommitteeSizes(committees, point) }) } catch (err) { console.error('Cannot record committees.', err) @@ -217,7 +235,7 @@ export const evaluate = async ({ if (createPgClient) { try { - await updatePublicStats({ createPgClient, honestMeasurements, allMeasurements: measurements }) + await updatePublicStats({ createPgClient, committees, honestMeasurements, allMeasurements: measurements }) } catch (err) { console.error('Cannot update public stats.', err) ignoredErrors.push(err) @@ -229,12 +247,20 @@ export const evaluate = async ({ } /** - * @param {bigint} roundIndex - * @param {Measurement[]} measurements - * @param {import('./typings.js').RoundDetails} sparkRoundDetails - * @param {Pick} logger + * @param {object} args + * @param {bigint} args.roundIndex + * @param {Measurement[]} args.measurements + * @param {import('./typings.js').RoundDetails} args.sparkRoundDetails + * @param {number} args.requiredCommitteeSize + * @param {Pick} args.logger */ -export const runFraudDetection = async (roundIndex, measurements, sparkRoundDetails, logger) => { +export const runFraudDetection = async ({ + roundIndex, + measurements, + sparkRoundDetails, + requiredCommitteeSize, + logger +}) => { const randomness = await getRandomnessForSparkRound(Number(sparkRoundDetails.startEpoch)) const taskBuildingStarted = Date.now() @@ -290,7 +316,7 @@ export const runFraudDetection = async (roundIndex, measurements, sparkRoundDeta } // - // 2. Reward only one participant in each inet group + // 2. Accept only maxTasksPerNode measurements from each inet group // /** @type {Map} */ const inetGroups = new Map() @@ -359,6 +385,26 @@ export const runFraudDetection = async (roundIndex, measurements, sparkRoundDeta } } + // + // 3. Group measurements to per-task committees and find majority result + // + + // PERFORMANCE: Avoid duplicating the array of measurements because there are + // hundreds of thousands of them. All the function groupMeasurementsToCommittees + // needs is to iterate over the accepted measurements once. + const iterateAcceptedMeasurements = function * () { + for (const m of measurements) { + if (m.fraudAssessment !== 'OK') continue + yield m + } + } + + const committees = groupMeasurementsToCommittees(iterateAcceptedMeasurements()) + + for (const c of committees.values()) { + c.evaluate({ requiredCommitteeSize }) + } + if (debug.enabled) { for (const m of measurements) { // Print round & participant address & CID together to simplify lookup when debugging @@ -372,6 +418,8 @@ export const runFraudDetection = async (roundIndex, measurements, sparkRoundDeta m) } } + + return { committees: Array.from(committees.values()) } } /** diff --git a/lib/preprocess.js b/lib/preprocess.js index 84bcc70..051664b 100644 --- a/lib/preprocess.js +++ b/lib/preprocess.js @@ -12,7 +12,7 @@ const debug = createDebug('spark:preprocess') export class Measurement { /** * @param {Partial} m - * @param {(string) => string} pointerize + * @param {(str: T) => T} pointerize */ constructor (m, pointerize = (v) => v) { this.participantAddress = pointerize(parseParticipantAddress(m.participant_address)) @@ -208,6 +208,9 @@ export const parseMeasurements = str => { return ret } +/** + * @param {string} filWalletAddress + */ export const parseParticipantAddress = filWalletAddress => { // ETH addresses don't need any conversion if (filWalletAddress.startsWith('0x')) { diff --git a/lib/public-stats.js b/lib/public-stats.js index f621378..fef81fa 100644 --- a/lib/public-stats.js +++ b/lib/public-stats.js @@ -3,15 +3,19 @@ import createDebug from 'debug' import { updatePlatformStats } from './platform-stats.js' import { getTaskId } from './retrieval-stats.js' +/** @import pg from 'pg' */ +/** @import { Committee } from './committee.js' */ + const debug = createDebug('spark:public-stats') /** * @param {object} args * @param {import('./typings.js').CreatePgClient} args.createPgClient + * @param {Iterable} args.committees * @param {import('./preprocess.js').Measurement[]} args.honestMeasurements * @param {import('./preprocess.js').Measurement[]} args.allMeasurements */ -export const updatePublicStats = async ({ createPgClient, honestMeasurements, allMeasurements }) => { +export const updatePublicStats = async ({ createPgClient, committees, honestMeasurements, allMeasurements }) => { /** @type {Map} */ const minerRetrievalStats = new Map() for (const m of honestMeasurements) { @@ -27,8 +31,8 @@ export const updatePublicStats = async ({ createPgClient, honestMeasurements, al for (const [minerId, retrievalStats] of minerRetrievalStats.entries()) { await updateRetrievalStats(pgClient, minerId, retrievalStats) } - await updateIndexerQueryStats(pgClient, honestMeasurements) - await updateDailyDealsStats(pgClient, honestMeasurements) + await updateIndexerQueryStats(pgClient, committees) + await updateDailyDealsStats(pgClient, committees) await updatePlatformStats(pgClient, honestMeasurements, allMeasurements) } finally { await pgClient.end() @@ -36,7 +40,7 @@ export const updatePublicStats = async ({ createPgClient, honestMeasurements, al } /** - * @param {import('pg').Client} pgClient + * @param {pg.Client} pgClient * @param {string} minerId * @param {object} stats * @param {number} stats.total @@ -59,16 +63,22 @@ const updateRetrievalStats = async (pgClient, minerId, { total, successful }) => ]) } -const updateIndexerQueryStats = async (pgClient, honestMeasurements) => { +/** + * @param {pg.Client} pgClient + * @param {Iterable} committees + */ +const updateIndexerQueryStats = async (pgClient, committees) => { /** @type {Set} */ const dealsWithHttpAdvertisement = new Set() /** @type {Set} */ const dealsWithIndexerResults = new Set() - for (const m of honestMeasurements) { - const dealId = getTaskId(m) - if (m.indexerResult) dealsWithIndexerResults.add(dealId) - if (m.indexerResult === 'OK') dealsWithHttpAdvertisement.add(dealId) + for (const c of committees) { + const dealId = getTaskId(c.retrievalTask) + const evaluation = c.evaluation + if (!evaluation) continue + if (evaluation.indexerResult) dealsWithIndexerResults.add(dealId) + if (evaluation.indexerResult === 'OK') dealsWithHttpAdvertisement.add(dealId) } const tested = dealsWithIndexerResults.size @@ -89,32 +99,26 @@ const updateIndexerQueryStats = async (pgClient, honestMeasurements) => { } /** - * @param {import('pg').Client} pgClient - * @param {import('./preprocess.js').Measurement[]} acceptedMeasurements + * @param {pg.Client} pgClient + * @param {Iterable} committees */ -const updateDailyDealsStats = async (pgClient, acceptedMeasurements) => { - /** @type {Set} */ - const dealsAll = new Set() - /** @type {Set} */ - const dealsIndexed = new Set() - /** @type {Set} */ - const dealsRetrievable = new Set() +const updateDailyDealsStats = async (pgClient, committees) => { + let total = 0 + let indexed = 0 + let retrievable = 0 + for (const c of committees) { + total++ - for (const m of acceptedMeasurements) { - const dealId = getTaskId(m) - dealsAll.add(dealId) - // TODO: Use the majority to decide whether a deal is indexed and retrievable. - // At the moment, we assume a deal is indexed/retrievable if at least one measurement - // indicates that. Once we implement "honest majority", minority results will be rejected, - // we won't receive them in the `acceptedMeasurements` array and the logic below will keep - // working unchanged. - if (m.indexerResult === 'OK' || m.indexerResult === 'HTTP_NOT_ADVERTISED') dealsIndexed.add(dealId) - if (m.retrievalResult === 'OK') dealsRetrievable.add(dealId) + const evaluation = c.evaluation + if (!evaluation) continue + if (evaluation.indexerResult === 'OK' || evaluation.indexerResult === 'HTTP_NOT_ADVERTISED') { + indexed++ + } + if (evaluation.retrievalResult === 'OK') { + retrievable++ + } } - const total = dealsAll.size - const indexed = dealsIndexed.size - const retrievable = dealsRetrievable.size debug('Updating public stats - daily deals: total += %s indexed += %s retrievable += %s', total, indexed, retrievable) await pgClient.query(` INSERT INTO daily_deals diff --git a/lib/retrieval-stats.js b/lib/retrieval-stats.js index b3452db..d61bad0 100644 --- a/lib/retrieval-stats.js +++ b/lib/retrieval-stats.js @@ -1,7 +1,8 @@ import createDebug from 'debug' import getValueAtPercentile from 'just-percentile' -/** @import {Measurement} from '../lib/preprocess.js' */ +/** @import {Measurement} from './preprocess.js' */ +/** @import {Committee} from './committee.js' */ const debug = createDebug('spark:retrieval-stats') @@ -164,7 +165,11 @@ const addHistogramToPoint = (point, values, fieldNamePrefix = '') => { } } -export const getTaskId = (/** @type {Measurement} */m) => `${m.cid}::${m.minerId}` +/** + * @param {Pick} m + * @returns {string} + */ +export const getTaskId = (m) => `${m.cid}::${m.minerId}` /** * @param {Measurement[]} measurements @@ -181,20 +186,20 @@ const countUniqueTasks = (measurements) => { } /** - * @param {Measurement[]} measurements + * @param {Iterable} committees * @param {import('./typings.js').Point} point */ -export const recordCommitteeSizes = (measurements, point) => { +export const recordCommitteeSizes = (committees, point) => { /** @type {Map; * participants: Set; * nodes: Set; * measurements: number * }>} */ - const tasks = new Map() - for (const m of measurements) { - const key = getTaskId(m) - let data = tasks.get(key) + const stats = new Map() + for (const c of committees) { + const key = getTaskId(c.retrievalTask) + let data = stats.get(key) if (!data) { data = { subnets: new Set(), @@ -202,14 +207,16 @@ export const recordCommitteeSizes = (measurements, point) => { nodes: new Set(), measurements: 0 } - tasks.set(key, data) + stats.set(key, data) + } + for (const m of c.measurements) { + data.subnets.add(m.inet_group) + data.participants.add(m.participantAddress) + // We don't have Station instance identifier in the measurement. + // The pair (inet_group, participant_address) is a good approximation. + data.nodes.add(`${m.inet_group}::${m.participantAddress}`) + data.measurements++ } - data.subnets.add(m.inet_group) - data.participants.add(m.participantAddress) - // We don't have Station instance identifier in the measurement. - // The pair (inet_group, participant_address) is a good approximation. - data.nodes.add(`${m.inet_group}::${m.participantAddress}`) - data.measurements++ } /** @type {Array} */ @@ -220,7 +227,7 @@ export const recordCommitteeSizes = (measurements, point) => { const nodeCounts = [] /** @type {Array} */ const measurementCounts = [] - for (const { subnets, participants, nodes, measurements } of tasks.values()) { + for (const { subnets, participants, nodes, measurements } of stats.values()) { subnetCounts.push(subnets.size) participantCounts.push(participants.size) nodeCounts.push(nodes.size) diff --git a/lib/typings.d.ts b/lib/typings.d.ts index 3f806ea..873c91f 100644 --- a/lib/typings.d.ts +++ b/lib/typings.d.ts @@ -28,6 +28,11 @@ export type RecordTelemetryFn = ( fn: (point: Point) => void ) => void +export type CommitteeCheckError = +| 'COMMITTEE_TOO_SMALL' +| 'MAJORITY_NOT_FOUND' +| 'MINORITY_RESULT' + // When adding a new enum value, remember to update the summary initializer inside `evaluate()` export type FraudAssesment = | 'OK' @@ -36,6 +41,7 @@ export type FraudAssesment = | 'DUP_INET_GROUP' | 'TOO_MANY_TASKS' | 'IPNI_NOT_QUERIED' + | CommitteeCheckError // When adding a new enum value, remember to update the summary initializer inside `reportRetrievalStats()` @@ -58,6 +64,8 @@ export type RetrievalResult = | `ERROR_${number}` | 'ERROR_500' | 'UNKNOWN_ERROR' + | CommitteeCheckError + // Data coming from spark-api and spark-publish export interface RawMeasurement { diff --git a/test/committee.test.js b/test/committee.test.js new file mode 100644 index 0000000..e292f90 --- /dev/null +++ b/test/committee.test.js @@ -0,0 +1,242 @@ +import assert from 'node:assert' +import { VALID_TASK, VALID_MEASUREMENT as VALID_MEASUREMENT_BEFORE_ASSESSMENT } from './helpers/test-data.js' +import { Committee } from '../lib/committee.js' + +/** @import {Measurement} from '../lib/preprocess.js' */ + +/** @type {Measurement} */ +const VALID_MEASUREMENT = { + ...VALID_MEASUREMENT_BEFORE_ASSESSMENT, + fraudAssessment: 'OK' +} +Object.freeze(VALID_MEASUREMENT) + +describe('Committee', () => { + describe('evaluate', () => { + it('produces OK result when the absolute majority agrees', () => { + const c = new Committee(VALID_TASK) + c.addMeasurement({ ...VALID_MEASUREMENT, retrievalResult: 'OK' }) + c.addMeasurement({ ...VALID_MEASUREMENT, retrievalResult: 'OK' }) + // minority result + c.addMeasurement({ ...VALID_MEASUREMENT, retrievalResult: 'CONTENT_VERIFICATION_FAILED' }) + + c.evaluate({ requiredCommitteeSize: 2 }) + + assert.deepStrictEqual(c.evaluation, { + indexerResult: 'OK', + retrievalResult: 'OK' + }) + assert.deepStrictEqual(c.measurements.map(m => m.fraudAssessment), [ + 'OK', + 'OK', + 'MINORITY_RESULT' + ]) + }) + + it('rejects committees that are too small', () => { + const c = new Committee(VALID_TASK) + c.addMeasurement({ ...VALID_MEASUREMENT }) + c.evaluate({ requiredCommitteeSize: 10 }) + assert.deepStrictEqual(c.evaluation, { + indexerResult: 'COMMITTEE_TOO_SMALL', + retrievalResult: 'COMMITTEE_TOO_SMALL' + }) + assert.strictEqual(c.measurements[0].fraudAssessment, 'COMMITTEE_TOO_SMALL') + }) + + it('rejects committees without absolute majority for providerId', () => { + const c = new Committee(VALID_TASK) + c.addMeasurement({ ...VALID_MEASUREMENT, providerId: 'pubkey1' }) + c.addMeasurement({ ...VALID_MEASUREMENT, providerId: 'pubkey2' }) + c.addMeasurement({ ...VALID_MEASUREMENT, providerId: 'pubkey3' }) + + c.evaluate({ requiredCommitteeSize: 2 }) + + assert.deepStrictEqual(c.evaluation, { + indexerResult: 'MAJORITY_NOT_FOUND', + retrievalResult: 'MAJORITY_NOT_FOUND' + }) + assert.deepStrictEqual(c.measurements.map(m => m.fraudAssessment), [ + 'MAJORITY_NOT_FOUND', + 'MAJORITY_NOT_FOUND', + 'MAJORITY_NOT_FOUND' + ]) + }) + + it('finds majority for providerId', () => { + const c = new Committee(VALID_TASK) + c.addMeasurement({ ...VALID_MEASUREMENT, providerId: 'pubkey1' }) + c.addMeasurement({ ...VALID_MEASUREMENT, providerId: 'pubkey1' }) + // minority result + c.addMeasurement({ ...VALID_MEASUREMENT, providerId: 'pubkey3' }) + + c.evaluate({ requiredCommitteeSize: 2 }) + + assert.deepStrictEqual(c.evaluation, { + indexerResult: 'OK', + retrievalResult: 'OK' + }) + assert.deepStrictEqual(c.measurements.map(m => m.fraudAssessment), [ + 'OK', + 'OK', + 'MINORITY_RESULT' + ]) + }) + + it('rejects committees without absolute majority for retrievalResult', () => { + const c = new Committee(VALID_TASK) + c.addMeasurement({ ...VALID_MEASUREMENT, retrievalResult: 'OK' }) + c.addMeasurement({ ...VALID_MEASUREMENT, retrievalResult: 'IPNI_ERROR_404' }) + c.addMeasurement({ ...VALID_MEASUREMENT, retrievalResult: 'ERROR_502' }) + + c.evaluate({ requiredCommitteeSize: 2 }) + + assert.deepStrictEqual(c.evaluation, { + indexerResult: 'OK', + retrievalResult: 'MAJORITY_NOT_FOUND' + }) + assert.deepStrictEqual(c.measurements.map(m => m.fraudAssessment), [ + 'MAJORITY_NOT_FOUND', + 'MAJORITY_NOT_FOUND', + 'MAJORITY_NOT_FOUND' + ]) + }) + + it('finds majority for retrievalResult', () => { + const c = new Committee(VALID_TASK) + c.addMeasurement({ ...VALID_MEASUREMENT, retrievalResult: 'CONTENT_VERIFICATION_FAILED' }) + c.addMeasurement({ ...VALID_MEASUREMENT, retrievalResult: 'CONTENT_VERIFICATION_FAILED' }) + // minority result + c.addMeasurement({ ...VALID_MEASUREMENT, retrievalResult: 'OK' }) + + c.evaluate({ requiredCommitteeSize: 2 }) + + assert.deepStrictEqual(c.evaluation, { + indexerResult: 'OK', + retrievalResult: 'CONTENT_VERIFICATION_FAILED' + }) + assert.deepStrictEqual(c.measurements.map(m => m.fraudAssessment), [ + 'OK', + 'OK', + 'MINORITY_RESULT' + ]) + }) + + it('rejects committees without absolute majority for indexerResult', () => { + const c = new Committee(VALID_TASK) + c.addMeasurement({ ...VALID_MEASUREMENT, indexerResult: 'OK' }) + c.addMeasurement({ ...VALID_MEASUREMENT, indexerResult: 'ERROR_404' }) + c.addMeasurement({ ...VALID_MEASUREMENT, indexerResult: 'HTTP_NOT_ADVERTISED' }) + + c.evaluate({ requiredCommitteeSize: 2 }) + + assert.deepStrictEqual(c.evaluation, { + indexerResult: 'MAJORITY_NOT_FOUND', + retrievalResult: 'MAJORITY_NOT_FOUND' + }) + assert.deepStrictEqual(c.measurements.map(m => m.fraudAssessment), [ + 'MAJORITY_NOT_FOUND', + 'MAJORITY_NOT_FOUND', + 'MAJORITY_NOT_FOUND' + ]) + }) + + it('finds majority for indexerResult', () => { + const c = new Committee(VALID_TASK) + c.addMeasurement({ ...VALID_MEASUREMENT, indexerResult: 'HTTP_NOT_ADVERTISED' }) + c.addMeasurement({ ...VALID_MEASUREMENT, indexerResult: 'HTTP_NOT_ADVERTISED' }) + // minority result + c.addMeasurement({ ...VALID_MEASUREMENT, indexerResult: 'OK' }) + + c.evaluate({ requiredCommitteeSize: 2 }) + + assert.deepStrictEqual(c.evaluation, { + indexerResult: 'HTTP_NOT_ADVERTISED', + retrievalResult: 'OK' + }) + assert.deepStrictEqual(c.measurements.map(m => m.fraudAssessment), [ + 'OK', + 'OK', + 'MINORITY_RESULT' + ]) + }) + }) + + it('rejects committees without absolute majority for byte_length', () => { + const c = new Committee(VALID_TASK) + c.addMeasurement({ ...VALID_MEASUREMENT, byte_length: 0 }) + c.addMeasurement({ ...VALID_MEASUREMENT, byte_length: 256 }) + c.addMeasurement({ ...VALID_MEASUREMENT, byte_length: 1024 }) + + c.evaluate({ requiredCommitteeSize: 2 }) + + assert.deepStrictEqual(c.evaluation, { + indexerResult: 'OK', + retrievalResult: 'MAJORITY_NOT_FOUND' + }) + assert.deepStrictEqual(c.measurements.map(m => m.fraudAssessment), [ + 'MAJORITY_NOT_FOUND', + 'MAJORITY_NOT_FOUND', + 'MAJORITY_NOT_FOUND' + ]) + }) + + it('finds majority for byte_length', () => { + const c = new Committee(VALID_TASK) + c.addMeasurement({ ...VALID_MEASUREMENT, byte_length: 1024 }) + c.addMeasurement({ ...VALID_MEASUREMENT, byte_length: 1024 }) + // minority result + c.addMeasurement({ ...VALID_MEASUREMENT, byte_length: 256 }) + + c.evaluate({ requiredCommitteeSize: 2 }) + + assert.deepStrictEqual(c.evaluation, { + indexerResult: 'OK', + retrievalResult: 'OK' + }) + assert.deepStrictEqual(c.measurements.map(m => m.fraudAssessment), [ + 'OK', + 'OK', + 'MINORITY_RESULT' + ]) + }) + + it('rejects committees without absolute majority for carChecksum', () => { + const c = new Committee(VALID_TASK) + c.addMeasurement({ ...VALID_MEASUREMENT, carChecksum: 'hashone' }) + c.addMeasurement({ ...VALID_MEASUREMENT, carChecksum: 'hash2' }) + c.addMeasurement({ ...VALID_MEASUREMENT, carChecksum: 'hash3' }) + + c.evaluate({ requiredCommitteeSize: 2 }) + + assert.deepStrictEqual(c.evaluation, { + indexerResult: 'OK', + retrievalResult: 'MAJORITY_NOT_FOUND' + }) + assert.deepStrictEqual(c.measurements.map(m => m.fraudAssessment), [ + 'MAJORITY_NOT_FOUND', + 'MAJORITY_NOT_FOUND', + 'MAJORITY_NOT_FOUND' + ]) + }) + + it('finds majority for carChecksum', () => { + const c = new Committee(VALID_TASK) + c.addMeasurement({ ...VALID_MEASUREMENT, carChecksum: 'hashone' }) + c.addMeasurement({ ...VALID_MEASUREMENT, carChecksum: 'hashone' }) + // minority result + c.addMeasurement({ ...VALID_MEASUREMENT, carChecksum: 'hash2' }) + + c.evaluate({ requiredCommitteeSize: 2 }) + + assert.deepStrictEqual(c.evaluation, { + indexerResult: 'OK', + retrievalResult: 'OK' + }) + assert.deepStrictEqual(c.measurements.map(m => m.fraudAssessment), [ + 'OK', + 'OK', + 'MINORITY_RESULT' + ]) + }) +}) diff --git a/test/evaluate.js b/test/evaluate.js index bbfc6a7..6939386 100644 --- a/test/evaluate.js +++ b/test/evaluate.js @@ -68,6 +68,7 @@ describe('evaluate', async function () { await evaluate({ round, roundIndex: 0n, + requiredCommitteeSize: 1, ieContractWithSigner, fetchRoundDetails, recordTelemetry, @@ -115,6 +116,7 @@ describe('evaluate', async function () { await evaluate({ round, roundIndex: 0n, + requiredCommitteeSize: 1, ieContractWithSigner, fetchRoundDetails, recordTelemetry, @@ -166,6 +168,7 @@ describe('evaluate', async function () { round, roundIndex: 0n, ieContractWithSigner, + requiredCommitteeSize: 1, fetchRoundDetails, recordTelemetry, createPgClient, @@ -209,6 +212,7 @@ describe('evaluate', async function () { await evaluate({ round, roundIndex: 0n, + requiredCommitteeSize: 1, ieContractWithSigner, recordTelemetry, fetchRoundDetails, @@ -256,6 +260,7 @@ describe('evaluate', async function () { await evaluate({ round, roundIndex: 0n, + requiredCommitteeSize: 1, ieContractWithSigner, recordTelemetry, fetchRoundDetails, @@ -299,6 +304,7 @@ describe('evaluate', async function () { await evaluate({ round, roundIndex: 0n, + requiredCommitteeSize: 1, ieContractWithSigner, recordTelemetry, fetchRoundDetails, @@ -358,7 +364,13 @@ describe('fraud detection', function () { } ] - await runFraudDetection(1n, measurements, sparkRoundDetails, logger) + await runFraudDetection({ + roundIndex: 1n, + measurements, + sparkRoundDetails, + requiredCommitteeSize: 1, + logger + }) assert.deepStrictEqual( measurements.map(m => m.fraudAssessment), ['OK', 'TASK_NOT_IN_ROUND', 'TASK_NOT_IN_ROUND'] @@ -373,7 +385,13 @@ describe('fraud detection', function () { { ...VALID_MEASUREMENT } ] - await runFraudDetection(1n, measurements, sparkRoundDetails, logger) + await runFraudDetection({ + roundIndex: 1n, + measurements, + sparkRoundDetails, + requiredCommitteeSize: 1, + logger + }) assert.deepStrictEqual( measurements.map(m => m.fraudAssessment), ['OK', 'DUP_INET_GROUP'] @@ -419,7 +437,13 @@ describe('fraud detection', function () { } } - await runFraudDetection(1n, measurements, sparkRoundDetails, logger) + await runFraudDetection({ + roundIndex: 1n, + measurements, + sparkRoundDetails, + requiredCommitteeSize: 1, + logger + }) assert.deepStrictEqual( measurements.map(m => `${m.participantAddress}::${m.fraudAssessment}`), [ @@ -468,7 +492,13 @@ describe('fraud detection', function () { const start = Date.now() measurements.forEach((m, ix) => { m.finished_at = start + ix * 1_000 }) - await runFraudDetection(1n, measurements, sparkRoundDetails, logger) + await runFraudDetection({ + roundIndex: 1n, + measurements, + sparkRoundDetails, + requiredCommitteeSize: 1, + logger + }) assert.strictEqual( measurements.filter(m => m.fraudAssessment === 'OK').length, @@ -531,7 +561,14 @@ describe('fraud detection', function () { } } - await runFraudDetection(1n, measurements, sparkRoundDetails, logger) + await runFraudDetection({ + roundIndex: 1n, + measurements, + sparkRoundDetails, + requiredCommitteeSize: 1, + logger + }) + assert.deepStrictEqual( measurements.map(m => `${m.participantAddress}::${m.fraudAssessment}`), [ @@ -593,7 +630,14 @@ describe('fraud detection', function () { } ] - await runFraudDetection(1n, measurements, sparkRoundDetails, logger) + await runFraudDetection({ + roundIndex: 1n, + measurements, + sparkRoundDetails, + requiredCommitteeSize: 1, + logger + }) + assert.deepStrictEqual( measurements.map(m => m.fraudAssessment), [ @@ -633,7 +677,14 @@ describe('fraud detection', function () { } ] - await runFraudDetection(1n, measurements, sparkRoundDetails, logger) + await runFraudDetection({ + roundIndex: 1n, + measurements, + sparkRoundDetails, + requiredCommitteeSize: 1, + logger + }) + assert.deepStrictEqual( measurements.map(m => m.fraudAssessment), ['OK', 'IPNI_NOT_QUERIED'] @@ -668,7 +719,13 @@ describe('fraud detection', function () { stationId })) - await runFraudDetection(1n, measurements, sparkRoundDetails, logger) + await runFraudDetection({ + roundIndex: 1n, + measurements, + sparkRoundDetails, + requiredCommitteeSize: 1, + logger + }) assert.deepStrictEqual(measurements.map(m => `${m.cid}::${m.minerId}::${m.fraudAssessment}`), [ 'bafyone::f010::TASK_WRONG_NODE', diff --git a/test/helpers/test-data.js b/test/helpers/test-data.js index a0b38a6..eea2b98 100644 --- a/test/helpers/test-data.js +++ b/test/helpers/test-data.js @@ -1,14 +1,18 @@ +import { groupMeasurementsToCommittees } from '../../lib/committee.js' + export const VALID_PARTICIPANT_ADDRESS = '0x000000000000000000000000000000000000dEaD' export const VALID_STATION_ID = '8800000000000000000000000000000000000000000000000000000000000000000000000000000000000000' export const VALID_INET_GROUP = 'some-group-id' +/** @import { Measurement} from '../../lib/preprocess.js' */ + export const VALID_TASK = { cid: 'QmUuEoBdjC8D1PfWZCc7JCSK8nj7TV6HbXWDHYHzZHCVGS', minerId: 'f1test' } Object.freeze(VALID_TASK) -/** @type {import('../../lib/preprocess.js').Measurement} */ +/** @type {Measurement} */ export const VALID_MEASUREMENT = { cid: VALID_TASK.cid, minerId: VALID_TASK.minerId, @@ -59,3 +63,18 @@ export const today = () => { d.setMilliseconds(0) return d } + +/** + * @param {Iterable} acceptedMeasurements + */ +export const buildEvaluatedCommitteesFromMeasurements = (acceptedMeasurements) => { + for (const m of acceptedMeasurements) m.fraudAssessment = 'OK' + const committees = [...groupMeasurementsToCommittees(acceptedMeasurements).values()] + for (const c of committees) { + c.evaluation = { + indexerResult: c.measurements[0].indexerResult, + retrievalResult: c.measurements[0].retrievalResult + } + } + return committees +} diff --git a/test/public-stats.test.js b/test/public-stats.test.js index 109c7e2..fb09434 100644 --- a/test/public-stats.test.js +++ b/test/public-stats.test.js @@ -3,7 +3,7 @@ import pg from 'pg' import { DATABASE_URL } from '../lib/config.js' import { migrateWithPgClient } from '../lib/migrate.js' -import { VALID_MEASUREMENT } from './helpers/test-data.js' +import { buildEvaluatedCommitteesFromMeasurements, VALID_MEASUREMENT } from './helpers/test-data.js' import { updatePublicStats } from '../lib/public-stats.js' import { beforeEach } from 'mocha' @@ -48,11 +48,13 @@ describe('public-stats', () => { it('creates or updates the row for today - one miner only', async () => { /** @type {Measurement[]} */ const honestMeasurements = [ - { ...VALID_MEASUREMENT, retrievalResult: 'OK' }, - { ...VALID_MEASUREMENT, retrievalResult: 'TIMEOUT' } + { ...VALID_MEASUREMENT, cid: 'cidone', retrievalResult: 'OK' }, + { ...VALID_MEASUREMENT, cid: 'cidtwo', retrievalResult: 'TIMEOUT' } ] const allMeasurements = honestMeasurements - await updatePublicStats({ createPgClient, honestMeasurements, allMeasurements }) + const committees = buildEvaluatedCommitteesFromMeasurements(honestMeasurements) + + await updatePublicStats({ createPgClient, committees, honestMeasurements, allMeasurements }) const { rows: created } = await pgClient.query( 'SELECT day::TEXT, total, successful FROM retrieval_stats' @@ -62,7 +64,7 @@ describe('public-stats', () => { ]) honestMeasurements.push({ ...VALID_MEASUREMENT, retrievalResult: 'UNKNOWN_ERROR' }) - await updatePublicStats({ createPgClient, honestMeasurements, allMeasurements }) + await updatePublicStats({ createPgClient, committees, honestMeasurements, allMeasurements }) const { rows: updated } = await pgClient.query( 'SELECT day::TEXT, total, successful FROM retrieval_stats' @@ -80,7 +82,9 @@ describe('public-stats', () => { { ...VALID_MEASUREMENT, minerId: 'f1second', retrievalResult: 'OK' } ] const allMeasurements = honestMeasurements - await updatePublicStats({ createPgClient, honestMeasurements, allMeasurements }) + const committees = buildEvaluatedCommitteesFromMeasurements(honestMeasurements) + + await updatePublicStats({ createPgClient, committees, honestMeasurements, allMeasurements }) const { rows: created } = await pgClient.query( 'SELECT day::TEXT, miner_id, total, successful FROM retrieval_stats' @@ -92,7 +96,7 @@ describe('public-stats', () => { honestMeasurements.push({ ...VALID_MEASUREMENT, minerId: 'f1first', retrievalResult: 'UNKNOWN_ERROR' }) honestMeasurements.push({ ...VALID_MEASUREMENT, minerId: 'f1second', retrievalResult: 'UNKNOWN_ERROR' }) - await updatePublicStats({ createPgClient, honestMeasurements, allMeasurements }) + await updatePublicStats({ createPgClient, committees, honestMeasurements, allMeasurements }) const { rows: updated } = await pgClient.query( 'SELECT day::TEXT, miner_id, total, successful FROM retrieval_stats' @@ -113,7 +117,9 @@ describe('public-stats', () => { { ...VALID_MEASUREMENT, cid: 'bafy3', indexerResult: 'ERROR_404' } ] const allMeasurements = honestMeasurements - await updatePublicStats({ createPgClient, honestMeasurements, allMeasurements }) + let committees = buildEvaluatedCommitteesFromMeasurements(honestMeasurements) + + await updatePublicStats({ createPgClient, committees, honestMeasurements, allMeasurements }) const { rows: created } = await pgClient.query( 'SELECT day::TEXT, deals_tested, deals_advertising_http FROM indexer_query_stats' @@ -127,7 +133,9 @@ describe('public-stats', () => { honestMeasurements.push({ ...VALID_MEASUREMENT, indexerResult: 'UNKNOWN_ERROR' }) // This is a measurement for a new task. honestMeasurements.push({ ...VALID_MEASUREMENT, cid: 'bafy4', indexerResult: 'UNKNOWN_ERROR' }) - await updatePublicStats({ createPgClient, honestMeasurements, allMeasurements }) + committees = buildEvaluatedCommitteesFromMeasurements(honestMeasurements) + + await updatePublicStats({ createPgClient, committees, honestMeasurements, allMeasurements }) const { rows: updated } = await pgClient.query( 'SELECT day::TEXT, deals_tested, deals_advertising_http FROM indexer_query_stats' @@ -149,7 +157,9 @@ describe('public-stats', () => { { ...VALID_MEASUREMENT, cid: 'bafy4', status_code: 502, retrievalResult: 'ERROR_502' } ] const allMeasurements = honestMeasurements - await updatePublicStats({ createPgClient, honestMeasurements, allMeasurements }) + let committees = buildEvaluatedCommitteesFromMeasurements(honestMeasurements) + + await updatePublicStats({ createPgClient, committees, honestMeasurements, allMeasurements }) const { rows: created } = await pgClient.query( 'SELECT day::TEXT, total, indexed, retrievable FROM daily_deals' @@ -162,10 +172,11 @@ describe('public-stats', () => { // effectively ignored as the other measurement was successful. honestMeasurements.push({ ...VALID_MEASUREMENT, status_code: 502 }) // These are measurements for a new task. + honestMeasurements.push({ ...VALID_MEASUREMENT, cid: 'bafy5', indexerResult: 'OK', status_code: 502, retrievalResult: 'ERROR_502' }) honestMeasurements.push({ ...VALID_MEASUREMENT, cid: 'bafy5', indexerResult: 'UNKNOWN_ERROR', retrievalResult: 'IPNI_UNKNOWN_ERROR' }) - honestMeasurements.push({ ...VALID_MEASUREMENT, cid: 'bafy5', status_code: 502, retrievalResult: 'ERROR_502' }) + committees = buildEvaluatedCommitteesFromMeasurements(honestMeasurements) - await updatePublicStats({ createPgClient, honestMeasurements, allMeasurements }) + await updatePublicStats({ createPgClient, committees, honestMeasurements, allMeasurements }) const { rows: updated } = await pgClient.query( 'SELECT day::TEXT, total, indexed, retrievable FROM daily_deals' diff --git a/test/retrieval-stats.test.js b/test/retrieval-stats.test.js index dcec093..00bbf4a 100644 --- a/test/retrieval-stats.test.js +++ b/test/retrieval-stats.test.js @@ -8,6 +8,7 @@ import { } from '../lib/retrieval-stats.js' import { VALID_MEASUREMENT } from './helpers/test-data.js' import { assertPointFieldValue, getPointName } from './helpers/assertions.js' +import { groupMeasurementsToCommittees } from '../lib/committee.js' /** @typedef {import('../lib/preprocess.js').Measurement} Measurement */ @@ -15,6 +16,7 @@ const debug = createDebug('test') describe('retrieval statistics', () => { it('reports all stats', async () => { + /** @type {Measurement[]} */ const measurements = [ { ...VALID_MEASUREMENT @@ -246,9 +248,11 @@ describe('recordCommitteeSizes', () => { cid: 'bafyanother' } ] + measurements.forEach(m => { m.fraudAssessment = 'OK' }) const point = new Point('committees') - recordCommitteeSizes(measurements, point) + const committees = groupMeasurementsToCommittees(measurements).values() + recordCommitteeSizes(committees, point) debug(getPointName(point), point.fields) assertPointFieldValue(point, 'subnets_min', '1i') @@ -284,11 +288,12 @@ describe('recordCommitteeSizes', () => { ...VALID_MEASUREMENT, cid: 'bafyanother' } - ] + measurements.forEach(m => { m.fraudAssessment = 'OK' }) const point = new Point('committees') - recordCommitteeSizes(measurements, point) + const committees = groupMeasurementsToCommittees(measurements).values() + recordCommitteeSizes(committees, point) debug(getPointName(point), point.fields) assertPointFieldValue(point, 'participants_min', '1i') @@ -326,11 +331,12 @@ describe('recordCommitteeSizes', () => { ...VALID_MEASUREMENT, cid: 'bafyanother' } - ] + measurements.forEach(m => { m.fraudAssessment = 'OK' }) const point = new Point('committees') - recordCommitteeSizes(measurements, point) + const committees = groupMeasurementsToCommittees(measurements).values() + recordCommitteeSizes(committees, point) debug(getPointName(point), point.fields) assertPointFieldValue(point, 'nodes_min', '1i') @@ -358,11 +364,12 @@ describe('recordCommitteeSizes', () => { ...VALID_MEASUREMENT, cid: 'bafyanother' } - ] + measurements.forEach(m => { m.fraudAssessment = 'OK' }) const point = new Point('committees') - recordCommitteeSizes(measurements, point) + const committees = groupMeasurementsToCommittees(measurements).values() + recordCommitteeSizes(committees, point) debug(getPointName(point), point.fields) assertPointFieldValue(point, 'measurements_min', '1i')