Skip to content

Commit

Permalink
enhancement: add end_block field in manifest (#824)
Browse files Browse the repository at this point in the history
* add logic and test

* add additional test

* fix type in yaml

* update docs and examples

* docs typo

* show warn message elsewhere
  • Loading branch information
0xmovses authored May 5, 2023
1 parent 42e1f11 commit 6e63015
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 1 deletion.
9 changes: 9 additions & 0 deletions docs/src/reference-guide/components/assets/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ abi: path/to/my/contract-abi.json
contract_id: "0x39150017c9e38e5e280432d546fae345d6ce6d8fe4710162c2e3a95a6faff051"
graphql_schema: path/to/my/schema.graphql
start_block: 1564
end_block: 310000
module:
wasm: path/to/my/wasm_module.wasm
report_metrics: true
Expand Down Expand Up @@ -61,6 +62,14 @@ _Optional._

The `start_block` field indicates the block height after which you'd like your indexer to start indexing events.

## `end_block`

_Optional._

The `end_block` field indicates the block height after which the indexer should stop indexing blocks.

> Important: If no `end_block` is added the indexer will keep listening to new blocks indefinitely.

## `module`

_Required._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ module:
metrics: ~
contract_id: ~
start_block: ~
end_block: ~
resumable: ~
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ fuel_client: ~
report_metrics: true
abi: examples/hello-world/contracts/greeting/out/debug/greeting-abi.json
start_block: 1
end_block: ~
graphql_schema: examples/hello-world-native/hello-indexer-native/schema/hello_indexer_native.schema.graphql
module: native
resumable: ~
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ module:
metrics: ~
contract_id: fuel18hchrf7f4hnpkl84sqf8k0sk8gcauzeemzwgweea8dgr7eachv4s86r9t9
start_block: 1
end_block: ~
resumable: ~
1 change: 1 addition & 0 deletions packages/fuel-indexer-lib/src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub struct Manifest {
)]
pub contract_id: ContractIds,
pub start_block: Option<u64>,
pub end_block: Option<u64>,
#[serde(default)]
pub resumable: Option<bool>,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ fuel_client: ~
graphql_schema: packages/fuel-indexer-tests/components/indices/fuel-indexer-test/schema/fuel_indexer_test.graphql
abi: packages/fuel-indexer-tests/contracts/fuel-indexer-test/out/debug/fuel-indexer-test-abi.json
start_block: 1
end_block: ~
contract_id: fuel1lun289ufekmnx55m540tj79a9e2mavaagxp3e6cekzfmjkpc8xxsqy42wh
identifier: index1
module:
Expand Down
158 changes: 158 additions & 0 deletions packages/fuel-indexer-tests/tests/e2e/indexing_postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,164 @@ async fn test_index_respects_start_block_postgres() {
assert!(row.get::<i64, usize>(2) > 0);
}

#[actix_web::test]
#[cfg(all(feature = "e2e", feature = "postgres"))]
async fn test_index_respects_end_block_postgres() {
let (node_handle, test_db, mut srvc) = setup_test_components().await;

let contract = connect_to_deployed_contract().await.unwrap();
let app = test::init_service(app(contract)).await;
let req = test::TestRequest::get().uri("/block_height").to_request();
let res = test::call_and_read_body(&app, req).await;
let block_height = String::from_utf8(res.to_vec())
.unwrap()
.parse::<u64>()
.unwrap();

let mut manifest = Manifest::try_from(assets::FUEL_INDEXER_TEST_MANIFEST).unwrap();
update_test_manifest_asset_paths(&mut manifest);
manifest.end_block = Some(block_height + 1);

srvc.register_index_from_manifest(manifest).await.unwrap();

let req = test::TestRequest::post().uri("/ping").to_request();
let _ = app.call(req).await;

sleep(Duration::from_secs(defaults::INDEXED_EVENT_WAIT)).await;

let mut conn = test_db.pool.acquire().await.unwrap();
let first_check = sqlx::query(&format!(
"SELECT * FROM fuel_indexer_test_index1.block where height = {}",
block_height,
))
.fetch_optional(&mut conn)
.await
.unwrap();

let row = first_check.unwrap();
let indexed_height = row.get::<BigDecimal, usize>(1).to_u64().unwrap();

assert_eq!(indexed_height, (block_height));
assert!(row.get::<i64, usize>(2) > 0);

let req = test::TestRequest::post().uri("/ping").to_request();
let _ = app.call(req).await;

sleep(Duration::from_secs(defaults::INDEXED_EVENT_WAIT)).await;
node_handle.abort();

let second_check = sqlx::query(&format!(
"SELECT * FROM fuel_indexer_test_index1.block where height = {}",
block_height + 1,
))
.fetch_optional(&mut conn)
.await
.unwrap();

let row = second_check.unwrap();
let indexed_height = row.get::<BigDecimal, usize>(1).to_u64().unwrap();

assert_eq!(indexed_height, (block_height + 1));
assert!(row.get::<i64, usize>(2) > 0);

let req = test::TestRequest::post().uri("/ping").to_request();
let _ = app.call(req).await;

sleep(Duration::from_secs(defaults::INDEXED_EVENT_WAIT)).await;
node_handle.abort();

let final_check = sqlx::query(&format!(
"SELECT * FROM fuel_indexer_test_index1.block where height = {}",
block_height + 2,
))
.fetch_optional(&mut conn)
.await
.unwrap();

// Should not have indexed this block
assert!(final_check.is_none());
}

#[actix_web::test]
#[cfg(all(feature = "e2e", feature = "postgres"))]
async fn test_index_respects_end_block_and_start_block_postgres() {
let (node_handle, test_db, mut srvc) = setup_test_components().await;

let contract = connect_to_deployed_contract().await.unwrap();
let app = test::init_service(app(contract)).await;
let req = test::TestRequest::get().uri("/block_height").to_request();
let res = test::call_and_read_body(&app, req).await;
let block_height = String::from_utf8(res.to_vec())
.unwrap()
.parse::<u64>()
.unwrap();

let mut manifest = Manifest::try_from(assets::FUEL_INDEXER_TEST_MANIFEST).unwrap();
update_test_manifest_asset_paths(&mut manifest);
manifest.start_block = Some(block_height + 1);
manifest.end_block = Some(block_height + 2);

srvc.register_index_from_manifest(manifest).await.unwrap();

let mut conn = test_db.pool.acquire().await.unwrap();

// Check block before start block is not indexed
let pre_check = sqlx::query(&format!(
"SELECT * FROM fuel_indexer_test_index1.block where height = {}",
block_height,
))
.fetch_optional(&mut conn)
.await
.unwrap();

assert!(pre_check.is_none());

// Index and check start block
let req = test::TestRequest::post().uri("/ping").to_request();
let _ = app.call(req).await;

sleep(Duration::from_secs(defaults::INDEXED_EVENT_WAIT)).await;

let start_block_check = sqlx::query(&format!(
"SELECT * FROM fuel_indexer_test_index1.block where height = {}",
block_height + 1,
))
.fetch_optional(&mut conn)
.await
.unwrap();

assert!(start_block_check.is_some());

// Index and check end block
let req = test::TestRequest::post().uri("/ping").to_request();
let _ = app.call(req).await;

sleep(Duration::from_secs(defaults::INDEXED_EVENT_WAIT)).await;

let end_block_check = sqlx::query(&format!(
"SELECT * FROM fuel_indexer_test_index1.block where height = {}",
block_height + 2,
))
.fetch_optional(&mut conn)
.await
.unwrap();

assert!(end_block_check.is_some());

// Check block after end block is not indexed
let post_check = sqlx::query(&format!(
"SELECT * FROM fuel_indexer_test_index1.block where height = {}",
block_height + 3,
))
.fetch_optional(&mut conn)
.await
.unwrap();

assert!(post_check.is_none());

node_handle.abort();
}

#[actix_web::test]
#[cfg(all(feature = "e2e", feature = "postgres"))]
async fn test_can_trigger_and_index_tuple_events_postgres() {
Expand Down
13 changes: 12 additions & 1 deletion packages/fuel-indexer/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use tokio::{
task::{spawn_blocking, JoinHandle},
time::{sleep, Duration},
};
use tracing::{debug, error, info};
use tracing::{debug, error, info, warn};
use wasmer::{
imports, Instance, LazyInit, Memory, Module, NativeFunc, RuntimeError, Store,
WasmerEnv,
Expand Down Expand Up @@ -72,6 +72,10 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(
kill_switch: Arc<AtomicBool>,
) -> impl Future<Output = ()> {
let start_block = manifest.start_block.expect("Failed to detect start_block.");
let end_block = manifest.end_block;
if end_block.is_none() {
warn!("No end_block specified in manifest. Indexer will run forever.");
}
let stop_idle_indexers = config.stop_idle_indexers;

let fuel_node_addr = if config.indexer_net_config {
Expand Down Expand Up @@ -131,6 +135,13 @@ pub fn run_executor<T: 'static + Executor + Send + Sync>(

let mut block_info = Vec::new();
for block in results.into_iter() {
if let Some(end_block) = end_block {
if block.header.height.0 > end_block {
info!("Stopping indexer at the specified end_block: {end_block}");
break;
}
}

let producer = block.block_producer().map(|pk| pk.hash());

let mut transactions = Vec::new();
Expand Down

0 comments on commit 6e63015

Please sign in to comment.