From ed2b59731f7943abbc01b3a4a4b3386f76db4c36 Mon Sep 17 00:00:00 2001 From: driftluo Date: Fri, 9 Aug 2024 15:49:12 +0800 Subject: [PATCH] fix: add limit to get cells --- .../src/conversion/blockchain/mod.rs | 6 +- util/gen-types/src/lib.rs | 2 + util/indexer/src/service.rs | 86 +++++++++++++------ util/jsonrpc-types/src/blockchain.rs | 2 +- util/jsonrpc-types/src/bytes.rs | 4 +- .../async_indexer_handle/get_cells.rs | 2 +- .../async_indexer_handle/get_transactions.rs | 2 +- .../async_indexer_handle/mod.rs | 2 + util/rich-indexer/src/tests/query.rs | 85 ++++++++++++------ 9 files changed, 128 insertions(+), 63 deletions(-) diff --git a/util/gen-types/src/conversion/blockchain/mod.rs b/util/gen-types/src/conversion/blockchain/mod.rs index b80804fc09..0c6415844a 100644 --- a/util/gen-types/src/conversion/blockchain/mod.rs +++ b/util/gen-types/src/conversion/blockchain/mod.rs @@ -1,7 +1,7 @@ #[cfg(feature = "std")] mod std_env; -use crate::{borrow::ToOwned, bytes::Bytes, generated::packed, prelude::*, vec::Vec}; +use crate::{bytes::Bytes, generated::packed, prelude::*, vec::Vec}; impl Pack for [u8; 32] { fn pack(&self) -> packed::Byte32 { @@ -46,13 +46,13 @@ impl Pack for Bytes { impl<'r> Unpack for packed::BytesReader<'r> { fn unpack(&self) -> Bytes { - Bytes::from(self.raw_data().to_owned()) + Bytes::from(self.raw_data().to_vec()) } } impl Unpack for packed::Bytes { fn unpack(&self) -> Bytes { - self.raw_data() + Bytes::from(self.raw_data().to_vec()) } } diff --git a/util/gen-types/src/lib.rs b/util/gen-types/src/lib.rs index c2aa5c5183..bceae47fda 100644 --- a/util/gen-types/src/lib.rs +++ b/util/gen-types/src/lib.rs @@ -19,8 +19,10 @@ pub use molecule::bytes; cfg_if::cfg_if! { if #[cfg(feature = "std")] { + #[allow(unused_imports)] use std::{vec, borrow}; } else { + #[allow(unused_imports)] use alloc::{vec, borrow}; } } diff --git a/util/indexer/src/service.rs b/util/indexer/src/service.rs index 774276e392..06f4407824 100644 --- a/util/indexer/src/service.rs +++ b/util/indexer/src/service.rs @@ -23,6 +23,10 @@ use std::sync::{Arc, RwLock}; pub(crate) const SUBSCRIBER_NAME: &str = "Indexer"; const DEFAULT_LOG_KEEP_NUM: usize = 1; const DEFAULT_MAX_BACKGROUND_JOBS: usize = 6; +// We set the memory usage limit at 2 GB, meaning only 200 MB of data can be requested at a time. +// The maximum cell data size is 500 KB, and 400 cells can be requested at one time. +// related: https://github.com/serde-rs/json/issues/635 +const DEFAULT_REQUEST_LIMIT: usize = 400; /// Indexer service #[derive(Clone)] @@ -164,7 +168,7 @@ impl IndexerHandle { )); } - let limit = limit.value() as usize; + let limit = std::cmp::min(limit.value() as usize, DEFAULT_REQUEST_LIMIT); if limit == 0 { return Err(Error::invalid_params("limit should be greater than 0")); } @@ -330,7 +334,7 @@ impl IndexerHandle { limit: Uint32, after_cursor: Option, ) -> Result, Error> { - let limit = limit.value() as usize; + let limit = std::cmp::min(limit.value() as usize, DEFAULT_REQUEST_LIMIT); if limit == 0 { return Err(Error::invalid_params("limit should be greater than 0")); } @@ -1688,35 +1692,63 @@ mod tests { ); // test get_transactions rpc with exact search mode - let txs = rpc - .get_transactions( - IndexerSearchKey { - script: lock_script1.clone().into(), - script_search_mode: Some(IndexerSearchMode::Exact), - ..Default::default() - }, - IndexerOrder::Asc, - 1000.into(), - None, - ) - .unwrap(); + let txs = { + let mut txs = IndexerPagination::new(Vec::new(), JsonBytes::from_bytes(Bytes::new())); + let mut last_key = None; + loop { + let txs_1 = rpc + .get_transactions( + IndexerSearchKey { + script: lock_script1.clone().into(), + script_search_mode: Some(IndexerSearchMode::Exact), + ..Default::default() + }, + IndexerOrder::Asc, + 1000.into(), + last_key.clone(), + ) + .unwrap(); + + if txs_1.objects.is_empty() { + break; + } else { + txs.objects.extend(txs_1.objects); + last_key = Some(txs_1.last_cursor); + } + } + txs + }; assert_eq!(total_blocks as usize * 3 - 1, txs.objects.len(), "total size should be cellbase tx count + total_block * 2 - 1 (genesis block only has one tx)"); // test get_transactions rpc group by tx hash with exact search mode - let txs = rpc - .get_transactions( - IndexerSearchKey { - script: lock_script1.clone().into(), - script_search_mode: Some(IndexerSearchMode::Exact), - group_by_transaction: Some(true), - ..Default::default() - }, - IndexerOrder::Asc, - 1000.into(), - None, - ) - .unwrap(); + let txs = { + let mut txs = IndexerPagination::new(Vec::new(), JsonBytes::from_bytes(Bytes::new())); + let mut last_key = None; + + loop { + let txs_1 = rpc + .get_transactions( + IndexerSearchKey { + script: lock_script1.clone().into(), + script_search_mode: Some(IndexerSearchMode::Exact), + group_by_transaction: Some(true), + ..Default::default() + }, + IndexerOrder::Asc, + 1000.into(), + last_key.clone(), + ) + .unwrap(); + if txs_1.objects.is_empty() { + break; + } else { + txs.objects.extend(txs_1.objects); + last_key = Some(txs_1.last_cursor); + } + } + txs + }; assert_eq!( total_blocks as usize * 2, diff --git a/util/jsonrpc-types/src/blockchain.rs b/util/jsonrpc-types/src/blockchain.rs index 47fbfff606..247b394c1c 100644 --- a/util/jsonrpc-types/src/blockchain.rs +++ b/util/jsonrpc-types/src/blockchain.rs @@ -116,7 +116,7 @@ impl From for Script { fn from(input: packed::Script) -> Script { Script { code_hash: input.code_hash().unpack(), - args: JsonBytes::from_bytes(input.args().unpack()), + args: JsonBytes::from_vec(input.args().unpack()), hash_type: core::ScriptHashType::try_from(input.hash_type()) .expect("checked data") .into(), diff --git a/util/jsonrpc-types/src/bytes.rs b/util/jsonrpc-types/src/bytes.rs index 1e270f8d54..b5b7a6bcf4 100644 --- a/util/jsonrpc-types/src/bytes.rs +++ b/util/jsonrpc-types/src/bytes.rs @@ -60,13 +60,13 @@ impl JsonBytes { impl From for JsonBytes { fn from(input: packed::Bytes) -> Self { - JsonBytes::from_bytes(input.raw_data()) + JsonBytes::from_vec(input.raw_data().to_vec()) } } impl<'a> From<&'a packed::Bytes> for JsonBytes { fn from(input: &'a packed::Bytes) -> Self { - JsonBytes::from_bytes(input.raw_data()) + JsonBytes::from_vec(input.raw_data().to_vec()) } } diff --git a/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_cells.rs b/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_cells.rs index 63a89d4800..d758b3cd3e 100644 --- a/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_cells.rs +++ b/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_cells.rs @@ -22,7 +22,7 @@ impl AsyncRichIndexerHandle { limit: Uint32, after: Option, ) -> Result, Error> { - let limit = limit.value(); + let limit = std::cmp::min(limit.value(), DEFAULT_REQUEST_LIMIT); if limit == 0 { return Err(Error::invalid_params("limit should be greater than 0")); } diff --git a/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_transactions.rs b/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_transactions.rs index d8d58e0d68..95b475dcbe 100644 --- a/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_transactions.rs +++ b/util/rich-indexer/src/indexer_handle/async_indexer_handle/get_transactions.rs @@ -20,7 +20,7 @@ impl AsyncRichIndexerHandle { limit: Uint32, after: Option, ) -> Result, Error> { - let limit = limit.value(); + let limit = std::cmp::min(limit.value(), DEFAULT_REQUEST_LIMIT); if limit == 0 { return Err(Error::invalid_params("limit should be greater than 0")); } diff --git a/util/rich-indexer/src/indexer_handle/async_indexer_handle/mod.rs b/util/rich-indexer/src/indexer_handle/async_indexer_handle/mod.rs index 7f6032ef25..84b3068475 100644 --- a/util/rich-indexer/src/indexer_handle/async_indexer_handle/mod.rs +++ b/util/rich-indexer/src/indexer_handle/async_indexer_handle/mod.rs @@ -18,6 +18,8 @@ use sqlx::Row; use std::sync::{Arc, RwLock}; +const DEFAULT_REQUEST_LIMIT: u32 = 400; + /// Async handle to the rich-indexer. #[derive(Clone)] pub struct AsyncRichIndexerHandle { diff --git a/util/rich-indexer/src/tests/query.rs b/util/rich-indexer/src/tests/query.rs index 80731de569..ddc8c496e8 100644 --- a/util/rich-indexer/src/tests/query.rs +++ b/util/rich-indexer/src/tests/query.rs @@ -1,7 +1,7 @@ use super::*; use ckb_indexer_sync::{CustomFilters, Pool}; -use ckb_jsonrpc_types::{IndexerRange, IndexerSearchKeyFilter, IndexerTx}; +use ckb_jsonrpc_types::{IndexerPagination, IndexerRange, IndexerSearchKeyFilter, IndexerTx}; use ckb_types::{ bytes::Bytes, core::{ @@ -1339,37 +1339,66 @@ async fn script_search_mode_rpc() { ); // test get_transactions rpc with exact search mode - let txs = rpc - .get_transactions( - IndexerSearchKey { - script: lock_script1.clone().into(), - script_search_mode: Some(IndexerSearchMode::Exact), - ..Default::default() - }, - IndexerOrder::Asc, - 1000.into(), - None, - ) - .await - .unwrap(); + let txs = { + let mut txs = IndexerPagination::new(Vec::new(), JsonBytes::from_bytes(Bytes::new())); + let mut last_key = None; + + loop { + let txs_1 = rpc + .get_transactions( + IndexerSearchKey { + script: lock_script1.clone().into(), + script_search_mode: Some(IndexerSearchMode::Exact), + ..Default::default() + }, + IndexerOrder::Asc, + 1000.into(), + last_key.clone(), + ) + .await + .unwrap(); + + if txs_1.objects.is_empty() { + break; + } else { + txs.objects.extend(txs_1.objects); + last_key = Some(txs_1.last_cursor); + } + } + txs + }; assert_eq!(total_blocks as usize * 3 - 1, txs.objects.len(), "total size should be cellbase tx count + total_block * 2 - 1 (genesis block only has one tx)"); // test get_transactions rpc group by tx hash with exact search mode - let txs = rpc - .get_transactions( - IndexerSearchKey { - script: lock_script1.clone().into(), - script_search_mode: Some(IndexerSearchMode::Exact), - group_by_transaction: Some(true), - ..Default::default() - }, - IndexerOrder::Asc, - 1000.into(), - None, - ) - .await - .unwrap(); + let txs = { + let mut txs = IndexerPagination::new(Vec::new(), JsonBytes::from_bytes(Bytes::new())); + let mut last_key = None; + + loop { + let txs_1 = rpc + .get_transactions( + IndexerSearchKey { + script: lock_script1.clone().into(), + script_search_mode: Some(IndexerSearchMode::Exact), + group_by_transaction: Some(true), + ..Default::default() + }, + IndexerOrder::Asc, + 1000.into(), + last_key.clone(), + ) + .await + .unwrap(); + if txs_1.objects.is_empty() { + break; + } else { + txs.objects.extend(txs_1.objects); + last_key = Some(txs_1.last_cursor); + } + } + txs + }; assert_eq!( total_blocks as usize * 2,