diff --git a/src/ordhook/server.ts b/src/ordhook/server.ts index b7fae7b0..21eac30f 100644 --- a/src/ordhook/server.ts +++ b/src/ordhook/server.ts @@ -55,7 +55,11 @@ export async function startOrdhookServer(args: { db: PgStore }): Promise { - logger.info(`OrdhookServer received payload from predicate ${uuid}`); + logger.info( + `OrdhookServer received ${ + payload.chainhook.is_streaming_blocks ? 'streamed' : 'replay' + } payload from predicate ${uuid}` + ); await args.db.updateInscriptions(payload); }); return server; diff --git a/src/pg/pg-store.ts b/src/pg/pg-store.ts index 2a2a397c..c392ead9 100644 --- a/src/pg/pg-store.ts +++ b/src/pg/pg-store.ts @@ -135,7 +135,7 @@ export class PgStore extends BasePgStore { newBlockHeight: event.block_identifier.index, }); for (const writeChunk of batchIterate(writes, INSERT_BATCH_SIZE)) - await this.insertInscriptions(writeChunk); + await this.insertInscriptions(writeChunk, payload.chainhook.is_streaming_blocks); updatedBlockHeightMin = Math.min(updatedBlockHeightMin, event.block_identifier.index); logger.info( `PgStore ingested block ${event.block_identifier.index} in ${time.getElapsedSeconds()}s` @@ -487,7 +487,10 @@ export class PgStore extends BasePgStore { `; // roughly 35 days of blocks, assuming 10 minute block times on a full database } - private async insertInscriptions(reveals: InscriptionEventData[]): Promise { + private async insertInscriptions( + reveals: InscriptionEventData[], + streamed: boolean + ): Promise { if (reveals.length === 0) return; await this.sqlWriteTransaction(async sql => { const inscriptionInserts: InscriptionInsert[] = []; @@ -549,31 +552,30 @@ export class PgStore extends BasePgStore { updated_at = NOW() `; const pointers: DbLocationPointerInsert[] = []; - for (const batch of batchIterate(locationInserts, INSERT_BATCH_SIZE)) - pointers.push( - ...(await sql` - INSERT INTO locations ${sql(batch)} - ON CONFLICT ON CONSTRAINT locations_inscription_id_block_height_tx_index_unique DO UPDATE SET - genesis_id = EXCLUDED.genesis_id, - block_hash = EXCLUDED.block_hash, - tx_id = EXCLUDED.tx_id, - address = EXCLUDED.address, - value = EXCLUDED.value, - output = EXCLUDED.output, - "offset" = EXCLUDED.offset, - timestamp = EXCLUDED.timestamp - RETURNING inscription_id, id AS location_id, block_height, tx_index, address - `) - ); + for (const batch of batchIterate(locationInserts, INSERT_BATCH_SIZE)) { + const pointerBatch = await sql` + INSERT INTO locations ${sql(batch)} + ON CONFLICT ON CONSTRAINT locations_inscription_id_block_height_tx_index_unique DO UPDATE SET + genesis_id = EXCLUDED.genesis_id, + block_hash = EXCLUDED.block_hash, + tx_id = EXCLUDED.tx_id, + address = EXCLUDED.address, + value = EXCLUDED.value, + output = EXCLUDED.output, + "offset" = EXCLUDED.offset, + timestamp = EXCLUDED.timestamp + RETURNING inscription_id, id AS location_id, block_height, tx_index, address + `; + await this.updateInscriptionLocationPointers(pointerBatch); + pointers.push(...pointerBatch); + } await this.updateInscriptionRecursions(reveals); - if (transferredOrdinalNumbers.length) + if (streamed && transferredOrdinalNumbers.length) await sql` UPDATE inscriptions SET updated_at = NOW() WHERE sat_ordinal IN ${sql(transferredOrdinalNumbers)} `; - for (const batch of batchIterate(pointers, INSERT_BATCH_SIZE)) - await this.updateInscriptionLocationPointers(batch); for (const reveal of reveals) { const action = 'inscription' in reveal