Skip to content

Commit

Permalink
chore: replace all ProstXxx with PbXxx (risingwavelabs#8621)
Browse files Browse the repository at this point in the history
Signed-off-by: Runji Wang <wangrunji0408@163.com>
  • Loading branch information
wangrunji0408 committed Mar 17, 2023
1 parent d90165a commit 6fd8821
Show file tree
Hide file tree
Showing 175 changed files with 994 additions and 1,065 deletions.
6 changes: 3 additions & 3 deletions src/batch/benches/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_common::util::value_encoding::serialize_datum;
use risingwave_expr::expr::build_from_prost;
use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::Datum as ProstDatum;
use risingwave_pb::data::PbDatum;
use risingwave_pb::expr::expr_node::RexNode;
use risingwave_pb::expr::expr_node::Type::{
ConstantValue as TConstValue, Equal, InputRef, Modulus,
Expand Down Expand Up @@ -53,7 +53,7 @@ fn create_filter_executor(chunk_size: usize, chunk_num: usize) -> BoxedExecutor
type_name: TypeName::Int64 as i32,
..Default::default()
}),
rex_node: Some(RexNode::Constant(ProstDatum {
rex_node: Some(RexNode::Constant(PbDatum {
body: serialize_datum(Some(ScalarImpl::Int64(2)).as_ref()),
})),
};
Expand All @@ -76,7 +76,7 @@ fn create_filter_executor(chunk_size: usize, chunk_num: usize) -> BoxedExecutor
type_name: TypeName::Int64 as i32,
..Default::default()
}),
rex_node: Some(RexNode::Constant(ProstDatum {
rex_node: Some(RexNode::Constant(PbDatum {
body: serialize_datum(Some(ScalarImpl::Int64(0)).as_ref()),
})),
};
Expand Down
8 changes: 4 additions & 4 deletions src/batch/benches/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use risingwave_common::util::value_encoding::serialize_datum;
use risingwave_common::{enable_jemalloc_on_linux, hash};
use risingwave_expr::expr::build_from_prost;
use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::Datum as ProstDatum;
use risingwave_pb::data::PbDatum;
use risingwave_pb::expr::expr_node::RexNode;
use risingwave_pb::expr::expr_node::Type::{
ConstantValue as TConstValue, GreaterThan, InputRef, Modulus,
Expand Down Expand Up @@ -58,7 +58,7 @@ fn create_hash_join_executor(
type_name: TypeName::Int64 as i32,
..Default::default()
}),
rex_node: Some(RexNode::Constant(ProstDatum {
rex_node: Some(RexNode::Constant(PbDatum {
body: serialize_datum(Some(ScalarImpl::Int64(123)).as_ref()),
})),
};
Expand Down Expand Up @@ -88,7 +88,7 @@ fn create_hash_join_executor(
type_name: TypeName::Int64 as i32,
..Default::default()
}),
rex_node: Some(RexNode::Constant(ProstDatum {
rex_node: Some(RexNode::Constant(PbDatum {
body: serialize_datum(Some(ScalarImpl::Int64(456)).as_ref()),
})),
};
Expand Down Expand Up @@ -141,7 +141,7 @@ fn create_hash_join_executor(
type_name: TypeName::Int64 as i32,
..Default::default()
}),
rex_node: Some(RexNode::Constant(ProstDatum {
rex_node: Some(RexNode::Constant(PbDatum {
body: serialize_datum(Some(ScalarImpl::Int64(100)).as_ref()),
})),
};
Expand Down
6 changes: 3 additions & 3 deletions src/batch/benches/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_common::util::value_encoding::serialize_datum;
use risingwave_expr::expr::build_from_prost;
use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::Datum as ProstDatum;
use risingwave_pb::data::PbDatum;
use risingwave_pb::expr::expr_node::RexNode;
use risingwave_pb::expr::expr_node::Type::{
ConstantValue as TConstValue, Equal, InputRef, Modulus,
Expand Down Expand Up @@ -68,7 +68,7 @@ fn create_nested_loop_join_executor(
type_name: TypeName::Int64 as i32,
..Default::default()
}),
rex_node: Some(RexNode::Constant(ProstDatum {
rex_node: Some(RexNode::Constant(PbDatum {
body: serialize_datum(Some(ScalarImpl::Int64(2)).as_ref()),
})),
};
Expand All @@ -79,7 +79,7 @@ fn create_nested_loop_join_executor(
type_name: TypeName::Int64 as i32,
..Default::default()
}),
rex_node: Some(RexNode::Constant(ProstDatum {
rex_node: Some(RexNode::Constant(PbDatum {
body: serialize_datum(Some(ScalarImpl::Int64(3)).as_ref()),
})),
};
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ mod tests {
use risingwave_common::util::value_encoding::serialize_datum;
use risingwave_expr::expr::build_from_prost;
use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::Datum as ProstDatum;
use risingwave_pb::data::PbDatum;
use risingwave_pb::expr::expr_node::Type::InputRef;
use risingwave_pb::expr::expr_node::{RexNode, Type};
use risingwave_pb::expr::{ExprNode, FunctionCall};
Expand Down Expand Up @@ -242,7 +242,7 @@ mod tests {
}],
..Default::default()
}),
rex_node: Some(RexNode::Constant(ProstDatum {
rex_node: Some(RexNode::Constant(PbDatum {
body: serialize_datum(
Some(ScalarImpl::List(ListValue::new(vec![Some(
2.to_scalar_value(),
Expand Down
14 changes: 7 additions & 7 deletions src/batch/src/executor/generic_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use risingwave_common::error::{Result, RwError};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_common::util::select_all;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::ExchangeSource as ProstExchangeSource;
use risingwave_pb::batch_plan::PbExchangeSource;
use risingwave_pb::plan_common::Field as NodeField;
use risingwave_rpc_client::ComputeClientPoolRef;

Expand All @@ -36,7 +36,7 @@ use super::BatchTaskMetricsWithTaskLabels;
use crate::executor::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor};

pub struct GenericExchangeExecutor<CS, C> {
proto_sources: Vec<ProstExchangeSource>,
proto_sources: Vec<PbExchangeSource>,
/// Mock-able CreateSource.
source_creators: Vec<CS>,
context: C,
Expand All @@ -56,7 +56,7 @@ pub trait CreateSource: Send {
async fn create_source(
&self,
context: impl BatchTaskContext,
prost_source: &ProstExchangeSource,
prost_source: &PbExchangeSource,
) -> Result<ExchangeSourceImpl>;
}

Expand All @@ -76,7 +76,7 @@ impl CreateSource for DefaultCreateSource {
async fn create_source(
&self,
context: impl BatchTaskContext,
prost_source: &ProstExchangeSource,
prost_source: &PbExchangeSource,
) -> Result<ExchangeSourceImpl> {
let peer_addr = prost_source.get_host()?.into();
let task_output_id = prost_source.get_task_output_id()?;
Expand Down Expand Up @@ -127,7 +127,7 @@ impl BoxedExecutorBuilder for GenericExchangeExecutorBuilder {
)?;

ensure!(!node.get_sources().is_empty());
let proto_sources: Vec<ProstExchangeSource> = node.get_sources().to_vec();
let proto_sources: Vec<PbExchangeSource> = node.get_sources().to_vec();
let source_creators =
vec![DefaultCreateSource::new(source.context().client_pool()); proto_sources.len()];

Expand Down Expand Up @@ -189,7 +189,7 @@ impl<CS: 'static + Send + CreateSource, C: BatchTaskContext> GenericExchangeExec

#[try_stream(boxed, ok = DataChunk, error = RwError)]
async fn data_chunk_stream(
prost_source: ProstExchangeSource,
prost_source: PbExchangeSource,
source_creator: CS,
context: C,
metrics: Option<BatchTaskMetricsWithTaskLabels>,
Expand Down Expand Up @@ -264,7 +264,7 @@ mod tests {
let chunks = vec![Some(chunk); 100];
let fake_exchange_source = FakeExchangeSource::new(chunks);
let fake_create_source = FakeCreateSource::new(fake_exchange_source);
proto_sources.push(ProstExchangeSource::default());
proto_sources.push(PbExchangeSource::default());
source_creators.push(fake_create_source);
}

Expand Down
8 changes: 4 additions & 4 deletions src/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ mod tests {
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::test_prelude::DataChunkTestExt;
use risingwave_pb::data::data_type::TypeName;
use risingwave_pb::data::DataType as ProstDataType;
use risingwave_pb::data::PbDataType;
use risingwave_pb::expr::agg_call::Type;
use risingwave_pb::expr::{AggCall, InputRef};

Expand Down Expand Up @@ -309,12 +309,12 @@ mod tests {
r#type: Type::Sum as i32,
args: vec![InputRef {
index: 2,
r#type: Some(ProstDataType {
r#type: Some(PbDataType {
type_name: TypeName::Int32 as i32,
..Default::default()
}),
}],
return_type: Some(ProstDataType {
return_type: Some(PbDataType {
type_name: TypeName::Int64 as i32,
..Default::default()
}),
Expand Down Expand Up @@ -382,7 +382,7 @@ mod tests {
let agg_call = AggCall {
r#type: Type::Count as i32,
args: vec![],
return_type: Some(ProstDataType {
return_type: Some(PbDataType {
type_name: TypeName::Int64 as i32,
..Default::default()
}),
Expand Down
12 changes: 6 additions & 6 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use risingwave_pb::batch_plan::exchange_info::DistributionMode;
use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::Plan;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::{
ExchangeInfo, ExchangeNode, ExchangeSource as ProstExchangeSource, LocalExecutePlan,
PlanFragment, PlanNode, RowSeqScanNode, TaskId as ProstTaskId, TaskOutputId,
ExchangeInfo, ExchangeNode, LocalExecutePlan, PbExchangeSource, PbTaskId, PlanFragment,
PlanNode, RowSeqScanNode, TaskOutputId,
};
use risingwave_pb::common::{BatchQueryEpoch, WorkerNode};
use risingwave_pb::plan_common::StorageTableDesc;
Expand Down Expand Up @@ -113,8 +113,8 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
Ok(row_seq_scan_node)
}

/// Creates the `ProstExchangeSource` using the given `id`.
fn build_prost_exchange_source(&self, id: &ParallelUnitId) -> Result<ProstExchangeSource> {
/// Creates the `PbExchangeSource` using the given `id`.
fn build_prost_exchange_source(&self, id: &ParallelUnitId) -> Result<PbExchangeSource> {
let worker = self.pu_to_worker_mapping.get(id).ok_or_else(|| {
internal_error("No worker node found for the given parallel unit id.")
})?;
Expand All @@ -134,9 +134,9 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
epoch: Some(self.epoch.clone()),
};

let prost_exchange_source = ProstExchangeSource {
let prost_exchange_source = PbExchangeSource {
task_output_id: Some(TaskOutputId {
task_id: Some(ProstTaskId {
task_id: Some(PbTaskId {
// FIXME: We should replace this random generated uuid to current query_id for
// better dashboard. However, due to the lack of info of
// stage_id and task_id, we can not do it now. Now just make sure it will not
Expand Down
22 changes: 11 additions & 11 deletions src/batch/src/executor/join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use risingwave_common::error::Result;
use risingwave_common::row::Row;
use risingwave_common::types::{DataType, DatumRef};
use risingwave_common::util::iter_util::ZipEqFast;
use risingwave_pb::plan_common::JoinType as JoinTypeProst;
use risingwave_pb::plan_common::JoinType as JoinTypePb;

use crate::error::BatchError;

Expand Down Expand Up @@ -61,17 +61,17 @@ impl JoinType {
)
}

pub fn from_prost(prost: JoinTypeProst) -> Self {
pub fn from_prost(prost: JoinTypePb) -> Self {
match prost {
JoinTypeProst::Inner => JoinType::Inner,
JoinTypeProst::LeftOuter => JoinType::LeftOuter,
JoinTypeProst::LeftSemi => JoinType::LeftSemi,
JoinTypeProst::LeftAnti => JoinType::LeftAnti,
JoinTypeProst::RightOuter => JoinType::RightOuter,
JoinTypeProst::RightSemi => JoinType::RightSemi,
JoinTypeProst::RightAnti => JoinType::RightAnti,
JoinTypeProst::FullOuter => JoinType::FullOuter,
JoinTypeProst::Unspecified => unreachable!(),
JoinTypePb::Inner => JoinType::Inner,
JoinTypePb::LeftOuter => JoinType::LeftOuter,
JoinTypePb::LeftSemi => JoinType::LeftSemi,
JoinTypePb::LeftAnti => JoinType::LeftAnti,
JoinTypePb::RightOuter => JoinType::RightOuter,
JoinTypePb::RightSemi => JoinType::RightSemi,
JoinTypePb::RightAnti => JoinType::RightAnti,
JoinTypePb::FullOuter => JoinType::FullOuter,
JoinTypePb::Unspecified => unreachable!(),
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/batch/src/executor/merge_sort_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use risingwave_common::error::{Result, RwError};
use risingwave_common::types::ToOwnedDatum;
use risingwave_common::util::sort_util::{ColumnOrder, HeapElem};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::ExchangeSource as ProstExchangeSource;
use risingwave_pb::batch_plan::PbExchangeSource;

use crate::exchange_source::ExchangeSourceImpl;
use crate::executor::{
Expand All @@ -41,7 +41,7 @@ pub struct MergeSortExchangeExecutorImpl<CS, C> {
source_inputs: Vec<Option<DataChunk>>,
column_orders: Arc<Vec<ColumnOrder>>,
min_heap: BinaryHeap<HeapElem>,
proto_sources: Vec<ProstExchangeSource>,
proto_sources: Vec<PbExchangeSource>,
sources: Vec<ExchangeSourceImpl>, // impl
/// Mock-able CreateSource.
source_creators: Vec<CS>,
Expand Down Expand Up @@ -199,7 +199,7 @@ impl BoxedExecutorBuilder for MergeSortExchangeExecutorBuilder {
let column_orders = Arc::new(column_orders);

let exchange_node = sort_merge_node.get_exchange()?;
let proto_sources: Vec<ProstExchangeSource> = exchange_node.get_sources().to_vec();
let proto_sources: Vec<PbExchangeSource> = exchange_node.get_sources().to_vec();
let source_creators =
vec![DefaultCreateSource::new(source.context().client_pool()); proto_sources.len()];
ensure!(!exchange_node.get_sources().is_empty());
Expand Down Expand Up @@ -253,11 +253,11 @@ mod tests {
let fake_exchange_source = FakeExchangeSource::new(vec![Some(chunk)]);
let fake_create_source = FakeCreateSource::new(fake_exchange_source);

let mut proto_sources: Vec<ProstExchangeSource> = vec![];
let mut proto_sources: Vec<PbExchangeSource> = vec![];
let mut source_creators = vec![];
let num_sources = 2;
for _ in 0..num_sources {
proto_sources.push(ProstExchangeSource::default());
proto_sources.push(PbExchangeSource::default());
source_creators.push(fake_create_source.clone());
}
let column_orders = Arc::new(vec![ColumnOrder {
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use risingwave_common::util::select_all;
use risingwave_common::util::sort_util::{Direction, OrderType};
use risingwave_common::util::value_encoding::deserialize_datum;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::{scan_range, ScanRange as ProstScanRange};
use risingwave_pb::batch_plan::{scan_range, PbScanRange};
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::plan_common::StorageTableDesc;
use risingwave_storage::store::PrefetchOptions;
Expand Down Expand Up @@ -75,7 +75,7 @@ impl ScanRange {

/// Create a scan range from the prost representation.
pub fn new(
scan_range: ProstScanRange,
scan_range: PbScanRange,
mut pk_types: impl Iterator<Item = DataType>,
) -> Result<Self> {
let pk_prefix = OwnedRow::new(
Expand Down
Loading

0 comments on commit 6fd8821

Please sign in to comment.