diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp b/be/src/pipeline/exec/multi_cast_data_streamer.cpp index 373a896852eaaf..75dfe2935009e8 100644 --- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp +++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp @@ -21,31 +21,41 @@ namespace doris::pipeline { -MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, size_t mem_size) - : _used_count(used_count), _mem_size(mem_size) { +MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, int un_finish_copy, + size_t mem_size) + : _used_count(used_count), _un_finish_copy(un_finish_copy), _mem_size(mem_size) { _block = vectorized::Block::create_unique(block->get_columns_with_type_and_name()); block->clear(); } Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block, bool* eos) { - std::lock_guard l(_mutex); - auto& pos_to_pull = _sender_pos_to_read[sender_idx]; - if (pos_to_pull != _multi_cast_blocks.end()) { - if (pos_to_pull->_used_count == 1) { - DCHECK(pos_to_pull == _multi_cast_blocks.begin()); - pos_to_pull->_block->swap(*block); + int* un_finish_copy = nullptr; + int use_count = 0; + { + std::lock_guard l(_mutex); + auto& pos_to_pull = _sender_pos_to_read[sender_idx]; + const auto end = _multi_cast_blocks.end(); + if (pos_to_pull != end) { + *block = *pos_to_pull->_block; _cumulative_mem_size -= pos_to_pull->_mem_size; - pos_to_pull++; - _multi_cast_blocks.pop_front(); - } else { - pos_to_pull->_block->create_same_struct_block(0)->swap(*block); - RETURN_IF_ERROR(vectorized::MutableBlock(block).merge(*pos_to_pull->_block)); pos_to_pull->_used_count--; + use_count = pos_to_pull->_used_count; + un_finish_copy = &pos_to_pull->_un_finish_copy; + pos_to_pull++; } + *eos = _eos and pos_to_pull == end; + } + + if (un_finish_copy) { + if (use_count == 0) { + // will clear _multi_cast_blocks + _wait_copy_block(block, *un_finish_copy); + } else { + _copy_block(block, *un_finish_copy); + } } - *eos = _eos and pos_to_pull == _multi_cast_blocks.end(); return Status::OK(); } @@ -60,12 +70,33 @@ void MultiCastDataStreamer::close_sender(int sender_idx) { _multi_cast_blocks.pop_front(); } else { pos_to_pull->_used_count--; + pos_to_pull->_un_finish_copy--; pos_to_pull++; } } _closed_sender_count++; } +void MultiCastDataStreamer::_copy_block(vectorized::Block* block, int& un_finish_copy) { + const auto rows = block->rows(); + for (int i = 0; i < block->columns(); ++i) { + block->get_by_position(i).column = block->get_by_position(i).column->clone_resized(rows); + } + + std::unique_lock l(_mutex); + un_finish_copy--; + if (un_finish_copy == 0) { + l.unlock(); + _cv.notify_one(); + } +} + +void MultiCastDataStreamer::_wait_copy_block(vectorized::Block* block, int& un_finish_copy) { + std::unique_lock l(_mutex); + _cv.wait(l, [&]() { return un_finish_copy == 0; }); + _multi_cast_blocks.pop_front(); +} + Status MultiCastDataStreamer::push(RuntimeState* state, doris::vectorized::Block* block, bool eos) { auto rows = block->rows(); COUNTER_UPDATE(_process_rows, rows); @@ -79,7 +110,8 @@ Status MultiCastDataStreamer::push(RuntimeState* state, doris::vectorized::Block // TODO: if the [queue back block rows + block->rows()] < batch_size, better // do merge block. but need check the need_process_count and used_count whether // equal - _multi_cast_blocks.emplace_back(block, need_process_count, block_mem_size); + _multi_cast_blocks.emplace_back(block, need_process_count, need_process_count - 1, + block_mem_size); _cumulative_mem_size += block_mem_size; COUNTER_SET(_peak_mem_usage, std::max(_cumulative_mem_size, _peak_mem_usage->value())); diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h b/be/src/pipeline/exec/multi_cast_data_streamer.h index 8159ed87809906..6c08ce282da42c 100644 --- a/be/src/pipeline/exec/multi_cast_data_streamer.h +++ b/be/src/pipeline/exec/multi_cast_data_streamer.h @@ -22,10 +22,11 @@ namespace doris::pipeline { struct MultiCastBlock { - MultiCastBlock(vectorized::Block* block, int used_count, size_t mem_size); + MultiCastBlock(vectorized::Block* block, int used_count, int need_copy, size_t mem_size); std::unique_ptr _block; int _used_count; + int _un_finish_copy; size_t _mem_size; }; @@ -68,6 +69,12 @@ class MultiCastDataStreamer { } private: + void _copy_block(vectorized::Block* block, int& un_finish_copy); + + void _wait_copy_block(vectorized::Block* block, int& un_finish_copy); + + std::condition_variable _cv; + const RowDescriptor& _row_desc; RuntimeProfile* _profile; std::list _multi_cast_blocks;