Skip to content

Commit

Permalink
Merge branch 'master' into show_transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
cjj2010 authored Oct 13, 2024
2 parents a8504d6 + 6d4c91a commit 09e0800
Show file tree
Hide file tree
Showing 303 changed files with 8,649 additions and 5,120 deletions.
14 changes: 12 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,10 @@ DEFINE_mInt64(stacktrace_in_alloc_large_memory_bytes, "2147483648");

DEFINE_mInt64(crash_in_alloc_large_memory_bytes, "-1");

// If memory tracker value is inaccurate, BE will crash. usually used in test environments, default value is false.
// The actual meaning of this parameter is `debug_memory`.
// 1. crash in memory tracker inaccurate, if memory tracker value is inaccurate, BE will crash.
// usually used in test environments, default value is false.
// 2. print more memory logs.
DEFINE_mBool(crash_in_memory_tracker_inaccurate, "false");

// default is true. if any memory tracking in Orphan mem tracker will report error.
Expand Down Expand Up @@ -995,7 +998,7 @@ DEFINE_Bool(enable_file_cache, "false");
// or use the default storage value:
// {"path": "memory", "total_size":53687091200}
// Both will use the directory "memory" on the disk instead of the real RAM.
DEFINE_String(file_cache_path, "");
DEFINE_String(file_cache_path, "[{\"path\":\"${DORIS_HOME}/file_cache\"}]");
DEFINE_Int64(file_cache_each_block_size, "1048576"); // 1MB

DEFINE_Bool(clear_file_cache, "false");
Expand Down Expand Up @@ -1683,6 +1686,13 @@ bool init(const char* conf_file, bool fill_conf_map, bool must_exist, bool set_t
SET_FIELD(it.second, std::vector<std::string>, fill_conf_map, set_to_default);
}

if (config::is_cloud_mode()) {
auto st = config::set_config("enable_file_cache", "true", true, true);
LOG(INFO) << "set config enable_file_cache "
<< "true"
<< " " << st;
}

return true;
}

Expand Down
5 changes: 4 additions & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,10 @@ DECLARE_mInt64(stacktrace_in_alloc_large_memory_bytes);
// modify this parameter to crash when large memory allocation occur will help
DECLARE_mInt64(crash_in_alloc_large_memory_bytes);

// If memory tracker value is inaccurate, BE will crash. usually used in test environments, default value is false.
// The actual meaning of this parameter is `debug_memory`.
// 1. crash in memory tracker inaccurate, if memory tracker value is inaccurate, BE will crash.
// usually used in test environments, default value is false.
// 2. print more memory logs.
DECLARE_mBool(crash_in_memory_tracker_inaccurate);

// default is true. if any memory tracking in Orphan mem tracker will report error.
Expand Down
30 changes: 11 additions & 19 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,48 +27,37 @@
// IWYU pragma: no_include <bits/std_abs.h>
#include <butil/iobuf.h>
#include <math.h>
#include <signal.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>

#include <algorithm>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <map>
#include <ostream>
#include <set>
#include <string>

#include "cloud/config.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "olap/memtable_memory_limiter.h"
#include "olap/options.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "runtime/be_proc_monitor.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/memory/global_memory_arbitrator.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/memory/memory_reclamation.h"
#include "runtime/process_profile.h"
#include "runtime/runtime_query_statistics_mgr.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/algorithm_util.h"
#include "util/cpu_info.h"
#include "util/debug_util.h"
#include "util/disk_info.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
#include "util/metrics.h"
#include "util/network_util.h"
#include "util/perf_counters.h"
#include "util/system_metrics.h"
#include "util/thrift_util.h"
#include "util/time.h"

namespace doris {
Expand Down Expand Up @@ -233,9 +222,8 @@ void refresh_memory_state_after_memory_change() {
if (abs(last_print_proc_mem - PerfCounters::get_vm_rss()) > 268435456) {
last_print_proc_mem = PerfCounters::get_vm_rss();
doris::MemTrackerLimiter::clean_tracker_limiter_group();
doris::MemTrackerLimiter::enable_print_log_process_usage();
// Refresh mem tracker each type counter.
doris::MemTrackerLimiter::refresh_global_counter();
doris::ProcessProfile::instance()->memory_profile()->enable_print_log_process_usage();
doris::ProcessProfile::instance()->memory_profile()->refresh_memory_overview_profile();
LOG(INFO) << doris::GlobalMemoryArbitrator::
process_mem_log_str(); // print mem log when memory state by 256M
}
Expand Down Expand Up @@ -339,10 +327,12 @@ void Daemon::memory_gc_thread() {
memory_full_gc_sleep_time_ms = memory_gc_sleep_time_ms;
memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
LOG(INFO) << fmt::format("[MemoryGC] start full GC, {}.", mem_info);
doris::MemTrackerLimiter::print_log_process_usage();
doris::ProcessProfile::instance()->memory_profile()->print_log_process_usage();
if (doris::MemoryReclamation::process_full_gc(std::move(mem_info))) {
// If there is not enough memory to be gc, the process memory usage will not be printed in the next continuous gc.
doris::MemTrackerLimiter::enable_print_log_process_usage();
doris::ProcessProfile::instance()
->memory_profile()
->enable_print_log_process_usage();
}
} else if (memory_minor_gc_sleep_time_ms <= 0 &&
(sys_mem_available < doris::MemInfo::sys_mem_available_warning_water_mark() ||
Expand All @@ -352,9 +342,11 @@ void Daemon::memory_gc_thread() {
doris::GlobalMemoryArbitrator::process_soft_limit_exceeded_errmsg_str();
memory_minor_gc_sleep_time_ms = memory_gc_sleep_time_ms;
LOG(INFO) << fmt::format("[MemoryGC] start minor GC, {}.", mem_info);
doris::MemTrackerLimiter::print_log_process_usage();
doris::ProcessProfile::instance()->memory_profile()->print_log_process_usage();
if (doris::MemoryReclamation::process_minor_gc(std::move(mem_info))) {
doris::MemTrackerLimiter::enable_print_log_process_usage();
doris::ProcessProfile::instance()
->memory_profile()
->enable_print_log_process_usage();
}
} else {
if (memory_full_gc_sleep_time_ms > 0) {
Expand Down
28 changes: 17 additions & 11 deletions be/src/exec/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -617,25 +617,30 @@ bool ColumnValueRange<primitive_type>::convert_to_avg_range_value(
std::vector<OlapTuple>& begin_scan_keys, std::vector<OlapTuple>& end_scan_keys,
bool& begin_include, bool& end_include, int32_t max_scan_key_num) {
if constexpr (!_is_reject_split_type) {
CppType min_value = get_range_min_value();
CppType max_value = get_range_max_value();
if constexpr (primitive_type == PrimitiveType::TYPE_DATE) {
min_value.set_type(TimeType::TIME_DATE);
max_value.set_type(TimeType::TIME_DATE);
}
auto empty_range_only_null = min_value > max_value;
if (empty_range_only_null) {
// Not contain null will be disposed in `convert_to_close_range`, return eos.
DCHECK(contain_null());
}

auto no_split = [&]() -> bool {
begin_scan_keys.emplace_back();
begin_scan_keys.back().add_value(
cast_to_string<primitive_type, CppType>(get_range_min_value(), scale()),
contain_null());
end_scan_keys.emplace_back();
end_scan_keys.back().add_value(
cast_to_string<primitive_type, CppType>(get_range_max_value(), scale()));
cast_to_string<primitive_type, CppType>(get_range_max_value(), scale()),
empty_range_only_null ? true : false);
return true;
};

CppType min_value = get_range_min_value();
CppType max_value = get_range_max_value();
if constexpr (primitive_type == PrimitiveType::TYPE_DATE) {
min_value.set_type(TimeType::TIME_DATE);
max_value.set_type(TimeType::TIME_DATE);
}

if (min_value > max_value || max_scan_key_num == 1) {
if (empty_range_only_null || max_scan_key_num == 1) {
return no_split();
}

Expand Down Expand Up @@ -1028,7 +1033,8 @@ Status OlapScanKeys::extend_scan_key(ColumnValueRange<primitive_type>& range,
*eos |= range.convert_to_close_range(_begin_scan_keys, _end_scan_keys, _begin_include,
_end_include);

if (range.convert_to_avg_range_value(_begin_scan_keys, _end_scan_keys, _begin_include,
if (!(*eos) &&
range.convert_to_avg_range_value(_begin_scan_keys, _end_scan_keys, _begin_include,
_end_include, max_scan_key_num)) {
_has_range_value = true;
}
Expand Down
23 changes: 1 addition & 22 deletions be/src/exec/schema_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@
namespace doris {
class ObjectPool;

SchemaScanner::SchemaScanner(const std::vector<ColumnDesc>& columns)
: _is_init(false), _columns(columns), _schema_table_type(TSchemaTableType::SCH_INVALID) {}

SchemaScanner::SchemaScanner(const std::vector<ColumnDesc>& columns, TSchemaTableType::type type)
: _is_init(false), _columns(columns), _schema_table_type(type) {}

Expand Down Expand Up @@ -125,7 +122,6 @@ Status SchemaScanner::get_next_block_async(RuntimeState* state) {
return;
}
SCOPED_ATTACH_TASK(state);
_dependency->block();
_async_thread_running = true;
_finish_dependency->block();
if (!_opened) {
Expand All @@ -150,19 +146,6 @@ Status SchemaScanner::get_next_block_async(RuntimeState* state) {
return Status::OK();
}

Status SchemaScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("used before initialized.");
}

if (nullptr == block || nullptr == eos) {
return Status::InternalError("input pointer is nullptr.");
}

*eos = true;
return Status::OK();
}

Status SchemaScanner::init(SchemaScannerParam* param, ObjectPool* pool) {
if (_is_init) {
return Status::OK();
Expand Down Expand Up @@ -426,21 +409,18 @@ Status SchemaScanner::insert_block_column(TCell cell, int col_index, vectorized:
case TYPE_BIGINT: {
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_value(
cell.longVal);
nullable_column->get_null_map_data().emplace_back(0);
break;
}

case TYPE_INT: {
reinterpret_cast<vectorized::ColumnVector<vectorized::Int32>*>(col_ptr)->insert_value(
cell.intVal);
nullable_column->get_null_map_data().emplace_back(0);
break;
}

case TYPE_BOOLEAN: {
reinterpret_cast<vectorized::ColumnVector<vectorized::UInt8>*>(col_ptr)->insert_value(
cell.boolVal);
nullable_column->get_null_map_data().emplace_back(0);
break;
}

Expand All @@ -449,7 +429,6 @@ Status SchemaScanner::insert_block_column(TCell cell, int col_index, vectorized:
case TYPE_CHAR: {
reinterpret_cast<vectorized::ColumnString*>(col_ptr)->insert_data(cell.stringVal.data(),
cell.stringVal.size());
nullable_column->get_null_map_data().emplace_back(0);
break;
}

Expand All @@ -461,7 +440,6 @@ Status SchemaScanner::insert_block_column(TCell cell, int col_index, vectorized:
auto data = datas[0];
reinterpret_cast<vectorized::ColumnVector<vectorized::Int64>*>(col_ptr)->insert_data(
reinterpret_cast<char*>(data), 0);
nullable_column->get_null_map_data().emplace_back(0);
break;
}
default: {
Expand All @@ -470,6 +448,7 @@ Status SchemaScanner::insert_block_column(TCell cell, int col_index, vectorized:
return Status::InternalError(ss.str());
}
}
nullable_column->get_null_map_data().emplace_back(0);
return Status::OK();
}

Expand Down
12 changes: 5 additions & 7 deletions be/src/exec/schema_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

#include <gen_cpp/Data_types.h>
#include <gen_cpp/Descriptors_types.h>
#include <stddef.h>
#include <stdint.h>

#include <condition_variable>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <string>
#include <vector>
Expand Down Expand Up @@ -82,8 +82,6 @@ struct SchemaScannerParam {

// virtual scanner for all schema table
class SchemaScanner {
ENABLE_FACTORY_CREATOR(SchemaScanner);

public:
struct ColumnDesc {
const char* name = nullptr;
Expand All @@ -94,16 +92,16 @@ class SchemaScanner {
int precision = -1;
int scale = -1;
};
SchemaScanner(const std::vector<ColumnDesc>& columns);
SchemaScanner(const std::vector<ColumnDesc>& columns, TSchemaTableType::type type);
SchemaScanner(const std::vector<ColumnDesc>& columns,
TSchemaTableType::type type = TSchemaTableType::SCH_INVALID);
virtual ~SchemaScanner();

// init object need information, schema etc.
virtual Status init(SchemaScannerParam* param, ObjectPool* pool);
Status get_next_block(RuntimeState* state, vectorized::Block* block, bool* eos);
// Start to work
virtual Status start(RuntimeState* state);
virtual Status get_next_block_internal(vectorized::Block* block, bool* eos);
virtual Status get_next_block_internal(vectorized::Block* block, bool* eos) = 0;
const std::vector<ColumnDesc>& get_column_desc() const { return _columns; }
// factory function
static std::unique_ptr<SchemaScanner> create(TSchemaTableType::type type);
Expand Down
50 changes: 0 additions & 50 deletions be/src/exec/schema_scanner/schema_statistics_scanner.cpp

This file was deleted.

Loading

0 comments on commit 09e0800

Please sign in to comment.