From 79b499ce1895e1eaedb40c3b34e0ea0deef7bb2b Mon Sep 17 00:00:00 2001 From: stonepage <40830455+st1page@users.noreply.github.com> Date: Fri, 10 Mar 2023 15:02:19 +0800 Subject: [PATCH] refactor(optimizer): move some methods into core struct && refactor the join's predicate push down (#8455) --- .../optimizer/plan_node/generic/hop_window.rs | 141 ++++++++++++++++- .../src/optimizer/plan_node/generic/join.rs | 129 +++++++++++++++ .../src/optimizer/plan_node/logical_apply.rs | 26 +-- .../optimizer/plan_node/logical_hop_window.rs | 148 +++--------------- .../src/optimizer/plan_node/logical_join.rs | 63 +------- .../optimizer/plan_node/stream_hop_window.rs | 4 +- 6 files changed, 302 insertions(+), 209 deletions(-) diff --git a/src/frontend/src/optimizer/plan_node/generic/hop_window.rs b/src/frontend/src/optimizer/plan_node/generic/hop_window.rs index 5d6abbe2bad4..d93ef973a300 100644 --- a/src/frontend/src/optimizer/plan_node/generic/hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/generic/hop_window.rs @@ -13,14 +13,18 @@ // limitations under the License. use std::fmt; +use std::num::NonZeroUsize; use itertools::Itertools; use risingwave_common::catalog::{Field, Schema}; +use risingwave_common::error::Result; use risingwave_common::types::{DataType, IntervalUnit, IntervalUnitDisplay}; +use risingwave_common::util::column_index_mapping::ColIndexMapping; +use risingwave_expr::ExprError; use super::super::utils::IndicesDisplay; use super::{GenericPlanNode, GenericPlanRef}; -use crate::expr::{InputRef, InputRefDisplay}; +use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef, InputRefDisplay, Literal}; use crate::optimizer::optimizer_context::OptimizerContextRef; /// [`HopWindow`] implements Hop Table Function. @@ -104,6 +108,141 @@ impl HopWindow { ) } + pub fn internal_window_start_col_idx(&self) -> usize { + self.input.schema().len() + } + + pub fn internal_window_end_col_idx(&self) -> usize { + self.internal_window_start_col_idx() + 1 + } + + pub fn o2i_col_mapping(&self) -> ColIndexMapping { + self.output2internal_col_mapping() + .composite(&self.internal2input_col_mapping()) + } + + pub fn i2o_col_mapping(&self) -> ColIndexMapping { + self.input2internal_col_mapping() + .composite(&self.internal2output_col_mapping()) + } + + pub fn internal_column_num(&self) -> usize { + self.internal_window_start_col_idx() + 2 + } + + pub fn output2internal_col_mapping(&self) -> ColIndexMapping { + self.internal2output_col_mapping().inverse() + } + + pub fn internal2output_col_mapping(&self) -> ColIndexMapping { + ColIndexMapping::with_remaining_columns(&self.output_indices, self.internal_column_num()) + } + + pub fn input2internal_col_mapping(&self) -> ColIndexMapping { + ColIndexMapping::identity_or_none( + self.internal_window_start_col_idx(), + self.internal_column_num(), + ) + } + + pub fn internal2input_col_mapping(&self) -> ColIndexMapping { + ColIndexMapping::identity_or_none( + self.internal_column_num(), + self.internal_window_start_col_idx(), + ) + } + + pub fn derive_window_start_and_end_exprs(&self) -> Result<(Vec, Vec)> { + let Self { + window_size, + window_slide, + time_col, + .. + } = &self; + let units = window_size + .exact_div(window_slide) + .and_then(|x| NonZeroUsize::new(usize::try_from(x).ok()?)) + .ok_or_else(|| ExprError::InvalidParam { + name: "window", + reason: format!( + "window_size {} cannot be divided by window_slide {}", + window_size, window_slide + ), + })? + .get(); + let window_size_expr = Literal::new(Some((*window_size).into()), DataType::Interval).into(); + let window_slide_expr: ExprImpl = + Literal::new(Some((*window_slide).into()), DataType::Interval).into(); + let window_size_sub_slide = FunctionCall::new( + ExprType::Subtract, + vec![window_size_expr, window_slide_expr.clone()], + )? + .into(); + + let time_col_shifted = FunctionCall::new( + ExprType::Subtract, + vec![ + ExprImpl::InputRef(Box::new(time_col.clone())), + window_size_sub_slide, + ], + )? + .into(); + + let hop_start: ExprImpl = FunctionCall::new( + ExprType::TumbleStart, + vec![time_col_shifted, window_slide_expr], + )? + .into(); + + let mut window_start_exprs = Vec::with_capacity(units); + let mut window_end_exprs = Vec::with_capacity(units); + for i in 0..units { + { + let window_start_offset = + window_slide + .checked_mul_int(i) + .ok_or_else(|| ExprError::InvalidParam { + name: "window", + reason: format!( + "window_slide {} cannot be multiplied by {}", + window_slide, i + ), + })?; + let window_start_offset_expr = + Literal::new(Some(window_start_offset.into()), DataType::Interval).into(); + let window_start_expr = FunctionCall::new( + ExprType::Add, + vec![hop_start.clone(), window_start_offset_expr], + )? + .into(); + window_start_exprs.push(window_start_expr); + } + { + let window_end_offset = + window_slide.checked_mul_int(i + units).ok_or_else(|| { + ExprError::InvalidParam { + name: "window", + reason: format!( + "window_slide {} cannot be multiplied by {}", + window_slide, + i + units + ), + } + })?; + let window_end_offset_expr = + Literal::new(Some(window_end_offset.into()), DataType::Interval).into(); + let window_end_expr = FunctionCall::new( + ExprType::Add, + vec![hop_start.clone(), window_end_offset_expr], + )? + .into(); + window_end_exprs.push(window_end_expr); + } + } + assert_eq!(window_start_exprs.len(), window_end_exprs.len()); + Ok((window_start_exprs, window_end_exprs)) + } + pub fn fmt_fields_with_builder(&self, builder: &mut fmt::DebugStruct<'_, '_>) { let output_type = DataType::window_of(&self.time_col.data_type).unwrap(); builder.field( diff --git a/src/frontend/src/optimizer/plan_node/generic/join.rs b/src/frontend/src/optimizer/plan_node/generic/join.rs index d1fc64caee01..b8cf9e132fed 100644 --- a/src/frontend/src/optimizer/plan_node/generic/join.rs +++ b/src/frontend/src/optimizer/plan_node/generic/join.rs @@ -252,3 +252,132 @@ impl Join { } } } + +/// Try to split and pushdown `predicate` into a into a join condition and into the inputs of the +/// join. Returns the pushed predicates. The pushed part will be removed from the original +/// predicate. +/// +/// `InputRef`s in the right pushed condition are indexed by the right child's output schema. + +pub fn push_down_into_join( + predicate: &mut Condition, + left_col_num: usize, + right_col_num: usize, + ty: JoinType, +) -> (Condition, Condition, Condition) { + let (left, right) = push_down_to_inputs( + predicate, + left_col_num, + right_col_num, + can_push_left_from_filter(ty), + can_push_right_from_filter(ty), + ); + + let on = if can_push_on_from_filter(ty) { + let mut conjunctions = std::mem::take(&mut predicate.conjunctions); + + // Do not push now on to the on, it will be pulled up into a filter instead. + let on = Condition { + conjunctions: conjunctions + .drain_filter(|expr| expr.count_nows() == 0) + .collect(), + }; + predicate.conjunctions = conjunctions; + on + } else { + Condition::true_cond() + }; + (left, right, on) +} + +/// Try to pushes parts of the join condition to its inputs. Returns the pushed predicates. The +/// pushed part will be removed from the original join predicate. +/// +/// `InputRef`s in the right pushed condition are indexed by the right child's output schema. + +pub fn push_down_join_condition( + on_condition: &mut Condition, + left_col_num: usize, + right_col_num: usize, + ty: JoinType, +) -> (Condition, Condition) { + push_down_to_inputs( + on_condition, + left_col_num, + right_col_num, + can_push_left_from_on(ty), + can_push_right_from_on(ty), + ) +} + +/// Try to split and pushdown `predicate` into a join's left/right child. +/// Returns the pushed predicates. The pushed part will be removed from the original predicate. +/// +/// `InputRef`s in the right `Condition` are shifted by `-left_col_num`. +fn push_down_to_inputs( + predicate: &mut Condition, + left_col_num: usize, + right_col_num: usize, + push_left: bool, + push_right: bool, +) -> (Condition, Condition) { + let conjunctions = std::mem::take(&mut predicate.conjunctions); + + let (mut left, right, mut others) = + Condition { conjunctions }.split(left_col_num, right_col_num); + + if !push_left { + others.conjunctions.extend(left); + left = Condition::true_cond(); + }; + + let right = if push_right { + let mut mapping = ColIndexMapping::with_shift_offset( + left_col_num + right_col_num, + -(left_col_num as isize), + ); + right.rewrite_expr(&mut mapping) + } else { + others.conjunctions.extend(right); + Condition::true_cond() + }; + + predicate.conjunctions = others.conjunctions; + + (left, right) +} + +pub fn can_push_left_from_filter(ty: JoinType) -> bool { + matches!( + ty, + JoinType::Inner | JoinType::LeftOuter | JoinType::LeftSemi | JoinType::LeftAnti + ) +} + +pub fn can_push_right_from_filter(ty: JoinType) -> bool { + matches!( + ty, + JoinType::Inner | JoinType::RightOuter | JoinType::RightSemi | JoinType::RightAnti + ) +} + +pub fn can_push_on_from_filter(ty: JoinType) -> bool { + matches!( + ty, + JoinType::Inner | JoinType::LeftSemi | JoinType::RightSemi + ) +} + +pub fn can_push_left_from_on(ty: JoinType) -> bool { + matches!( + ty, + JoinType::Inner | JoinType::RightOuter | JoinType::LeftSemi + ) +} + +pub fn can_push_right_from_on(ty: JoinType) -> bool { + matches!( + ty, + JoinType::Inner | JoinType::LeftOuter | JoinType::RightSemi + ) +} diff --git a/src/frontend/src/optimizer/plan_node/logical_apply.rs b/src/frontend/src/optimizer/plan_node/logical_apply.rs index ed31491f3403..7eabf47055a6 100644 --- a/src/frontend/src/optimizer/plan_node/logical_apply.rs +++ b/src/frontend/src/optimizer/plan_node/logical_apply.rs @@ -18,7 +18,7 @@ use risingwave_common::catalog::Schema; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_pb::plan_common::JoinType; -use super::generic::{self, GenericPlanNode}; +use super::generic::{self, push_down_into_join, push_down_join_condition, GenericPlanNode}; use super::{ ColPrunable, LogicalJoin, LogicalProject, PlanBase, PlanRef, PlanTreeNodeBinary, PredicatePushdown, ToBatch, ToStream, @@ -318,28 +318,12 @@ impl PredicatePushdown for LogicalApply { let right_col_num = self.right().schema().len(); let join_type = self.join_type(); - let (left_from_filter, right_from_filter, on) = LogicalJoin::push_down( - &mut predicate, - left_col_num, - right_col_num, - LogicalJoin::can_push_left_from_filter(join_type), - LogicalJoin::can_push_right_from_filter(join_type), - LogicalJoin::can_push_on_from_filter(join_type), - ); + let (left_from_filter, right_from_filter, on) = + push_down_into_join(&mut predicate, left_col_num, right_col_num, join_type); let mut new_on = self.on.clone().and(on); - let (left_from_on, right_from_on, on) = LogicalJoin::push_down( - &mut new_on, - left_col_num, - right_col_num, - LogicalJoin::can_push_left_from_on(join_type), - LogicalJoin::can_push_right_from_on(join_type), - false, - ); - assert!( - on.always_true(), - "On-clause should not be pushed to on-clause." - ); + let (left_from_on, right_from_on) = + push_down_join_condition(&mut new_on, left_col_num, right_col_num, join_type); let left_predicate = left_from_filter.and(left_from_on); let right_predicate = right_from_filter.and(right_from_on); diff --git a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs index 93888267a86d..85efe484429f 100644 --- a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs @@ -13,21 +13,19 @@ // limitations under the License. use std::fmt; -use std::num::NonZeroUsize; use fixedbitset::FixedBitSet; use itertools::Itertools; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::error::Result; use risingwave_common::types::{DataType, IntervalUnit}; -use risingwave_expr::ExprError; use super::generic::GenericPlanNode; use super::{ gen_filter_and_pushdown, generic, BatchHopWindow, ColPrunable, ExprRewritable, LogicalFilter, PlanBase, PlanRef, PlanTreeNodeUnary, PredicatePushdown, StreamHopWindow, ToBatch, ToStream, }; -use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef, Literal}; +use crate::expr::{ExprType, FunctionCall, InputRef}; use crate::optimizer::plan_node::{ ColumnPruningContext, PredicatePushdownContext, RewriteStreamContext, ToStreamContext, }; @@ -138,45 +136,28 @@ impl LogicalHopWindow { Self::new(input, time_col, window_slide, window_size, None).into() } - pub fn window_start_col_idx(&self) -> usize { - self.input().schema().len() + pub fn internal_window_start_col_idx(&self) -> usize { + self.core.internal_window_start_col_idx() } - pub fn window_end_col_idx(&self) -> usize { - self.window_start_col_idx() + 1 + pub fn internal_window_end_col_idx(&self) -> usize { + self.core.internal_window_end_col_idx() } pub fn o2i_col_mapping(&self) -> ColIndexMapping { - self.output2internal_col_mapping() - .composite(&self.internal2input_col_mapping()) + self.core.o2i_col_mapping() } pub fn i2o_col_mapping(&self) -> ColIndexMapping { - self.input2internal_col_mapping() - .composite(&self.internal2output_col_mapping()) + self.core.i2o_col_mapping() } pub fn internal_column_num(&self) -> usize { - self.window_start_col_idx() + 2 + self.core.internal_column_num() } fn output2internal_col_mapping(&self) -> ColIndexMapping { - self.internal2output_col_mapping().inverse() - } - - fn internal2output_col_mapping(&self) -> ColIndexMapping { - ColIndexMapping::with_remaining_columns( - &self.core.output_indices, - self.internal_column_num(), - ) - } - - fn input2internal_col_mapping(&self) -> ColIndexMapping { - ColIndexMapping::identity_or_none(self.window_start_col_idx(), self.internal_column_num()) - } - - fn internal2input_col_mapping(&self) -> ColIndexMapping { - ColIndexMapping::identity_or_none(self.internal_column_num(), self.window_start_col_idx()) + self.core.output2internal_col_mapping() } fn clone_with_output_indices(&self, output_indices: Vec) -> Self { @@ -207,97 +188,6 @@ impl LogicalHopWindow { pub fn output_indices(&self) -> &Vec { &self.core.output_indices } - - fn derive_window_start_and_end_exprs(&self) -> Result<(Vec, Vec)> { - let generic::HopWindow:: { - window_size, - window_slide, - time_col, - .. - } = &self.core; - let units = window_size - .exact_div(window_slide) - .and_then(|x| NonZeroUsize::new(usize::try_from(x).ok()?)) - .ok_or_else(|| ExprError::InvalidParam { - name: "window", - reason: format!( - "window_size {} cannot be divided by window_slide {}", - window_size, window_slide - ), - })? - .get(); - let window_size_expr = Literal::new(Some((*window_size).into()), DataType::Interval).into(); - let window_slide_expr: ExprImpl = - Literal::new(Some((*window_slide).into()), DataType::Interval).into(); - let window_size_sub_slide = FunctionCall::new( - ExprType::Subtract, - vec![window_size_expr, window_slide_expr.clone()], - )? - .into(); - - let time_col_shifted = FunctionCall::new( - ExprType::Subtract, - vec![ - ExprImpl::InputRef(Box::new(time_col.clone())), - window_size_sub_slide, - ], - )? - .into(); - - let hop_start: ExprImpl = FunctionCall::new( - ExprType::TumbleStart, - vec![time_col_shifted, window_slide_expr], - )? - .into(); - - let mut window_start_exprs = Vec::with_capacity(units); - let mut window_end_exprs = Vec::with_capacity(units); - for i in 0..units { - { - let window_start_offset = - window_slide - .checked_mul_int(i) - .ok_or_else(|| ExprError::InvalidParam { - name: "window", - reason: format!( - "window_slide {} cannot be multiplied by {}", - window_slide, i - ), - })?; - let window_start_offset_expr = - Literal::new(Some(window_start_offset.into()), DataType::Interval).into(); - let window_start_expr = FunctionCall::new( - ExprType::Add, - vec![hop_start.clone(), window_start_offset_expr], - )? - .into(); - window_start_exprs.push(window_start_expr); - } - { - let window_end_offset = - window_slide.checked_mul_int(i + units).ok_or_else(|| { - ExprError::InvalidParam { - name: "window", - reason: format!( - "window_slide {} cannot be multiplied by {}", - window_slide, - i + units - ), - } - })?; - let window_end_offset_expr = - Literal::new(Some(window_end_offset.into()), DataType::Interval).into(); - let window_end_expr = FunctionCall::new( - ExprType::Add, - vec![hop_start.clone(), window_end_offset_expr], - )? - .into(); - window_end_exprs.push(window_end_expr); - } - } - assert_eq!(window_start_exprs.len(), window_end_exprs.len()); - Ok((window_start_exprs, window_end_exprs)) - } } impl PlanTreeNodeUnary for LogicalHopWindow { @@ -335,10 +225,10 @@ impl PlanTreeNodeUnary for LogicalHopWindow { Some(new_idx) } None => { - if idx == self.window_start_col_idx() { + if idx == self.internal_window_start_col_idx() { columns_to_be_kept.push(i); Some(input.schema().len()) - } else if idx == self.window_end_col_idx() { + } else if idx == self.internal_window_end_col_idx() { columns_to_be_kept.push(i); Some(input.schema().len() + 1) } else { @@ -407,9 +297,9 @@ impl ColPrunable for LogicalHopWindow { if let Some(idx) = o2i.try_map(idx) { Some(IndexType::Input(idx)) } else if let Some(idx) = output2internal.try_map(idx) { - if idx == self.window_start_col_idx() { + if idx == self.internal_window_start_col_idx() { Some(IndexType::WindowStart) - } else if idx == self.window_end_col_idx() { + } else if idx == self.internal_window_end_col_idx() { Some(IndexType::WindowEnd) } else { None @@ -424,8 +314,8 @@ impl ColPrunable for LogicalHopWindow { .iter() .filter_map(|&idx| match idx { IndexType::Input(x) => input_change.try_map(x), - IndexType::WindowStart => Some(new_hop.window_start_col_idx()), - IndexType::WindowEnd => Some(new_hop.window_end_col_idx()), + IndexType::WindowStart => Some(new_hop.internal_window_start_col_idx()), + IndexType::WindowEnd => Some(new_hop.internal_window_end_col_idx()), }) .collect_vec() }; @@ -445,8 +335,8 @@ impl PredicatePushdown for LogicalHopWindow { ) -> PlanRef { let mut window_columns = FixedBitSet::with_capacity(self.schema().len()); - let window_start_idx = self.window_start_col_idx(); - let window_end_idx = self.window_end_col_idx(); + let window_start_idx = self.internal_window_start_col_idx(); + let window_end_idx = self.internal_window_end_col_idx(); for (i, v) in self.output_indices().iter().enumerate() { if *v == window_start_idx || *v == window_end_idx { window_columns.insert(i); @@ -464,7 +354,7 @@ impl ToBatch for LogicalHopWindow { let new_input = self.input().to_batch()?; let new_logical = self.clone_with_input(new_input); let (window_start_exprs, window_end_exprs) = - new_logical.derive_window_start_and_end_exprs()?; + new_logical.core.derive_window_start_and_end_exprs()?; Ok(BatchHopWindow::new(new_logical, window_start_exprs, window_end_exprs).into()) } } @@ -474,7 +364,7 @@ impl ToStream for LogicalHopWindow { let new_input = self.input().to_stream(ctx)?; let new_logical = self.clone_with_input(new_input); let (window_start_exprs, window_end_exprs) = - new_logical.derive_window_start_and_end_exprs()?; + new_logical.core.derive_window_start_and_end_exprs()?; Ok(StreamHopWindow::new(new_logical, window_start_exprs, window_end_exprs).into()) } diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index debe4a0cd7d0..6c16232f146a 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -28,7 +28,9 @@ use super::{ PlanTreeNodeBinary, PredicatePushdown, StreamHashJoin, StreamProject, ToBatch, ToStream, }; use crate::expr::{Expr, ExprImpl, ExprRewriter, ExprType, InputRef}; -use crate::optimizer::plan_node::generic::GenericPlanRef; +use crate::optimizer::plan_node::generic::{ + push_down_into_join, push_down_join_condition, GenericPlanRef, +}; use crate::optimizer::plan_node::utils::IndicesDisplay; use crate::optimizer::plan_node::{ BatchHashJoin, BatchLookupJoin, BatchNestedLoopJoin, ColumnPruningContext, EqJoinPredicate, @@ -332,41 +334,6 @@ impl LogicalJoin { (left, right, on) } - pub fn can_push_left_from_filter(ty: JoinType) -> bool { - matches!( - ty, - JoinType::Inner | JoinType::LeftOuter | JoinType::LeftSemi | JoinType::LeftAnti - ) - } - - pub fn can_push_right_from_filter(ty: JoinType) -> bool { - matches!( - ty, - JoinType::Inner | JoinType::RightOuter | JoinType::RightSemi | JoinType::RightAnti - ) - } - - pub fn can_push_on_from_filter(ty: JoinType) -> bool { - matches!( - ty, - JoinType::Inner | JoinType::LeftSemi | JoinType::RightSemi - ) - } - - pub fn can_push_left_from_on(ty: JoinType) -> bool { - matches!( - ty, - JoinType::Inner | JoinType::RightOuter | JoinType::LeftSemi - ) - } - - pub fn can_push_right_from_on(ty: JoinType) -> bool { - matches!( - ty, - JoinType::Inner | JoinType::LeftOuter | JoinType::RightSemi - ) - } - /// Try to simplify the outer join with the predicate on the top of the join /// /// now it is just a naive implementation for comparison expression, we can give a more general @@ -921,28 +888,12 @@ impl PredicatePushdown for LogicalJoin { predicate = predicate.rewrite_expr(&mut mapping); - let (left_from_filter, right_from_filter, on) = LogicalJoin::push_down( - &mut predicate, - left_col_num, - right_col_num, - LogicalJoin::can_push_left_from_filter(join_type), - LogicalJoin::can_push_right_from_filter(join_type), - LogicalJoin::can_push_on_from_filter(join_type), - ); + let (left_from_filter, right_from_filter, on) = + push_down_into_join(&mut predicate, left_col_num, right_col_num, join_type); let mut new_on = self.on().clone().and(on); - let (left_from_on, right_from_on, on) = LogicalJoin::push_down( - &mut new_on, - left_col_num, - right_col_num, - LogicalJoin::can_push_left_from_on(join_type), - LogicalJoin::can_push_right_from_on(join_type), - false, - ); - assert!( - on.always_true(), - "On-clause should not be pushed to on-clause." - ); + let (left_from_on, right_from_on) = + push_down_join_condition(&mut new_on, left_col_num, right_col_num, join_type); let left_predicate = left_from_filter.and(left_from_on); let right_predicate = right_from_filter.and(right_from_on); diff --git a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs index 133732ff2b7e..73b455a54b75 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs @@ -53,8 +53,8 @@ impl StreamHopWindow { if watermark_columns.contains(logical.core.time_col.index) { // Watermark on `time_col` indicates watermark on both `window_start` and `window_end`. - watermark_columns.insert(logical.window_start_col_idx()); - watermark_columns.insert(logical.window_end_col_idx()); + watermark_columns.insert(logical.internal_window_start_col_idx()); + watermark_columns.insert(logical.internal_window_end_col_idx()); } let watermark_columns = ColIndexMapping::with_remaining_columns( logical.output_indices(),