diff --git a/src/pg/block-cache.ts b/src/pg/block-cache.ts index 2dc9f249..128dcba7 100644 --- a/src/pg/block-cache.ts +++ b/src/pg/block-cache.ts @@ -28,6 +28,7 @@ export class BlockCache { locations: DbLocationInsert[] = []; currentLocations = new Map(); recursiveRefs = new Map(); + revealedNumbers: number[] = []; mimeTypeCounts = new Map(); satRarityCounts = new Map(); @@ -72,6 +73,7 @@ export class BlockCache { parent: reveal.parent, timestamp: this.timestamp, }); + this.revealedNumbers.push(reveal.inscription_number.jubilee); this.increaseMimeTypeCount(mime_type); this.increaseSatRarityCount(satoshi.rarity); this.increaseInscriptionTypeCount(reveal.inscription_number.classic < 0 ? 'cursed' : 'blessed'); diff --git a/src/pg/pg-store.ts b/src/pg/pg-store.ts index 0c9267b4..0dbf25a2 100644 --- a/src/pg/pg-store.ts +++ b/src/pg/pg-store.ts @@ -8,7 +8,11 @@ import { runMigrations, stopwatch, } from '@hirosystems/api-toolkit'; -import { BitcoinEvent, BitcoinPayload } from '@hirosystems/chainhook-client'; +import { + BadPayloadRequestError, + BitcoinEvent, + BitcoinPayload, +} from '@hirosystems/chainhook-client'; import * as path from 'path'; import * as postgres from 'postgres'; import { Order, OrderBy } from '../api/schemas'; @@ -35,6 +39,8 @@ export const INSERT_BATCH_SIZE = 4000; type InscriptionIdentifier = { genesis_id: string } | { number: number }; +class BlockAlreadyIngestedError extends Error {} + export class PgStore extends BasePgStore { readonly brc20: Brc20PgStore; readonly counts: CountsPgStore; @@ -90,14 +96,17 @@ export class PgStore extends BasePgStore { ); } for (const event of payload.apply) { - if (await this.isBlockIngested(event)) { - logger.warn(`PgStore skipping previously seen block ${event.block_identifier.index}`); - continue; - } logger.info(`PgStore apply block ${event.block_identifier.index}`); const time = stopwatch(); - await this.updateInscriptionsEvent(sql, event, 'apply', streamed); - await this.brc20.updateBrc20Operations(sql, event, 'apply'); + try { + await this.updateInscriptionsEvent(sql, event, 'apply', streamed); + await this.brc20.updateBrc20Operations(sql, event, 'apply'); + } catch (error) { + if (error instanceof BlockAlreadyIngestedError) { + logger.warn(error); + continue; + } else throw error; + } await this.updateChainTipBlockHeight(sql, event.block_identifier.index); logger.info( `PgStore apply block ${ @@ -119,6 +128,7 @@ export class PgStore extends BasePgStore { normalizedHexString(event.block_identifier.hash), event.timestamp ); + if (direction === 'apply') await this.assertNextBlockIsNotIngested(sql, event); for (const tx of event.transactions) { const tx_id = normalizedHexString(tx.transaction_identifier.hash); for (const operation of tx.metadata.ordinal_operations) { @@ -138,6 +148,7 @@ export class PgStore extends BasePgStore { } switch (direction) { case 'apply': + if (streamed) await this.assertNextBlockIsContiguous(sql, event, cache); await this.applyInscriptions(sql, cache, streamed); break; case 'rollback': @@ -348,15 +359,44 @@ export class PgStore extends BasePgStore { } } - private async isBlockIngested(event: BitcoinEvent): Promise { - const currentBlockHeight = await this.getChainTipBlockHeight(); + private async assertNextBlockIsNotIngested(sql: PgSqlClient, event: BitcoinEvent) { + const result = await sql<{ block_height: number }[]>` + SELECT block_height::int FROM chain_tip + `; + if (!result.count) return false; + const currentHeight = result[0].block_height; if ( - event.block_identifier.index <= currentBlockHeight && + event.block_identifier.index <= currentHeight && event.block_identifier.index !== ORDINALS_GENESIS_BLOCK ) { - return true; + throw new BlockAlreadyIngestedError( + `Block ${event.block_identifier.index} is already ingested, chain tip is at ${currentHeight}` + ); + } + } + + private async assertNextBlockIsContiguous( + sql: PgSqlClient, + event: BitcoinEvent, + cache: BlockCache + ) { + if (!cache.revealedNumbers.length) { + // TODO: How do we check blocks with only transfers? + return; } - return false; + const result = await sql<{ max: number | null; block_height: number }[]>` + WITH tip AS (SELECT block_height::int FROM chain_tip) + SELECT MAX(number)::int AS max, (SELECT block_height FROM tip) + FROM inscriptions WHERE number >= 0 + `; + if (!result.count) return; + const data = result[0]; + const firstReveal = cache.revealedNumbers.sort()[0]; + if (data.max === null && firstReveal === 0) return; + if ((data.max ?? 0) + 1 != firstReveal) + throw new BadPayloadRequestError( + `Streamed block ${event.block_identifier.index} is non-contiguous, attempting to reveal #${firstReveal} when current max is #${data.max} at block height ${data.block_height}` + ); } private async updateChainTipBlockHeight(sql: PgSqlClient, block_height: number): Promise { diff --git a/tests/api/cache.test.ts b/tests/api/cache.test.ts index 9eca102e..9c2fc564 100644 --- a/tests/api/cache.test.ts +++ b/tests/api/cache.test.ts @@ -21,6 +21,7 @@ describe('ETag cache', () => { test('inscription cache control', async () => { const block = new TestChainhookPayloadBuilder() + .streamingBlocks(true) .apply() .block({ height: 775617 }) .transaction({ hash: '0x38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dc' }) @@ -88,6 +89,7 @@ describe('ETag cache', () => { // Perform transfer and check cache await db.updateInscriptions( new TestChainhookPayloadBuilder() + .streamingBlocks(true) .apply() .block({ height: 775618, timestamp: 1678122360 }) .transaction({ @@ -125,6 +127,7 @@ describe('ETag cache', () => { // Perform transfer GAP FILL and check cache await db.updateInscriptions( new TestChainhookPayloadBuilder() + .streamingBlocks(true) .apply() .block({ height: 775619, timestamp: 1678122360 }) .transaction({ @@ -161,6 +164,7 @@ describe('ETag cache', () => { test('inscriptions index cache control', async () => { const block1 = new TestChainhookPayloadBuilder() + .streamingBlocks(true) .apply() .block({ height: 778575 }) .transaction({ hash: '0x9f4a9b73b0713c5da01c0a47f97c6c001af9028d6bdd9e264dfacbc4e6790201' }) @@ -194,6 +198,7 @@ describe('ETag cache', () => { .build(); await db.updateInscriptions(block1); const block2 = new TestChainhookPayloadBuilder() + .streamingBlocks(true) .apply() .block({ height: 778576 }) .transaction({ hash: '0x00000000000000000002a90330a99f67e3f01eb2ce070b45930581e82fb7a91d' }) @@ -246,6 +251,7 @@ describe('ETag cache', () => { // New location const block3 = new TestChainhookPayloadBuilder() + .streamingBlocks(true) .apply() .block({ height: 778577 }) .transaction({ hash: 'ae9d273a10e899f0d2cad47ee2b0e77ab8a9addd9dd5bb5e4b03d6971c060d52' }) @@ -274,6 +280,7 @@ describe('ETag cache', () => { test('inscriptions stats per block cache control', async () => { const block1 = new TestChainhookPayloadBuilder() + .streamingBlocks(true) .apply() .block({ height: 778575, hash: randomHash() }) .transaction({ hash: '0x9f4a9b73b0713c5da01c0a47f97c6c001af9028d6bdd9e264dfacbc4e6790201' }) @@ -326,6 +333,7 @@ describe('ETag cache', () => { // New block const block2 = new TestChainhookPayloadBuilder() + .streamingBlocks(true) .apply() .block({ height: 778576, hash: randomHash() }) .transaction({ hash: '0x00000000000000000002a90330a99f67e3f01eb2ce070b45930581e82fb7a91d' }) @@ -370,6 +378,7 @@ describe('ETag cache', () => { test('status etag changes with new block', async () => { const block1 = new TestChainhookPayloadBuilder() + .streamingBlocks(true) .apply() .block({ height: 778575, hash: randomHash() }) .transaction({ hash: '0x9f4a9b73b0713c5da01c0a47f97c6c001af9028d6bdd9e264dfacbc4e6790201' }) @@ -422,6 +431,7 @@ describe('ETag cache', () => { // New block const block2 = new TestChainhookPayloadBuilder() + .streamingBlocks(true) .apply() .block({ height: 778576, hash: randomHash() }) .transaction({ hash: '0x00000000000000000002a90330a99f67e3f01eb2ce070b45930581e82fb7a91d' }) diff --git a/tests/helpers.ts b/tests/helpers.ts index 84750cf4..9450f56c 100644 --- a/tests/helpers.ts +++ b/tests/helpers.ts @@ -30,7 +30,7 @@ export class TestChainhookPayloadBuilder { operation: 'inscription_feed', meta_protocols: ['brc-20'], }, - is_streaming_blocks: true, + is_streaming_blocks: false, }, }; private action: 'apply' | 'rollback' = 'apply'; diff --git a/tests/ordhook/replay.test.ts b/tests/ordhook/replay.test.ts index 1e07d625..e6eef06e 100644 --- a/tests/ordhook/replay.test.ts +++ b/tests/ordhook/replay.test.ts @@ -22,6 +22,7 @@ describe('Replay', () => { test('shuts down when streaming on replay mode', async () => { const payload1 = new TestChainhookPayloadBuilder() + .streamingBlocks(true) .apply() .block({ height: 767430, diff --git a/tests/ordhook/server.test.ts b/tests/ordhook/server.test.ts index a09a217c..4f8a4d23 100644 --- a/tests/ordhook/server.test.ts +++ b/tests/ordhook/server.test.ts @@ -572,6 +572,132 @@ describe('EventServer', () => { }); describe('gap detection', () => { + test('server rejects payload with first inscription gap when streaming', async () => { + await db.updateInscriptions( + new TestChainhookPayloadBuilder() + .streamingBlocks(false) + .apply() + .block({ + height: 778575, + hash: '0x00000000000000000002a90330a99f67e3f01eb2ce070b45930581e82fb7a91d', + timestamp: 1676913207, + }) + .transaction({ + hash: '9f4a9b73b0713c5da01c0a47f97c6c001af9028d6bdd9e264dfacbc4e6790201', + }) + .inscriptionRevealed({ + content_bytes: '0x48656C6C6F', + content_type: 'text/plain;charset=utf-8', + content_length: 5, + inscription_number: { classic: 0, jubilee: 0 }, + inscription_fee: 705, + inscription_id: '9f4a9b73b0713c5da01c0a47f97c6c001af9028d6bdd9e264dfacbc4e6790201i0', + inscription_output_value: 10000, + inscriber_address: 'bc1pscktlmn99gyzlvymvrezh6vwd0l4kg06tg5rvssw0czg8873gz5sdkteqj', + ordinal_number: 257418248345364, + ordinal_block_height: 650000, + ordinal_offset: 0, + satpoint_post_inscription: + '9f4a9b73b0713c5da01c0a47f97c6c001af9028d6bdd9e264dfacbc4e6790201:0:0', + inscription_input_index: 0, + transfers_pre_inscription: 0, + tx_index: 0, + curse_type: null, + inscription_pointer: null, + delegate: null, + metaprotocol: null, + metadata: null, + parent: null, + }) + .build() + ); + const errorPayload1 = new TestChainhookPayloadBuilder() + .streamingBlocks(false) + .apply() + .block({ + height: 778576, + hash: '00000000000000000002a90330a99f67e3f01eb2ce070b45930581e82fb7a91d', + timestamp: 1676913207, + }) + .transaction({ + hash: '38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dc', + }) + .inscriptionRevealed({ + content_bytes: '0x48656C6C6F', + content_type: 'text/plain;charset=utf-8', + content_length: 5, + inscription_number: { classic: 5, jubilee: 5 }, // Gap at 5 but block is not streamed + inscription_fee: 705, + inscription_id: '38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dci0', + inscription_output_value: 10000, + inscriber_address: 'bc1p3cyx5e2hgh53w7kpxcvm8s4kkega9gv5wfw7c4qxsvxl0u8x834qf0u2td', + ordinal_number: 1050000000000000, + ordinal_block_height: 650000, + ordinal_offset: 0, + satpoint_post_inscription: + '38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dc:0:0', + inscription_input_index: 0, + transfers_pre_inscription: 0, + tx_index: 0, + curse_type: null, + inscription_pointer: null, + delegate: null, + metaprotocol: null, + metadata: null, + parent: null, + }) + .build(); + // Not streamed, accepts block. + await expect(db.updateInscriptions(errorPayload1)).resolves.not.toThrow( + BadPayloadRequestError + ); + + const errorPayload2 = new TestChainhookPayloadBuilder() + .streamingBlocks(true) + .apply() + .block({ + height: 778579, + hash: '00000000000000000002a90330a99f67e3f01eb2ce070b45930581e82fb7a91d', + timestamp: 1676913207, + }) + .transaction({ + hash: '38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dc', + }) + .inscriptionRevealed({ + content_bytes: '0x48656C6C6F', + content_type: 'text/plain;charset=utf-8', + content_length: 5, + inscription_number: { classic: 10, jubilee: 10 }, // Gap at 10 + inscription_fee: 705, + inscription_id: '38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dci0', + inscription_output_value: 10000, + inscriber_address: 'bc1p3cyx5e2hgh53w7kpxcvm8s4kkega9gv5wfw7c4qxsvxl0u8x834qf0u2td', + ordinal_number: 1050000000000000, + ordinal_block_height: 650000, + ordinal_offset: 0, + satpoint_post_inscription: + '38c46a8bf7ec90bc7f6b797e7dc84baa97f4e5fd4286b92fe1b50176d03b18dc:0:0', + inscription_input_index: 0, + transfers_pre_inscription: 0, + tx_index: 0, + curse_type: null, + inscription_pointer: null, + delegate: null, + metaprotocol: null, + metadata: null, + parent: null, + }) + .build(); + await expect(db.updateInscriptions(errorPayload2)).rejects.toThrow(BadPayloadRequestError); + const response = await server['fastify'].inject({ + method: 'POST', + url: `/payload`, + headers: { authorization: `Bearer ${ENV.ORDHOOK_NODE_AUTH_TOKEN}` }, + payload: errorPayload2, + }); + expect(response.statusCode).toBe(400); + }); + test('server ignores past blocks', async () => { const payload = new TestChainhookPayloadBuilder() .apply()