Skip to content

Commit

Permalink
[BUG] fix compression bug while compaction (apache#5893)
Browse files Browse the repository at this point in the history
Because the maximum length of LZ4 compression is 2^32, it can cause some memory problems
  • Loading branch information
stdpain authored and morningman-cmy committed May 26, 2021
1 parent b8d3d29 commit 8ad12c9
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 16 deletions.
2 changes: 1 addition & 1 deletion be/src/olap/fs/file_block_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ Status FileWritableBlock::appendv(const Slice* data, size_t data_cnt) {

// Calculate the amount of data written
size_t bytes_written = accumulate(data, data + data_cnt, static_cast<size_t>(0),
[&](int sum, const Slice& curr) { return sum + curr.size; });
[](size_t sum, const Slice& curr) { return sum + curr.size; });
_bytes_appended += bytes_written;
return Status::OK();
}
Expand Down
30 changes: 16 additions & 14 deletions be/src/olap/rowset/segment_v2/page_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,22 @@ Status PageIO::compress_page_body(const BlockCompressionCodec* codec, double min
size_t uncompressed_size = Slice::compute_total_size(body);
if (codec != nullptr && uncompressed_size > 0) {
size_t max_compressed_size = codec->max_compressed_len(uncompressed_size);
faststring buf;
buf.resize(max_compressed_size);
Slice compressed_slice(buf);
RETURN_IF_ERROR(codec->compress(body, &compressed_slice));
buf.resize(compressed_slice.get_size());

double space_saving = 1.0 - static_cast<double>(buf.size()) / uncompressed_size;
// return compressed body only when it saves more than min_space_saving
if (space_saving > 0 && space_saving >= min_space_saving) {
// shrink the buf to fit the len size to avoid taking
// up the memory of the size MAX_COMPRESSED_SIZE
buf.shrink_to_fit();
*compressed_body = buf.build();
return Status::OK();
if (max_compressed_size) {
faststring buf;
buf.resize(max_compressed_size);
Slice compressed_slice(buf);
RETURN_IF_ERROR(codec->compress(body, &compressed_slice));
buf.resize(compressed_slice.get_size());

double space_saving = 1.0 - static_cast<double>(buf.size()) / uncompressed_size;
// return compressed body only when it saves more than min_space_saving
if (space_saving > 0 && space_saving >= min_space_saving) {
// shrink the buf to fit the len size to avoid taking
// up the memory of the size MAX_COMPRESSED_SIZE
buf.shrink_to_fit();
*compressed_body = buf.build();
return Status::OK();
}
}
}
// otherwise, do not compress
Expand Down
12 changes: 11 additions & 1 deletion be/src/util/block_compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
#include "gutil/strings/substitute.h"
#include "util/faststring.h"

#include <limits>

namespace doris {

using strings::Substitute;
Expand Down Expand Up @@ -71,7 +73,12 @@ class Lz4BlockCompression : public BlockCompressionCodec {
return Status::OK();
}

size_t max_compressed_len(size_t len) const override { return LZ4_compressBound(len); }
size_t max_compressed_len(size_t len) const override {
if (len > std::numeric_limits<int32_t>::max()) {
return 0;
}
return LZ4_compressBound(len);
}
};

// Used for LZ4 frame format, decompress speed is two times faster than LZ4.
Expand Down Expand Up @@ -120,6 +127,9 @@ class Lz4fBlockCompression : public BlockCompressionCodec {
}

size_t max_compressed_len(size_t len) const override {
if (len > std::numeric_limits<int32_t>::max()) {
return 0;
}
return std::max(LZ4F_compressBound(len, &_s_preferences),
LZ4F_compressFrameBound(len, &_s_preferences));
}
Expand Down

0 comments on commit 8ad12c9

Please sign in to comment.