Skip to content

Commit

Permalink
Prestore block to db before orphan pool, use fill unverified thread t…
Browse files Browse the repository at this point in the history
…o pre load unverified block

Signed-off-by: Eval EXEC <execvy@gmail.com>
  • Loading branch information
eval-exec committed Mar 26, 2024
1 parent 6aac1b1 commit 8ef1d95
Show file tree
Hide file tree
Showing 13 changed files with 681 additions and 309 deletions.
40 changes: 16 additions & 24 deletions chain/src/chain_service.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
//! CKB chain service.
#![allow(missing_docs)]

use crate::{LonelyBlock, ProcessBlockRequest};
use ckb_channel::{select, Receiver, Sender};
use crate::orphan_broker::OrphanBroker;
use crate::{LonelyBlock, LonelyBlockHash, ProcessBlockRequest};
use ckb_channel::{select, Receiver};
use ckb_error::{Error, InternalErrorKind};
use ckb_logger::{self, debug, error, info, warn};
use ckb_shared::block_status::BlockStatus;
Expand All @@ -13,26 +14,25 @@ use ckb_verification::{BlockVerifier, NonContextualBlockTxsVerifier};
use ckb_verification_traits::Verifier;

/// Chain background service to receive LonelyBlock and only do `non_contextual_verify`
#[derive(Clone)]
pub(crate) struct ChainService {
shared: Shared,

process_block_rx: Receiver<ProcessBlockRequest>,

lonely_block_tx: Sender<LonelyBlock>,
orphan_broker: OrphanBroker,
}
impl ChainService {
/// Create a new ChainService instance with shared.
pub(crate) fn new(
shared: Shared,
process_block_rx: Receiver<ProcessBlockRequest>,

lonely_block_tx: Sender<LonelyBlock>,
consume_orphan: OrphanBroker,
) -> ChainService {
ChainService {
shared,
process_block_rx,
lonely_block_tx,
orphan_broker: consume_orphan,
}
}

Expand Down Expand Up @@ -127,25 +127,17 @@ impl ChainService {
}
}

if let Some(metrics) = ckb_metrics::handle() {
metrics
.ckb_chain_lonely_block_ch_len
.set(self.lonely_block_tx.len() as i64)
let db_txn = self.shared.store().begin_transaction();
if let Err(err) = db_txn.insert_block(lonely_block.block()) {
error!("insert block failed: {:?}", err);
return;
}

match self.lonely_block_tx.send(lonely_block) {
Ok(_) => {
debug!(
"processing block: {}-{}, (tip:unverified_tip):({}:{})",
block_number,
block_hash,
self.shared.snapshot().tip_number(),
self.shared.get_unverified_tip().number(),
);
}
Err(_) => {
error!("Failed to notify new block to orphan pool, It seems that the orphan pool has exited.");
}
if let Err(err) = db_txn.commit() {
error!("commit block failed: {:?}", err);
return;
}

let lonely_block_hash: LonelyBlockHash = lonely_block.into();
self.orphan_broker.process_lonely_block(lonely_block_hash);
}
}
69 changes: 31 additions & 38 deletions chain/src/consume_unverified.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::LonelyBlockHash;
use crate::UnverifiedBlock;
use crate::{utils::forkchanges::ForkChanges, GlobalIndex, TruncateRequest, VerifyResult};
use ckb_channel::{select, Receiver};
use ckb_error::{Error, InternalErrorKind};
Expand All @@ -23,19 +23,21 @@ use ckb_verification::cache::Completed;
use ckb_verification::InvalidParentError;
use ckb_verification_contextual::{ContextualBlockVerifier, VerifyContext};
use ckb_verification_traits::Switch;
use dashmap::DashSet;
use std::cmp;
use std::collections::HashSet;
use std::sync::Arc;

pub(crate) struct ConsumeUnverifiedBlockProcessor {
pub(crate) shared: Shared,
pub(crate) is_pending_verify: Arc<DashSet<Byte32>>,
pub(crate) proposal_table: ProposalTable,
}

pub(crate) struct ConsumeUnverifiedBlocks {
tx_pool_controller: TxPoolController,

unverified_block_rx: Receiver<LonelyBlockHash>,
unverified_block_rx: Receiver<UnverifiedBlock>,
truncate_block_rx: Receiver<TruncateRequest>,

stop_rx: Receiver<()>,
Expand All @@ -45,9 +47,10 @@ pub(crate) struct ConsumeUnverifiedBlocks {
impl ConsumeUnverifiedBlocks {
pub(crate) fn new(
shared: Shared,
unverified_blocks_rx: Receiver<LonelyBlockHash>,
unverified_blocks_rx: Receiver<UnverifiedBlock>,
truncate_block_rx: Receiver<TruncateRequest>,
proposal_table: ProposalTable,
is_pending_verify: Arc<DashSet<Byte32>>,
stop_rx: Receiver<()>,
) -> Self {
ConsumeUnverifiedBlocks {
Expand All @@ -57,6 +60,7 @@ impl ConsumeUnverifiedBlocks {
stop_rx,
processor: ConsumeUnverifiedBlockProcessor {
shared,
is_pending_verify,
proposal_table,
},
}
Expand Down Expand Up @@ -94,7 +98,7 @@ impl ConsumeUnverifiedBlocks {
let _ = self.tx_pool_controller.continue_chunk_process();
},
Err(err) => {
error!("truncate_block_tx has been closed,err: {}", err);
info!("truncate_block_tx has been closed,err: {}", err);
return;
},
},
Expand All @@ -109,52 +113,31 @@ impl ConsumeUnverifiedBlocks {
}

impl ConsumeUnverifiedBlockProcessor {
fn load_unverified_block_and_parent_header(
&self,
block_hash: &Byte32,
) -> (BlockView, HeaderView) {
let block_view = self
.shared
.store()
.get_block(block_hash)
.expect("block stored");
let parent_header_view = self
.shared
.store()
.get_block_header(&block_view.data().header().raw().parent_hash())
.expect("parent header stored");

(block_view, parent_header_view)
}

pub(crate) fn consume_unverified_blocks(&mut self, lonely_block_hash: LonelyBlockHash) {
let LonelyBlockHash {
block_number_and_hash,
pub(crate) fn consume_unverified_blocks(&mut self, unverified_block: UnverifiedBlock) {
let UnverifiedBlock {
block,
switch,
verify_callback,
} = lonely_block_hash;
let (unverified_block, parent_header) =
self.load_unverified_block_and_parent_header(&block_number_and_hash.hash);
parent_header,
} = unverified_block;
let block_hash = block.hash();
// process this unverified block
let verify_result = self.verify_block(&unverified_block, &parent_header, switch);
let verify_result = self.verify_block(&block, &parent_header, switch);
match &verify_result {
Ok(_) => {
let log_now = std::time::Instant::now();
self.shared.remove_block_status(&block_number_and_hash.hash);
self.shared.remove_block_status(&block_hash);
let log_elapsed_remove_block_status = log_now.elapsed();
self.shared.remove_header_view(&block_number_and_hash.hash);
self.shared.remove_header_view(&block_hash);
debug!(
"block {} remove_block_status cost: {:?}, and header_view cost: {:?}",
block_number_and_hash.hash,
block_hash,
log_elapsed_remove_block_status,
log_now.elapsed()
);
}
Err(err) => {
error!(
"verify block {} failed: {}",
block_number_and_hash.hash, err
);
error!("verify block {} failed: {}", block_hash, err);

let tip = self
.shared
Expand All @@ -174,17 +157,19 @@ impl ConsumeUnverifiedBlockProcessor {
));

self.shared
.insert_block_status(block_number_and_hash.hash(), BlockStatus::BLOCK_INVALID);
.insert_block_status(block_hash.clone(), BlockStatus::BLOCK_INVALID);
error!(
"set_unverified tip to {}-{}, because verify {} failed: {}",
tip.number(),
tip.hash(),
block_number_and_hash.hash,
block_hash,
err
);
}
}

self.is_pending_verify.remove(&block_hash);

if let Some(callback) = verify_callback {
callback(verify_result);
}
Expand Down Expand Up @@ -280,6 +265,14 @@ impl ConsumeUnverifiedBlockProcessor {
let txn_snapshot = db_txn.get_snapshot();
let _snapshot_tip_hash = db_txn.get_update_for_tip_hash(&txn_snapshot);

db_txn.insert_block_epoch_index(
&block.header().hash(),
&epoch.last_block_hash_in_previous_epoch(),
)?;
if new_epoch {
db_txn.insert_epoch_ext(&epoch.last_block_hash_in_previous_epoch(), &epoch)?;
}

if new_best_block {
info!(
"[verify block] new best block found: {} => {:#x}, difficulty diff = {:#x}, unverified_tip: {}",
Expand Down
69 changes: 40 additions & 29 deletions chain/src/init.rs
Original file line number Diff line number Diff line change
@@ -1,41 +1,53 @@
#![allow(missing_docs)]

//! Bootstrap ChainService, ConsumeOrphan and ConsumeUnverified threads.
//! Bootstrap InitLoadUnverified, PreloadUnverifiedBlock, ChainService and ConsumeUnverified threads.
use crate::chain_service::ChainService;
use crate::consume_unverified::ConsumeUnverifiedBlocks;
use crate::init_load_unverified::InitLoadUnverified;
use crate::orphan_broker::OrphanBroker;
use crate::preload_unverified_blocks_channel::PreloadUnverifiedBlocksChannel;
use crate::utils::orphan_block_pool::OrphanBlockPool;
use crate::{ChainController, LonelyBlock, LonelyBlockHash};
use crate::{chain_controller::ChainController, LonelyBlockHash, UnverifiedBlock};
use ckb_channel::{self as channel, SendError};
use ckb_constant::sync::BLOCK_DOWNLOAD_WINDOW;
use ckb_logger::warn;
use ckb_shared::ChainServicesBuilder;
use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread};
use ckb_types::packed::Byte32;
use dashmap::DashSet;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::thread;

const ORPHAN_BLOCK_SIZE: usize = (BLOCK_DOWNLOAD_WINDOW * 2) as usize;
const ORPHAN_BLOCK_SIZE: usize = BLOCK_DOWNLOAD_WINDOW as usize;

pub fn start_chain_services(builder: ChainServicesBuilder) -> ChainController {
let orphan_blocks_broker = Arc::new(OrphanBlockPool::with_capacity(ORPHAN_BLOCK_SIZE));

let (truncate_block_tx, truncate_block_rx) = channel::bounded(1);

let (preload_unverified_stop_tx, preload_unverified_stop_rx) = ckb_channel::bounded::<()>(1);

let (preload_unverified_tx, preload_unverified_rx) =
channel::bounded::<LonelyBlockHash>(BLOCK_DOWNLOAD_WINDOW as usize * 10);

let (unverified_queue_stop_tx, unverified_queue_stop_rx) = ckb_channel::bounded::<()>(1);
let (unverified_tx, unverified_rx) =
channel::bounded::<LonelyBlockHash>(BLOCK_DOWNLOAD_WINDOW as usize * 3);
let (unverified_block_tx, unverified_block_rx) = channel::bounded::<UnverifiedBlock>(128usize);

let is_pending_verify: Arc<DashSet<Byte32>> = Arc::new(DashSet::new());

let consumer_unverified_thread = thread::Builder::new()
.name("consume_unverified_blocks".into())
.spawn({
let shared = builder.shared.clone();
let is_pending_verify = Arc::clone(&is_pending_verify);
move || {
let consume_unverified = ConsumeUnverifiedBlocks::new(
shared,
unverified_rx,
unverified_block_rx,
truncate_block_rx,
builder.proposal_table,
is_pending_verify,
unverified_queue_stop_rx,
);

Expand All @@ -44,38 +56,30 @@ pub fn start_chain_services(builder: ChainServicesBuilder) -> ChainController {
})
.expect("start unverified_queue consumer thread should ok");

let (lonely_block_tx, lonely_block_rx) =
channel::bounded::<LonelyBlock>(BLOCK_DOWNLOAD_WINDOW as usize);

let (search_orphan_pool_stop_tx, search_orphan_pool_stop_rx) = ckb_channel::bounded::<()>(1);

let search_orphan_pool_thread = thread::Builder::new()
.name("consume_orphan_blocks".into())
let preload_unverified_block_thread = thread::Builder::new()
.name("preload_unverified_block".into())
.spawn({
let orphan_blocks_broker = Arc::clone(&orphan_blocks_broker);
let shared = builder.shared.clone();
use crate::consume_orphan::ConsumeOrphan;
move || {
let consume_orphan = ConsumeOrphan::new(
let preload_unverified_block = PreloadUnverifiedBlocksChannel::new(
shared,
orphan_blocks_broker,
unverified_tx,
lonely_block_rx,
search_orphan_pool_stop_rx,
preload_unverified_rx,
unverified_block_tx,
preload_unverified_stop_rx,
);
consume_orphan.start();
preload_unverified_block.start()
}
})
.expect("start search_orphan_pool thread should ok");
.expect("start preload_unverified_block should ok");

let (process_block_tx, process_block_rx) = channel::bounded(BLOCK_DOWNLOAD_WINDOW as usize);
let (process_block_tx, process_block_rx) = channel::bounded(0);

let is_verifying_unverified_blocks_on_startup = Arc::new(AtomicBool::new(true));

let chain_controller = ChainController::new(
process_block_tx,
truncate_block_tx,
orphan_blocks_broker,
Arc::clone(&orphan_blocks_broker),
Arc::clone(&is_verifying_unverified_blocks_on_startup),
);

Expand All @@ -90,16 +94,23 @@ pub fn start_chain_services(builder: ChainServicesBuilder) -> ChainController {
let init_load_unverified: InitLoadUnverified = InitLoadUnverified::new(
shared,
chain_controller,
signal_receiver,
is_verifying_unverified_blocks_on_startup,
signal_receiver,
);
init_load_unverified.start();
}
})
.expect("start unverified_queue consumer thread should ok");

let consume_orphan = OrphanBroker::new(
builder.shared.clone(),
orphan_blocks_broker,
preload_unverified_tx,
is_pending_verify,
);

let chain_service: ChainService =
ChainService::new(builder.shared, process_block_rx, lonely_block_tx);
ChainService::new(builder.shared, process_block_rx, consume_orphan);
let chain_service_thread = thread::Builder::new()
.name("ChainService".into())
.spawn({
Expand All @@ -108,10 +119,10 @@ pub fn start_chain_services(builder: ChainServicesBuilder) -> ChainController {

let _ = init_load_unverified_thread.join();

if let Err(SendError(_)) = search_orphan_pool_stop_tx.send(()) {
warn!("trying to notify search_orphan_pool thread to stop, but search_orphan_pool_stop_tx already closed")
if preload_unverified_stop_tx.send(()).is_err(){
warn!("trying to notify preload unverified thread to stop, but preload_unverified_stop_tx already closed");
}
let _ = search_orphan_pool_thread.join();
let _ = preload_unverified_block_thread.join();

if let Err(SendError(_)) = unverified_queue_stop_tx.send(()) {
warn!("trying to notify consume unverified thread to stop, but unverified_queue_stop_tx already closed");
Expand Down
Loading

0 comments on commit 8ef1d95

Please sign in to comment.