From e2bb077d439d6590c68f02d9c4ca217cf535f614 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Wo=C5=9B?= Date: Wed, 27 Sep 2023 13:50:08 +0100 Subject: [PATCH] check stored blocks serialization, comments, updates --- .../fuel-indexer-database/postgres/src/lib.rs | 26 ++++++++- packages/fuel-indexer-database/src/queries.rs | 14 +++++ ...sts__commands__default_indexer_config.snap | 1 + packages/fuel-indexer/src/commands/run.rs | 16 +++--- packages/fuel-indexer/src/service.rs | 54 ++++++++++++++++++- 5 files changed, 101 insertions(+), 10 deletions(-) diff --git a/packages/fuel-indexer-database/postgres/src/lib.rs b/packages/fuel-indexer-database/postgres/src/lib.rs index 110af2e69..b02d1da8a 100644 --- a/packages/fuel-indexer-database/postgres/src/lib.rs +++ b/packages/fuel-indexer-database/postgres/src/lib.rs @@ -498,6 +498,7 @@ pub async fn register_indexer( }) } +/// Save `BlockData` to the database. pub async fn save_block_data( conn: &mut PoolConnection, blockdata: &[BlockData], @@ -520,12 +521,30 @@ pub async fn save_block_data( Ok(()) } +/// Load `BlockData` from the database. pub async fn load_block_data( conn: &mut PoolConnection, start_block: u32, end_block: Option, limit: usize, ) -> sqlx::Result> { + let raw = load_raw_block_data(conn, start_block, end_block, limit).await?; + + let mut blocks = Vec::new(); + for bytes in raw { + let blockdata: BlockData = fuel_indexer_lib::utils::deserialize(&bytes).unwrap(); + blocks.push(blockdata); + } + Ok(blocks) +} + +/// Load raw `BlockData` bytes from the database. +pub async fn load_raw_block_data( + conn: &mut PoolConnection, + start_block: u32, + end_block: Option, + limit: usize, +) -> sqlx::Result>> { let end_condition = end_block .map(|x| format!("AND block_height <= {x}")) .unwrap_or("".to_string()); @@ -536,8 +555,7 @@ pub async fn load_block_data( let mut blocks = Vec::new(); for row in rows { let bytes = row.get::, usize>(0); - let blockdata: BlockData = fuel_indexer_lib::utils::deserialize(&bytes).unwrap(); - blocks.push(blockdata); + blocks.push(bytes); } Ok(blocks) } @@ -982,6 +1000,8 @@ pub async fn put_many_to_many_record( Ok(()) } +/// Create a database trigger on indexmetadataentity table in the indexers +/// schema which ensures that indexed blocks must be consecutive. pub async fn create_ensure_block_height_consecutive_trigger( conn: &mut PoolConnection, namespace: &str, @@ -1027,6 +1047,8 @@ pub async fn create_ensure_block_height_consecutive_trigger( Ok(()) } +/// Remove the database trogger which ensures that the indexed blocks must be +/// consecutive. pub async fn remove_ensure_block_height_consecutive_trigger( conn: &mut PoolConnection, namespace: &str, diff --git a/packages/fuel-indexer-database/src/queries.rs b/packages/fuel-indexer-database/src/queries.rs index 30efafc05..efed1a3a9 100644 --- a/packages/fuel-indexer-database/src/queries.rs +++ b/packages/fuel-indexer-database/src/queries.rs @@ -246,6 +246,20 @@ pub async fn load_block_data( } } +/// Load raw `BlockData` bytes from the database. +pub async fn load_raw_block_data( + conn: &mut IndexerConnection, + start_block: u32, + end_block: Option, + limit: usize, +) -> sqlx::Result>> { + match conn { + IndexerConnection::Postgres(ref mut c) => { + postgres::load_raw_block_data(c, start_block, end_block, limit).await + } + } +} + /// Remove all stored `BlockData` from the database. pub async fn remove_block_data(conn: &mut IndexerConnection) -> sqlx::Result { match conn { diff --git a/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__default_indexer_config.snap b/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__default_indexer_config.snap index 2a5063c1b..cc1e42023 100644 --- a/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__default_indexer_config.snap +++ b/packages/fuel-indexer-tests/tests/snapshots/integration_tests__commands__default_indexer_config.snap @@ -42,3 +42,4 @@ block_page_size: 20 enable_block_store: false remove_stored_blocks: false allow_non_sequential_blocks: false + diff --git a/packages/fuel-indexer/src/commands/run.rs b/packages/fuel-indexer/src/commands/run.rs index 2f78bf5c0..a27f1a0be 100644 --- a/packages/fuel-indexer/src/commands/run.rs +++ b/packages/fuel-indexer/src/commands/run.rs @@ -110,13 +110,6 @@ pub async fn exec(args: IndexerArgs) -> anyhow::Result<()> { info!("Succesfully removed {count} blocks."); } - if config.enable_block_store { - subsystems.spawn(crate::service::create_block_sync_task( - config.clone(), - pool.clone(), - )); - }; - #[allow(unused)] let (tx, rx) = channel::(defaults::SERVICE_REQUEST_CHANNEL_SIZE); @@ -125,6 +118,15 @@ pub async fn exec(args: IndexerArgs) -> anyhow::Result<()> { queries::run_migration(&mut c).await?; } + // Block Sync must be started after migrations to ensure that the database + // table has been created. + if config.enable_block_store { + subsystems.spawn(crate::service::create_block_sync_task( + config.clone(), + pool.clone(), + )); + }; + let mut service = IndexerService::new(config.clone(), pool.clone(), rx).await?; match manifest.map(|p| { diff --git a/packages/fuel-indexer/src/service.rs b/packages/fuel-indexer/src/service.rs index acdd05f4b..886d50997 100644 --- a/packages/fuel-indexer/src/service.rs +++ b/packages/fuel-indexer/src/service.rs @@ -4,6 +4,7 @@ use crate::{ }; use async_std::sync::{Arc, Mutex}; use async_std::{fs::File, io::ReadExt}; +use fuel_core_client::client::FuelClient; use fuel_indexer_database::{ queries, types::IndexerAssetType, IndexerConnection, IndexerConnectionPool, }; @@ -422,6 +423,10 @@ pub(crate) async fn create_block_sync_task( fuel_core_client::client::FuelClient::new(config.fuel_node.uri().to_string()) .unwrap_or_else(|e| panic!("Client node connection failed: {e}.")); + check_stored_block_data(pool.clone(), task_id, &config, &client) + .await + .unwrap(); + loop { // Get the next page of blocks, and the starting cursor for the subsequent page let (block_info, next_cursor, _has_next_page) = @@ -438,7 +443,7 @@ pub(crate) async fn create_block_sync_task( if !block_info.is_empty() { let first = block_info[0].height; let last = block_info.last().unwrap().height; - info!("{task_id}: retrieved blocks: {}-{}.", first, last); + info!("{task_id}: retrieved blocks {}-{}.", first, last); } (block_info, next_cursor, _has_next_page) } @@ -470,3 +475,50 @@ pub(crate) async fn create_block_sync_task( } } } + +async fn check_stored_block_data( + pool: IndexerConnectionPool, + task_id: &str, + config: &IndexerConfig, + client: &FuelClient, +) -> IndexerResult<()> { + let (block_data_client, _, _) = crate::executor::retrieve_blocks_from_node( + client, + config.block_page_size, + &Some("0".to_string()), + None, + task_id, + ) + .await?; + + let block_data_client: Vec> = block_data_client + .iter() + .map(|bd| fuel_indexer_lib::utils::serialize(bd)) + .collect(); + + let mut conn = pool.acquire().await?; + + let block_data_database = queries::load_raw_block_data( + &mut conn, + 1, + Some(config.block_page_size as u32), + config.block_page_size, + ) + .await?; + + if block_data_database.is_empty() { + return Ok(()); + } + + if block_data_client.len() != block_data_database.len() { + return Err(IndexerError::Unknown("TODO".to_string())); + } + + if block_data_client != block_data_database { + warn!("{task_id} detected serialization format change. Removing stored blocks. {task_id} will re-sync blocks from client."); + let count = queries::remove_block_data(&mut conn).await?; + warn!("{task_id} succesfully removed {count} blocks."); + } + + Ok(()) +}