Skip to content

Commit

Permalink
[refine](pipeline) refine some some operator close function (apache#3…
Browse files Browse the repository at this point in the history
…9397)

## Proposed changes

Issue Number: close #xxx

<!--Describe your changes.-->
  • Loading branch information
Mryange authored Aug 22, 2024
1 parent e1a27f2 commit 478727a
Show file tree
Hide file tree
Showing 22 changed files with 24 additions and 139 deletions.
1 change: 0 additions & 1 deletion be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,6 @@ struct AggSharedState : public BasicSharedState {
vectorized::VExprContextSPtrs probe_expr_ctxs;
size_t input_num_rows = 0;
std::vector<vectorized::AggregateDataPtr> values;
std::unique_ptr<vectorized::Arena> agg_profile_arena;
/// The total size of the row from the aggregate functions.
size_t total_size_of_aggregate_states = 0;
size_t align_aggregate_states = 1;
Expand Down
5 changes: 2 additions & 3 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace doris::pipeline {
/// is in a random order. This means that we assume that the reduction factor will
/// increase over time.
AggSinkLocalState::AggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state) {}
: Base(parent, state), _agg_profile_arena(std::make_unique<vectorized::Arena>()) {}

Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
Expand Down Expand Up @@ -97,11 +97,10 @@ Status AggSinkLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(
p._probe_expr_ctxs[i]->clone(state, Base::_shared_state->probe_expr_ctxs[i]));
}
Base::_shared_state->agg_profile_arena = std::make_unique<vectorized::Arena>();

if (Base::_shared_state->probe_expr_ctxs.empty()) {
_agg_data->without_key = reinterpret_cast<vectorized::AggregateDataPtr>(
Base::_shared_state->agg_profile_arena->alloc(p._total_size_of_aggregate_states));
_agg_profile_arena->alloc(p._total_size_of_aggregate_states));

if (p._is_merge) {
_executor = std::make_unique<Executor<true, true>>();
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ class AggSinkLocalState : public PipelineXSinkLocalState<AggSharedState> {

AggregatedDataVariants* _agg_data = nullptr;
vectorized::Arena* _agg_arena_pool = nullptr;
std::unique_ptr<vectorized::Arena> _agg_profile_arena;

std::unique_ptr<ExecutorBase> _executor = nullptr;
};
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ AggLocalState::AggLocalState(RuntimeState* state, OperatorXBase* parent)
Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
SCOPED_TIMER(_init_timer);
_get_results_timer = ADD_TIMER(profile(), "GetResultsTime");
_serialize_result_timer = ADD_TIMER(profile(), "SerializeResultTime");
_hash_table_iterate_timer = ADD_TIMER(profile(), "HashTableIterateTime");
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ AnalyticLocalState::AnalyticLocalState(RuntimeState* state, OperatorXBase* paren
_rows_end_offset(0),
_fn_place_ptr(nullptr),
_agg_functions_size(0),
_agg_functions_created(false) {}
_agg_functions_created(false),
_agg_arena_pool(std::make_unique<vectorized::Arena>()) {}

//_partition_by_columns,_order_by_columns save in blocks, so if need to calculate the boundary, may find in which blocks firstly
BlockRowPos AnalyticLocalState::_compare_row_to_find_end(int idx, BlockRowPos start,
Expand Down Expand Up @@ -168,7 +169,6 @@ Status AnalyticLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(PipelineXLocalState<AnalyticSharedState>::open(state));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
_agg_arena_pool = std::make_unique<vectorized::Arena>();

auto& p = _parent->cast<AnalyticSourceOperatorX>();
_agg_functions_size = p._agg_functions.size();
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/file_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state,

Status FileScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(ScanLocalState<FileScanLocalState>::init(state, info));
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<FileScanOperatorX>();
_output_tuple_id = p._output_tuple_id;
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ HashJoinBuildSinkLocalState::HashJoinBuildSinkLocalState(DataSinkOperatorXBase*
Status HashJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
_shared_state->join_op_variants = p._join_op_variants;

Expand Down
6 changes: 3 additions & 3 deletions be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@
namespace doris::pipeline {

HashJoinProbeLocalState::HashJoinProbeLocalState(RuntimeState* state, OperatorXBase* parent)
: JoinProbeLocalState<HashJoinSharedState, HashJoinProbeLocalState>(state, parent) {}
: JoinProbeLocalState<HashJoinSharedState, HashJoinProbeLocalState>(state, parent),
_process_hashtable_ctx_variants(std::make_unique<HashTableCtxVariants>()) {}

Status HashJoinProbeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(JoinProbeLocalState::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<HashJoinProbeOperatorX>();
_shared_state->probe_ignore_null = p._probe_ignore_null;
_probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
Expand Down Expand Up @@ -71,7 +72,6 @@ Status HashJoinProbeLocalState::open(RuntimeState* state) {
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(JoinProbeLocalState::open(state));

_process_hashtable_ctx_variants = std::make_unique<HashTableCtxVariants>();
auto& p = _parent->cast<HashJoinProbeOperatorX>();
std::visit(
[&](auto&& join_op_variants, auto have_other_join_conjunct) {
Expand Down
15 changes: 1 addition & 14 deletions be/src/pipeline/exec/hive_table_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,10 @@ namespace doris::pipeline {
Status HiveTableSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<Parent>();
RETURN_IF_ERROR(_writer->init_properties(p._pool));
return Status::OK();
}

Status HiveTableSinkLocalState::close(RuntimeState* state, Status exec_status) {
if (Base::_closed) {
return Status::OK();
}
SCOPED_TIMER(_close_timer);
SCOPED_TIMER(exec_time_counter());
if (_closed) {
return _close_status;
}
_close_status = Base::close(state, exec_status);
return _close_status;
}

} // namespace doris::pipeline
4 changes: 0 additions & 4 deletions be/src/pipeline/exec/hive_table_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,7 @@ class HiveTableSinkLocalState final
return Base::open(state);
}

Status close(RuntimeState* state, Status exec_status) override;
friend class HiveTableSinkOperatorX;

private:
Status _close_status = Status::OK();
};

class HiveTableSinkOperatorX final : public DataSinkOperatorX<HiveTableSinkLocalState> {
Expand Down
15 changes: 1 addition & 14 deletions be/src/pipeline/exec/iceberg_table_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,10 @@ namespace doris::pipeline {
Status IcebergTableSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<Parent>();
RETURN_IF_ERROR(_writer->init_properties(p._pool));
return Status::OK();
}

Status IcebergTableSinkLocalState::close(RuntimeState* state, Status exec_status) {
if (Base::_closed) {
return Status::OK();
}
SCOPED_TIMER(_close_timer);
SCOPED_TIMER(exec_time_counter());
if (_closed) {
return _close_status;
}
_close_status = Base::close(state, exec_status);
return _close_status;
}

} // namespace doris::pipeline
5 changes: 0 additions & 5 deletions be/src/pipeline/exec/iceberg_table_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,7 @@ class IcebergTableSinkLocalState final
SCOPED_TIMER(_open_timer);
return Base::open(state);
}

Status close(RuntimeState* state, Status exec_status) override;
friend class IcebergTableSinkOperatorX;

private:
Status _close_status = Status::OK();
};

class IcebergTableSinkOperatorX final : public DataSinkOperatorX<IcebergTableSinkLocalState> {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/nested_loop_join_build_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ NestedLoopJoinBuildSinkLocalState::NestedLoopJoinBuildSinkLocalState(DataSinkOpe
Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<NestedLoopJoinBuildSinkOperatorX>();
_shared_state->join_op_variants = p._join_op_variants;
_runtime_filters.resize(p._runtime_filter_descs.size());
Expand Down
37 changes: 0 additions & 37 deletions be/src/pipeline/exec/olap_table_sink_operator.cpp

This file was deleted.

4 changes: 0 additions & 4 deletions be/src/pipeline/exec/olap_table_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@ class OlapTableSinkLocalState final
ENABLE_FACTORY_CREATOR(OlapTableSinkLocalState);
OlapTableSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state) {};
Status close(RuntimeState* state, Status exec_status) override;
friend class OlapTableSinkOperatorX;

private:
Status _close_status = Status::OK();
};
class OlapTableSinkOperatorX final : public DataSinkOperatorX<OlapTableSinkLocalState> {
public:
Expand Down
37 changes: 0 additions & 37 deletions be/src/pipeline/exec/olap_table_sink_v2_operator.cpp

This file was deleted.

4 changes: 0 additions & 4 deletions be/src/pipeline/exec/olap_table_sink_v2_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,7 @@ class OlapTableSinkV2LocalState final
ENABLE_FACTORY_CREATOR(OlapTableSinkV2LocalState);
OlapTableSinkV2LocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state) {};
Status close(RuntimeState* state, Status exec_status) override;
friend class OlapTableSinkV2OperatorX;

private:
Status _close_status = Status::OK();
};

class OlapTableSinkV2OperatorX final : public DataSinkOperatorX<OlapTableSinkV2LocalState> {
Expand Down
4 changes: 1 addition & 3 deletions be/src/pipeline/exec/partition_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ Status PartitionBlocks::do_partition_topn_sort() {
Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(PipelineXSinkLocalState<PartitionSortNodeSharedState>::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<PartitionSortSinkOperatorX>();
RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, _vsort_exec_exprs));
_partition_expr_ctxs.resize(p._partition_expr_ctxs.size());
Expand All @@ -108,8 +108,6 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
RETURN_IF_ERROR(p._partition_expr_ctxs[i]->clone(state, _partition_expr_ctxs[i]));
}
_partition_exprs_num = p._partition_exprs_num;
_partitioned_data = std::make_unique<PartitionedHashMapVariants>();
_agg_arena_pool = std::make_unique<vectorized::Arena>();
_hash_table_size_counter = ADD_COUNTER(_profile, "HashTableSize", TUnit::UNIT);
_build_timer = ADD_TIMER(_profile, "HashTableBuildTime");
_selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime");
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/partition_sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,9 @@ class PartitionSortSinkLocalState : public PipelineXSinkLocalState<PartitionSort

public:
PartitionSortSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: PipelineXSinkLocalState<PartitionSortNodeSharedState>(parent, state) {}
: PipelineXSinkLocalState<PartitionSortNodeSharedState>(parent, state),
_partitioned_data(std::make_unique<PartitionedHashMapVariants>()),
_agg_arena_pool(std::make_unique<vectorized::Arena>()) {}

Status init(RuntimeState* state, LocalSinkStateInfo& info) override;

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/partition_sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ namespace pipeline {
Status PartitionSortSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(PipelineXLocalState<PartitionSortNodeSharedState>::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
SCOPED_TIMER(_init_timer);
_get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime");
_sorted_partition_output_rows_counter =
ADD_COUNTER(profile(), "SortedPartitionOutputRows", TUnit::UNIT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,20 @@ PartitionedAggLocalState::PartitionedAggLocalState(RuntimeState* state, Operator
Status PartitionedAggLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
SCOPED_TIMER(_init_timer);
_init_counters();
return Status::OK();
}

Status PartitionedAggLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(Base::open(state));
SCOPED_TIMER(_open_timer);
if (_opened) {
return Status::OK();
}
_opened = true;
RETURN_IF_ERROR(setup_in_memory_agg_op(state));
return Base::open(state);
return Status::OK();
}

void PartitionedAggLocalState::_init_counters() {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/result_file_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ Status ResultFileSinkOperatorX::open(RuntimeState* state) {
Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
SCOPED_TIMER(_init_timer);
_sender_id = info.sender_id;

_brpc_wait_timer = ADD_TIMER(_profile, "BrpcSendTime.Wait");
Expand Down

0 comments on commit 478727a

Please sign in to comment.