Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(streaming): add fuzz test for materialize handle conflict #14166

Merged
merged 3 commits into from
Jan 3, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 112 additions & 1 deletion src/stream/src/executor/mview/materialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,10 +597,15 @@ impl<SD: ValueRowSerde> MaterializeCache<SD> {
#[cfg(test)]
mod tests {

use std::iter;
use std::sync::atomic::AtomicU64;

use futures::stream::StreamExt;
use risingwave_common::array::stream_chunk::StreamChunkTestExt;
use rand::rngs::SmallRng;
use rand::{Rng, RngCore, SeedableRng};
use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder;
use risingwave_common::array::Op;
use risingwave_common::catalog::{ColumnDesc, ConflictBehavior, Field, Schema, TableId};
use risingwave_common::row::OwnedRow;
use risingwave_common::types::DataType;
Expand All @@ -609,6 +614,7 @@ mod tests {
use risingwave_storage::memory::MemoryStateStore;
use risingwave_storage::table::batch_table::storage_table::StorageTable;

use crate::executor::test_utils::prelude::StateTable;
use crate::executor::test_utils::*;
use crate::executor::*;

Expand Down Expand Up @@ -1443,4 +1449,109 @@ mod tests {
_ => unreachable!(),
}
}

fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
const KN: u32 = 4;
const SEED: u64 = 998244353;
let mut ret = vec![];
let mut builder =
StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
let mut rng = SmallRng::seed_from_u64(SEED);

let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
let len = c.data_chunk().capacity();
let mut c = StreamChunkMut::from(c);
for i in 0..len {
c.set_vis(i, rng.gen_bool(0.5));
}
c.into()
};
for _ in 0..row_number {
let k = (rng.next_u32() % KN) as i32;
let v = rng.next_u32() as i32;
let op = if rng.gen_bool(0.5) {
Op::Insert
} else {
Op::Delete
};
if let Some(c) =
builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
{
ret.push(random_vis(c, &mut rng));
}
}
if let Some(c) = builder.take() {
ret.push(random_vis(c, &mut rng));
}
ret
}

async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
const N: usize = 100000;

// Prepare storage and memtable.
let memory_state_store = MemoryStateStore::new();
let table_id = TableId::new(1);
// Two columns of int32 type, the first column is PK.
let schema = Schema::new(vec![
Field::unnamed(DataType::Int32),
Field::unnamed(DataType::Int32),
]);
let column_ids = vec![0.into(), 1.into()];

let chunks = gen_fuzz_data(N, 128);
let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(1)))
.chain(chunks.into_iter().map(Message::Chunk))
.chain(iter::once(Message::Barrier(Barrier::new_test_barrier(2))))
.collect();
// Prepare stream executors.
let source = MockSource::with_messages(schema.clone(), PkIndices::new(), messages);

let mut materialize_executor = Box::new(
MaterializeExecutor::for_test(
Box::new(source),
memory_state_store.clone(),
table_id,
vec![ColumnOrder::new(0, OrderType::ascending())],
column_ids,
1,
Arc::new(AtomicU64::new(0)),
conflict_behavior,
)
.await,
)
.execute();
materialize_executor.expect_barrier().await;

let order_types = vec![OrderType::ascending()];
let column_descs = vec![
ColumnDesc::unnamed(0.into(), DataType::Int32),
ColumnDesc::unnamed(1.into(), DataType::Int32),
];
let pk_indices = vec![0];

let mut table = StateTable::new_without_distribution(
memory_state_store.clone(),
TableId::from(1002),
column_descs.clone(),
order_types,
pk_indices,
)
.await;

while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
// check with state table's memtable
table.write_chunk(c);
}
}

#[tokio::test]
async fn fuzz_test_stream_consistent_upsert() {
fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
}

#[tokio::test]
async fn fuzz_test_stream_consistent_ignore() {
fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
}
}
Loading