Skip to content

Commit

Permalink
[Bug](runtime-filter) fix some rf error problems (#37273)
Browse files Browse the repository at this point in the history
## Proposed changes
1. ignore rf when rf-mgr released
2. move acquire rf controller to after acquire query_ctx on
send_filter_size
3. enlarge timeout limit on sync_filter_size/apply_filterv2
4. logout rf's debug string when rpc meet error
  • Loading branch information
BiteTheDDDDt authored Jul 4, 2024
1 parent ac21d76 commit 4f24b7a
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 11 deletions.
17 changes: 13 additions & 4 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1033,25 +1033,34 @@ Status IRuntimeFilter::publish(bool publish_local) {
class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest,
DummyBrpcCallback<PSendFilterSizeResponse>> {
std::shared_ptr<pipeline::Dependency> _dependency;
IRuntimeFilter* _filter;
using Base =
AutoReleaseClosure<PSendFilterSizeRequest, DummyBrpcCallback<PSendFilterSizeResponse>>;
ENABLE_FACTORY_CREATOR(SyncSizeClosure);

void _process_if_rpc_failed() override {
((pipeline::CountedFinishDependency*)_dependency.get())->sub();
LOG(WARNING) << "sync filter size meet rpc error, filter=" << _filter->debug_string();
Base::_process_if_rpc_failed();
}

void _process_if_meet_error_status(const Status& status) override {
((pipeline::CountedFinishDependency*)_dependency.get())->sub();
Base::_process_if_meet_error_status(status);
if (status.is<ErrorCode::END_OF_FILE>()) {
// rf merger backend may finished before rf's send_filter_size, we just ignore filter in this case.
_filter->set_ignored();
} else {
LOG(WARNING) << "sync filter size meet error status, filter="
<< _filter->debug_string();
Base::_process_if_meet_error_status(status);
}
}

public:
SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req,
std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback,
std::shared_ptr<pipeline::Dependency> dependency)
: Base(req, callback), _dependency(std::move(dependency)) {}
std::shared_ptr<pipeline::Dependency> dependency, IRuntimeFilter* filter)
: Base(req, callback), _dependency(std::move(dependency)), _filter(filter) {}
};

Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filter_size) {
Expand Down Expand Up @@ -1094,7 +1103,7 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt

auto request = std::make_shared<PSendFilterSizeRequest>();
auto callback = DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
auto closure = SyncSizeClosure::create_unique(request, callback, _dependency);
auto closure = SyncSizeClosure::create_unique(request, callback, _dependency, this);
auto* pquery_id = request->mutable_query_id();
pquery_id->set_hi(_state->query_id.hi());
pquery_id->set_lo(_state->query_id.lo());
Expand Down
9 changes: 9 additions & 0 deletions be/src/exprs/runtime_filter_slots.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class VRuntimeFilterSlots {
// process ignore duplicate IN_FILTER
std::unordered_set<int> has_in_filter;
for (auto* filter : _runtime_filters) {
if (filter->get_ignored()) {
continue;
}
if (filter->get_real_type() != RuntimeFilterType::IN_FILTER) {
continue;
}
Expand All @@ -83,6 +86,9 @@ class VRuntimeFilterSlots {

// process ignore filter when it has IN_FILTER on same expr, and init bloom filter size
for (auto* filter : _runtime_filters) {
if (filter->get_ignored()) {
continue;
}
if (filter->get_real_type() == RuntimeFilterType::IN_FILTER ||
!has_in_filter.contains(filter->expr_order())) {
continue;
Expand All @@ -95,6 +101,9 @@ class VRuntimeFilterSlots {
Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {
// process IN_OR_BLOOM_FILTER's real type
for (auto* filter : _runtime_filters) {
if (filter->get_ignored()) {
continue;
}
if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
get_real_size(filter, local_hash_table_size) > state->runtime_filter_max_in_num()) {
RETURN_IF_ERROR(filter->change_to_bloom_filter());
Expand Down
3 changes: 0 additions & 3 deletions be/src/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,6 @@ class PipelineFragmentContext : public TaskExecutionContext {

bool _need_local_merge = false;

// It is used to manage the lifecycle of RuntimeFilterMergeController
std::vector<std::shared_ptr<RuntimeFilterMergeControllerEntity>> _merge_controller_handlers;

// TODO: remove the _sink and _multi_cast_stream_sink_senders to set both
// of it in pipeline task not the fragment_context
#ifdef __clang__
Expand Down
9 changes: 5 additions & 4 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1085,8 +1085,6 @@ Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,

Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
UniqueId queryid = request->query_id();
std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller));

std::shared_ptr<QueryContext> query_ctx;
{
Expand All @@ -1097,10 +1095,13 @@ Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
if (auto q_ctx = _get_or_erase_query_ctx(query_id)) {
query_ctx = q_ctx;
} else {
return Status::InvalidArgument("Query context (query-id: {}) not found",
queryid.to_string());
return Status::EndOfFile("Query context (query-id: {}) not found, maybe finished",
queryid.to_string());
}
}

std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller));
auto merge_status = filter_controller->send_filter_size(request);
return merge_status;
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz
auto* pquery_id = closure->request_->mutable_query_id();
pquery_id->set_hi(_state->query_id.hi());
pquery_id->set_lo(_state->query_id.lo());
closure->cntl_->set_timeout_ms(std::min(3600, _state->execution_timeout) * 1000);

closure->request_->set_filter_id(filter_id);
closure->request_->set_filter_size(cnt_val->global_size);
Expand Down Expand Up @@ -453,6 +454,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
if (has_attachment) {
closure->cntl_->request_attachment().append(request_attachment);
}
closure->cntl_->set_timeout_ms(std::min(3600, _state->execution_timeout) * 1000);
// set fragment-id
for (auto& target_fragment_instance_id : target.target_fragment_instance_ids) {
PUniqueId* cur_id = closure->request_->add_fragment_instance_ids();
Expand Down

0 comments on commit 4f24b7a

Please sign in to comment.