Skip to content

Commit

Permalink
feat: form committees and find majority results (#304)
Browse files Browse the repository at this point in the history
Group accepted measurements to committees on a per-retrieval-task basis.
Evaluate each committee to find an absolute majority result. Reject
results that are in minority.

When the committee is too small to give us confidence in a majority
being honest, or if we cannot find an absolute majority, then reject
all measurements in such committee.

Signed-off-by: Miroslav Bajtoš <oss@bajtos.net>
Co-authored-by: Julian Gruber <julian@juliangruber.com>
  • Loading branch information
bajtos and juliangruber authored Aug 28, 2024
1 parent fc7e4c4 commit 4925298
Show file tree
Hide file tree
Showing 11 changed files with 706 additions and 86 deletions.
214 changes: 214 additions & 0 deletions lib/committee.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
import assert from 'node:assert'
import createDebug from 'debug'
import { getTaskId } from './retrieval-stats.js'

/** @import {Measurement} from './preprocess.js' */
/** @import {RetrievalResult, CommitteeCheckError} from './typings.js' */

const debug = createDebug('spark:committee')

/** @typedef {Map<string, Committee>} TaskIdToCommitteeMap */

/** @typedef {{
indexerResult: string | CommitteeCheckError;
retrievalResult: RetrievalResult
}} CommitteeEvaluation
*/
export class Committee {
/** @type {Measurement[]} */
#measurements

/**
* @param {Pick<Measurement, 'cid' | 'minerId'>} retrievalTask
*/
constructor ({ cid, minerId }) {
this.retrievalTask = { minerId, cid }

this.#measurements = []

/** @type {CommitteeEvaluation | undefined} */
this.evaluation = undefined
}

get size () {
return this.#measurements.length
}

get measurements () {
const ret = [...this.#measurements]
Object.freeze(ret)
return ret
}

/**
* @param {Measurement} m
*/
addMeasurement (m) {
assert.strictEqual(m.cid, this.retrievalTask.cid, 'cid must match')
assert.strictEqual(m.minerId, this.retrievalTask.minerId, 'minerId must match')
assert.strictEqual(m.fraudAssessment, 'OK', 'only accepted measurements can be added')
this.#measurements.push(m)
}

/**
* @param {object} args
* @param {number} args.requiredCommitteeSize
* @returns
*/
evaluate ({ requiredCommitteeSize }) {
debug(
'Evaluating task %o with a committee of %s measurements',
this.retrievalTask,
this.#measurements.length
)

if (this.#measurements.length < requiredCommitteeSize) {
debug('→ committee is too small (size=%s required=%s); retrievalResult=COMMITTEE_TOO_SMALL',
this.#measurements.length,
requiredCommitteeSize
)
this.evaluation = {
indexerResult: 'COMMITTEE_TOO_SMALL',
retrievalResult: 'COMMITTEE_TOO_SMALL'
}
for (const m of this.#measurements) m.fraudAssessment = 'COMMITTEE_TOO_SMALL'
return
}

debug('- searching for majority in indexer results')
/** @type {(keyof Measurement)[]} */
const indexerResultProps = [
'providerId',
'indexerResult',
'provider_address',
'protocol'
]
const indexerResultMajority = this.#findMajority(indexerResultProps)
const indexerResult = indexerResultMajority
? indexerResultMajority.majorityValue.indexerResult
: 'MAJORITY_NOT_FOUND'

debug('- searching for majority in retrieval results')
/** @type {(keyof Measurement)[]} */
const retrievalResultProps = [
...indexerResultProps,
// NOTE: We are not checking the fields that were used to calculate
// the retrievalResult value:
// - status_code
// - timeout
// - car_too_large
// If there is an agreement on the retrieval result, then those fields
// must have the same value too.
'retrievalResult',
'byte_length',
'carChecksum'
]

const retrievalResultMajority = this.#findMajority(retrievalResultProps)
/** @type {CommitteeEvaluation['retrievalResult']} */
let retrievalResult
if (retrievalResultMajority) {
retrievalResult = retrievalResultMajority.majorityValue.retrievalResult
for (const m of retrievalResultMajority.minorityMeasurements) {
m.fraudAssessment = 'MINORITY_RESULT'
}
} else {
retrievalResult = 'MAJORITY_NOT_FOUND'
for (const m of this.#measurements) m.fraudAssessment = 'MAJORITY_NOT_FOUND'
}

this.evaluation = {
indexerResult,
retrievalResult
}
}

/**
* @param {(keyof Measurement)[]} measurementFields
*/
#findMajority (measurementFields) {
/** @param {Measurement} m */
const getResult = m => pick(m, ...measurementFields)

/** @type {Map<string, Measurement[]>} */
const resultGroups = new Map()

// 1. Group measurements using the result as the grouping key
for (const m of this.#measurements) {
const key = JSON.stringify(getResult(m))
let list = resultGroups.get(key)
if (!list) {
list = []
resultGroups.set(key, list)
}
list.push(m)
}

if (debug.enabled) {
debug('- results found:')
for (const k of resultGroups.keys()) {
debug(' %o', JSON.parse(k))
}
}

// 2. Sort the measurement groups by their size
const keys = Array.from(resultGroups.keys())
keys.sort(
(a, b) => (resultGroups.get(a)?.length ?? 0) - (resultGroups.get(b)?.length ?? 0)
)
const measurementGroups = keys.map(k => resultGroups.get(k))

// 3. Find the majority
const majorityMeasurements = measurementGroups.pop()
const majoritySize = majorityMeasurements.length
const majorityValue = getResult(majorityMeasurements[0])

debug('- majority=%s committee-size=%s value=%o', majoritySize, this.size, majorityValue)
if (majoritySize <= this.size / 2) {
debug('→ majority is not absolute; result=MAJORITY_NOT_FOUND')
return undefined
} else {
debug('→ majority agrees on result=%o', majorityValue)
return {
majorityValue,
majorityMeasurements,
minorityMeasurements: measurementGroups.flat()
}
}
}
}

/**
* @template T
* @template {keyof T} K
* @param {T} obj
* @param {K[]} keys
* @returns {Pick<T, K>}
*/
function pick (obj, ...keys) {
/** @type {any} */
const ret = {}
keys.forEach(key => {
ret[key] = obj[key]
})
return ret
}

/**
* @param {Iterable<Measurement>} measurements
* @returns {TaskIdToCommitteeMap}
*/
export function groupMeasurementsToCommittees (measurements) {
/** @type {TaskIdToCommitteeMap} */
const committeesMap = new Map()
for (const m of measurements) {
const key = getTaskId(m)
let c = committeesMap.get(key)
if (!c) {
c = new Committee(m)
committeesMap.set(key, c)
}
c.addMeasurement(m)
}
return committeesMap
}
68 changes: 58 additions & 10 deletions lib/evaluate.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import { getRandomnessForSparkRound } from './drand-client.js'
import { updatePublicStats } from './public-stats.js'
import { buildRetrievalStats, getTaskId, recordCommitteeSizes } from './retrieval-stats.js'
import { getTasksAllowedForStations } from './tasker.js'
import { groupMeasurementsToCommittees } from './committee.js'

/** @import {Measurement} from '../lib/preprocess.js' */
/** @import {Measurement} from './preprocess.js' */

const debug = createDebug('spark:evaluate')

export const MAX_SCORE = 1_000_000_000_000_000n
export const MAX_SET_SCORES_PARTICIPANTS = 700
export const REQUIRED_COMMITTEE_SIZE = 30

export class SetScoresBucket {
constructor () {
Expand Down Expand Up @@ -46,6 +48,7 @@ export const createSetScoresBuckets = participants => {
* @param {object} args
* @param {import('./round.js').RoundData} args.round
* @param {bigint} args.roundIndex
* @param {number} [args.requiredCommitteeSize]
* @param {any} args.ieContractWithSigner
* @param {import('./spark-api.js').fetchRoundDetails} args.fetchRoundDetails,
* @param {import('./typings.js').RecordTelemetryFn} args.recordTelemetry
Expand All @@ -55,12 +58,15 @@ export const createSetScoresBuckets = participants => {
export const evaluate = async ({
round,
roundIndex,
requiredCommitteeSize,
ieContractWithSigner,
fetchRoundDetails,
recordTelemetry,
createPgClient,
logger
}) => {
requiredCommitteeSize ??= REQUIRED_COMMITTEE_SIZE

// Get measurements
/** @type {Measurement[]} */
const measurements = round.measurements || []
Expand All @@ -73,7 +79,13 @@ export const evaluate = async ({

const started = Date.now()

await runFraudDetection(roundIndex, measurements, sparkRoundDetails, logger)
const { committees } = await runFraudDetection({
roundIndex,
measurements,
sparkRoundDetails,
requiredCommitteeSize,
logger
})
const honestMeasurements = measurements.filter(m => m.fraudAssessment === 'OK')

// Calculate reward shares
Expand Down Expand Up @@ -207,7 +219,13 @@ export const evaluate = async ({
try {
recordTelemetry('committees', (point) => {
point.intField('round_index', roundIndex)
recordCommitteeSizes(honestMeasurements, point)
point.intField('committees_all', committees.length)
point.intField('committees_too_small',
committees
.filter(c => c.evaluation?.retrievalResult === 'COMMITTEE_TOO_SMALL')
.length
)
recordCommitteeSizes(committees, point)
})
} catch (err) {
console.error('Cannot record committees.', err)
Expand All @@ -217,7 +235,7 @@ export const evaluate = async ({

if (createPgClient) {
try {
await updatePublicStats({ createPgClient, honestMeasurements, allMeasurements: measurements })
await updatePublicStats({ createPgClient, committees, honestMeasurements, allMeasurements: measurements })
} catch (err) {
console.error('Cannot update public stats.', err)
ignoredErrors.push(err)
Expand All @@ -229,12 +247,20 @@ export const evaluate = async ({
}

/**
* @param {bigint} roundIndex
* @param {Measurement[]} measurements
* @param {import('./typings.js').RoundDetails} sparkRoundDetails
* @param {Pick<Console, 'log' | 'error'>} logger
* @param {object} args
* @param {bigint} args.roundIndex
* @param {Measurement[]} args.measurements
* @param {import('./typings.js').RoundDetails} args.sparkRoundDetails
* @param {number} args.requiredCommitteeSize
* @param {Pick<Console, 'log' | 'error'>} args.logger
*/
export const runFraudDetection = async (roundIndex, measurements, sparkRoundDetails, logger) => {
export const runFraudDetection = async ({
roundIndex,
measurements,
sparkRoundDetails,
requiredCommitteeSize,
logger
}) => {
const randomness = await getRandomnessForSparkRound(Number(sparkRoundDetails.startEpoch))

const taskBuildingStarted = Date.now()
Expand Down Expand Up @@ -290,7 +316,7 @@ export const runFraudDetection = async (roundIndex, measurements, sparkRoundDeta
}

//
// 2. Reward only one participant in each inet group
// 2. Accept only maxTasksPerNode measurements from each inet group
//
/** @type {Map<string, Measurement[]>} */
const inetGroups = new Map()
Expand Down Expand Up @@ -359,6 +385,26 @@ export const runFraudDetection = async (roundIndex, measurements, sparkRoundDeta
}
}

//
// 3. Group measurements to per-task committees and find majority result
//

// PERFORMANCE: Avoid duplicating the array of measurements because there are
// hundreds of thousands of them. All the function groupMeasurementsToCommittees
// needs is to iterate over the accepted measurements once.
const iterateAcceptedMeasurements = function * () {
for (const m of measurements) {
if (m.fraudAssessment !== 'OK') continue
yield m
}
}

const committees = groupMeasurementsToCommittees(iterateAcceptedMeasurements())

for (const c of committees.values()) {
c.evaluate({ requiredCommitteeSize })
}

if (debug.enabled) {
for (const m of measurements) {
// Print round & participant address & CID together to simplify lookup when debugging
Expand All @@ -372,6 +418,8 @@ export const runFraudDetection = async (roundIndex, measurements, sparkRoundDeta
m)
}
}

return { committees: Array.from(committees.values()) }
}

/**
Expand Down
5 changes: 4 additions & 1 deletion lib/preprocess.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const debug = createDebug('spark:preprocess')
export class Measurement {
/**
* @param {Partial<import('./typings.js').RawMeasurement>} m
* @param {(string) => string} pointerize
* @param {<T extends string>(str: T) => T} pointerize
*/
constructor (m, pointerize = (v) => v) {
this.participantAddress = pointerize(parseParticipantAddress(m.participant_address))
Expand Down Expand Up @@ -208,6 +208,9 @@ export const parseMeasurements = str => {
return ret
}

/**
* @param {string} filWalletAddress
*/
export const parseParticipantAddress = filWalletAddress => {
// ETH addresses don't need any conversion
if (filWalletAddress.startsWith('0x')) {
Expand Down
Loading

0 comments on commit 4925298

Please sign in to comment.