Skip to content

Commit

Permalink
[feature](profilev2) Preliminary support for profilev2. (#24881)
Browse files Browse the repository at this point in the history
You can set the level of counters on the backend using ADD_COUNTER_WITH_LEVEL/ADD_TIMER_WITH_LEVEL. The profile can then merge counters with level 1.
set profile_level = 1;
such as
sql
select count(*) from customer join item on c_customer_sk = i_item_sk

profile

Simple  profile  
  
  PLAN  FRAGMENT  0
    OUTPUT  EXPRS:
        count(*)
    PARTITION:  UNPARTITIONED

    VRESULT  SINK
          MYSQL_PROTOCAL


    7:VAGGREGATE  (merge  finalize)
    |    output:  count(partial_count(*))[#44]
    |    group  by:  
    |    cardinality=1
    |    TotalTime:  avg  725.608us,  max  725.608us,  min  725.608us
    |    RowsReturned:  1
    |    
    6:VEXCHANGE
          offset:  0
          TotalTime:  avg  52.411us,  max  52.411us,  min  52.411us
          RowsReturned:  8

PLAN  FRAGMENT  1

    PARTITION:  HASH_PARTITIONED:  c_customer_sk

    STREAM  DATA  SINK
        EXCHANGE  ID:  06
        UNPARTITIONED

        TotalTime:  avg  106.263us,  max  118.38us,  min  81.403us
        BlocksSent:  8

    5:VAGGREGATE  (update  serialize)
    |    output:  partial_count(*)[#43]
    |    group  by:  
    |    cardinality=1
    |    TotalTime:  avg  679.296us,  max  739.395us,  min  554.904us
    |    BuildTime:  avg  33.198us,  max  48.387us,  min  28.880us
    |    ExecTime:  avg  27.633us,  max  40.278us,  min  24.537us
    |    RowsReturned:  8
    |    
    4:VHASH  JOIN
    |    join  op:  INNER  JOIN(PARTITIONED)[]
    |    equal  join  conjunct:  c_customer_sk  =  i_item_sk
    |    runtime  filters:  RF000[bloom]  <-  i_item_sk(18000/16384/1048576)
    |    cardinality=17,740
    |    vec  output  tuple  id:  3
    |    vIntermediate  tuple  ids:  2  
    |    hash  output  slot  ids:  22  
    |    RowsReturned:  18.0K  (18000)
    |    ProbeRows:  18.0K  (18000)
    |    ProbeTime:  avg  862.308us,  max  1.576ms,  min  666.28us
    |    BuildRows:  18.0K  (18000)
    |    BuildTime:  avg  3.8ms,  max  3.860ms,  min  2.317ms
    |    
    |----1:VEXCHANGE
    |              offset:  0
    |              TotalTime:  avg  48.822us,  max  67.459us,  min  30.380us
    |              RowsReturned:  18.0K  (18000)
    |        
    3:VEXCHANGE
          offset:  0
          TotalTime:  avg  33.162us,  max  39.480us,  min  28.854us
          RowsReturned:  18.0K  (18000)

PLAN  FRAGMENT  2

    PARTITION:  HASH_PARTITIONED:  c_customer_id

    STREAM  DATA  SINK
        EXCHANGE  ID:  03
        HASH_PARTITIONED:  c_customer_sk

        TotalTime:  avg  753.954us,  max  1.210ms,  min  499.470us
        BlocksSent:  64

    2:VOlapScanNode
          TABLE:  default_cluster:tpcds.customer(customer),  PREAGGREGATION:  ON
          runtime  filters:  RF000[bloom]  ->  c_customer_sk
          partitions=1/1,  tablets=12/12,  tabletList=1550745,1550747,1550749  ...
          cardinality=100000,  avgRowSize=0.0,  numNodes=1
          pushAggOp=NONE
          TotalTime:  avg  18.417us,  max  41.319us,  min  10.189us
          RowsReturned:  18.0K  (18000)
---------

Co-authored-by: yiguolei <676222867@qq.com>
  • Loading branch information
Mryange and yiguolei authored Oct 7, 2023
1 parent 83a9d07 commit 0631ed6
Show file tree
Hide file tree
Showing 20 changed files with 375 additions and 140 deletions.
3 changes: 2 additions & 1 deletion be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ Status ExecNode::prepare(RuntimeState* state) {
DCHECK(_runtime_profile.get() != nullptr);
_span = state->get_tracer()->StartSpan(get_name());
OpentelemetryScope scope {_span};
_rows_returned_counter = ADD_COUNTER(_runtime_profile, "RowsReturned", TUnit::UNIT);
_rows_returned_counter =
ADD_COUNTER_WITH_LEVEL(_runtime_profile, "RowsReturned", TUnit::UNIT, 1);
_projection_timer = ADD_TIMER(_runtime_profile, "ProjectionTime");
_rows_returned_rate = runtime_profile()->add_derived_counter(
ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND,
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
_split_block_hash_compute_timer = ADD_TIMER(_profile, "SplitBlockHashComputeTime");
_split_block_distribute_by_channel_timer =
ADD_TIMER(_profile, "SplitBlockDistributeByChannelTime");
_blocks_sent_counter = ADD_COUNTER(_profile, "BlocksSent", TUnit::UNIT);
_blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksSent", TUnit::UNIT, 1);
_overall_throughput = _profile->add_derived_counter(
"OverallThroughput", TUnit::BYTES_PER_SECOND,
std::bind<int64_t>(&RuntimeProfile::units_per_second, _bytes_sent_counter,
Expand Down
12 changes: 8 additions & 4 deletions be/src/util/runtime_profile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ RuntimeProfile::RuntimeProfile(const std::string& name, bool is_averaged_profile
_metadata(-1),
_timestamp(-1),
_is_averaged_profile(is_averaged_profile),
_counter_total_time(TUnit::TIME_NS, 0),
_counter_total_time(TUnit::TIME_NS, 0, 1),
_local_time_percent(0) {
_counter_map["TotalTime"] = &_counter_total_time;
}
Expand Down Expand Up @@ -278,6 +278,9 @@ RuntimeProfile* RuntimeProfile::create_child(const std::string& name, bool inden
std::lock_guard<std::mutex> l(_children_lock);
DCHECK(_child_map.find(name) == _child_map.end());
RuntimeProfile* child = _pool->add(new RuntimeProfile(name));
if (this->is_set_metadata()) {
child->set_metadata(this->metadata());
}
if (_children.empty()) {
add_child_unlock(child, indent, nullptr);
} else {
Expand Down Expand Up @@ -405,7 +408,8 @@ std::shared_ptr<RuntimeProfile::HighWaterMarkCounter> RuntimeProfile::AddSharedH
}

RuntimeProfile::Counter* RuntimeProfile::add_counter(const std::string& name, TUnit::type type,
const std::string& parent_counter_name) {
const std::string& parent_counter_name,
int64_t level) {
std::lock_guard<std::mutex> l(_counter_map_lock);

// TODO(yingchun): Can we ensure that 'name' is not exist in '_counter_map'? Use CHECK instead?
Expand All @@ -416,7 +420,7 @@ RuntimeProfile::Counter* RuntimeProfile::add_counter(const std::string& name, TU

DCHECK(parent_counter_name == ROOT_COUNTER ||
_counter_map.find(parent_counter_name) != _counter_map.end());
Counter* counter = _pool->add(new Counter(type, 0));
Counter* counter = _pool->add(new Counter(type, 0, level));
_counter_map[name] = counter;
std::set<std::string>* child_counters =
find_or_insert(&_child_counter_map, parent_counter_name, std::set<std::string>());
Expand Down Expand Up @@ -631,7 +635,6 @@ void RuntimeProfile::to_thrift(std::vector<TRuntimeProfileNode>* nodes) {
node.metadata = _metadata;
node.timestamp = _timestamp;
node.indent = true;

CounterMap counter_map;
{
std::lock_guard<std::mutex> l(_counter_map_lock);
Expand All @@ -645,6 +648,7 @@ void RuntimeProfile::to_thrift(std::vector<TRuntimeProfileNode>* nodes) {
counter.name = iter->first;
counter.value = iter->second->value();
counter.type = iter->second->type();
counter.__set_level(iter->second->level());
node.counters.push_back(counter);
}

Expand Down
25 changes: 22 additions & 3 deletions be/src/util/runtime_profile.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <algorithm>
#include <atomic>
#include <cstdint>
#include <functional>
#include <iostream>
#include <map>
Expand Down Expand Up @@ -53,7 +54,11 @@ class TRuntimeProfileTree;

#define ADD_LABEL_COUNTER(profile, name) (profile)->add_counter(name, TUnit::NONE)
#define ADD_COUNTER(profile, name, type) (profile)->add_counter(name, type)
#define ADD_COUNTER_WITH_LEVEL(profile, name, type, level) \
(profile)->add_counter_with_level(name, type, level)
#define ADD_TIMER(profile, name) (profile)->add_counter(name, TUnit::TIME_NS)
#define ADD_TIMER_WITH_LEVEL(profile, name, level) \
(profile)->add_counter_with_level(name, TUnit::TIME_NS, level)
#define ADD_CHILD_COUNTER(profile, name, type, parent) (profile)->add_counter(name, type, parent)
#define ADD_CHILD_TIMER(profile, name, parent) (profile)->add_counter(name, TUnit::TIME_NS, parent)
#define SCOPED_TIMER(c) ScopedTimer<MonotonicStopWatch> MACRO_CONCAT(SCOPED_TIMER, __COUNTER__)(c)
Expand Down Expand Up @@ -86,7 +91,8 @@ class RuntimeProfile {
public:
class Counter {
public:
Counter(TUnit::type type, int64_t value = 0) : _value(value), _type(type) {}
Counter(TUnit::type type, int64_t value = 0, int64_t level = 3)
: _value(value), _type(type), _level(level) {}
virtual ~Counter() = default;

virtual void update(int64_t delta) { _value.fetch_add(delta, std::memory_order_relaxed); }
Expand All @@ -108,11 +114,14 @@ class RuntimeProfile {

TUnit::type type() const { return _type; }

virtual int64_t level() { return _level; }

private:
friend class RuntimeProfile;

std::atomic<int64_t> _value;
TUnit::type _type;
int64_t _level;
};

/// A counter that keeps track of the highest value seen (reporting that
Expand Down Expand Up @@ -276,11 +285,15 @@ class RuntimeProfile {
// parent_counter_name.
// If the counter already exists, the existing counter object is returned.
Counter* add_counter(const std::string& name, TUnit::type type,
const std::string& parent_counter_name);
const std::string& parent_counter_name, int64_t level = 2);
Counter* add_counter(const std::string& name, TUnit::type type) {
return add_counter(name, type, "");
}

Counter* add_counter_with_level(const std::string& name, TUnit::type type, int64_t level) {
return add_counter(name, type, "", level);
}

// Add a derived counter with 'name'/'type'. The counter is owned by the
// RuntimeProfile object.
// If parent_counter_name is a non-empty string, the counter is added as a child of
Expand Down Expand Up @@ -348,7 +361,12 @@ class RuntimeProfile {
void set_name(const std::string& name) { _name = name; }

int64_t metadata() const { return _metadata; }
void set_metadata(int64_t md) { _metadata = md; }
void set_metadata(int64_t md) {
_is_set_metadata = true;
_metadata = md;
}

bool is_set_metadata() const { return _is_set_metadata; }

time_t timestamp() const { return _timestamp; }
void set_timestamp(time_t ss) { _timestamp = ss; }
Expand Down Expand Up @@ -410,6 +428,7 @@ class RuntimeProfile {

// user-supplied, uninterpreted metadata.
int64_t _metadata;
bool _is_set_metadata = false;

// The timestamp when the profile was modified, make sure the update is up to date.
time_t _timestamp;
Expand Down
9 changes: 7 additions & 2 deletions be/src/vec/exec/join/vjoin_node_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/runtime_profile.h"
#include "util/telemetry/telemetry.h"
#include "util/threadpool.h"
#include "vec/columns/column.h"
Expand Down Expand Up @@ -111,12 +112,16 @@ Status VJoinNodeBase::prepare(RuntimeState* state) {
runtime_profile()->add_info_string("JoinType", to_string(_join_op));
_build_phase_profile = runtime_profile()->create_child("BuildPhase", true, true);

_build_get_next_timer = ADD_TIMER(_build_phase_profile, "BuildGetNextTime");
_build_timer = ADD_TIMER_WITH_LEVEL(_build_phase_profile, "BuildTime", 1);
_build_rows_counter = ADD_COUNTER_WITH_LEVEL(_build_phase_profile, "BuildRows", TUnit::UNIT, 1);

_probe_phase_profile = runtime_profile()->create_child("ProbePhase", true, true);
_probe_timer = ADD_TIMER(_probe_phase_profile, "ProbeTime");
_probe_timer = ADD_TIMER_WITH_LEVEL(_probe_phase_profile, "ProbeTime", 1);
_join_filter_timer = ADD_CHILD_TIMER(_probe_phase_profile, "JoinFilterTimer", "ProbeTime");
_build_output_block_timer =
ADD_CHILD_TIMER(_probe_phase_profile, "BuildOutputBlock", "ProbeTime");
_probe_rows_counter = ADD_COUNTER(_probe_phase_profile, "ProbeRows", TUnit::UNIT);
_probe_rows_counter = ADD_COUNTER_WITH_LEVEL(_probe_phase_profile, "ProbeRows", TUnit::UNIT, 1);

_push_down_timer = ADD_TIMER(runtime_profile(), "PublishRuntimeFilterTime");
_push_compute_timer = ADD_TIMER(runtime_profile(), "PushDownComputeTime");
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/vaggregation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,10 @@ Status AggregationNode::prepare_profile(RuntimeState* state) {
_serialize_key_arena_memory_usage = runtime_profile()->AddHighWaterMarkCounter(
"SerializeKeyArena", TUnit::BYTES, "MemoryUsage");

_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
_build_timer = ADD_TIMER_WITH_LEVEL(runtime_profile(), "BuildTime", 1);
_build_table_convert_timer = ADD_TIMER(runtime_profile(), "BuildConvertToPartitionedTime");
_serialize_key_timer = ADD_TIMER(runtime_profile(), "SerializeKeyTime");
_exec_timer = ADD_TIMER(runtime_profile(), "ExecTime");
_exec_timer = ADD_TIMER_WITH_LEVEL(runtime_profile(), "ExecTime", 1);
_merge_timer = ADD_TIMER(runtime_profile(), "MergeTime");
_expr_timer = ADD_TIMER(runtime_profile(), "ExprTime");
_get_results_timer = ADD_TIMER(runtime_profile(), "GetResultsTime");
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
_split_block_distribute_by_channel_timer =
ADD_TIMER(profile(), "SplitBlockDistributeByChannelTime");
_merge_block_timer = ADD_TIMER(profile(), "MergeBlockTime");
_blocks_sent_counter = ADD_COUNTER(profile(), "BlocksSent", TUnit::UNIT);
_blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(profile(), "BlocksSent", TUnit::UNIT, 1);
_overall_throughput = profile()->add_derived_counter(
"OverallThroughput", TUnit::BYTES_PER_SECOND,
std::bind<int64_t>(&RuntimeProfile::units_per_second, _bytes_sent_counter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.common.util.ProfileManager;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.planner.Planner;

import com.google.common.collect.Lists;

Expand Down Expand Up @@ -62,7 +63,7 @@ public void addExecutionProfile(ExecutionProfile executionProfile) {
}

public synchronized void update(long startTime, Map<String, String> summaryInfo, boolean isFinished,
boolean isSimpleProfile) {
int profileLevel, Planner planner) {
if (this.isFinished) {
return;
}
Expand All @@ -71,7 +72,8 @@ public synchronized void update(long startTime, Map<String, String> summaryInfo,
executionProfile.update(startTime, isFinished);
}
rootProfile.computeTimeInProfile();
rootProfile.setProfileLevel(isSimpleProfile);
rootProfile.setPlaner(planner);
rootProfile.setProfileLevel(profileLevel);
ProfileManager.getInstance().pushProfile(rootProfile);
this.isFinished = isFinished;
}
Expand Down
16 changes: 16 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/common/util/Counter.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class Counter {
private volatile long value;
private volatile int type;
private volatile boolean remove = false;
private volatile long level;

public long getValue() {
return value;
Expand All @@ -42,13 +43,24 @@ public TUnit getType() {
return TUnit.findByValue(type);
}

public void setLevel(long level) {
this.level = level;
}

public void setType(TUnit type) {
this.type = type.getValue();
}

public Counter(TUnit type, long value) {
this.value = value;
this.type = type.getValue();
this.level = 2;
}

public Counter(TUnit type, long value, long level) {
this.value = value;
this.type = type.getValue();
this.level = level;
}

public void addValue(Counter other) {
Expand Down Expand Up @@ -77,4 +89,8 @@ public void setCanRemove() {
public boolean isRemove() {
return this.remove;
}

public long getLevel() {
return this.level;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public String getProfileContent() {
if (profileContent == null) {
// Simple profile will change the structure of the profile.
try {
profileContent = profile.getSimpleString();
profileContent = profile.getProfileByLevel();
} catch (Exception e) {
LOG.warn("profile get error : " + e.toString());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.util;

import java.util.ArrayList;
import java.util.HashMap;

/**
* Used for collecting information obtained from the profile.
*/
public class ProfileStatistics {
// Record statistical information based on nodeid.
private HashMap<Integer, ArrayList<String>> statisticalInfo;

// Record statistical information based on fragment ID.
// "Currently used to record sink nodes.
private HashMap<Integer, ArrayList<String>> fragmentInfo;

private int fragmentId;

private boolean isDataSink;

public ProfileStatistics() {
statisticalInfo = new HashMap<Integer, ArrayList<String>>();
fragmentInfo = new HashMap<Integer, ArrayList<String>>();
fragmentId = 0;
isDataSink = false;
}

private void addPlanNodeInfo(int id, String info) {
if (!statisticalInfo.containsKey(id)) {
statisticalInfo.put(id, new ArrayList<String>());
}
statisticalInfo.get(id).add(info);
}

private void addDataSinkInfo(String info) {
if (fragmentInfo.get(fragmentId) == null) {
fragmentInfo.put(fragmentId, new ArrayList<String>());
}
fragmentInfo.get(fragmentId).add(info);
}

public void addInfoFromProfile(RuntimeProfile profile, String info) {
if (isDataSink) {
addDataSinkInfo(info);
} else {
addPlanNodeInfo(profile.nodeId(), info);
}
}

public boolean hasInfo(int id) {
return statisticalInfo.containsKey(id);
}

public void getInfoById(int id, String prefix, StringBuilder str) {
if (!hasInfo(id)) {
return;
}
ArrayList<String> infos = statisticalInfo.get(id);
for (String info : infos) {
str.append(prefix + info + "\n");
}
}

public void getDataSinkInfo(int fragmentIdx, String prefix, StringBuilder str) {
if (!fragmentInfo.containsKey(fragmentIdx)) {
return;
}
ArrayList<String> infos = fragmentInfo.get(fragmentIdx);
for (String info : infos) {
str.append(prefix + info + "\n");
}
}

public void setFragmentId(int id) {
this.fragmentId = id;
}

public void setIsDataSink(boolean dataSink) {
this.isDataSink = dataSink;
}

}
Loading

0 comments on commit 0631ed6

Please sign in to comment.