Skip to content

Commit

Permalink
check stored blocks serialization, comments, updates
Browse files Browse the repository at this point in the history
  • Loading branch information
lostman committed Sep 27, 2023
1 parent a49f365 commit e2bb077
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 10 deletions.
26 changes: 24 additions & 2 deletions packages/fuel-indexer-database/postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ pub async fn register_indexer(
})
}

/// Save `BlockData` to the database.
pub async fn save_block_data(
conn: &mut PoolConnection<Postgres>,
blockdata: &[BlockData],
Expand All @@ -520,12 +521,30 @@ pub async fn save_block_data(
Ok(())
}

/// Load `BlockData` from the database.
pub async fn load_block_data(
conn: &mut PoolConnection<Postgres>,
start_block: u32,
end_block: Option<u32>,
limit: usize,
) -> sqlx::Result<Vec<BlockData>> {
let raw = load_raw_block_data(conn, start_block, end_block, limit).await?;

let mut blocks = Vec::new();
for bytes in raw {
let blockdata: BlockData = fuel_indexer_lib::utils::deserialize(&bytes).unwrap();
blocks.push(blockdata);
}
Ok(blocks)
}

/// Load raw `BlockData` bytes from the database.
pub async fn load_raw_block_data(
conn: &mut PoolConnection<Postgres>,
start_block: u32,
end_block: Option<u32>,
limit: usize,
) -> sqlx::Result<Vec<Vec<u8>>> {
let end_condition = end_block
.map(|x| format!("AND block_height <= {x}"))
.unwrap_or("".to_string());
Expand All @@ -536,8 +555,7 @@ pub async fn load_block_data(
let mut blocks = Vec::new();
for row in rows {
let bytes = row.get::<Vec<u8>, usize>(0);
let blockdata: BlockData = fuel_indexer_lib::utils::deserialize(&bytes).unwrap();
blocks.push(blockdata);
blocks.push(bytes);
}
Ok(blocks)
}
Expand Down Expand Up @@ -982,6 +1000,8 @@ pub async fn put_many_to_many_record(
Ok(())
}

/// Create a database trigger on indexmetadataentity table in the indexers
/// schema which ensures that indexed blocks must be consecutive.
pub async fn create_ensure_block_height_consecutive_trigger(
conn: &mut PoolConnection<Postgres>,
namespace: &str,
Expand Down Expand Up @@ -1027,6 +1047,8 @@ pub async fn create_ensure_block_height_consecutive_trigger(
Ok(())
}

/// Remove the database trogger which ensures that the indexed blocks must be
/// consecutive.
pub async fn remove_ensure_block_height_consecutive_trigger(
conn: &mut PoolConnection<Postgres>,
namespace: &str,
Expand Down
14 changes: 14 additions & 0 deletions packages/fuel-indexer-database/src/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,20 @@ pub async fn load_block_data(
}
}

/// Load raw `BlockData` bytes from the database.
pub async fn load_raw_block_data(
conn: &mut IndexerConnection,
start_block: u32,
end_block: Option<u32>,
limit: usize,
) -> sqlx::Result<Vec<Vec<u8>>> {
match conn {
IndexerConnection::Postgres(ref mut c) => {
postgres::load_raw_block_data(c, start_block, end_block, limit).await
}
}
}

/// Remove all stored `BlockData` from the database.
pub async fn remove_block_data(conn: &mut IndexerConnection) -> sqlx::Result<usize> {
match conn {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,4 @@ block_page_size: 20
enable_block_store: false
remove_stored_blocks: false
allow_non_sequential_blocks: false

16 changes: 9 additions & 7 deletions packages/fuel-indexer/src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,6 @@ pub async fn exec(args: IndexerArgs) -> anyhow::Result<()> {
info!("Succesfully removed {count} blocks.");
}

if config.enable_block_store {
subsystems.spawn(crate::service::create_block_sync_task(
config.clone(),
pool.clone(),
));
};

#[allow(unused)]
let (tx, rx) = channel::<ServiceRequest>(defaults::SERVICE_REQUEST_CHANNEL_SIZE);

Expand All @@ -125,6 +118,15 @@ pub async fn exec(args: IndexerArgs) -> anyhow::Result<()> {
queries::run_migration(&mut c).await?;
}

// Block Sync must be started after migrations to ensure that the database
// table has been created.
if config.enable_block_store {
subsystems.spawn(crate::service::create_block_sync_task(
config.clone(),
pool.clone(),
));
};

let mut service = IndexerService::new(config.clone(), pool.clone(), rx).await?;

match manifest.map(|p| {
Expand Down
54 changes: 53 additions & 1 deletion packages/fuel-indexer/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
};
use async_std::sync::{Arc, Mutex};
use async_std::{fs::File, io::ReadExt};
use fuel_core_client::client::FuelClient;
use fuel_indexer_database::{
queries, types::IndexerAssetType, IndexerConnection, IndexerConnectionPool,
};
Expand Down Expand Up @@ -422,6 +423,10 @@ pub(crate) async fn create_block_sync_task(
fuel_core_client::client::FuelClient::new(config.fuel_node.uri().to_string())
.unwrap_or_else(|e| panic!("Client node connection failed: {e}."));

check_stored_block_data(pool.clone(), task_id, &config, &client)
.await
.unwrap();

loop {
// Get the next page of blocks, and the starting cursor for the subsequent page
let (block_info, next_cursor, _has_next_page) =
Expand All @@ -438,7 +443,7 @@ pub(crate) async fn create_block_sync_task(
if !block_info.is_empty() {
let first = block_info[0].height;
let last = block_info.last().unwrap().height;
info!("{task_id}: retrieved blocks: {}-{}.", first, last);
info!("{task_id}: retrieved blocks {}-{}.", first, last);
}
(block_info, next_cursor, _has_next_page)
}
Expand Down Expand Up @@ -470,3 +475,50 @@ pub(crate) async fn create_block_sync_task(
}
}
}

async fn check_stored_block_data(
pool: IndexerConnectionPool,
task_id: &str,
config: &IndexerConfig,
client: &FuelClient,
) -> IndexerResult<()> {
let (block_data_client, _, _) = crate::executor::retrieve_blocks_from_node(
client,
config.block_page_size,
&Some("0".to_string()),
None,
task_id,
)
.await?;

let block_data_client: Vec<Vec<u8>> = block_data_client
.iter()
.map(|bd| fuel_indexer_lib::utils::serialize(bd))
.collect();

let mut conn = pool.acquire().await?;

let block_data_database = queries::load_raw_block_data(
&mut conn,
1,
Some(config.block_page_size as u32),
config.block_page_size,
)
.await?;

if block_data_database.is_empty() {
return Ok(());
}

if block_data_client.len() != block_data_database.len() {
return Err(IndexerError::Unknown("TODO".to_string()));
}

if block_data_client != block_data_database {
warn!("{task_id} detected serialization format change. Removing stored blocks. {task_id} will re-sync blocks from client.");
let count = queries::remove_block_data(&mut conn).await?;
warn!("{task_id} succesfully removed {count} blocks.");
}

Ok(())
}

0 comments on commit e2bb077

Please sign in to comment.