Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <i@bugenzhao.com>
  • Loading branch information
BugenZhao committed Oct 28, 2022
1 parent 7ad842d commit 1040276
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 5 deletions.
1 change: 1 addition & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ set -e
cargo nextest run \
-p risingwave_simulation_scale \
--tests \
"$@"
"""

Expand Down
37 changes: 32 additions & 5 deletions src/tests/simulation_scale/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub struct Cluster {
}

impl Cluster {
pub async fn start(conf: Configuration) -> Result<Self> {
async fn start_inner(conf: Configuration) -> Result<Self> {
let handle = madsim::runtime::Handle::current();
println!("seed = {}", handle.seed());
println!("{:?}", conf);
Expand Down Expand Up @@ -185,7 +185,11 @@ impl Cluster {
})
}

pub async fn run(&mut self, sql: impl Into<String>) -> Result<String> {
pub fn start(conf: Configuration) -> BoxFuture<'static, Result<Self>> {
self.start_inner(conf)
}

async fn run_inner(&mut self, sql: impl Into<String>) -> Result<String> {
let frontend = self
.frontends
.choose(&mut thread_rng())
Expand All @@ -207,7 +211,11 @@ impl Cluster {
Ok(result)
}

pub async fn wait_until(
pub fn run(&mut self, sql: impl Into<String>) -> BoxFuture<'_, Result<String>> {
self.run_inner(sql)
}

async fn wait_until_inner(
&mut self,
sql: impl Into<String> + Clone,
mut p: impl FnMut(&str) -> bool,
Expand All @@ -231,13 +239,32 @@ impl Cluster {
}
}

pub async fn wait_until_non_empty(
pub fn wait_until(
&mut self,
sql: impl Into<String> + Clone,
mut p: impl FnMut(&str) -> bool,
interval: Duration,
timeout: Duration,
) -> BoxFuture<'static, Result<String>> {
self.wait_until_inner(sql, p, interval, timeout)
}

async fn wait_until_non_empty_inner(
&mut self,
sql: impl Into<String> + Clone,
interval: Duration,
timeout: Duration,
) -> Result<String> {
self.wait_until(sql, |r| !r.trim().is_empty(), interval, timeout)
self.wait_until_inner(sql, |r| !r.trim().is_empty(), interval, timeout)
.await
}

pub fn wait_until_non_empty(
&mut self,
sql: impl Into<String> + Clone,
interval: Duration,
timeout: Duration,
) -> BoxFuture<'_, Result<String>> {
self.wait_until_non_empty_inner(sql, interval, timeout)
}
}
1 change: 1 addition & 0 deletions src/tests/simulation_scale/tests/nexmark_chaos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use std::time::Duration;

use anyhow::Result;
use futures::future::BoxFuture;
use itertools::Itertools;
use madsim::time::sleep;
use risingwave_simulation_scale::cluster::Configuration;
Expand Down

0 comments on commit 1040276

Please sign in to comment.