Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: remove dist_key_indices in state table and storage table #8601

Merged
merged 3 commits into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,6 @@ pub struct StorageTableInner<S: StateStore, SD: ValueRowSerde> {
// FIXME: revisit constructions and usages.
pk_indices: Vec<usize>,

/// Indices of distribution key for computing vnode.
/// Note that the index is based on the all columns of the table, instead of the output ones.
// FIXME: revisit constructions and usages.
dist_key_indices: Vec<usize>,

/// Indices of distribution key for computing vnode.
/// Note that the index is based on the primary key columns by `pk_indices`.
dist_key_in_pk_indices: Vec<usize>,
Expand Down Expand Up @@ -266,7 +261,6 @@ impl<S: StateStore> StorageTableInner<S, EitherSerde> {
mapping: Arc::new(mapping),
row_serde: Arc::new(row_serde),
pk_indices,
dist_key_indices,
dist_key_in_pk_indices,
vnodes,
table_option,
Expand Down Expand Up @@ -592,23 +586,21 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
Some(Bytes::from(encoded_prefix[..prefix_len].to_vec()))
} else {
trace!(
"iter_with_pk_bounds dist_key_indices table_id {} not match prefix pk_prefix {:?} dist_key_indices {:?} pk_prefix_indices {:?}",
"iter_with_pk_bounds dist_key_indices table_id {} not match prefix pk_prefix {:?} pk_prefix_indices {:?}",
self.table_id,
pk_prefix,
self.dist_key_indices,
pk_prefix_indices
);
None
};

trace!(
"iter_with_pk_bounds table_id {} prefix_hint {:?} start_key: {:?}, end_key: {:?} pk_prefix {:?} dist_key_indices {:?} pk_prefix_indices {:?}" ,
"iter_with_pk_bounds table_id {} prefix_hint {:?} start_key: {:?}, end_key: {:?} pk_prefix {:?} pk_prefix_indices {:?}" ,
self.table_id,
prefix_hint,
start_key,
end_key,
pk_prefix,
self.dist_key_indices,
pk_prefix_indices
);

Expand Down
23 changes: 20 additions & 3 deletions src/storage/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod batch_table;

use std::sync::{Arc, LazyLock};

use itertools::Itertools;
use risingwave_common::array::DataChunk;
use risingwave_common::buffer::{Bitmap, BitmapBuilder};
use risingwave_common::catalog::Schema;
Expand Down Expand Up @@ -53,6 +54,17 @@ impl Distribution {
}
}

pub fn fallback_vnodes() -> Arc<Bitmap> {
/// A bitmap that only the default vnode is set.
static FALLBACK_VNODES: LazyLock<Arc<Bitmap>> = LazyLock::new(|| {
let mut vnodes = BitmapBuilder::zeroed(VirtualNode::COUNT);
vnodes.set(DEFAULT_VNODE.to_index(), true);
vnodes.finish().into()
});

FALLBACK_VNODES.clone()
}

/// Distribution that accesses all vnodes, mainly used for tests.
pub fn all_vnodes(dist_key_indices: Vec<usize>) -> Self {
/// A bitmap that all vnodes are set.
Expand Down Expand Up @@ -124,14 +136,19 @@ pub fn compute_vnode(row: impl Row, indices: &[usize], vnodes: &Bitmap) -> Virtu
/// Get vnode values with `indices` on the given `chunk`.
pub fn compute_chunk_vnode(
chunk: &DataChunk,
indices: &[usize],
dist_key_in_pk_indices: &[usize],
pk_indices: &[usize],
vnodes: &Bitmap,
) -> Vec<VirtualNode> {
if indices.is_empty() {
if dist_key_in_pk_indices.is_empty() {
vec![DEFAULT_VNODE; chunk.capacity()]
} else {
let dist_key_indices = dist_key_in_pk_indices
.iter()
.map(|idx| pk_indices[*idx])
.collect_vec();
chunk
.get_hash_values(indices, Crc32FastBuilder)
.get_hash_values(&dist_key_indices, Crc32FastBuilder)
.into_iter()
.zip_eq_fast(chunk.vis().iter())
.map(|(h, vis)| {
Expand Down
43 changes: 23 additions & 20 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub struct StateTableInner<
/// Indices of distribution key for computing vnode.
/// Note that the index is based on the all columns of the table, instead of the output ones.
// FIXME: revisit constructions and usages.
dist_key_indices: Vec<usize>,
// dist_key_indices: Vec<usize>,

/// Indices of distribution key for computing vnode.
/// Note that the index is based on the primary key columns by `pk_indices`.
Expand Down Expand Up @@ -198,15 +198,10 @@ where
.collect();
let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);

let Distribution {
dist_key_indices,
vnodes,
} = match vnodes {
Some(vnodes) => Distribution {
dist_key_indices,
vnodes,
},
None => Distribution::fallback(),
let vnodes = match vnodes {
Some(vnodes) => vnodes,

None => Distribution::fallback_vnodes(),
};
let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
let vnode_col_idx = *idx as usize;
Expand Down Expand Up @@ -251,7 +246,6 @@ where
pk_serde,
row_serde,
pk_indices: pk_indices.to_vec(),
dist_key_indices,
dist_key_in_pk_indices,
prefix_hint_len,
vnodes,
Expand Down Expand Up @@ -452,7 +446,6 @@ where
pk_serde,
row_serde: SD::new(&column_ids, Arc::from(data_types.into_boxed_slice())),
pk_indices,
dist_key_indices,
dist_key_in_pk_indices,
prefix_hint_len,
vnodes,
Expand All @@ -475,7 +468,7 @@ where
if self.vnode_col_idx_in_pk.is_some() {
false
} else {
self.dist_key_indices.is_empty()
self.dist_key_in_pk_indices.is_empty()
}
}

Expand Down Expand Up @@ -503,8 +496,13 @@ where
}

/// Get the vnode value of the given row
pub fn compute_vnode(&self, row: impl Row) -> VirtualNode {
compute_vnode(row, &self.dist_key_indices, &self.vnodes)
// pub fn compute_vnode(&self, row: impl Row) -> VirtualNode {
// compute_vnode(row, &self.dist_key_indices, &self.vnodes)
// }

/// Get the vnode value of the given row
pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
compute_vnode(pk, &self.dist_key_in_pk_indices, &self.vnodes)
}

// TODO: remove, should not be exposed to user
Expand All @@ -516,9 +514,9 @@ where
&self.pk_serde
}

pub fn dist_key_indices(&self) -> &[usize] {
&self.dist_key_indices
}
// pub fn dist_key_indices(&self) -> &[usize] {
// &self.dist_key_indices
// }

pub fn vnodes(&self) -> &Arc<Bitmap> {
&self.vnodes
Expand Down Expand Up @@ -724,7 +722,12 @@ where
pub fn write_chunk(&mut self, chunk: StreamChunk) {
let (chunk, op) = chunk.into_parts();

let vnodes = compute_chunk_vnode(&chunk, &self.dist_key_indices, &self.vnodes);
let vnodes = compute_chunk_vnode(
&chunk,
&self.dist_key_in_pk_indices,
&self.pk_indices,
&self.vnodes,
);

let value_chunk = if let Some(ref value_indices) = self.value_indices {
chunk.clone().reorder_columns(value_indices)
Expand Down Expand Up @@ -984,7 +987,7 @@ where
trace!(
table_id = %self.table_id(),
?prefix_hint, ?encoded_key_range_with_vnode, ?pk_prefix,
dist_key_indices = ?self.dist_key_indices, ?pk_prefix_indices,
?pk_prefix_indices,
"storage_iter_with_prefix"
);

Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ impl<S: StateStore> SortExecutor<S> {
let no_longer_owned_vnodes =
Bitmap::bit_saturate_subtract(prev_vnode_bitmap, curr_vnode_bitmap);
self.buffer.retain(|(_, pk), _| {
let vnode = self.state_table.compute_vnode(pk);
let vnode = self.state_table.compute_vnode_by_pk(pk);
!no_longer_owned_vnodes.is_set(vnode.to_index())
});
}
Expand Down