diff --git a/packages/fuel-indexer-database/postgres/src/lib.rs b/packages/fuel-indexer-database/postgres/src/lib.rs index 110af2e69..b02d1da8a 100644 --- a/packages/fuel-indexer-database/postgres/src/lib.rs +++ b/packages/fuel-indexer-database/postgres/src/lib.rs @@ -498,6 +498,7 @@ pub async fn register_indexer( }) } +/// Save `BlockData` to the database. pub async fn save_block_data( conn: &mut PoolConnection, blockdata: &[BlockData], @@ -520,12 +521,30 @@ pub async fn save_block_data( Ok(()) } +/// Load `BlockData` from the database. pub async fn load_block_data( conn: &mut PoolConnection, start_block: u32, end_block: Option, limit: usize, ) -> sqlx::Result> { + let raw = load_raw_block_data(conn, start_block, end_block, limit).await?; + + let mut blocks = Vec::new(); + for bytes in raw { + let blockdata: BlockData = fuel_indexer_lib::utils::deserialize(&bytes).unwrap(); + blocks.push(blockdata); + } + Ok(blocks) +} + +/// Load raw `BlockData` bytes from the database. +pub async fn load_raw_block_data( + conn: &mut PoolConnection, + start_block: u32, + end_block: Option, + limit: usize, +) -> sqlx::Result>> { let end_condition = end_block .map(|x| format!("AND block_height <= {x}")) .unwrap_or("".to_string()); @@ -536,8 +555,7 @@ pub async fn load_block_data( let mut blocks = Vec::new(); for row in rows { let bytes = row.get::, usize>(0); - let blockdata: BlockData = fuel_indexer_lib::utils::deserialize(&bytes).unwrap(); - blocks.push(blockdata); + blocks.push(bytes); } Ok(blocks) } @@ -982,6 +1000,8 @@ pub async fn put_many_to_many_record( Ok(()) } +/// Create a database trigger on indexmetadataentity table in the indexers +/// schema which ensures that indexed blocks must be consecutive. pub async fn create_ensure_block_height_consecutive_trigger( conn: &mut PoolConnection, namespace: &str, @@ -1027,6 +1047,8 @@ pub async fn create_ensure_block_height_consecutive_trigger( Ok(()) } +/// Remove the database trogger which ensures that the indexed blocks must be +/// consecutive. pub async fn remove_ensure_block_height_consecutive_trigger( conn: &mut PoolConnection, namespace: &str, diff --git a/packages/fuel-indexer-database/src/queries.rs b/packages/fuel-indexer-database/src/queries.rs index 30efafc05..efed1a3a9 100644 --- a/packages/fuel-indexer-database/src/queries.rs +++ b/packages/fuel-indexer-database/src/queries.rs @@ -246,6 +246,20 @@ pub async fn load_block_data( } } +/// Load raw `BlockData` bytes from the database. +pub async fn load_raw_block_data( + conn: &mut IndexerConnection, + start_block: u32, + end_block: Option, + limit: usize, +) -> sqlx::Result>> { + match conn { + IndexerConnection::Postgres(ref mut c) => { + postgres::load_raw_block_data(c, start_block, end_block, limit).await + } + } +} + /// Remove all stored `BlockData` from the database. pub async fn remove_block_data(conn: &mut IndexerConnection) -> sqlx::Result { match conn { diff --git a/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__default_indexer_config.snap b/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__default_indexer_config.snap index 2a5063c1b..cc1e42023 100644 --- a/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__default_indexer_config.snap +++ b/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__default_indexer_config.snap @@ -42,3 +42,4 @@ block_page_size: 20 enable_block_store: false remove_stored_blocks: false allow_non_sequential_blocks: false + diff --git a/packages/fuel-indexer/src/commands/run.rs b/packages/fuel-indexer/src/commands/run.rs index 2f78bf5c0..a7c5b2c13 100644 --- a/packages/fuel-indexer/src/commands/run.rs +++ b/packages/fuel-indexer/src/commands/run.rs @@ -107,16 +107,9 @@ pub async fn exec(args: IndexerArgs) -> anyhow::Result<()> { info!("Removing stored blocks."); let mut conn = pool.acquire().await?; let count = queries::remove_block_data(&mut conn).await?; - info!("Succesfully removed {count} blocks."); + info!("Successfully removed {count} blocks."); } - if config.enable_block_store { - subsystems.spawn(crate::service::create_block_sync_task( - config.clone(), - pool.clone(), - )); - }; - #[allow(unused)] let (tx, rx) = channel::(defaults::SERVICE_REQUEST_CHANNEL_SIZE); @@ -125,6 +118,15 @@ pub async fn exec(args: IndexerArgs) -> anyhow::Result<()> { queries::run_migration(&mut c).await?; } + // Block Sync must be started after migrations to ensure that the database + // table has been created. + if config.enable_block_store { + subsystems.spawn(crate::service::create_block_sync_task( + config.clone(), + pool.clone(), + )); + }; + let mut service = IndexerService::new(config.clone(), pool.clone(), rx).await?; match manifest.map(|p| { diff --git a/packages/fuel-indexer/src/service.rs b/packages/fuel-indexer/src/service.rs index acdd05f4b..c4cb59f48 100644 --- a/packages/fuel-indexer/src/service.rs +++ b/packages/fuel-indexer/src/service.rs @@ -4,6 +4,7 @@ use crate::{ }; use async_std::sync::{Arc, Mutex}; use async_std::{fs::File, io::ReadExt}; +use fuel_core_client::client::FuelClient; use fuel_indexer_database::{ queries, types::IndexerAssetType, IndexerConnection, IndexerConnectionPool, }; @@ -410,6 +411,14 @@ pub(crate) async fn create_block_sync_task( let mut conn = pool.acquire().await.unwrap(); + let client = + fuel_core_client::client::FuelClient::new(config.fuel_node.uri().to_string()) + .unwrap_or_else(|e| panic!("Client node connection failed: {e}.")); + + check_stored_block_data(pool.clone(), task_id, &config, &client) + .await + .unwrap_or_else(|_| panic!("{task_id} stored blocks verification failed.")); + let last_height = queries::last_block_height_for_stored_blocks(&mut conn) .await .unwrap_or_else(|_| panic!("{task_id} was unable to determine the last block height for stored blocks.")); @@ -418,10 +427,6 @@ pub(crate) async fn create_block_sync_task( info!("{task_id}: starting from Block#{}", last_height + 1); - let client = - fuel_core_client::client::FuelClient::new(config.fuel_node.uri().to_string()) - .unwrap_or_else(|e| panic!("Client node connection failed: {e}.")); - loop { // Get the next page of blocks, and the starting cursor for the subsequent page let (block_info, next_cursor, _has_next_page) = @@ -438,7 +443,7 @@ pub(crate) async fn create_block_sync_task( if !block_info.is_empty() { let first = block_info[0].height; let last = block_info.last().unwrap().height; - info!("{task_id}: retrieved blocks: {}-{}.", first, last); + info!("{task_id}: retrieved blocks {}-{}.", first, last); } (block_info, next_cursor, _has_next_page) } @@ -470,3 +475,51 @@ pub(crate) async fn create_block_sync_task( } } } + +// We store serialized `BlockData` in the database. Since it is not versioned, +// we need a mechanism to detect whether the format of `BlockData` has changed. +// This function fetches some blocks from the client, serializes them, and then +// compares to those stored in the database. If they are not the same, the +// blocks in the database are purged. +async fn check_stored_block_data( + pool: IndexerConnectionPool, + task_id: &str, + config: &IndexerConfig, + client: &FuelClient, +) -> IndexerResult<()> { + let (block_data_client, _, _) = crate::executor::retrieve_blocks_from_node( + client, + config.block_page_size, + &Some("0".to_string()), + None, + task_id, + ) + .await?; + + let block_data_client: Vec> = block_data_client + .iter() + .map(|bd| fuel_indexer_lib::utils::serialize(bd)) + .collect(); + + let mut conn = pool.acquire().await?; + + let block_data_database = queries::load_raw_block_data( + &mut conn, + 1, + Some(config.block_page_size as u32), + config.block_page_size, + ) + .await?; + + if block_data_database.is_empty() { + return Ok(()); + } + + if block_data_client != block_data_database { + warn!("{task_id} detected serialization format change. Removing stored blocks. {task_id} will re-sync blocks from the client."); + let count = queries::remove_block_data(&mut conn).await?; + warn!("{task_id} successfully removed {count} blocks."); + } + + Ok(()) +}