Skip to content

Commit

Permalink
feat: draft queue mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
Ludo Galabru committed Oct 12, 2023
1 parent fe56ee9 commit 78b666b
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 9 deletions.
39 changes: 39 additions & 0 deletions Ordhook.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
[storage]
working_dir = "/tmp/ordinals/sdk"

# The Http Api allows you to register / deregister
# dynamically predicates.
# Disable by default.
#
# [http_api]
# http_port = 20456
# database_uri = "redis://localhost:6379/"

[network]
mode = "mainnet"
bitcoind_rpc_url = "http://0.0.0.0:8332"
bitcoind_rpc_username = "devnet"
bitcoind_rpc_password = "devnet"
# Bitcoin block events can be received by Chainhook
# either through a Bitcoin node's ZeroMQ interface,
# or through the Stacks node. Zmq is being
# used by default:
bitcoind_zmq_url = "tcp://0.0.0.0:18543"
# but stacks can also be used:
# stacks_node_rpc_url = "http://0.0.0.0:20443"

[limits]
max_number_of_bitcoin_predicates = 100
max_number_of_concurrent_bitcoin_scans = 100
max_number_of_processing_threads = 16
bitcoin_concurrent_http_requests_max = 16
max_caching_memory_size_mb = 32000

# Disable the following section if the state
# must be built locally
[bootstrap]
download_url = "https://archive.hiro.so/mainnet/ordhook/mainnet-ordhook-sqlite-latest"

[logs]
ordinals_internals = true
chainhook_internals = true
18 changes: 18 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
"node-pg-migrate": "^6.2.2",
"pino": "^8.10.0",
"postgres": "^3.3.4",
"queue": "^6.0.0",
"undici": "^5.8.0"
}
}
14 changes: 10 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,23 @@ import { ApiMetrics } from './metrics/metrics';
import { PgStore } from './pg/pg-store';
import { buildAdminRpcServer } from './admin-rpc/init';
import { buildOrdhookIndexer } from './ordhook';
import Queue from 'queue';

function numberRange(start: number, end: number) {
return Array.from(Array(end - start + 1).keys()).map(x => x + start);
}

async function initBackgroundServices(db: PgStore) {
async function initBackgroundServices(jobQueue: Queue, db: PgStore) {
logger.info('Initializing background services...');
const ordhook = buildOrdhookIndexer({ db });
const ordhook = buildOrdhookIndexer(jobQueue, db);
registerShutdownConfig({
name: 'Ordhook Indexer',
forceKillable: false,
handler: async () => {
await ordhook.terminate();
},
});
await ordhook.streamBlocks();
await ordhook.replayBlockRange(767430, 807750);
// await ordhook.replayBlocks(numberRange(767430, 809386));
}

Expand Down Expand Up @@ -57,8 +58,13 @@ async function initApp() {
logger.info(`Initializing in ${ENV.RUN_MODE} run mode...`);
const db = await PgStore.connect({ skipMigrations: ENV.RUN_MODE === 'readonly' });

const jobQueue = new Queue({
concurrency: 1,
autostart: true,
});

if (['default', 'writeonly'].includes(ENV.RUN_MODE)) {
await initBackgroundServices(db);
await initBackgroundServices(jobQueue, db);
}
if (['default', 'readonly'].includes(ENV.RUN_MODE)) {
await initApiService(db);
Expand Down
36 changes: 31 additions & 5 deletions src/ordhook/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,50 @@ import { OrdinalsIndexer } from '@hirosystems/ordhook-sdk-js';
import { ENV } from '../env';
import { PgStore } from '../pg/pg-store';
import { BitcoinEvent } from '@hirosystems/chainhook-client';
import Queue from 'queue';

export interface OrdhookBlock {
block: BitcoinEvent;
}

export function buildOrdhookIndexer(args: { db: PgStore }): OrdinalsIndexer {
export function buildOrdhookIndexer(jobQueue: Queue, db: PgStore): OrdinalsIndexer {
const ordhook = new OrdinalsIndexer({
bitcoinRpcUrl: ENV.BITCOIN_RPC_URL,
bitcoinRpcUsername: ENV.BITCOIN_RPC_USERNAME,
bitcoinRpcPassword: ENV.BITCOIN_RPC_PASSWORD,
workingDirectory: ENV.ORDHOOK_WORKING_DIR,
logs: true,
});
ordhook.onBlock(async block => {
await args.db.insertBlock(block as OrdhookBlock);
ordhook.onBlock(block => {
console.log(`Queue size: ${jobQueue.length}`);

// Early return: if the queue is full, reject the block
if (jobQueue.length > 10) {
console.log("Blocking");
return false;
}

// Enqueue
jobQueue.push(async () => {
await db.insertBlock(block as OrdhookBlock)
})
return true;
});
ordhook.onBlockRollBack(async block => {
await args.db.rollBackBlock(block as OrdhookBlock);

ordhook.onBlockRollBack(block => {
console.log(`Queue size: ${jobQueue.length}`);

// Early return: if the queue is full, reject the block
if (jobQueue.length > 10) {
console.log("Blocking");
return false;
}

// Enqueue
jobQueue.push(async () => {
await db.rollBackBlock(block as OrdhookBlock);
})
return true;
});
return ordhook;
}

0 comments on commit 78b666b

Please sign in to comment.