Skip to content

Commit

Permalink
[Feature](multi-catalog) Add memory tracker for orc reader and writer.
Browse files Browse the repository at this point in the history
  • Loading branch information
kaka11chen committed Jul 3, 2024
1 parent dda7ef1 commit 61ac8ab
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 3 deletions.
6 changes: 6 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ class ExecEnv {
}
std::shared_ptr<MemTrackerLimiter> s3_file_buffer_tracker() { return _s3_file_buffer_tracker; }

std::shared_ptr<MemTrackerLimiter> orc_reader_tracker() { return _orc_reader_tracker; }
std::shared_ptr<MemTrackerLimiter> orc_writer_tracker() { return _orc_writer_tracker; }

ThreadPool* send_batch_thread_pool() { return _send_batch_thread_pool.get(); }
ThreadPool* buffered_reader_prefetch_thread_pool() {
return _buffered_reader_prefetch_thread_pool.get();
Expand Down Expand Up @@ -357,6 +360,9 @@ class ExecEnv {
std::shared_ptr<MemTrackerLimiter> _subcolumns_tree_tracker;
std::shared_ptr<MemTrackerLimiter> _s3_file_buffer_tracker;

std::shared_ptr<MemTrackerLimiter> _orc_reader_tracker;
std::shared_ptr<MemTrackerLimiter> _orc_writer_tracker;

std::unique_ptr<ThreadPool> _send_batch_thread_pool;
// Threadpool used to prefetch remote file for buffered reader
std::unique_ptr<ThreadPool> _buffered_reader_prefetch_thread_pool;
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,10 @@ void ExecEnv::init_mem_tracker() {
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "SubcolumnsTree");
_s3_file_buffer_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "S3FileBuffer");
_orc_reader_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "ORCReader");
_orc_writer_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "ORCWriter");
}

void ExecEnv::_register_metrics() {
Expand Down
43 changes: 43 additions & 0 deletions be/src/vec/exec/format/orc/orc_memory_pool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <map>

#include "orc/MemoryPool.hh"
#include "vec/common/allocator.h"

extern "C" {

void* doris_malloc(size_t size) __THROW;
void doris_free(void* p) __THROW;
}

namespace doris::vectorized {

class ORCMemoryPool : public orc::MemoryPool {
public:
char* malloc(uint64_t size) override { return reinterpret_cast<char*>(doris_malloc(size)); }

void free(char* p) override { doris_free(p); }

ORCMemoryPool() = default;
~ORCMemoryPool() override = default;
};

} // namespace doris::vectorized
5 changes: 5 additions & 0 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
#include "vec/data_types/data_type_map.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_struct.h"
#include "vec/exec/format/orc/orc_memory_pool.h"
#include "vec/exec/format/table/transactional_hive_common.h"
#include "vec/exprs/vbloom_predicate.h"
#include "vec/exprs/vdirect_in_predicate.h"
Expand Down Expand Up @@ -252,6 +253,8 @@ Status OrcReader::_create_file_reader() {
// create orc reader
try {
orc::ReaderOptions options;
_pool = std::make_unique<ORCMemoryPool>();
options.setMemoryPool(*(_pool.get()));
_reader = orc::createReader(
std::unique_ptr<ORCFileInputStream>(_file_input_stream.release()), options);
} catch (std::exception& e) {
Expand All @@ -276,6 +279,7 @@ Status OrcReader::init_reader(
const RowDescriptor* row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(ExecEnv::GetInstance()->orc_reader_tracker());
_column_names = column_names;
_colname_to_value_range = colname_to_value_range;
_lazy_read_ctx.conjuncts = conjuncts;
Expand Down Expand Up @@ -1539,6 +1543,7 @@ std::string OrcReader::get_field_name_lower_case(const orc::Type* orc_type, int
}

Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(ExecEnv::GetInstance()->orc_reader_tracker());
RETURN_IF_ERROR(get_next_block_impl(block, read_rows, eof));
if (_orc_filter) {
RETURN_IF_ERROR(_orc_filter->get_status());
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ class OrcReader : public GenericReader {
std::unique_ptr<ORCFilterImpl> _orc_filter;
orc::ReaderOptions _reader_options;
orc::RowReaderOptions _row_reader_options;
std::unique_ptr<orc::MemoryPool> _pool;

std::shared_ptr<io::FileSystem> _file_system;

Expand Down
13 changes: 10 additions & 3 deletions be/src/vec/runtime/vorc_transformer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_map.h"
#include "vec/data_types/data_type_struct.h"
#include "vec/exec/format/orc/orc_memory_pool.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/runtime/vdatetime_value.h"
Expand Down Expand Up @@ -123,6 +124,7 @@ VOrcTransformer::VOrcTransformer(RuntimeState* state, doris::io::FileWriter* fil
}

Status VOrcTransformer::open() {
SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(ExecEnv::GetInstance()->orc_writer_tracker());
if (!_schema_str.empty()) {
try {
_schema = orc::Type::buildTypeFromString(_schema_str);
Expand Down Expand Up @@ -151,6 +153,8 @@ Status VOrcTransformer::open() {

_output_stream = std::make_unique<VOrcOutputStream>(_file_writer);
try {
_pool = std::make_unique<ORCMemoryPool>();
_write_options->setMemoryPool(_pool.get());
_writer = orc::createWriter(*_schema, _output_stream.get(), *_write_options);
} catch (const std::exception& e) {
return Status::InternalError("failed to create writer: {}", e.what());
Expand Down Expand Up @@ -314,6 +318,7 @@ int64_t VOrcTransformer::written_len() {
}

Status VOrcTransformer::close() {
SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(ExecEnv::GetInstance()->orc_writer_tracker());
if (_writer != nullptr) {
try {
_writer->close();
Expand All @@ -332,6 +337,8 @@ Status VOrcTransformer::write(const Block& block) {
return Status::OK();
}

SCOPED_CONSUME_MEM_TRACKER_BY_HOOK(ExecEnv::GetInstance()->orc_writer_tracker());

// Buffer used by date/datetime/datev2/datetimev2/largeint type
std::vector<StringRef> buffer_list;
Defer defer {[&]() {
Expand All @@ -353,13 +360,13 @@ Status VOrcTransformer::write(const Block& block) {
RETURN_IF_ERROR(_serdes[i]->write_column_to_orc(
_state->timezone(), *raw_column, nullptr, root->fields[i], 0, sz, buffer_list));
}
root->numElements = sz;
_writer->add(*row_batch);
_cur_written_rows += sz;
} catch (const std::exception& e) {
LOG(WARNING) << "Orc write error: " << e.what();
return Status::InternalError(e.what());
}
root->numElements = sz;
_writer->add(*row_batch);
_cur_written_rows += sz;

return Status::OK();
}
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/runtime/vorc_transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ class VOrcTransformer final : public VFileFormatTransformer {
std::vector<std::string> _column_names;
std::unique_ptr<orc::OutputStream> _output_stream;
std::unique_ptr<orc::WriterOptions> _write_options;
std::unique_ptr<orc::MemoryPool> _pool;
std::string _schema_str;
std::unique_ptr<orc::Type> _schema;
std::unique_ptr<orc::Writer> _writer;
Expand Down

0 comments on commit 61ac8ab

Please sign in to comment.