diff --git a/src/stream/src/executor/mview/materialize.rs b/src/stream/src/executor/mview/materialize.rs index a3ec5c36a5eb..30dd733ea032 100644 --- a/src/stream/src/executor/mview/materialize.rs +++ b/src/stream/src/executor/mview/materialize.rs @@ -597,10 +597,15 @@ impl MaterializeCache { #[cfg(test)] mod tests { + use std::iter; use std::sync::atomic::AtomicU64; use futures::stream::StreamExt; - use risingwave_common::array::stream_chunk::StreamChunkTestExt; + use rand::rngs::SmallRng; + use rand::{Rng, RngCore, SeedableRng}; + use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt}; + use risingwave_common::array::stream_chunk_builder::StreamChunkBuilder; + use risingwave_common::array::Op; use risingwave_common::catalog::{ColumnDesc, ConflictBehavior, Field, Schema, TableId}; use risingwave_common::row::OwnedRow; use risingwave_common::types::DataType; @@ -609,6 +614,7 @@ mod tests { use risingwave_storage::memory::MemoryStateStore; use risingwave_storage::table::batch_table::storage_table::StorageTable; + use crate::executor::test_utils::prelude::StateTable; use crate::executor::test_utils::*; use crate::executor::*; @@ -1443,4 +1449,109 @@ mod tests { _ => unreachable!(), } } + + fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec { + const KN: u32 = 4; + const SEED: u64 = 998244353; + let mut ret = vec![]; + let mut builder = + StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]); + let mut rng = SmallRng::seed_from_u64(SEED); + + let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk { + let len = c.data_chunk().capacity(); + let mut c = StreamChunkMut::from(c); + for i in 0..len { + c.set_vis(i, rng.gen_bool(0.5)); + } + c.into() + }; + for _ in 0..row_number { + let k = (rng.next_u32() % KN) as i32; + let v = rng.next_u32() as i32; + let op = if rng.gen_bool(0.5) { + Op::Insert + } else { + Op::Delete + }; + if let Some(c) = + builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())])) + { + ret.push(random_vis(c, &mut rng)); + } + } + if let Some(c) = builder.take() { + ret.push(random_vis(c, &mut rng)); + } + ret + } + + async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) { + const N: usize = 100000; + + // Prepare storage and memtable. + let memory_state_store = MemoryStateStore::new(); + let table_id = TableId::new(1); + // Two columns of int32 type, the first column is PK. + let schema = Schema::new(vec![ + Field::unnamed(DataType::Int32), + Field::unnamed(DataType::Int32), + ]); + let column_ids = vec![0.into(), 1.into()]; + + let chunks = gen_fuzz_data(N, 128); + let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(1))) + .chain(chunks.into_iter().map(Message::Chunk)) + .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(2)))) + .collect(); + // Prepare stream executors. + let source = MockSource::with_messages(schema.clone(), PkIndices::new(), messages); + + let mut materialize_executor = Box::new( + MaterializeExecutor::for_test( + Box::new(source), + memory_state_store.clone(), + table_id, + vec![ColumnOrder::new(0, OrderType::ascending())], + column_ids, + 1, + Arc::new(AtomicU64::new(0)), + conflict_behavior, + ) + .await, + ) + .execute(); + materialize_executor.expect_barrier().await; + + let order_types = vec![OrderType::ascending()]; + let column_descs = vec![ + ColumnDesc::unnamed(0.into(), DataType::Int32), + ColumnDesc::unnamed(1.into(), DataType::Int32), + ]; + let pk_indices = vec![0]; + + let mut table = StateTable::new_without_distribution( + memory_state_store.clone(), + TableId::from(1002), + column_descs.clone(), + order_types, + pk_indices, + ) + .await; + + while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() { + // check with state table's memtable + table.write_chunk(c); + } + } + + #[tokio::test] + async fn fuzz_test_stream_consistent_upsert() { + fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await + } + + #[tokio::test] + async fn fuzz_test_stream_consistent_ignore() { + fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await + } }