From 02e38a24e3c25975c08c42b1b61c06f73b04affc Mon Sep 17 00:00:00 2001 From: Mryange <2319153948@qq.com> Date: Mon, 8 Jul 2024 15:32:20 +0800 Subject: [PATCH] upd --- .../exec/multi_cast_data_streamer.cpp | 113 +++++++++++------- .../pipeline/exec/multi_cast_data_streamer.h | 18 +-- 2 files changed, 78 insertions(+), 53 deletions(-) diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp b/be/src/pipeline/exec/multi_cast_data_streamer.cpp index deebf7d11bb2e2b..e7d503805fefd69 100644 --- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp +++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp @@ -23,63 +23,95 @@ namespace doris::pipeline { -MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, size_t mem_size) - : _used_count(used_count), _mem_size(mem_size) { +MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, int un_finish_copy, + size_t mem_size) + : _used_count(used_count), _un_finish_copy(un_finish_copy), _mem_size(mem_size) { _block = vectorized::Block::create_unique(block->get_columns_with_type_and_name()); block->clear(); } Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block, bool* eos) { - std::lock_guard l(_mutex); - auto& pos_to_pull = _sender_pos_to_read[sender_idx]; - if (pos_to_pull != _multi_cast_blocks.end()) { - if (pos_to_pull->_used_count == 1) { - DCHECK(pos_to_pull == _multi_cast_blocks.begin()); - pos_to_pull->_block->swap(*block); - - _cumulative_mem_size -= pos_to_pull->_mem_size; - pos_to_pull++; - _multi_cast_blocks.pop_front(); - } else { - pos_to_pull->_block->create_same_struct_block(0)->swap(*block); - RETURN_IF_ERROR(vectorized::MutableBlock(block).merge(*pos_to_pull->_block)); - pos_to_pull->_used_count--; - pos_to_pull++; + std::atomic_int* un_finish_copy = nullptr; + int use_count = 0; + { + std::lock_guard l(_mutex); + auto& pos_to_pull = _sender_pos_to_read[sender_idx]; + const auto end = _multi_cast_blocks.end(); + DCHECK(pos_to_pull != end); + + *block = *pos_to_pull->_block; + + _cumulative_mem_size -= pos_to_pull->_mem_size; + + pos_to_pull->_used_count--; + use_count = pos_to_pull->_used_count; + un_finish_copy = &pos_to_pull->_un_finish_copy; + + pos_to_pull++; + + if (pos_to_pull == end) { + _block_reading(sender_idx); } + + *eos = _eos and pos_to_pull == end; } - *eos = _eos and pos_to_pull == _multi_cast_blocks.end(); - if (pos_to_pull == _multi_cast_blocks.end()) { - _block_reading(sender_idx); + + if (use_count == 0) { + // will clear _multi_cast_blocks + _wait_copy_block(block, *un_finish_copy); + } else { + _copy_block(block, *un_finish_copy); } + return Status::OK(); } +void MultiCastDataStreamer::_copy_block(vectorized::Block* block, std::atomic_int& un_finish_copy) { + const auto rows = block->rows(); + for (int i = 0; i < block->columns(); ++i) { + block->get_by_position(i).column = block->get_by_position(i).column->clone_resized(rows); + } + un_finish_copy--; + if (un_finish_copy == 0) { + _cv.notify_one(); + } +} + +void MultiCastDataStreamer::_wait_copy_block(vectorized::Block* block, + std::atomic_int& un_finish_copy) { + std::unique_lock l(_mutex); + _cv.wait(l, [&]() { return un_finish_copy == 0; }); + _multi_cast_blocks.pop_front(); +} + Status MultiCastDataStreamer::push(RuntimeState* state, doris::vectorized::Block* block, bool eos) { auto rows = block->rows(); COUNTER_UPDATE(_process_rows, rows); - auto block_mem_size = block->allocated_bytes(); - std::lock_guard l(_mutex); - int need_process_count = _cast_sender_count - _closed_sender_count; - if (need_process_count == 0) { - return Status::EndOfFile("All data streamer is EOF"); - } - // TODO: if the [queue back block rows + block->rows()] < batch_size, better - // do merge block. but need check the need_process_count and used_count whether - // equal - _multi_cast_blocks.emplace_back(block, need_process_count, block_mem_size); + const auto block_mem_size = block->allocated_bytes(); _cumulative_mem_size += block_mem_size; COUNTER_SET(_peak_mem_usage, std::max(_cumulative_mem_size, _peak_mem_usage->value())); - auto end = _multi_cast_blocks.end(); - end--; - for (int i = 0; i < _sender_pos_to_read.size(); ++i) { - if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) { - _sender_pos_to_read[i] = end; - _set_ready_for_read(i); + { + std::lock_guard l(_mutex); + _multi_cast_blocks.emplace_back(block, _cast_sender_count, _cast_sender_count - 1, + block_mem_size); + // last elem + auto end = std::prev(_multi_cast_blocks.end()); + for (int i = 0; i < _sender_pos_to_read.size(); ++i) { + if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) { + _sender_pos_to_read[i] = end; + _set_ready_for_read(i); + } + } + _eos = eos; + } + + if (_eos) { + for (auto* read_dep : _dependencies) { + read_dep->set_always_ready(); } } - _eos = eos; return Status::OK(); } @@ -92,13 +124,6 @@ void MultiCastDataStreamer::_set_ready_for_read(int sender_idx) { dep->set_ready(); } -void MultiCastDataStreamer::_set_ready_for_read() { - for (auto* dep : _dependencies) { - DCHECK(dep); - dep->set_ready(); - } -} - void MultiCastDataStreamer::_block_reading(int sender_idx) { if (_dependencies.empty()) { return; diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h b/be/src/pipeline/exec/multi_cast_data_streamer.h index 2112ebaaf205b12..1adb43d3fbc8935 100644 --- a/be/src/pipeline/exec/multi_cast_data_streamer.h +++ b/be/src/pipeline/exec/multi_cast_data_streamer.h @@ -17,16 +17,19 @@ #pragma once +#include + #include "vec/sink/vdata_stream_sender.h" namespace doris::pipeline { class Dependency; struct MultiCastBlock { - MultiCastBlock(vectorized::Block* block, int used_count, size_t mem_size); + MultiCastBlock(vectorized::Block* block, int used_count, int need_copy, size_t mem_size); std::unique_ptr _block; int _used_count; + std::atomic_int _un_finish_copy; size_t _mem_size; }; @@ -58,12 +61,6 @@ class MultiCastDataStreamer { RuntimeProfile* profile() { return _profile; } - void set_eos() { - std::lock_guard l(_mutex); - _eos = true; - _set_ready_for_read(); - } - void set_dep_by_sender_idx(int sender_idx, Dependency* dep) { _dependencies[sender_idx] = dep; _block_reading(sender_idx); @@ -71,17 +68,20 @@ class MultiCastDataStreamer { private: void _set_ready_for_read(int sender_idx); - void _set_ready_for_read(); void _block_reading(int sender_idx); + void _copy_block(vectorized::Block* block, std::atomic_int& un_finish_copy); + + void _wait_copy_block(vectorized::Block* block, std::atomic_int& un_finish_copy); + const RowDescriptor& _row_desc; RuntimeProfile* _profile = nullptr; std::list _multi_cast_blocks; std::vector::iterator> _sender_pos_to_read; + std::condition_variable _cv; std::mutex _mutex; bool _eos = false; int _cast_sender_count = 0; - int _closed_sender_count = 0; int64_t _cumulative_mem_size = 0; RuntimeProfile::Counter* _process_rows = nullptr;