Skip to content

Commit

Permalink
Revert "feat(streaming): call may_exist when insert cache miss in joi…
Browse files Browse the repository at this point in the history
…n executor (#7957)"

This reverts commit 6d3dba6.
  • Loading branch information
lmatz committed Mar 20, 2023
1 parent ae99e55 commit 2027568
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 92 deletions.
6 changes: 1 addition & 5 deletions grafana/risingwave-dashboard.dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -1378,11 +1378,7 @@ def section_streaming_actors(outer_panels):
),
panels.target(
f"rate({metric('stream_join_insert_cache_miss_count')}[$__rate_interval])",
"cache miss when insert {{actor_id}} {{side}}",
),
panels.target(
f"rate({metric('stream_join_may_exist_true_count')}[$__rate_interval])",
"may_exist true when insert {{actor_id}} {{side}}",
"cache miss when insert{{actor_id}} {{side}}",
),
],
),
Expand Down
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

30 changes: 1 addition & 29 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,31 +315,6 @@ where
Distribution::fallback(),
None,
false,
0,
)
.await
}

/// Create a state table without distribution, with given `prefix_hint_len`, used for unit
/// tests.
pub async fn new_without_distribution_with_prefix_hint_len(
store: S,
table_id: TableId,
columns: Vec<ColumnDesc>,
order_types: Vec<OrderType>,
pk_indices: Vec<usize>,
prefix_hint_len: usize,
) -> Self {
Self::new_with_distribution_inner(
store,
table_id,
columns,
order_types,
pk_indices,
Distribution::fallback(),
None,
true,
prefix_hint_len,
)
.await
}
Expand All @@ -364,7 +339,6 @@ where
distribution,
value_indices,
true,
0,
)
.await
}
Expand All @@ -387,7 +361,6 @@ where
distribution,
value_indices,
false,
0,
)
.await
}
Expand All @@ -405,7 +378,6 @@ where
}: Distribution,
value_indices: Option<Vec<usize>>,
is_consistent_op: bool,
prefix_hint_len: usize,
) -> Self {
let local_state_store = store
.new_local(NewLocalOptions {
Expand Down Expand Up @@ -446,7 +418,7 @@ where
row_serde: SD::new(&column_ids, Arc::from(data_types.into_boxed_slice())),
pk_indices,
dist_key_in_pk_indices,
prefix_hint_len,
prefix_hint_len: 0,
vnodes,
table_option: Default::default(),
vnode_col_idx_in_pk: None,
Expand Down
18 changes: 5 additions & 13 deletions src/stream/src/executor/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1047,20 +1047,18 @@ mod tests {
order_types: &[OrderType],
pk_indices: &[usize],
table_id: u32,
prefix_hint_len: usize,
) -> (StateTable<MemoryStateStore>, StateTable<MemoryStateStore>) {
let column_descs = data_types
.iter()
.enumerate()
.map(|(id, data_type)| ColumnDesc::unnamed(ColumnId::new(id as i32), data_type.clone()))
.collect_vec();
let state_table = StateTable::new_without_distribution_with_prefix_hint_len(
let state_table = StateTable::new_without_distribution(
mem_state.clone(),
TableId::new(table_id),
column_descs,
order_types.to_vec(),
pk_indices.to_vec(),
prefix_hint_len,
)
.await;

Expand Down Expand Up @@ -1111,9 +1109,8 @@ mod tests {
};
let (tx_l, source_l) = MockSource::channel(schema.clone(), vec![1]);
let (tx_r, source_r) = MockSource::channel(schema, vec![1]);
let join_key_indices = vec![0];
let params_l = JoinParams::new(join_key_indices.clone(), vec![1]);
let params_r = JoinParams::new(join_key_indices.clone(), vec![1]);
let params_l = JoinParams::new(vec![0], vec![1]);
let params_r = JoinParams::new(vec![0], vec![1]);
let cond = with_condition.then(create_cond);

let mem_state = MemoryStateStore::new();
Expand All @@ -1124,7 +1121,6 @@ mod tests {
&[OrderType::ascending(), OrderType::ascending()],
&[0, 1],
0,
join_key_indices.len(),
)
.await;

Expand All @@ -1134,7 +1130,6 @@ mod tests {
&[OrderType::ascending(), OrderType::ascending()],
&[0, 1],
2,
join_key_indices.len(),
)
.await;

Expand Down Expand Up @@ -1180,9 +1175,8 @@ mod tests {
};
let (tx_l, source_l) = MockSource::channel(schema.clone(), vec![0]);
let (tx_r, source_r) = MockSource::channel(schema, vec![0]);
let join_key_indices = vec![0, 1];
let params_l = JoinParams::new(join_key_indices.clone(), vec![]);
let params_r = JoinParams::new(join_key_indices.clone(), vec![]);
let params_l = JoinParams::new(vec![0, 1], vec![]);
let params_r = JoinParams::new(vec![0, 1], vec![]);
let cond = with_condition.then(create_cond);

let mem_state = MemoryStateStore::new();
Expand All @@ -1197,7 +1191,6 @@ mod tests {
],
&[0, 1, 0],
0,
join_key_indices.len(),
)
.await;

Expand All @@ -1211,7 +1204,6 @@ mod tests {
],
&[0, 1, 1],
0,
join_key_indices.len(),
)
.await;
let schema_len = match T {
Expand Down
34 changes: 3 additions & 31 deletions src/stream/src/executor/managed_state/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ pub struct JoinHashMapMetrics {
total_lookup_count: usize,
/// How many times have we miss the cache when insert row
insert_cache_miss_count: usize,
may_exist_true_count: usize,
}

impl JoinHashMapMetrics {
Expand All @@ -173,7 +172,6 @@ impl JoinHashMapMetrics {
lookup_miss_count: 0,
total_lookup_count: 0,
insert_cache_miss_count: 0,
may_exist_true_count: 0,
}
}

Expand All @@ -190,14 +188,9 @@ impl JoinHashMapMetrics {
.join_insert_cache_miss_count
.with_label_values(&[&self.actor_id, self.side])
.inc_by(self.insert_cache_miss_count as u64);
self.metrics
.join_may_exist_true_count
.with_label_values(&[&self.actor_id, self.side])
.inc_by(self.may_exist_true_count as u64);
self.total_lookup_count = 0;
self.lookup_miss_count = 0;
self.insert_cache_miss_count = 0;
self.may_exist_true_count = 0;
}
}

Expand Down Expand Up @@ -435,22 +428,11 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
// Update cache
entry.insert(pk, value.encode());
} else if self.pk_contained_in_jk {
// Refill cache when the join key contains primary key.
// Refill cache when the join key exist in neither cache or storage.
self.metrics.insert_cache_miss_count += 1;
let mut state = JoinEntryState::default();
state.insert(pk, value.encode());
self.update_state(key, state.into());
} else {
let prefix = key.deserialize(&self.join_key_data_types)?;
self.metrics.insert_cache_miss_count += 1;
// Refill cache when the join key exists in neither cache or storage.
if !self.state.table.may_exist(&prefix).await? {
let mut state = JoinEntryState::default();
state.insert(pk, value.encode());
self.update_state(key, state.into());
} else {
self.metrics.may_exist_true_count += 1;
}
}

// Update the flush buffer.
Expand All @@ -462,6 +444,7 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {

/// Insert a row.
/// Used when the side does not need to update degree.
#[allow(clippy::unused_async)]
pub async fn insert_row(&mut self, key: &K, value: impl Row) -> StreamExecutorResult<()> {
let join_row = JoinRow::new(&value, 0);
let pk = (&value)
Expand All @@ -471,22 +454,11 @@ impl<K: HashKey, S: StateStore> JoinHashMap<K, S> {
// Update cache
entry.insert(pk, join_row.encode());
} else if self.pk_contained_in_jk {
// Refill cache when the join key contains primary key.
// Refill cache when the join key exist in neither cache or storage.
self.metrics.insert_cache_miss_count += 1;
let mut state = JoinEntryState::default();
state.insert(pk, join_row.encode());
self.update_state(key, state.into());
} else {
let prefix = key.deserialize(&self.join_key_data_types)?;
self.metrics.insert_cache_miss_count += 1;
// Refill cache when the join key exists in neither cache or storage.
if !self.state.table.may_exist(&prefix).await? {
let mut state = JoinEntryState::default();
state.insert(pk, join_row.encode());
self.update_state(key, state.into());
} else {
self.metrics.may_exist_true_count += 1;
}
}

// Update the flush buffer.
Expand Down
16 changes: 3 additions & 13 deletions src/stream/src/executor/monitor/streaming_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ pub struct StreamingMetrics {
pub join_lookup_miss_count: GenericCounterVec<AtomicU64>,
pub join_total_lookup_count: GenericCounterVec<AtomicU64>,
pub join_insert_cache_miss_count: GenericCounterVec<AtomicU64>,
pub join_may_exist_true_count: GenericCounterVec<AtomicU64>,
pub join_actor_input_waiting_duration_ns: GenericCounterVec<AtomicU64>,
pub join_match_duration_ns: GenericCounterVec<AtomicU64>,
pub join_barrier_align_duration: HistogramVec,
Expand Down Expand Up @@ -275,31 +274,23 @@ impl StreamingMetrics {

let join_lookup_miss_count = register_int_counter_vec_with_registry!(
"stream_join_lookup_miss_count",
"Join executor lookup miss count",
"Join executor lookup miss duration",
&["actor_id", "side"],
registry
)
.unwrap();

let join_total_lookup_count = register_int_counter_vec_with_registry!(
"stream_join_lookup_total_count",
"Join executor lookup total count",
"Join executor lookup total operation",
&["actor_id", "side"],
registry
)
.unwrap();

let join_insert_cache_miss_count = register_int_counter_vec_with_registry!(
"stream_join_insert_cache_miss_count",
"Count of cache miss when insert rows in join executor",
&["actor_id", "side"],
registry
)
.unwrap();

let join_may_exist_true_count = register_int_counter_vec_with_registry!(
"stream_join_may_exist_true_count",
"Count of may_exist's true returns of when insert rows in join executor",
"Join executor cache miss when insert operation",
&["actor_id", "side"],
registry
)
Expand Down Expand Up @@ -486,7 +477,6 @@ impl StreamingMetrics {
join_lookup_miss_count,
join_total_lookup_count,
join_insert_cache_miss_count,
join_may_exist_true_count,
join_actor_input_waiting_duration_ns,
join_match_duration_ns,
join_barrier_align_duration,
Expand Down

0 comments on commit 2027568

Please sign in to comment.