diff --git a/script/Cargo.toml b/script/Cargo.toml index db7f913e02c..3c492a727fb 100644 --- a/script/Cargo.toml +++ b/script/Cargo.toml @@ -28,7 +28,7 @@ ckb-logger = { path = "../util/logger", version = "= 0.114.0-pre", optional = tr serde = { version = "1.0", features = ["derive"] } ckb-error = { path = "../error", version = "= 0.114.0-pre" } ckb-chain-spec = { path = "../spec", version = "= 0.114.0-pre" } -tokio = { version = "1.35.0", features = ["sync", "rt-multi-thread"] } +tokio = { version = "1.35.0", features = ["rt-multi-thread"] } [dev-dependencies] proptest = "1.0" diff --git a/script/src/syscalls/spawn.rs b/script/src/syscalls/spawn.rs index 15301bc1997..9b7fee0298f 100644 --- a/script/src/syscalls/spawn.rs +++ b/script/src/syscalls/spawn.rs @@ -235,6 +235,9 @@ where Ok(true) } Err(err) => { + // `CyclesExceeded` for old version snapshot + // `Pause` for new version suspend with pause signal + // Maybe we need to cleanup in future if matches!(err, VMError::Pause | VMError::CyclesExceeded) { let mut context = self.context.lock().map_err(|e| { VMError::Unexpected(format!("Failed to acquire lock: {}", e)) diff --git a/script/src/verify.rs b/script/src/verify.rs index b4e3cb558c1..cc8dee5bdef 100644 --- a/script/src/verify.rs +++ b/script/src/verify.rs @@ -693,8 +693,9 @@ impl { - // FIXME: we need to cleanup this later, state will not contain snapshot - panic!("unexpect suspend in resumable_verify_with_signal"); + // FIXME(yukang): we need to cleanup this later, state will not contain snapshot + unreachable!("unexpect suspend in resumable_verify_with_signal"); } Err(e) => { #[cfg(feature = "logging")] @@ -1051,10 +1052,7 @@ impl Ok(ChunkState::Completed(cycles)), - Err(e) => Err(e), - } + verifier.verify().map(ChunkState::Completed) } else { self.chunk_run_with_signal(group, max_cycles, command_rx) .await @@ -1200,7 +1198,7 @@ impl, context: Arc>, signal: &mut Receiver, @@ -1435,11 +1433,7 @@ async fn run_vms_with_signal( exit_code ))}, (Err(err), _) => { - let map_vm_internal_error = |error: VMInternalError| match error { - VMInternalError::CyclesExceeded => ScriptError::ExceededMaximumCycles(max_cycles), - _ => ScriptError::VMInternalError(error), - }; - return Err(map_vm_internal_error(err)); + return Err(ScriptError::VMInternalError(err)); } } @@ -1456,13 +1450,13 @@ async fn run_vms_child( context: Arc>, ) { let (mut exit_code, mut cycles, mut spawn_data) = (0, 0, None); - child_recv.mark_changed(); // mark changed to make sure we can receive initial command // and start to run immediately + child_recv.mark_changed(); loop { select! { _ = child_recv.changed() => { - match child_recv.borrow().to_owned() { + match *child_recv.borrow() { ChunkCommand::Stop => { let exit = (Err(ckb_vm::Error::Unexpected("stopped".to_string())), cycles); let _ = finished_send.send(exit); diff --git a/tx-pool/src/process.rs b/tx-pool/src/process.rs index 2a4064fb1d5..83dbd6c5f9d 100644 --- a/tx-pool/src/process.rs +++ b/tx-pool/src/process.rs @@ -189,16 +189,16 @@ impl TxPoolService { } } + pub(crate) async fn verify_queue_contains(&self, tx: &TransactionView) -> bool { + let queue = self.verify_queue.read().await; + queue.contains_key(&tx.proposal_short_id()) + } + pub(crate) async fn orphan_contains(&self, tx: &TransactionView) -> bool { let orphan = self.orphan.read().await; orphan.contains_key(&tx.proposal_short_id()) } - pub(crate) async fn chunk_contains(&self, tx: &TransactionView) -> bool { - let chunk = self.verify_queue.read().await; - chunk.contains_key(&tx.proposal_short_id()) - } - pub(crate) async fn with_tx_pool_read_lock) -> U>( &self, mut f: F, @@ -302,7 +302,7 @@ impl TxPoolService { return Err(Reject::Duplicated(tx.hash())); } - if self.chunk_contains(&tx).await { + if self.verify_queue_contains(&tx).await { return Err(Reject::Duplicated(tx.hash())); } @@ -334,7 +334,7 @@ impl TxPoolService { // non contextual verify first self.non_contextual_verify(&tx, remote)?; - if self.chunk_contains(&tx).await || self.orphan_contains(&tx).await { + if self.verify_queue_contains(&tx).await || self.orphan_contains(&tx).await { return Err(Reject::Duplicated(tx.hash())); } @@ -365,8 +365,8 @@ impl TxPoolService { pub(crate) async fn remove_tx(&self, tx_hash: Byte32) -> bool { let id = ProposalShortId::from_tx_hash(&tx_hash); { - let mut chunk = self.verify_queue.write().await; - if chunk.remove_tx(&id).is_some() { + let mut queue = self.verify_queue.write().await; + if queue.remove_tx(&id).is_some() { return true; } } @@ -527,7 +527,7 @@ impl TxPoolService { for orphan in orphans.into_iter() { if orphan.cycle > self.tx_pool_config.max_tx_verify_cycles { debug!( - "process_orphan {} added to chunk; find previous from {}", + "process_orphan {} added to verify queue; find previous from {}", orphan.tx.hash(), tx.hash(), ); @@ -828,8 +828,8 @@ impl TxPoolService { tx: TransactionView, remote: Option<(Cycle, PeerIndex)>, ) -> Result { - let mut chunk = self.verify_queue.write().await; - chunk.add_tx(tx, remote) + let mut queue = self.verify_queue.write().await; + queue.add_tx(tx, remote) } pub(crate) async fn _process_tx( @@ -1015,8 +1015,8 @@ impl TxPoolService { self.remove_orphan_txs_by_attach(&attached).await; { - let mut chunk = self.verify_queue.write().await; - chunk.remove_txs(attached.iter().map(|tx| tx.proposal_short_id())); + let mut queue = self.verify_queue.write().await; + queue.remove_txs(attached.iter().map(|tx| tx.proposal_short_id())); } }