Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(compactor): fix put key miss tombstone #14233

Merged
merged 3 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
319 changes: 229 additions & 90 deletions src/storage/hummock_test/src/compactor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub(crate) mod tests {
};
use risingwave_meta::hummock::{HummockManagerRef, MockHummockMetaClient};
use risingwave_pb::common::{HostAddress, WorkerType};
use risingwave_pb::hummock::{CompactTask, InputLevel, KeyRange, TableOption};
use risingwave_pb::hummock::{CompactTask, InputLevel, KeyRange, SstableInfo, TableOption};
use risingwave_pb::meta::add_worker_node_request::Property;
use risingwave_rpc_client::HummockMetaClient;
use risingwave_storage::filter_key_extractor::{
Expand All @@ -62,9 +62,10 @@ pub(crate) mod tests {
use risingwave_storage::hummock::test_utils::gen_test_sstable_info;
use risingwave_storage::hummock::value::HummockValue;
use risingwave_storage::hummock::{
CachePolicy, CompressionAlgorithm, HummockStorage as GlobalHummockStorage, HummockStorage,
MemoryLimiter, SharedComapctorObjectIdManager, Sstable, SstableBuilderOptions,
SstableIteratorReadOptions, SstableObjectIdManager,
BlockedXor16FilterBuilder, CachePolicy, CompressionAlgorithm, FilterBuilder,
HummockStorage as GlobalHummockStorage, HummockStorage, MemoryLimiter,
SharedComapctorObjectIdManager, Sstable, SstableBuilder, SstableBuilderOptions,
SstableIteratorReadOptions, SstableObjectIdManager, SstableWriterOptions,
};
use risingwave_storage::monitor::{CompactorMetrics, StoreLocalStatistic};
use risingwave_storage::opts::StorageOpts;
Expand Down Expand Up @@ -1357,6 +1358,82 @@ pub(crate) mod tests {
}

type KeyValue = (FullKey<Vec<u8>>, HummockValue<Vec<u8>>);
async fn check_compaction_result(
sstable_store: SstableStoreRef,
ret: Vec<SstableInfo>,
fast_ret: Vec<SstableInfo>,
capacity: u64,
) {
let mut fast_tables = Vec::with_capacity(fast_ret.len());
let mut normal_tables = Vec::with_capacity(ret.len());
let mut stats = StoreLocalStatistic::default();
for sst_info in &fast_ret {
fast_tables.push(sstable_store.sstable(sst_info, &mut stats).await.unwrap());
}

for sst_info in &ret {
normal_tables.push(sstable_store.sstable(sst_info, &mut stats).await.unwrap());
}
assert!(fast_ret.iter().all(|f| f.file_size < capacity * 6 / 5));
println!(
"fast sstables file size: {:?}",
fast_ret.iter().map(|f| f.file_size).collect_vec(),
);
assert!(can_concat(&ret));
assert!(can_concat(&fast_ret));
let read_options = Arc::new(SstableIteratorReadOptions::default());

let mut normal_iter = UserIterator::for_test(
ConcatIterator::new(ret, sstable_store.clone(), read_options.clone()),
(Bound::Unbounded, Bound::Unbounded),
);
let mut fast_iter = UserIterator::for_test(
ConcatIterator::new(fast_ret, sstable_store.clone(), read_options.clone()),
(Bound::Unbounded, Bound::Unbounded),
);

normal_iter.rewind().await.unwrap();
fast_iter.rewind().await.unwrap();
let mut count = 0;
while normal_iter.is_valid() {
assert_eq!(
normal_iter.key(),
fast_iter.key(),
"not equal in {}, len: {} {} vs {}",
count,
normal_iter.key().user_key.table_key.as_ref().len(),
u64::from_be_bytes(
normal_iter.key().user_key.table_key.as_ref()[0..8]
.try_into()
.unwrap()
),
u64::from_be_bytes(
fast_iter.key().user_key.table_key.as_ref()[0..8]
.try_into()
.unwrap()
),
);
let hash = Sstable::hash_for_bloom_filter(
fast_iter.key().user_key.encode().as_slice(),
fast_iter.key().user_key.table_id.table_id,
);
assert_eq!(normal_iter.value(), fast_iter.value());
let key_ref = fast_iter.key().user_key.as_ref();
assert!(normal_tables.iter().any(|table| {
table
.value()
.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash)
}));
assert!(fast_tables.iter().any(|table| {
table
.value()
.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash)
}));
normal_iter.next().await.unwrap();
fast_iter.next().await.unwrap();
count += 1;
}
}

async fn test_fast_compact_impl(data: Vec<Vec<KeyValue>>) {
let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) =
Expand Down Expand Up @@ -1399,7 +1476,6 @@ pub(crate) mod tests {
println!("generate ssts size: {}", sst.file_size);
ssts.push(sst);
}
let read_options = Arc::new(SstableIteratorReadOptions::default());
let select_file_count = ssts.len() / 2;

let task = CompactTask {
Expand Down Expand Up @@ -1457,91 +1533,7 @@ pub(crate) mod tests {
let ret = ret1.into_iter().map(|sst| sst.sst_info).collect_vec();
let (ssts, _) = fast_compact_runner.run().await.unwrap();
let fast_ret = ssts.into_iter().map(|sst| sst.sst_info).collect_vec();
println!("ssts: {} vs {}", fast_ret.len(), ret.len());
let mut fast_tables = Vec::with_capacity(fast_ret.len());
let mut normal_tables = Vec::with_capacity(ret.len());
let mut stats = StoreLocalStatistic::default();
for sst_info in &fast_ret {
fast_tables.push(
compact_ctx
.sstable_store
.sstable(sst_info, &mut stats)
.await
.unwrap(),
);
}

for sst_info in &ret {
normal_tables.push(
compact_ctx
.sstable_store
.sstable(sst_info, &mut stats)
.await
.unwrap(),
);
}
assert!(fast_ret.iter().all(|f| f.file_size < capacity * 6 / 5));
println!(
"fast sstables file size: {:?}",
fast_ret.iter().map(|f| f.file_size).collect_vec(),
);
assert!(can_concat(&ret));
assert!(can_concat(&fast_ret));

let mut normal_iter = UserIterator::for_test(
ConcatIterator::new(ret, compact_ctx.sstable_store.clone(), read_options.clone()),
(Bound::Unbounded, Bound::Unbounded),
);
let mut fast_iter = UserIterator::for_test(
ConcatIterator::new(
fast_ret,
compact_ctx.sstable_store.clone(),
read_options.clone(),
),
(Bound::Unbounded, Bound::Unbounded),
);

normal_iter.rewind().await.unwrap();
fast_iter.rewind().await.unwrap();
let mut count = 0;
while normal_iter.is_valid() {
assert_eq!(
normal_iter.key(),
fast_iter.key(),
"not equal in {}, len: {} {} vs {}",
count,
normal_iter.key().user_key.table_key.as_ref().len(),
u64::from_be_bytes(
normal_iter.key().user_key.table_key.as_ref()[0..8]
.try_into()
.unwrap()
),
u64::from_be_bytes(
fast_iter.key().user_key.table_key.as_ref()[0..8]
.try_into()
.unwrap()
),
);
let hash = Sstable::hash_for_bloom_filter(
fast_iter.key().user_key.encode().as_slice(),
fast_iter.key().user_key.table_id.table_id,
);
assert_eq!(normal_iter.value(), fast_iter.value());
let key_ref = fast_iter.key().user_key.as_ref();
assert!(normal_tables.iter().any(|table| {
table
.value()
.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash)
}));
assert!(fast_tables.iter().any(|table| {
table
.value()
.may_match_hash(&(Bound::Included(key_ref), Bound::Included(key_ref)), hash)
}));
normal_iter.next().await.unwrap();
fast_iter.next().await.unwrap();
count += 1;
}
check_compaction_result(compact_ctx.sstable_store, ret, fast_ret, capacity).await;
}

#[tokio::test]
Expand Down Expand Up @@ -1645,4 +1637,151 @@ pub(crate) mod tests {
}
test_fast_compact_impl(vec![data1, data2]).await;
}

#[tokio::test]
async fn test_tombstone_recycle() {
let (env, hummock_manager_ref, _cluster_manager_ref, worker_node) =
setup_compute_env(8080).await;
let hummock_meta_client: Arc<dyn HummockMetaClient> = Arc::new(MockHummockMetaClient::new(
hummock_manager_ref.clone(),
worker_node.id,
));
let existing_table_id: u32 = 1;
let storage = get_hummock_storage(
hummock_meta_client.clone(),
get_notification_client_for_test(env, hummock_manager_ref.clone(), worker_node.clone()),
&hummock_manager_ref,
TableId::from(existing_table_id),
)
.await;
hummock_manager_ref.get_new_sst_ids(10).await.unwrap();
let (compact_ctx, _) = prepare_compactor_and_filter(&storage, existing_table_id);

let sstable_store = compact_ctx.sstable_store.clone();
let capacity = 256 * 1024;
let opts = SstableBuilderOptions {
capacity,
block_capacity: 2048,
restart_interval: 16,
bloom_false_positive: 0.1,
compression_algorithm: CompressionAlgorithm::Lz4,
..Default::default()
};

const KEY_COUNT: usize = 20000;
let mut rng = rand::rngs::StdRng::seed_from_u64(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
);
let mut sst_infos = vec![];
let mut max_sst_file_size = 0;

for object_id in 1..3 {
let mut builder = SstableBuilder::<_, BlockedXor16FilterBuilder>::new(
object_id,
sstable_store
.clone()
.create_sst_writer(object_id, SstableWriterOptions::default()),
BlockedXor16FilterBuilder::create(opts.bloom_false_positive, opts.capacity / 16),
opts.clone(),
Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor)),
None,
);
let mut last_k: u64 = 1;
let init_epoch = 100 * object_id;
let mut last_epoch = init_epoch;
for idx in 0..KEY_COUNT {
let rand_v = rng.next_u32() % 10;
let (k, epoch) = if rand_v == 0 {
(last_k + 1000 * object_id, init_epoch)
} else if rand_v < 5 {
(last_k, last_epoch - 1)
} else {
(last_k + 1, init_epoch)
};
let key = k.to_be_bytes().to_vec();
let key = FullKey::new(TableId::new(1), TableKey(key.as_slice()), epoch);
let rand_v = rng.next_u32() % 10;
let v = if (5..7).contains(&rand_v) {
HummockValue::delete()
} else {
HummockValue::put(format!("{}-{}", idx, epoch).into_bytes())
};
if rand_v < 5 && builder.current_block_size() > opts.block_capacity / 2 {
// cut block when the key is same with the last key.
builder.build_block().await.unwrap();
}
builder.add(key, v.as_slice()).await.unwrap();
last_k = k;
last_epoch = epoch;
}

let output = builder.finish().await.unwrap();
output.writer_output.await.unwrap().unwrap();
let sst_info = output.sst_info.sst_info;
max_sst_file_size = std::cmp::max(max_sst_file_size, sst_info.file_size);
sst_infos.push(sst_info);
}

let target_file_size = max_sst_file_size / 4;

let task = CompactTask {
input_ssts: vec![
InputLevel {
level_idx: 5,
level_type: 1,
table_infos: sst_infos.drain(..1).collect_vec(),
},
InputLevel {
level_idx: 6,
level_type: 1,
table_infos: sst_infos,
},
],
existing_table_ids: vec![1],
task_id: 1,
watermark: 1000,
splits: vec![KeyRange::inf()],
target_level: 6,
base_level: 4,
target_file_size,
compression_algorithm: 1,
gc_delete_keys: true,
..Default::default()
};
let multi_filter_key_extractor =
Arc::new(FilterKeyExtractorImpl::FullKey(FullKeyFilterKeyExtractor));
let compaction_filter = DummyCompactionFilter {};
let slow_compact_runner = CompactorRunner::new(
0,
compact_ctx.clone(),
task.clone(),
Box::new(SharedComapctorObjectIdManager::for_test(
VecDeque::from_iter([5, 6, 7, 8, 9, 10, 11, 12, 13]),
)),
);
let fast_compact_runner = FastCompactorRunner::new(
compact_ctx.clone(),
task.clone(),
multi_filter_key_extractor.clone(),
Box::new(SharedComapctorObjectIdManager::for_test(
VecDeque::from_iter([22, 23, 24, 25, 26, 27, 28, 29]),
)),
Arc::new(TaskProgress::default()),
);
let (_, ret1, _) = slow_compact_runner
.run(
compaction_filter,
multi_filter_key_extractor,
Arc::new(TaskProgress::default()),
)
.await
.unwrap();
let ret = ret1.into_iter().map(|sst| sst.sst_info).collect_vec();
let (ssts, _) = fast_compact_runner.run().await.unwrap();
let fast_ret = ssts.into_iter().map(|sst| sst.sst_info).collect_vec();
check_compaction_result(compact_ctx.sstable_store, ret, fast_ret, target_file_size).await;
}
}
15 changes: 8 additions & 7 deletions src/storage/src/hummock/compactor/fast_compactor_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,13 +466,8 @@ impl CompactorRunner {
let smallest_key = FullKey::decode(sstable_iter.next_block_smallest()).to_vec();
let (block, filter_data, block_meta) =
sstable_iter.download_next_block().await?.unwrap();
if self.executor.builder.need_flush() {
let largest_key = sstable_iter.sstable.value().meta.largest_key.clone();
let target_key = FullKey::decode(&largest_key);
sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?;
let mut iter = sstable_iter.iter.take().unwrap();
self.executor.run(&mut iter, target_key).await?;
} else {
let is_new_user_key = !self.executor.last_key.user_key.eq(&smallest_key.user_key);
if !self.executor.builder.need_flush() && is_new_user_key {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add some comments here? It's kind of confusing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I have add comment for it.

let largest_key = sstable_iter.current_block_largest();
let block_len = block.len() as u64;
let block_key_count = block_meta.total_key_count;
Expand All @@ -487,6 +482,12 @@ impl CompactorRunner {
}
self.executor.may_report_process_key(block_key_count);
self.executor.clear();
} else {
let largest_key = sstable_iter.sstable.value().meta.largest_key.clone();
let target_key = FullKey::decode(&largest_key);
sstable_iter.init_block_iter(block, block_meta.uncompressed_size as usize)?;
let mut iter = sstable_iter.iter.take().unwrap();
self.executor.run(&mut iter, target_key).await?;
}
}
rest_data.next_sstable().await?;
Expand Down
Loading
Loading