Skip to content

Commit

Permalink
fix: detect block gaps when streaming from ordhook (#349)
Browse files Browse the repository at this point in the history
* fix: detect gaps when streaming

* fix: tests
  • Loading branch information
rafaelcr authored Apr 26, 2024
1 parent 5422156 commit 3c1480f
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 13 deletions.
2 changes: 2 additions & 0 deletions src/pg/block-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export class BlockCache {
locations: DbLocationInsert[] = [];
currentLocations = new Map<string, DbCurrentLocationInsert>();
recursiveRefs = new Map<string, string[]>();
revealedNumbers: number[] = [];

mimeTypeCounts = new Map<string, number>();
satRarityCounts = new Map<string, number>();
Expand Down Expand Up @@ -72,6 +73,7 @@ export class BlockCache {
parent: reveal.parent,
timestamp: this.timestamp,
});
this.revealedNumbers.push(reveal.inscription_number.jubilee);
this.increaseMimeTypeCount(mime_type);
this.increaseSatRarityCount(satoshi.rarity);
this.increaseInscriptionTypeCount(reveal.inscription_number.classic < 0 ? 'cursed' : 'blessed');
Expand Down
64 changes: 52 additions & 12 deletions src/pg/pg-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ import {
runMigrations,
stopwatch,
} from '@hirosystems/api-toolkit';
import { BitcoinEvent, BitcoinPayload } from '@hirosystems/chainhook-client';
import {
BadPayloadRequestError,
BitcoinEvent,
BitcoinPayload,
} from '@hirosystems/chainhook-client';
import * as path from 'path';
import * as postgres from 'postgres';
import { Order, OrderBy } from '../api/schemas';
Expand All @@ -35,6 +39,8 @@ export const INSERT_BATCH_SIZE = 4000;

type InscriptionIdentifier = { genesis_id: string } | { number: number };

class BlockAlreadyIngestedError extends Error {}

export class PgStore extends BasePgStore {
readonly brc20: Brc20PgStore;
readonly counts: CountsPgStore;
Expand Down Expand Up @@ -90,14 +96,17 @@ export class PgStore extends BasePgStore {
);
}
for (const event of payload.apply) {
if (await this.isBlockIngested(event)) {
logger.warn(`PgStore skipping previously seen block ${event.block_identifier.index}`);
continue;
}
logger.info(`PgStore apply block ${event.block_identifier.index}`);
const time = stopwatch();
await this.updateInscriptionsEvent(sql, event, 'apply', streamed);
await this.brc20.updateBrc20Operations(sql, event, 'apply');
try {
await this.updateInscriptionsEvent(sql, event, 'apply', streamed);
await this.brc20.updateBrc20Operations(sql, event, 'apply');
} catch (error) {
if (error instanceof BlockAlreadyIngestedError) {
logger.warn(error);
continue;
} else throw error;
}
await this.updateChainTipBlockHeight(sql, event.block_identifier.index);
logger.info(
`PgStore apply block ${
Expand All @@ -119,6 +128,7 @@ export class PgStore extends BasePgStore {
normalizedHexString(event.block_identifier.hash),
event.timestamp
);
if (direction === 'apply') await this.assertNextBlockIsNotIngested(sql, event);
for (const tx of event.transactions) {
const tx_id = normalizedHexString(tx.transaction_identifier.hash);
for (const operation of tx.metadata.ordinal_operations) {
Expand All @@ -138,6 +148,7 @@ export class PgStore extends BasePgStore {
}
switch (direction) {
case 'apply':
if (streamed) await this.assertNextBlockIsContiguous(sql, event, cache);
await this.applyInscriptions(sql, cache, streamed);
break;
case 'rollback':
Expand Down Expand Up @@ -348,15 +359,44 @@ export class PgStore extends BasePgStore {
}
}

private async isBlockIngested(event: BitcoinEvent): Promise<boolean> {
const currentBlockHeight = await this.getChainTipBlockHeight();
private async assertNextBlockIsNotIngested(sql: PgSqlClient, event: BitcoinEvent) {
const result = await sql<{ block_height: number }[]>`
SELECT block_height::int FROM chain_tip
`;
if (!result.count) return false;
const currentHeight = result[0].block_height;
if (
event.block_identifier.index <= currentBlockHeight &&
event.block_identifier.index <= currentHeight &&
event.block_identifier.index !== ORDINALS_GENESIS_BLOCK
) {
return true;
throw new BlockAlreadyIngestedError(
`Block ${event.block_identifier.index} is already ingested, chain tip is at ${currentHeight}`
);
}
}

private async assertNextBlockIsContiguous(
sql: PgSqlClient,
event: BitcoinEvent,
cache: BlockCache
) {
if (!cache.revealedNumbers.length) {
// TODO: How do we check blocks with only transfers?
return;
}
return false;
const result = await sql<{ max: number | null; block_height: number }[]>`
WITH tip AS (SELECT block_height::int FROM chain_tip)
SELECT MAX(number)::int AS max, (SELECT block_height FROM tip)
FROM inscriptions WHERE number >= 0
`;
if (!result.count) return;
const data = result[0];
const firstReveal = cache.revealedNumbers.sort()[0];
if (data.max === null && firstReveal === 0) return;
if ((data.max ?? 0) + 1 != firstReveal)
throw new BadPayloadRequestError(
`Streamed block ${event.block_identifier.index} is non-contiguous, attempting to reveal #${firstReveal} when current max is #${data.max} at block height ${data.block_height}`
);
}

private async updateChainTipBlockHeight(sql: PgSqlClient, block_height: number): Promise<void> {
Expand Down
10 changes: 10 additions & 0 deletions tests/api/cache.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ describe('ETag cache', () => {

test('inscription cache control', async () => {
const block = new TestChainhookPayloadBuilder()
.streamingBlocks(true)
.apply()
.block({ height: 775617 })
.transaction({ hash: '0x38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dc' })
Expand Down Expand Up @@ -88,6 +89,7 @@ describe('ETag cache', () => {
// Perform transfer and check cache
await db.updateInscriptions(
new TestChainhookPayloadBuilder()
.streamingBlocks(true)
.apply()
.block({ height: 775618, timestamp: 1678122360 })
.transaction({
Expand Down Expand Up @@ -125,6 +127,7 @@ describe('ETag cache', () => {
// Perform transfer GAP FILL and check cache
await db.updateInscriptions(
new TestChainhookPayloadBuilder()
.streamingBlocks(true)
.apply()
.block({ height: 775619, timestamp: 1678122360 })
.transaction({
Expand Down Expand Up @@ -161,6 +164,7 @@ describe('ETag cache', () => {

test('inscriptions index cache control', async () => {
const block1 = new TestChainhookPayloadBuilder()
.streamingBlocks(true)
.apply()
.block({ height: 778575 })
.transaction({ hash: '0x9f4a9b73b0713c5da01c0a47f97c6c001af9028d6bdd9e264dfacbc4e6790201' })
Expand Down Expand Up @@ -194,6 +198,7 @@ describe('ETag cache', () => {
.build();
await db.updateInscriptions(block1);
const block2 = new TestChainhookPayloadBuilder()
.streamingBlocks(true)
.apply()
.block({ height: 778576 })
.transaction({ hash: '0x00000000000000000002a90330a99f67e3f01eb2ce070b45930581e82fb7a91d' })
Expand Down Expand Up @@ -246,6 +251,7 @@ describe('ETag cache', () => {

// New location
const block3 = new TestChainhookPayloadBuilder()
.streamingBlocks(true)
.apply()
.block({ height: 778577 })
.transaction({ hash: 'ae9d273a10e899f0d2cad47ee2b0e77ab8a9addd9dd5bb5e4b03d6971c060d52' })
Expand Down Expand Up @@ -274,6 +280,7 @@ describe('ETag cache', () => {

test('inscriptions stats per block cache control', async () => {
const block1 = new TestChainhookPayloadBuilder()
.streamingBlocks(true)
.apply()
.block({ height: 778575, hash: randomHash() })
.transaction({ hash: '0x9f4a9b73b0713c5da01c0a47f97c6c001af9028d6bdd9e264dfacbc4e6790201' })
Expand Down Expand Up @@ -326,6 +333,7 @@ describe('ETag cache', () => {

// New block
const block2 = new TestChainhookPayloadBuilder()
.streamingBlocks(true)
.apply()
.block({ height: 778576, hash: randomHash() })
.transaction({ hash: '0x00000000000000000002a90330a99f67e3f01eb2ce070b45930581e82fb7a91d' })
Expand Down Expand Up @@ -370,6 +378,7 @@ describe('ETag cache', () => {

test('status etag changes with new block', async () => {
const block1 = new TestChainhookPayloadBuilder()
.streamingBlocks(true)
.apply()
.block({ height: 778575, hash: randomHash() })
.transaction({ hash: '0x9f4a9b73b0713c5da01c0a47f97c6c001af9028d6bdd9e264dfacbc4e6790201' })
Expand Down Expand Up @@ -422,6 +431,7 @@ describe('ETag cache', () => {

// New block
const block2 = new TestChainhookPayloadBuilder()
.streamingBlocks(true)
.apply()
.block({ height: 778576, hash: randomHash() })
.transaction({ hash: '0x00000000000000000002a90330a99f67e3f01eb2ce070b45930581e82fb7a91d' })
Expand Down
2 changes: 1 addition & 1 deletion tests/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export class TestChainhookPayloadBuilder {
operation: 'inscription_feed',
meta_protocols: ['brc-20'],
},
is_streaming_blocks: true,
is_streaming_blocks: false,
},
};
private action: 'apply' | 'rollback' = 'apply';
Expand Down
1 change: 1 addition & 0 deletions tests/ordhook/replay.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ describe('Replay', () => {

test('shuts down when streaming on replay mode', async () => {
const payload1 = new TestChainhookPayloadBuilder()
.streamingBlocks(true)
.apply()
.block({
height: 767430,
Expand Down
126 changes: 126 additions & 0 deletions tests/ordhook/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,132 @@ describe('EventServer', () => {
});

describe('gap detection', () => {
test('server rejects payload with first inscription gap when streaming', async () => {
await db.updateInscriptions(
new TestChainhookPayloadBuilder()
.streamingBlocks(false)
.apply()
.block({
height: 778575,
hash: '0x00000000000000000002a90330a99f67e3f01eb2ce070b45930581e82fb7a91d',
timestamp: 1676913207,
})
.transaction({
hash: '9f4a9b73b0713c5da01c0a47f97c6c001af9028d6bdd9e264dfacbc4e6790201',
})
.inscriptionRevealed({
content_bytes: '0x48656C6C6F',
content_type: 'text/plain;charset=utf-8',
content_length: 5,
inscription_number: { classic: 0, jubilee: 0 },
inscription_fee: 705,
inscription_id: '9f4a9b73b0713c5da01c0a47f97c6c001af9028d6bdd9e264dfacbc4e6790201i0',
inscription_output_value: 10000,
inscriber_address: 'bc1pscktlmn99gyzlvymvrezh6vwd0l4kg06tg5rvssw0czg8873gz5sdkteqj',
ordinal_number: 257418248345364,
ordinal_block_height: 650000,
ordinal_offset: 0,
satpoint_post_inscription:
'9f4a9b73b0713c5da01c0a47f97c6c001af9028d6bdd9e264dfacbc4e6790201:0:0',
inscription_input_index: 0,
transfers_pre_inscription: 0,
tx_index: 0,
curse_type: null,
inscription_pointer: null,
delegate: null,
metaprotocol: null,
metadata: null,
parent: null,
})
.build()
);
const errorPayload1 = new TestChainhookPayloadBuilder()
.streamingBlocks(false)
.apply()
.block({
height: 778576,
hash: '00000000000000000002a90330a99f67e3f01eb2ce070b45930581e82fb7a91d',
timestamp: 1676913207,
})
.transaction({
hash: '38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dc',
})
.inscriptionRevealed({
content_bytes: '0x48656C6C6F',
content_type: 'text/plain;charset=utf-8',
content_length: 5,
inscription_number: { classic: 5, jubilee: 5 }, // Gap at 5 but block is not streamed
inscription_fee: 705,
inscription_id: '38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dci0',
inscription_output_value: 10000,
inscriber_address: 'bc1p3cyx5e2hgh53w7kpxcvm8s4kkega9gv5wfw7c4qxsvxl0u8x834qf0u2td',
ordinal_number: 1050000000000000,
ordinal_block_height: 650000,
ordinal_offset: 0,
satpoint_post_inscription:
'38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dc:0:0',
inscription_input_index: 0,
transfers_pre_inscription: 0,
tx_index: 0,
curse_type: null,
inscription_pointer: null,
delegate: null,
metaprotocol: null,
metadata: null,
parent: null,
})
.build();
// Not streamed, accepts block.
await expect(db.updateInscriptions(errorPayload1)).resolves.not.toThrow(
BadPayloadRequestError
);

const errorPayload2 = new TestChainhookPayloadBuilder()
.streamingBlocks(true)
.apply()
.block({
height: 778579,
hash: '00000000000000000002a90330a99f67e3f01eb2ce070b45930581e82fb7a91d',
timestamp: 1676913207,
})
.transaction({
hash: '38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dc',
})
.inscriptionRevealed({
content_bytes: '0x48656C6C6F',
content_type: 'text/plain;charset=utf-8',
content_length: 5,
inscription_number: { classic: 10, jubilee: 10 }, // Gap at 10
inscription_fee: 705,
inscription_id: '38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dci0',
inscription_output_value: 10000,
inscriber_address: 'bc1p3cyx5e2hgh53w7kpxcvm8s4kkega9gv5wfw7c4qxsvxl0u8x834qf0u2td',
ordinal_number: 1050000000000000,
ordinal_block_height: 650000,
ordinal_offset: 0,
satpoint_post_inscription:
'38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dc:0:0',
inscription_input_index: 0,
transfers_pre_inscription: 0,
tx_index: 0,
curse_type: null,
inscription_pointer: null,
delegate: null,
metaprotocol: null,
metadata: null,
parent: null,
})
.build();
await expect(db.updateInscriptions(errorPayload2)).rejects.toThrow(BadPayloadRequestError);
const response = await server['fastify'].inject({
method: 'POST',
url: `/payload`,
headers: { authorization: `Bearer ${ENV.ORDHOOK_NODE_AUTH_TOKEN}` },
payload: errorPayload2,
});
expect(response.statusCode).toBe(400);
});

test('server ignores past blocks', async () => {
const payload = new TestChainhookPayloadBuilder()
.apply()
Expand Down

0 comments on commit 3c1480f

Please sign in to comment.