Skip to content

Commit

Permalink
Add tests for run_vms_with_signal, fix debug pause
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Jan 17, 2024
1 parent 1f6d363 commit 6f64322
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 9 deletions.
4 changes: 3 additions & 1 deletion script/src/syscalls/pause.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ impl<Mac: SupportMachine> Syscalls<Mac> for Pause {
if self.skip.load(Ordering::SeqCst) {
return Ok(true);
}
Err(VMError::CyclesExceeded)
// FIXME(yukang): currently we change use VMInternalError::CyclesExceeded | VMInternalError::Pause
// as a flag to Suspend, only for compatibility with old tests, maybe we need cleanup this later.
Err(VMError::Pause)
}
}
2 changes: 1 addition & 1 deletion script/src/syscalls/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ where
Ok(true)
}
Err(err) => {
if matches!(err, VMError::CyclesExceeded | VMError::Pause) {
if matches!(err, VMError::Pause | VMError::CyclesExceeded) {
let mut context = self.context.lock().map_err(|e| {
VMError::Unexpected(format!("Failed to acquire lock: {}", e))
})?;
Expand Down
15 changes: 11 additions & 4 deletions script/src/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1353,7 +1353,7 @@ fn run_vms(
}
}
Err(error) => match error {
VMInternalError::CyclesExceeded => {
VMInternalError::CyclesExceeded | VMInternalError::Pause => {
let mut new_suspended_machines: Vec<_> = {
let mut context = context.lock().map_err(|e| {
ScriptError::Other(format!("Failed to acquire lock: {}", e))
Expand Down Expand Up @@ -1396,24 +1396,30 @@ async fn run_vms_with_signal(
));
}

let pause = machines[0].pause();
let mut pause = machines[0].pause();
let (finished_send, mut finished_recv) =
mpsc::unbounded_channel::<(Result<i8, ckb_vm::Error>, u64)>();

// send initial `Resume` command to child
// it's maybe useful to set initial command to `signal.borrow().to_owned()`
// so that we can control the initial state of child, which is useful for testing purpose
let (child_sender, child_recv) = watch::channel(ChunkCommand::Resume);
let jh =
tokio::spawn(
async move { run_vms_child(machines, child_recv, finished_send, context).await },
);

loop {
//eprintln!("parent wait to receive: {:?}", signal.borrow().to_owned());
tokio::select! {
_ = signal.changed() => {
match signal.borrow().to_owned() {
ChunkCommand::Suspend => {
pause.interrupt();
}
ChunkCommand::Resume => {
child_sender.send(ChunkCommand::Resume).unwrap();
pause.free();
let _res = child_sender.send(ChunkCommand::Resume);
}
}
}
Expand Down Expand Up @@ -1451,6 +1457,8 @@ async fn run_vms_child(
) {
let (mut exit_code, mut cycles, mut spawn_data) = (0, 0, None);
child_recv.mark_changed();
// mark changed to make sure we can receive initial command
// and start to run immediately
loop {
select! {
_ = child_recv.changed() => {
Expand Down Expand Up @@ -1501,7 +1509,6 @@ async fn run_vms_child(
new_suspended_machines.reverse();
machines.push(machine);
machines.append(&mut new_suspended_machines);
eprintln!("suspend here: {:?}", machines.len());
// break to wait for Resume command to begin next loop iteration
break;
}
Expand Down
40 changes: 38 additions & 2 deletions script/src/verify/tests/ckb_latest/features_since_v2021.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,42 @@ fn check_exec_wrong_callee_format() {
assert!(result.is_err());
}

#[tokio::test]
async fn async_check_exec_wrong_callee_format() {
let script_version = SCRIPT_VERSION;

let (exec_caller_cell, exec_caller_data_hash) =
load_cell_from_path("testdata/exec_caller_from_cell_data");
let (exec_callee_cell, _exec_caller_data_hash) =
load_cell_from_slice(&[0x00, 0x01, 0x02, 0x03]);

let exec_caller_script = Script::new_builder()
.hash_type(script_version.data_hash_type().into())
.code_hash(exec_caller_data_hash)
.build();
let output = CellOutputBuilder::default()
.capacity(capacity_bytes!(100).pack())
.lock(exec_caller_script)
.build();
let input = CellInput::new(OutPoint::null(), 0);

let transaction = TransactionBuilder::default().input(input).build();
let dummy_cell = create_dummy_cell(output);

let rtx = ResolvedTransaction {
transaction,
resolved_cell_deps: vec![exec_caller_cell, exec_callee_cell],
resolved_inputs: vec![dummy_cell],
resolved_dep_groups: vec![],
};

let verifier = TransactionScriptsVerifierWithEnv::new();
let result = verifier
.verify_without_limit_async(script_version, &rtx)
.await;
assert!(result.is_err());
}

#[test]
fn check_exec_big_offset_length() {
let script_version = SCRIPT_VERSION;
Expand Down Expand Up @@ -518,7 +554,7 @@ fn _check_type_id_one_in_one_out_resume(step_cycles: Cycle) -> Result<(), TestCa
}
}
Err(error) => match error {
VMInternalError::CyclesExceeded => {
VMInternalError::CyclesExceeded | VMInternalError::Pause => {
tmp = Some(vm);
limit += step_cycles;
continue;
Expand Down Expand Up @@ -718,7 +754,7 @@ fn _check_typical_secp256k1_blake160_2_in_2_out_tx_with_chunk(step_cycles: Cycle
}
}
Err(error) => match error {
VMInternalError::CyclesExceeded => {
VMInternalError::CyclesExceeded | VMInternalError::Pause => {
tmp = Some(vm);
limit += step_cycles;
continue;
Expand Down
82 changes: 82 additions & 0 deletions script/src/verify/tests/ckb_latest/features_since_v2023.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,88 @@ fn check_spawn_snapshot() {
assert!(chunks_count > 1);
}

#[tokio::test]
async fn check_spawn_async() {
let script_version = SCRIPT_VERSION;
if script_version <= ScriptVersion::V1 {
return;
}

let (spawn_caller_cell, spawn_caller_data_hash) =
load_cell_from_path("testdata/spawn_caller_exec");
let (snapshot_cell, _) = load_cell_from_path("testdata/current_cycles_with_snapshot");

let spawn_caller_script = Script::new_builder()
.hash_type(script_version.data_hash_type().into())
.code_hash(spawn_caller_data_hash)
.build();
let output = CellOutputBuilder::default()
.capacity(capacity_bytes!(100).pack())
.lock(spawn_caller_script)
.build();
let input = CellInput::new(OutPoint::null(), 0);

let transaction = TransactionBuilder::default().input(input).build();
let dummy_cell = create_dummy_cell(output);

let rtx = ResolvedTransaction {
transaction,
resolved_cell_deps: vec![spawn_caller_cell, snapshot_cell],
resolved_inputs: vec![dummy_cell],
resolved_dep_groups: vec![],
};
let verifier = TransactionScriptsVerifierWithEnv::new();
let result = verifier.verify_without_pause(script_version, &rtx, Cycle::MAX);
let cycles_once = result.unwrap();

// we use debug pause to test context resume
// `current_cycles_with_snapshot` will try to pause verifier
// here we use `channel` to send Resume to verifier until it completes
let (command_tx, mut command_rx) = watch::channel(ChunkCommand::Resume);
let _jt = tokio::spawn(async move {
loop {
let res = command_tx.send(ChunkCommand::Resume);
if res.is_err() {
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
}
});
let cycles = verifier
.verify_complete_async(script_version, &rtx, &mut command_rx, false)
.await
.unwrap();
assert_eq!(cycles, cycles_once);

// we send Resume/Suspend to command_rx in a loop, make sure cycles is still the same
let (command_tx, mut command_rx) = watch::channel(ChunkCommand::Resume);
let _jt = tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
let res = command_tx.send(ChunkCommand::Suspend);
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
if res.is_err() {
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
let res = command_tx.send(ChunkCommand::Resume);
if res.is_err() {
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;

let _res = command_tx.send(ChunkCommand::Suspend);
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
}
});

let cycles = verifier
.verify_complete_async(script_version, &rtx, &mut command_rx, true)
.await
.unwrap();
assert_eq!(cycles, cycles_once);
}

#[test]
fn check_spawn_state() {
let script_version = SCRIPT_VERSION;
Expand Down
70 changes: 70 additions & 0 deletions script/src/verify/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,76 @@ impl TransactionScriptsVerifierWithEnv {
self.verify(version, rtx, u64::MAX)
}

pub(crate) async fn verify_without_limit_async(
&self,
version: ScriptVersion,
rtx: &ResolvedTransaction,
) -> Result<Cycle, Error> {
let data_loader = self.store.as_data_loader();
let epoch = match version {
ScriptVersion::V0 => EpochNumberWithFraction::new(0, 0, 1),
ScriptVersion::V1 => EpochNumberWithFraction::new(self.version_1_enabled_at, 0, 1),
ScriptVersion::V2 => EpochNumberWithFraction::new(self.version_2_enabled_at, 0, 1),
};
let header = HeaderView::new_advanced_builder()
.epoch(epoch.pack())
.build();
let tx_env = Arc::new(TxVerifyEnv::new_commit(&header));
let verifier = TransactionScriptsVerifier::new(
Arc::new(rtx.clone()),
data_loader,
Arc::clone(&self.consensus),
tx_env,
);

let (_command_tx, mut command_rx) = tokio::sync::watch::channel(ChunkCommand::Resume);
let res = verifier
.resumable_verify_with_signal(u64::MAX, &mut command_rx)
.await;
match res {
Ok(VerifyResult::Completed(cycle)) => Ok(cycle),
Ok(VerifyResult::Suspended(_)) => unreachable!(),
Err(err) => Err(err),
}
}

pub(crate) async fn verify_complete_async(
&self,
version: ScriptVersion,
rtx: &ResolvedTransaction,
command_rx: &mut tokio::sync::watch::Receiver<ChunkCommand>,
skip_debug_pause: bool,
) -> Result<Cycle, Error> {
let data_loader = self.store.as_data_loader();
let epoch = match version {
ScriptVersion::V0 => EpochNumberWithFraction::new(0, 0, 1),
ScriptVersion::V1 => EpochNumberWithFraction::new(self.version_1_enabled_at, 0, 1),
ScriptVersion::V2 => EpochNumberWithFraction::new(self.version_2_enabled_at, 0, 1),
};
let header = HeaderView::new_advanced_builder()
.epoch(epoch.pack())
.build();
let tx_env = Arc::new(TxVerifyEnv::new_commit(&header));
let verifier = TransactionScriptsVerifier::new(
Arc::new(rtx.clone()),
data_loader,
Arc::clone(&self.consensus),
tx_env,
);

if skip_debug_pause {
verifier.set_skip_pause(true);
}
let res = verifier
.resumable_verify_with_signal(Cycle::MAX, command_rx)
.await;
match res {
Ok(VerifyResult::Completed(cycle)) => Ok(cycle),
Ok(VerifyResult::Suspended(_)) => unreachable!(),
Err(err) => Err(err),
}
}

// If the max cycles is meaningless, please use `verify_without_limit`,
// so reviewers or developers can understand the intentions easier.
pub(crate) fn verify(
Expand Down
1 change: 0 additions & 1 deletion tx-pool/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,6 @@ impl TxPoolService {
}

if let Some((ret, snapshot)) = self._resumeble_process_tx(tx.clone(), remote).await {
eprintln!("resumeble_process_tx: ret = {:?}", ret);
match ret {
Ok(processed) => {
if let ProcessResult::Completed(completed) = processed {
Expand Down

0 comments on commit 6f64322

Please sign in to comment.