Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: form committees and find majority results #304

Merged
merged 21 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 214 additions & 0 deletions lib/committee.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
import assert from 'node:assert'
import createDebug from 'debug'
import { getTaskId } from './retrieval-stats.js'

/** @import {Measurement} from './preprocess.js' */
/** @import {RetrievalResult, CommitteeCheckError} from './typings.js' */

const debug = createDebug('spark:committee')

/** @typedef {Map<string, Committee>} TaskIdToCommitteeMap */

/** @typedef {{
indexerResult: string | CommitteeCheckError;
retrievalResult: RetrievalResult
}} CommitteeEvaluation
*/
export class Committee {
/** @type {Measurement[]} */
#measurements

/**
* @param {Pick<Measurement, 'cid' | 'minerId'>} retrievalTask
*/
constructor ({ cid, minerId }) {
this.retrievalTask = { minerId, cid }

this.#measurements = []

/** @type {CommitteeEvaluation | undefined} */
this.evaluation = undefined
}

get size () {
return this.#measurements.length
}

get measurements () {
const ret = [...this.#measurements]
Object.freeze(ret)
return ret
}

/**
* @param {Measurement} m
*/
addMeasurement (m) {
assert.strictEqual(m.cid, this.retrievalTask.cid, 'cid must match')
assert.strictEqual(m.minerId, this.retrievalTask.minerId, 'minerId must match')
assert.strictEqual(m.fraudAssessment, 'OK', 'only accepted measurements can be added')
this.#measurements.push(m)
}

/**
* @param {object} args
* @param {number} args.requiredCommitteeSize
* @returns
*/
evaluate ({ requiredCommitteeSize }) {
debug(
'Evaluating task %o with a committee of %s measurements',
this.retrievalTask,
this.#measurements.length
)

if (this.#measurements.length < requiredCommitteeSize) {
debug('→ committee is too small (size=%s required=%s); retrievalResult=COMMITTEE_TOO_SMALL',
this.#measurements.length,
requiredCommitteeSize
)
this.evaluation = {
indexerResult: 'COMMITTEE_TOO_SMALL',
retrievalResult: 'COMMITTEE_TOO_SMALL'
}
for (const m of this.#measurements) m.fraudAssessment = 'COMMITTEE_TOO_SMALL'
return
}

debug('- searching for majority in indexer results')
/** @type {(keyof Measurement)[]} */
const indexerResultProps = [
'providerId',
'indexerResult',
'provider_address',
'protocol'
]
const indexerResultMajority = this.#findMajority(indexerResultProps)
const indexerResult = indexerResultMajority
? indexerResultMajority.majorityValue.indexerResult
: 'MAJORITY_NOT_FOUND'

debug('- searching for majority in retrieval results')
/** @type {(keyof Measurement)[]} */
const retrievalResultProps = [
...indexerResultProps,
// NOTE: We are not checking the fields that were used to calculate
// the retrievalResult value:
// - status_code
// - timeout
// - car_too_large
// If there is an agreement on the retrieval result, then those fields
// must have the same value too.
'retrievalResult',
'byte_length',
'carChecksum'
]

const retrievalResultMajority = this.#findMajority(retrievalResultProps)
/** @type {CommitteeEvaluation['retrievalResult']} */
let retrievalResult
if (retrievalResultMajority) {
retrievalResult = retrievalResultMajority.majorityValue.retrievalResult
for (const m of retrievalResultMajority.minorityMeasurements) {
m.fraudAssessment = 'MINORITY_RESULT'
}
} else {
retrievalResult = 'MAJORITY_NOT_FOUND'
for (const m of this.#measurements) m.fraudAssessment = 'MAJORITY_NOT_FOUND'
}

this.evaluation = {
indexerResult,
retrievalResult
}
}

/**
* @param {(keyof Measurement)[]} measurementFields
*/
#findMajority (measurementFields) {
/** @param {Measurement} m */
const getResult = m => pick(m, ...measurementFields)

/** @type {Map<string, Measurement[]>} */
const resultGroups = new Map()

// 1. Group measurements using the result as the grouping key
for (const m of this.#measurements) {
const key = JSON.stringify(getResult(m))
let list = resultGroups.get(key)
if (!list) {
list = []
resultGroups.set(key, list)
}
list.push(m)
}

if (debug.enabled) {
debug('- results found:')
for (const k of resultGroups.keys()) {
debug(' %o', JSON.parse(k))
}
}

// 2. Sort the measurement groups by their size
const keys = Array.from(resultGroups.keys())
keys.sort(
(a, b) => (resultGroups.get(a)?.length ?? 0) - (resultGroups.get(b)?.length ?? 0)
)
const measurementGroups = keys.map(k => resultGroups.get(k))

// 3. Find the majority
const majorityMeasurements = measurementGroups.pop()
const majoritySize = majorityMeasurements.length
const majorityValue = getResult(majorityMeasurements[0])

debug('- majority=%s committee-size=%s value=%o', majoritySize, this.size, majorityValue)
if (majoritySize <= this.size / 2) {
debug('→ majority is not absolute; result=MAJORITY_NOT_FOUND')
return undefined
} else {
debug('→ majority agrees on result=%o', majorityValue)
return {
majorityValue,
majorityMeasurements,
minorityMeasurements: measurementGroups.flat()
}
}
}
}

/**
* @template T
* @template {keyof T} K
* @param {T} obj
* @param {K[]} keys
* @returns {Pick<T, K>}
*/
function pick (obj, ...keys) {
/** @type {any} */
const ret = {}
keys.forEach(key => {
ret[key] = obj[key]
})
return ret
}

/**
* @param {Iterable<Measurement>} measurements
* @returns {TaskIdToCommitteeMap}
*/
export function groupMeasurementsToCommittees (measurements) {
/** @type {TaskIdToCommitteeMap} */
const committeesMap = new Map()
for (const m of measurements) {
const key = getTaskId(m)
let c = committeesMap.get(key)
if (!c) {
c = new Committee(m)
committeesMap.set(key, c)
}
c.addMeasurement(m)
}
return committeesMap
}
68 changes: 58 additions & 10 deletions lib/evaluate.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import { getRandomnessForSparkRound } from './drand-client.js'
import { updatePublicStats } from './public-stats.js'
import { buildRetrievalStats, getTaskId, recordCommitteeSizes } from './retrieval-stats.js'
import { getTasksAllowedForStations } from './tasker.js'
import { groupMeasurementsToCommittees } from './committee.js'

/** @import {Measurement} from '../lib/preprocess.js' */
/** @import {Measurement} from './preprocess.js' */

const debug = createDebug('spark:evaluate')

export const MAX_SCORE = 1_000_000_000_000_000n
export const MAX_SET_SCORES_PARTICIPANTS = 700
export const REQUIRED_COMMITTEE_SIZE = 30

export class SetScoresBucket {
constructor () {
Expand Down Expand Up @@ -46,6 +48,7 @@ export const createSetScoresBuckets = participants => {
* @param {object} args
* @param {import('./round.js').RoundData} args.round
* @param {bigint} args.roundIndex
* @param {number} [args.requiredCommitteeSize]
* @param {any} args.ieContractWithSigner
* @param {import('./spark-api.js').fetchRoundDetails} args.fetchRoundDetails,
* @param {import('./typings.js').RecordTelemetryFn} args.recordTelemetry
Expand All @@ -55,12 +58,15 @@ export const createSetScoresBuckets = participants => {
export const evaluate = async ({
round,
roundIndex,
requiredCommitteeSize,
ieContractWithSigner,
fetchRoundDetails,
recordTelemetry,
createPgClient,
logger
}) => {
requiredCommitteeSize ??= REQUIRED_COMMITTEE_SIZE

// Get measurements
/** @type {Measurement[]} */
const measurements = round.measurements || []
Expand All @@ -73,7 +79,13 @@ export const evaluate = async ({

const started = Date.now()

await runFraudDetection(roundIndex, measurements, sparkRoundDetails, logger)
const { committees } = await runFraudDetection({
roundIndex,
measurements,
sparkRoundDetails,
requiredCommitteeSize,
logger
})
const honestMeasurements = measurements.filter(m => m.fraudAssessment === 'OK')

// Calculate reward shares
Expand Down Expand Up @@ -207,7 +219,13 @@ export const evaluate = async ({
try {
recordTelemetry('committees', (point) => {
point.intField('round_index', roundIndex)
recordCommitteeSizes(honestMeasurements, point)
point.intField('committees_all', committees.length)
point.intField('committees_too_small',
committees
.filter(c => c.evaluation?.retrievalResult === 'COMMITTEE_TOO_SMALL')
.length
)
recordCommitteeSizes(committees, point)
})
} catch (err) {
console.error('Cannot record committees.', err)
Expand All @@ -217,7 +235,7 @@ export const evaluate = async ({

if (createPgClient) {
try {
await updatePublicStats({ createPgClient, honestMeasurements, allMeasurements: measurements })
await updatePublicStats({ createPgClient, committees, honestMeasurements, allMeasurements: measurements })
} catch (err) {
console.error('Cannot update public stats.', err)
ignoredErrors.push(err)
Expand All @@ -229,12 +247,20 @@ export const evaluate = async ({
}

/**
* @param {bigint} roundIndex
* @param {Measurement[]} measurements
* @param {import('./typings.js').RoundDetails} sparkRoundDetails
* @param {Pick<Console, 'log' | 'error'>} logger
* @param {object} args
* @param {bigint} args.roundIndex
* @param {Measurement[]} args.measurements
* @param {import('./typings.js').RoundDetails} args.sparkRoundDetails
* @param {number} args.requiredCommitteeSize
* @param {Pick<Console, 'log' | 'error'>} args.logger
*/
export const runFraudDetection = async (roundIndex, measurements, sparkRoundDetails, logger) => {
export const runFraudDetection = async ({
roundIndex,
measurements,
sparkRoundDetails,
requiredCommitteeSize,
logger
}) => {
const randomness = await getRandomnessForSparkRound(Number(sparkRoundDetails.startEpoch))

const taskBuildingStarted = Date.now()
Expand Down Expand Up @@ -290,7 +316,7 @@ export const runFraudDetection = async (roundIndex, measurements, sparkRoundDeta
}

//
// 2. Reward only one participant in each inet group
// 2. Accept only maxTasksPerNode measurements from each inet group
//
/** @type {Map<string, Measurement[]>} */
const inetGroups = new Map()
Expand Down Expand Up @@ -359,6 +385,26 @@ export const runFraudDetection = async (roundIndex, measurements, sparkRoundDeta
}
}

//
// 3. Group measurements to per-task committees and find majority result
juliangruber marked this conversation as resolved.
Show resolved Hide resolved
//

// PERFORMANCE: Avoid duplicating the array of measurements because there are
// hundreds of thousands of them. All the function groupMeasurementsToCommittees
// needs is to iterate over the accepted measurements once.
const iterateAcceptedMeasurements = function * () {
for (const m of measurements) {
if (m.fraudAssessment !== 'OK') continue
yield m
}
}

const committees = groupMeasurementsToCommittees(iterateAcceptedMeasurements())

for (const c of committees.values()) {
c.evaluate({ requiredCommitteeSize })
}

if (debug.enabled) {
for (const m of measurements) {
// Print round & participant address & CID together to simplify lookup when debugging
Expand All @@ -372,6 +418,8 @@ export const runFraudDetection = async (roundIndex, measurements, sparkRoundDeta
m)
}
}

return { committees: Array.from(committees.values()) }
}

/**
Expand Down
5 changes: 4 additions & 1 deletion lib/preprocess.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const debug = createDebug('spark:preprocess')
export class Measurement {
/**
* @param {Partial<import('./typings.js').RawMeasurement>} m
* @param {(string) => string} pointerize
* @param {<T extends string>(str: T) => T} pointerize
*/
constructor (m, pointerize = (v) => v) {
this.participantAddress = pointerize(parseParticipantAddress(m.participant_address))
Expand Down Expand Up @@ -208,6 +208,9 @@ export const parseMeasurements = str => {
return ret
}

/**
* @param {string} filWalletAddress
*/
export const parseParticipantAddress = filWalletAddress => {
// ETH addresses don't need any conversion
if (filWalletAddress.startsWith('0x')) {
Expand Down
Loading