Skip to content

Commit

Permalink
[fix](local exchange) Fix EOS processing in local exchanger (#39031)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 authored and dataroaring committed Aug 11, 2024
1 parent f9b2976 commit 541a6f9
Show file tree
Hide file tree
Showing 8 changed files with 312 additions and 245 deletions.
6 changes: 5 additions & 1 deletion be/src/pipeline/dependency.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,12 @@ void LocalExchangeSharedState::sub_running_sink_operators() {
}
}

void LocalExchangeSharedState::sub_running_source_operators() {
void LocalExchangeSharedState::sub_running_source_operators(
LocalExchangeSourceLocalState& local_state) {
std::unique_lock<std::mutex> lc(le_lock);
if (exchanger->_running_source_operators.fetch_sub(1) == 1) {
_set_always_ready();
exchanger->finalize(local_state);
}
}

Expand Down Expand Up @@ -397,4 +399,6 @@ Status AggSharedState::_destroy_agg_status(vectorized::AggregateDataPtr data) {
return Status::OK();
}

LocalExchangeSharedState::~LocalExchangeSharedState() = default;

} // namespace doris::pipeline
85 changes: 70 additions & 15 deletions be/src/pipeline/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class Dependency;
class PipelineTask;
struct BasicSharedState;
using DependencySPtr = std::shared_ptr<Dependency>;
using DependencyMap = std::map<int, std::vector<DependencySPtr>>;
class LocalExchangeSourceLocalState;

static constexpr auto SLOW_DEPENDENCY_THRESHOLD = 60 * 1000L * 1000L * 1000L;
static constexpr auto TIME_UNIT_DEPENDENCY_LOG = 30 * 1000L * 1000L * 1000L;
Expand Down Expand Up @@ -811,20 +811,21 @@ struct LocalExchangeSharedState : public BasicSharedState {
public:
ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
LocalExchangeSharedState(int num_instances);
~LocalExchangeSharedState() override;
std::unique_ptr<ExchangerBase> exchanger {};
std::vector<MemTracker*> mem_trackers;
std::atomic<int64_t> mem_usage = 0;
// We need to make sure to add mem_usage first and then enqueue, otherwise sub mem_usage may cause negative mem_usage during concurrent dequeue.
std::mutex le_lock;
void create_source_dependencies(int operator_id, int node_id) {
virtual void create_dependencies(int operator_id, int node_id) {
for (auto& source_dep : source_deps) {
source_dep = std::make_shared<Dependency>(operator_id, node_id,
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
source_dep->set_shared_state(this);
}
};
}
void sub_running_sink_operators();
void sub_running_source_operators();
void sub_running_source_operators(LocalExchangeSourceLocalState& local_state);
void _set_always_ready() {
for (auto& dep : source_deps) {
DCHECK(dep);
Expand All @@ -836,7 +837,10 @@ struct LocalExchangeSharedState : public BasicSharedState {
}
}

Dependency* get_dep_by_channel_id(int channel_id) { return source_deps[channel_id].get(); }
virtual std::vector<DependencySPtr> get_dep_by_channel_id(int channel_id) {
return {source_deps[channel_id]};
}
virtual Dependency* get_sink_dep_by_channel_id(int channel_id) { return nullptr; }

void set_ready_to_read(int channel_id) {
auto& dep = source_deps[channel_id];
Expand All @@ -847,28 +851,79 @@ struct LocalExchangeSharedState : public BasicSharedState {
void add_mem_usage(int channel_id, size_t delta, bool update_total_mem_usage = true) {
mem_trackers[channel_id]->consume(delta);
if (update_total_mem_usage) {
add_total_mem_usage(delta);
add_total_mem_usage(delta, channel_id);
}
}

void sub_mem_usage(int channel_id, size_t delta, bool update_total_mem_usage = true) {
mem_trackers[channel_id]->release(delta);
if (update_total_mem_usage) {
sub_total_mem_usage(delta);
}
}
void sub_mem_usage(int channel_id, size_t delta) { mem_trackers[channel_id]->release(delta); }

void add_total_mem_usage(size_t delta) {
virtual void add_total_mem_usage(size_t delta, int channel_id = 0) {
if (mem_usage.fetch_add(delta) + delta > config::local_exchange_buffer_mem_limit) {
sink_deps.front()->block();
}
}

void sub_total_mem_usage(size_t delta) {
if (mem_usage.fetch_sub(delta) - delta <= config::local_exchange_buffer_mem_limit) {
virtual void sub_total_mem_usage(size_t delta, int channel_id = 0) {
auto prev_usage = mem_usage.fetch_sub(delta);
DCHECK_GE(prev_usage - delta, 0) << "prev_usage: " << prev_usage << " delta: " << delta
<< " channel_id: " << channel_id;
if (prev_usage - delta <= config::local_exchange_buffer_mem_limit) {
sink_deps.front()->set_ready();
}
}
};

struct LocalMergeExchangeSharedState : public LocalExchangeSharedState {
LocalMergeExchangeSharedState(int num_instances)
: LocalExchangeSharedState(num_instances),
_queues_mem_usage(num_instances),
_each_queue_limit(config::local_exchange_buffer_mem_limit / num_instances) {
for (size_t i = 0; i < num_instances; i++) {
_queues_mem_usage[i] = 0;
}
}

void create_dependencies(int operator_id, int node_id) override {
sink_deps.resize(source_deps.size());
for (size_t i = 0; i < source_deps.size(); i++) {
source_deps[i] = std::make_shared<Dependency>(operator_id, node_id,
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
source_deps[i]->set_shared_state(this);
sink_deps[i] = std::make_shared<Dependency>(
operator_id, node_id, "LOCAL_EXCHANGE_OPERATOR_SINK_DEPENDENCY", true);
sink_deps[i]->set_shared_state(this);
}
}

void sub_total_mem_usage(size_t delta, int channel_id) override {
auto prev_usage = _queues_mem_usage[channel_id].fetch_sub(delta);
DCHECK_GE(prev_usage - delta, 0) << "prev_usage: " << prev_usage << " delta: " << delta
<< " channel_id: " << channel_id;
if (prev_usage - delta <= _each_queue_limit) {
sink_deps[channel_id]->set_ready();
}
if (_queues_mem_usage[channel_id] == 0) {
source_deps[channel_id]->block();
}
}
void add_total_mem_usage(size_t delta, int channel_id) override {
if (_queues_mem_usage[channel_id].fetch_add(delta) + delta > _each_queue_limit) {
sink_deps[channel_id]->block();
}
source_deps[channel_id]->set_ready();
}

Dependency* get_sink_dep_by_channel_id(int channel_id) override {
return sink_deps[channel_id].get();
}

std::vector<DependencySPtr> get_dep_by_channel_id(int channel_id) override {
return source_deps;
}

private:
std::vector<std::atomic_int64_t> _queues_mem_usage;
const int64_t _each_queue_limit;
};

} // namespace doris::pipeline
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,10 @@ Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
DCHECK(info.le_state_map.find(_parent->operator_id()) != info.le_state_map.end());
_shared_state = info.le_state_map.at(_parent->operator_id()).first.get();

_dependency = _shared_state->get_dep_by_channel_id(info.task_idx);
auto deps = _shared_state->get_dep_by_channel_id(info.task_idx);
if (deps.size() == 1) {
_dependency = deps.front().get();
}
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1);
} else if (info.shared_state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ LocalExchangeSinkLocalState::~LocalExchangeSinkLocalState() = default;

std::vector<Dependency*> LocalExchangeSinkLocalState::dependencies() const {
auto deps = Base::dependencies();
auto exchanger_deps = _exchanger->local_sink_state_dependency(_channel_id);
for (auto* dep : exchanger_deps) {

auto dep = _shared_state->get_sink_dep_by_channel_id(_channel_id);
if (dep != nullptr) {
deps.push_back(dep);
}
return deps;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,20 @@ Status LocalExchangeSourceLocalState::close(RuntimeState* state) {
_exchanger->close(*this);
}
if (_shared_state) {
_shared_state->sub_running_source_operators();
_shared_state->sub_running_source_operators(*this);
}

return Base::close(state);
}

std::vector<Dependency*> LocalExchangeSourceLocalState::dependencies() const {
auto deps = Base::dependencies();
auto exchanger_deps = _exchanger->local_state_dependency(_channel_id);
for (auto* dep : exchanger_deps) {
deps.push_back(dep);
auto le_deps = _shared_state->get_dep_by_channel_id(_channel_id);
if (le_deps.size() > 1) {
// If this is a local merge exchange, we should use all dependencies here.
for (auto& dep : le_deps) {
deps.push_back(dep.get());
}
}
return deps;
}
Expand Down
Loading

0 comments on commit 541a6f9

Please sign in to comment.