Skip to content

Commit

Permalink
add comments and more cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Jan 18, 2024
1 parent 9c66560 commit b0f1d76
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 32 deletions.
2 changes: 1 addition & 1 deletion script/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ ckb-logger = { path = "../util/logger", version = "= 0.114.0-pre", optional = tr
serde = { version = "1.0", features = ["derive"] }
ckb-error = { path = "../error", version = "= 0.114.0-pre" }
ckb-chain-spec = { path = "../spec", version = "= 0.114.0-pre" }
tokio = { version = "1.35.0", features = ["sync", "rt-multi-thread"] }
tokio = { version = "1.35.0", features = ["rt-multi-thread"] }

[dev-dependencies]
proptest = "1.0"
Expand Down
3 changes: 3 additions & 0 deletions script/src/syscalls/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ where
Ok(true)
}
Err(err) => {
// `CyclesExceeded` for old version snapshot
// `Pause` for new version suspend with pause signal
// Maybe we need to cleanup in future
if matches!(err, VMError::Pause | VMError::CyclesExceeded) {
let mut context = self.context.lock().map_err(|e| {
VMError::Unexpected(format!("Failed to acquire lock: {}", e))
Expand Down
28 changes: 11 additions & 17 deletions script/src/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,8 +693,9 @@ impl<DL: CellDataProvider + HeaderProvider + ExtensionProvider + Send + Sync + C
Ok(VerifyResult::Completed(cycles))
}

/// Performing a resumable verification on the transaction scripts with signal
/// If `Suspend` comes from `command_rx`, the process will be hang up until `Resume` comes.
/// Performing a resumable verification on the transaction scripts with signal channel,
/// if `Suspend` comes from `command_rx`, the process will be hang up until `Resume` comes,
/// otherwise, it will return until the verification is completed.
pub async fn resumable_verify_with_signal(
&self,
limit_cycles: Cycle,
Expand All @@ -718,8 +719,8 @@ impl<DL: CellDataProvider + HeaderProvider + ExtensionProvider + Send + Sync + C
cycles = wrapping_cycles_add(cycles, used_cycles, group)?;
}
Ok(ChunkState::Suspended(_vms, _context)) => {
// FIXME: we need to cleanup this later, state will not contain snapshot
panic!("unexpect suspend in resumable_verify_with_signal");
// FIXME(yukang): we need to cleanup this later, state will not contain snapshot
unreachable!("unexpect suspend in resumable_verify_with_signal");
}
Err(e) => {
#[cfg(feature = "logging")]
Expand Down Expand Up @@ -1051,10 +1052,7 @@ impl<DL: CellDataProvider + HeaderProvider + ExtensionProvider + Send + Sync + C
script_group: group,
max_cycles,
};
match verifier.verify() {
Ok(cycles) => Ok(ChunkState::Completed(cycles)),
Err(e) => Err(e),
}
verifier.verify().map(ChunkState::Completed)
} else {
self.chunk_run_with_signal(group, max_cycles, command_rx)
.await
Expand Down Expand Up @@ -1200,7 +1198,7 @@ impl<DL: CellDataProvider + HeaderProvider + ExtensionProvider + Send + Sync + C
vec![ResumableMachine::initial(machine)]
};

run_vms_with_signal(script_group, max_cycles, machines, context, command_rx).await
run_vms_with_signal(script_group, machines, context, command_rx).await
}

fn chunk_run(
Expand Down Expand Up @@ -1383,9 +1381,9 @@ fn run_vms(
}

// Run a series of VMs with control signal, will only return when verification finished
// Or send `Stop` command when verification is suspended
async fn run_vms_with_signal(
script_group: &ScriptGroup,
max_cycles: Cycle,
machines: Vec<ResumableMachine>,
context: Arc<Mutex<MachineContext>>,
signal: &mut Receiver<ChunkCommand>,
Expand Down Expand Up @@ -1435,11 +1433,7 @@ async fn run_vms_with_signal(
exit_code
))},
(Err(err), _) => {
let map_vm_internal_error = |error: VMInternalError| match error {
VMInternalError::CyclesExceeded => ScriptError::ExceededMaximumCycles(max_cycles),
_ => ScriptError::VMInternalError(error),
};
return Err(map_vm_internal_error(err));
return Err(ScriptError::VMInternalError(err));
}
}

Expand All @@ -1456,13 +1450,13 @@ async fn run_vms_child(
context: Arc<Mutex<MachineContext>>,
) {
let (mut exit_code, mut cycles, mut spawn_data) = (0, 0, None);
child_recv.mark_changed();
// mark changed to make sure we can receive initial command
// and start to run immediately
child_recv.mark_changed();
loop {
select! {
_ = child_recv.changed() => {
match child_recv.borrow().to_owned() {
match *child_recv.borrow() {
ChunkCommand::Stop => {
let exit = (Err(ckb_vm::Error::Unexpected("stopped".to_string())), cycles);
let _ = finished_send.send(exit);
Expand Down
28 changes: 14 additions & 14 deletions tx-pool/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,16 @@ impl TxPoolService {
}
}

pub(crate) async fn verify_queue_contains(&self, tx: &TransactionView) -> bool {
let queue = self.verify_queue.read().await;
queue.contains_key(&tx.proposal_short_id())
}

pub(crate) async fn orphan_contains(&self, tx: &TransactionView) -> bool {
let orphan = self.orphan.read().await;
orphan.contains_key(&tx.proposal_short_id())
}

pub(crate) async fn chunk_contains(&self, tx: &TransactionView) -> bool {
let chunk = self.verify_queue.read().await;
chunk.contains_key(&tx.proposal_short_id())
}

pub(crate) async fn with_tx_pool_read_lock<U, F: FnMut(&TxPool, Arc<Snapshot>) -> U>(
&self,
mut f: F,
Expand Down Expand Up @@ -302,7 +302,7 @@ impl TxPoolService {
return Err(Reject::Duplicated(tx.hash()));
}

if self.chunk_contains(&tx).await {
if self.verify_queue_contains(&tx).await {
return Err(Reject::Duplicated(tx.hash()));
}

Expand Down Expand Up @@ -334,7 +334,7 @@ impl TxPoolService {
// non contextual verify first
self.non_contextual_verify(&tx, remote)?;

if self.chunk_contains(&tx).await || self.orphan_contains(&tx).await {
if self.verify_queue_contains(&tx).await || self.orphan_contains(&tx).await {
return Err(Reject::Duplicated(tx.hash()));
}

Expand Down Expand Up @@ -365,8 +365,8 @@ impl TxPoolService {
pub(crate) async fn remove_tx(&self, tx_hash: Byte32) -> bool {
let id = ProposalShortId::from_tx_hash(&tx_hash);
{
let mut chunk = self.verify_queue.write().await;
if chunk.remove_tx(&id).is_some() {
let mut queue = self.verify_queue.write().await;
if queue.remove_tx(&id).is_some() {
return true;
}
}
Expand Down Expand Up @@ -527,7 +527,7 @@ impl TxPoolService {
for orphan in orphans.into_iter() {
if orphan.cycle > self.tx_pool_config.max_tx_verify_cycles {
debug!(
"process_orphan {} added to chunk; find previous from {}",
"process_orphan {} added to verify queue; find previous from {}",
orphan.tx.hash(),
tx.hash(),
);
Expand Down Expand Up @@ -828,8 +828,8 @@ impl TxPoolService {
tx: TransactionView,
remote: Option<(Cycle, PeerIndex)>,
) -> Result<bool, Reject> {
let mut chunk = self.verify_queue.write().await;
chunk.add_tx(tx, remote)
let mut queue = self.verify_queue.write().await;
queue.add_tx(tx, remote)
}

pub(crate) async fn _process_tx(
Expand Down Expand Up @@ -1015,8 +1015,8 @@ impl TxPoolService {

self.remove_orphan_txs_by_attach(&attached).await;
{
let mut chunk = self.verify_queue.write().await;
chunk.remove_txs(attached.iter().map(|tx| tx.proposal_short_id()));
let mut queue = self.verify_queue.write().await;
queue.remove_txs(attached.iter().map(|tx| tx.proposal_short_id()));
}
}

Expand Down

0 comments on commit b0f1d76

Please sign in to comment.