Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: form committees and find majority results #304

Merged
merged 21 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 200 additions & 0 deletions lib/committee.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
import assert from 'node:assert'
import createDebug from 'debug'
import { getTaskId } from './retrieval-stats.js'

/** @import {Measurement} from './preprocess.js' */
/** @import {RetrievalResult, CommitteeCheckError} from './typings.js' */

const debug = createDebug('spark:committee')

/** @typedef {Map<string, Committee>} TaskIdToCommitteeMap */

export class Committee {
/** @type {Measurement[]} */
#measurements

/**
* @param {Pick<Measurement, 'cid' | 'minerId'>} retrievalTask
*/
constructor ({ cid, minerId }) {
this.retrievalTask = { minerId, cid }

this.#measurements = []

/** @type {string | CommitteeCheckError} */
this.indexerResult = 'MAJORITY_NOT_FOUND'

/** @type {RetrievalResult} */
this.retrievalResult = 'MAJORITY_NOT_FOUND'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think there can be a case where Committee is initialized, then there's an error during evaluate(), then it's state is read and it's falsely interpreted as 'MAJORITY_NOT_FOUND', while it should rather be something like 'UNKNOWN_ERROR'?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you are right that this design (a mutable Committee class) allows that to happen in theory. I think it should not happen in practice right now because if there is an error, then an exception is thrown and the entire evaluation is aborted.

I propose to move these two "*Result" properties into a nested object stored in Commitee.evaluation property, and let this property be undefined until the evaluation completes.

See 029db0d


A possible next refactoring is to remove the Committee class entirely and modify Committee.evaluate into a pure function accepting ({requiredCommitteeSize, retrievalTask, measurements}) and returning CommitteeEvaluation.

Such change would push the complexity to runFraudDetection, which would need to create the list of committees - an array of {retrievalTask, measurements, evaluation} objects. This list of committees is later used to extract stats for InfluxDB and spark-stats API.

In that light, such refactoring does not seem to be an improvement.

Let me know what you think!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to having the result be undefined or something else, which we can differentiate from the previous default state

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I'm not sure whether removing the class does help, let's see how it goes?

}

get size () {
return this.#measurements.length
}

get measurements () {
const ret = [...this.#measurements]
Object.freeze(ret)
return ret
}

/**
* @param {Measurement} m
*/
addMeasurement (m) {
assert.strictEqual(m.cid, this.retrievalTask.cid, 'cid must match')
assert.strictEqual(m.minerId, this.retrievalTask.minerId, 'minerId must match')
assert.strictEqual(m.fraudAssessment, 'OK', 'only accepted measurements can be added')
this.#measurements.push(m)
}

/**
* @param {object} args
* @param {number} args.requiredCommitteeSize
* @returns
*/
evaluate ({ requiredCommitteeSize }) {
debug(
'Evaluating task %o with a committee of %s measurements',
this.retrievalTask,
this.#measurements.length
)

if (this.#measurements.length < requiredCommitteeSize) {
debug('→ committee is too small (size=%s required=%s); retrievalResult=COMMITTEE_TOO_SMALL',
this.#measurements.length,
requiredCommitteeSize
)
this.indexerResult = 'COMMITTEE_TOO_SMALL'
this.retrievalResult = 'COMMITTEE_TOO_SMALL'
this.#measurements.forEach(m => { m.fraudAssessment = 'COMMITTEE_TOO_SMALL' })
bajtos marked this conversation as resolved.
Show resolved Hide resolved
return
}

debug('- searching for majority in indexer results')
/** @type {(keyof Measurement)[]} */
const indexerResultProps = [
'providerId',
'indexerResult',
'provider_address',
'protocol'
]
const indexerResultMajority = this.#findMajority(indexerResultProps)
if (indexerResultMajority) {
this.indexerResult = indexerResultMajority.majorityValue.indexerResult
}

debug('- searching for majority in retrieval results')
/** @type {(keyof Measurement)[]} */
const retrievalResultProps = [
...indexerResultProps,
'retrievalResult'
// TODO
// - status_code,
// - timeout,
// - car size
// - car checksum
// - car too large
]

const retrievalResultMajority = this.#findMajority(retrievalResultProps)
if (retrievalResultMajority) {
this.retrievalResult = retrievalResultMajority.majorityValue.retrievalResult
retrievalResultMajority.minorityMeasurements.forEach(m => {
m.fraudAssessment = 'MINORITY_RESULT'
})
bajtos marked this conversation as resolved.
Show resolved Hide resolved
} else {
this.retrievalResult = 'MAJORITY_NOT_FOUND'
this.#measurements.forEach(m => { m.fraudAssessment = 'MAJORITY_NOT_FOUND' })
bajtos marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
* @param {(keyof Measurement)[]} measurementFields
*/
#findMajority (measurementFields) {
/** @param {Measurement} m */
const getResult = m => pick(m, ...measurementFields)

/** @type {Map<string, Measurement[]>} */
const resultGroups = new Map()

// 1. Group measurements using the result as the grouping key
for (const m of this.#measurements) {
const key = JSON.stringify(getResult(m))
let list = resultGroups.get(key)
if (!list) {
list = []
resultGroups.set(key, list)
}
list.push(m)
}

if (debug.enabled) {
debug('- results found:')
for (const k of resultGroups.keys()) {
debug(' %o', JSON.parse(k))
}
}

// 2. Sort the measurement groups by their size
const keys = Array.from(resultGroups.keys())
keys.sort(
(a, b) => (resultGroups.get(a)?.length ?? 0) - (resultGroups.get(b)?.length ?? 0)
)
const measurementGroups = keys.map(k => resultGroups.get(k))

// 3. Find the majority
const majorityMeasurements = measurementGroups.pop()
const majoritySize = majorityMeasurements.length
const majorityValue = getResult(majorityMeasurements[0])

debug('- majority=%s committee-size=%s value=%o', majoritySize, this.size, majorityValue)
if (majoritySize <= this.size / 2) {
debug('→ majority is not absolute; result=MAJORITY_NOT_FOUND')
return undefined
} else {
debug('→ majority agrees on result=%o', majorityValue)
return {
majorityValue,
majorityMeasurements,
minorityMeasurements: measurementGroups.flat()
}
}
}
}

/**
* @template T
* @template {keyof T} K
* @param {T} obj
* @param {K[]} keys
* @returns {Pick<T, K>}
*/
function pick (obj, ...keys) {
/** @type {any} */
const ret = {}
keys.forEach(key => {
ret[key] = obj[key]
})
return ret
}

/**
* @param {Iterable<Measurement>} measurements
* @returns {TaskIdToCommitteeMap}
*/
export function groupMeasurementsToCommittees (measurements) {
/** @type {TaskIdToCommitteeMap} */
const committeesMap = new Map()
for (const m of measurements) {
const key = getTaskId(m)
let c = committeesMap.get(key)
if (!c) {
c = new Committee(m)
committeesMap.set(key, c)
}
c.addMeasurement(m)
}
return committeesMap
}
62 changes: 53 additions & 9 deletions lib/evaluate.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import { getRandomnessForSparkRound } from './drand-client.js'
import { updatePublicStats } from './public-stats.js'
import { buildRetrievalStats, getTaskId, recordCommitteeSizes } from './retrieval-stats.js'
import { getTasksAllowedForStations } from './tasker.js'
import { groupMeasurementsToCommittees } from './committee.js'

/** @import {Measurement} from '../lib/preprocess.js' */
/** @import {Measurement} from './preprocess.js' */

const debug = createDebug('spark:evaluate')

export const MAX_SCORE = 1_000_000_000_000_000n
export const MAX_SET_SCORES_PARTICIPANTS = 700
export const REQUIRED_COMMITTEE_SIZE = 30

export class SetScoresBucket {
constructor () {
Expand Down Expand Up @@ -47,6 +49,7 @@ export const createSetScoresBuckets = participants => {
* @param {object} args
* @param {import('./round.js').RoundData} args.round
* @param {bigint} args.roundIndex
* @param {number} [args.requiredCommitteeSize]
* @param {any} args.ieContractWithSigner
* @param {import('./spark-api.js').fetchRoundDetails} args.fetchRoundDetails,
* @param {import('./typings.js').RecordTelemetryFn} args.recordTelemetry
Expand All @@ -56,12 +59,15 @@ export const createSetScoresBuckets = participants => {
export const evaluate = async ({
round,
roundIndex,
requiredCommitteeSize,
ieContractWithSigner,
fetchRoundDetails,
recordTelemetry,
createPgClient,
logger
}) => {
requiredCommitteeSize ??= REQUIRED_COMMITTEE_SIZE

// Get measurements
/** @type {Measurement[]} */
const measurements = round.measurements || []
Expand All @@ -74,7 +80,13 @@ export const evaluate = async ({

const started = Date.now()

await runFraudDetection(roundIndex, measurements, sparkRoundDetails, logger)
const { committees } = await runFraudDetection({
roundIndex,
measurements,
sparkRoundDetails,
requiredCommitteeSize,
logger
})
const honestMeasurements = measurements.filter(m => m.fraudAssessment === 'OK')

// Calculate reward shares
Expand Down Expand Up @@ -216,7 +228,13 @@ export const evaluate = async ({
try {
recordTelemetry('committees', (point) => {
point.intField('round_index', roundIndex)
recordCommitteeSizes(honestMeasurements, point)
point.intField('committees_all', committees.length)
point.intField('committees_too_small',
committees
.filter(c => c.retrievalResult === 'COMMITTEE_TOO_SMALL')
.length
)
recordCommitteeSizes(committees, point)
})
} catch (err) {
console.error('Cannot record committees.', err)
Expand All @@ -238,12 +256,20 @@ export const evaluate = async ({
}

/**
* @param {bigint} roundIndex
* @param {Measurement[]} measurements
* @param {import('./typings.js').RoundDetails} sparkRoundDetails
* @param {Pick<Console, 'log' | 'error'>} logger
* @param {object} args
* @param {bigint} args.roundIndex
* @param {Measurement[]} args.measurements
* @param {import('./typings.js').RoundDetails} args.sparkRoundDetails
* @param {number} args.requiredCommitteeSize
* @param {Pick<Console, 'log' | 'error'>} args.logger
*/
export const runFraudDetection = async (roundIndex, measurements, sparkRoundDetails, logger) => {
export const runFraudDetection = async ({
roundIndex,
measurements,
sparkRoundDetails,
requiredCommitteeSize,
logger
}) => {
const randomness = await getRandomnessForSparkRound(Number(sparkRoundDetails.startEpoch))

const taskBuildingStarted = Date.now()
Expand Down Expand Up @@ -299,7 +325,7 @@ export const runFraudDetection = async (roundIndex, measurements, sparkRoundDeta
}

//
// 2. Reward only one participant in each inet group
// 2. Accept only maxTasksPerNode measurements from each inet group
//
/** @type {Map<string, Measurement[]>} */
const inetGroups = new Map()
Expand Down Expand Up @@ -368,6 +394,22 @@ export const runFraudDetection = async (roundIndex, measurements, sparkRoundDeta
}
}

//
// 3. Group measurements to per-task committees and find majority result
juliangruber marked this conversation as resolved.
Show resolved Hide resolved
//
const iterateHonestMeasurements = function * () {
for (const m of measurements) {
if (m.fraudAssessment !== 'OK') continue
yield m
}
}

const committees = groupMeasurementsToCommittees(iterateHonestMeasurements())
juliangruber marked this conversation as resolved.
Show resolved Hide resolved

for (const c of committees.values()) {
c.evaluate({ requiredCommitteeSize })
}

if (debug.enabled) {
for (const m of measurements) {
// Print round & participant address & CID together to simplify lookup when debugging
Expand All @@ -381,6 +423,8 @@ export const runFraudDetection = async (roundIndex, measurements, sparkRoundDeta
m)
}
}

return { committees: Array.from(committees.values()) }
}

/**
Expand Down
5 changes: 4 additions & 1 deletion lib/preprocess.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const debug = createDebug('spark:preprocess')
export class Measurement {
/**
* @param {Partial<import('./typings.js').RawMeasurement>} m
* @param {(string) => string} pointerize
* @param {<T extends string>(str: T) => T} pointerize
*/
constructor (m, pointerize = (v) => v) {
this.participantAddress = pointerize(parseParticipantAddress(m.participant_address))
Expand Down Expand Up @@ -208,6 +208,9 @@ export const parseMeasurements = str => {
return ret
}

/**
* @param {string} filWalletAddress
*/
export const parseParticipantAddress = filWalletAddress => {
// ETH addresses don't need any conversion
if (filWalletAddress.startsWith('0x')) {
Expand Down
4 changes: 4 additions & 0 deletions lib/public-stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ const updateRetrievalStats = async (pgClient, minerId, { total, successful }) =>
}

const updateIndexerQueryStats = async (pgClient, honestMeasurements) => {
// FIXME(bajtos) use committees to build these stats

/** @type {Set<string>} */
const dealsWithHttpAdvertisement = new Set()
/** @type {Set<string>} */
Expand Down Expand Up @@ -93,6 +95,8 @@ const updateIndexerQueryStats = async (pgClient, honestMeasurements) => {
* @param {import('./preprocess.js').Measurement[]} acceptedMeasurements
*/
const updateDailyDealsStats = async (pgClient, acceptedMeasurements) => {
// FIXME(bajtos) use committees to build these stats

/** @type {Set<string>} */
const dealsAll = new Set()
/** @type {Set<string>} */
Expand Down
Loading
Loading