Skip to content

Commit

Permalink
Support allocations of larger-than-256KB memory buffers (#3243)
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Apr 10, 2024
1 parent 21b3b6a commit d59070f
Show file tree
Hide file tree
Showing 50 changed files with 120 additions and 103 deletions.
1 change: 1 addition & 0 deletions extension/duckdb_scanner/src/duckdb_scan.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "duckdb_scan.h"

#include "common/exception/runtime.h"
#include "common/types/types.h"
#include "function/table/bind_input.h"

Expand Down
1 change: 1 addition & 0 deletions extension/httpfs/src/s3fs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "common/cast.h"
#include "common/exception/io.h"
#include "common/exception/runtime.h"
#include "common/types/timestamp_t.h"
#include "crypto.h"
#include "main/client_context.h"
Expand Down
1 change: 1 addition & 0 deletions extension/postgres_scanner/src/postgres_catalog.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "postgres_catalog.h"

#include "common/exception/binder.h"
#include "common/exception/runtime.h"
#include "postgres_storage.h"

namespace kuzu {
Expand Down
2 changes: 1 addition & 1 deletion extension/postgres_scanner/src/postgres_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include <regex>

#include "catalog/catalog_entry/table_catalog_entry.h"
#include "duckdb_type_converter.h"
#include "common/exception/runtime.h"
#include "postgres_catalog.h"

namespace kuzu {
Expand Down
1 change: 1 addition & 0 deletions src/common/arrow/arrow_array_scan.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "common/arrow/arrow_converter.h"
#include "common/exception/runtime.h"
#include "common/types/interval_t.h"
#include "common/types/types.h"
#include "common/vector/value_vector.h"
Expand Down
7 changes: 3 additions & 4 deletions src/common/in_mem_overflow_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@ namespace common {

uint8_t* InMemOverflowBuffer::allocateSpace(uint64_t size) {
if (requireNewBlock(size)) {
allocateNewBlock();
allocateNewBlock(size);
}
KU_ASSERT(size <= BufferPoolConstants::PAGE_256KB_SIZE);
auto data = currentBlock->block->buffer + currentBlock->currentOffset;
currentBlock->currentOffset += size;
return data;
}

void InMemOverflowBuffer::allocateNewBlock() {
void InMemOverflowBuffer::allocateNewBlock(uint64_t size) {
auto newBlock = make_unique<BufferBlock>(
memoryManager->allocateBuffer(false /* do not initialize to zero */));
memoryManager->allocateBuffer(false /* do not initialize to zero */, size));
currentBlock = newBlock.get();
blocks.push_back(std::move(newBlock));
}
Expand Down
3 changes: 3 additions & 0 deletions src/common/md5.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
* needed on buffers full of bytes, and then call MD5Final, which
* will fill a supplied 16-byte array with the digest.
*/

#include "common/md5.h"

#include <cstring>

namespace kuzu {
namespace common {

Expand Down
30 changes: 1 addition & 29 deletions src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include "common/vector/value_vector.h"

#include "common/exception/message.h"
#include "common/exception/runtime.h"
#include "common/null_buffer.h"
#include "common/types/blob.h"
#include "common/types/value/nested.h"
Expand Down Expand Up @@ -394,13 +394,6 @@ void StringVector::addString(ValueVector* vector, uint32_t vectorPos, ku_string_
if (ku_string_t::isShortString(srcStr.len)) {
dstStr.setShortString(srcStr);
} else {
if (srcStr.len > BufferPoolConstants::PAGE_256KB_SIZE) {
if constexpr (StorageConstants::TRUNCATE_OVER_LARGE_STRINGS) {
srcStr.len = BufferPoolConstants::PAGE_256KB_SIZE;
} else {
throw RuntimeException(ExceptionMessage::overLargeStringValueException(srcStr.len));
}
}
dstStr.overflowPtr = reinterpret_cast<uint64_t>(stringBuffer->allocateOverflow(srcStr.len));
dstStr.setLongString(srcStr);
}
Expand All @@ -415,13 +408,6 @@ void StringVector::addString(ValueVector* vector, uint32_t vectorPos, const char
if (ku_string_t::isShortString(length)) {
dstStr.setShortString(srcStr, length);
} else {
if (length > BufferPoolConstants::PAGE_256KB_SIZE) {
if constexpr (StorageConstants::TRUNCATE_OVER_LARGE_STRINGS) {
length = BufferPoolConstants::PAGE_256KB_SIZE;
} else {
throw RuntimeException(ExceptionMessage::overLargeStringValueException(length));
}
}
dstStr.overflowPtr = reinterpret_cast<uint64_t>(stringBuffer->allocateOverflow(length));
dstStr.setLongString(srcStr, length);
}
Expand Down Expand Up @@ -460,13 +446,6 @@ void StringVector::addString(ValueVector* vector, ku_string_t& dstStr, ku_string
if (ku_string_t::isShortString(srcStr.len)) {
dstStr.setShortString(srcStr);
} else {
if (srcStr.len > BufferPoolConstants::PAGE_256KB_SIZE) {
if constexpr (StorageConstants::TRUNCATE_OVER_LARGE_STRINGS) {
srcStr.len = BufferPoolConstants::PAGE_256KB_SIZE;
} else {
throw RuntimeException(ExceptionMessage::overLargeStringValueException(srcStr.len));
}
}
dstStr.overflowPtr = reinterpret_cast<uint64_t>(stringBuffer->allocateOverflow(srcStr.len));
dstStr.setLongString(srcStr);
}
Expand All @@ -480,13 +459,6 @@ void StringVector::addString(ValueVector* vector, ku_string_t& dstStr, const cha
if (ku_string_t::isShortString(length)) {
dstStr.setShortString(srcStr, length);
} else {
if (length > BufferPoolConstants::PAGE_256KB_SIZE) {
if constexpr (StorageConstants::TRUNCATE_OVER_LARGE_STRINGS) {
length = BufferPoolConstants::PAGE_256KB_SIZE;
} else {
throw RuntimeException(ExceptionMessage::overLargeStringValueException(length));
}
}
dstStr.overflowPtr = reinterpret_cast<uint64_t>(stringBuffer->allocateOverflow(length));
dstStr.setLongString(srcStr, length);
}
Expand Down
1 change: 0 additions & 1 deletion src/expression_evaluator/node_rel_evaluator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ namespace kuzu {
namespace evaluator {

void NodeRelExpressionEvaluator::evaluate(ClientContext* clientContext) {
resultVector->resetAuxiliaryBuffer();
for (auto& child : children) {
child->evaluate(clientContext);
}
Expand Down
1 change: 1 addition & 0 deletions src/function/cast/cast_rdf_variant.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "function/cast/functions/cast_rdf_variant.h"

#include "common/exception/runtime.h"
#include "common/types/blob.h"
#include "function/cast/functions/cast_functions.h"
#include "function/cast/functions/numeric_cast.h"
Expand Down
1 change: 1 addition & 0 deletions src/function/vector_boolean_functions.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "function/boolean/vector_boolean_functions.h"

#include "common/exception/runtime.h"
#include "function/boolean/boolean_functions.h"

using namespace kuzu::common;
Expand Down
1 change: 1 addition & 0 deletions src/function/vector_null_functions.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "function/null/vector_null_functions.h"

#include "common/exception/runtime.h"
#include "function/null/null_functions.h"

using namespace kuzu::common;
Expand Down
1 change: 1 addition & 0 deletions src/function/vector_struct_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ void StructPackFunctions::execFunc(const std::vector<std::shared_ptr<ValueVector
}
// If the parameter's state is inconsistent with the result's state, we need to copy the
// parameter's value to the corresponding child vector.
StructVector::getFieldVector(&result, i)->resetAuxiliaryBuffer();
copyParameterValueToStructFieldVector(parameter.get(),
StructVector::getFieldVector(&result, i).get(), result.state.get());
}
Expand Down
2 changes: 0 additions & 2 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ struct StorageConstants {
// The number of CSR lists in a segment.
static constexpr uint64_t CSR_SEGMENT_SIZE_LOG2 = 10;
static constexpr uint64_t CSR_SEGMENT_SIZE = (uint64_t)1 << CSR_SEGMENT_SIZE_LOG2;

static constexpr bool TRUNCATE_OVER_LARGE_STRINGS = true;
};

// Hash Index Configurations
Expand Down
11 changes: 2 additions & 9 deletions src/include/common/in_mem_overflow_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
#include <iterator>
#include <vector>

#include "common/constants.h"
#include "common/exception/runtime.h"
#include "storage/buffer_manager/memory_manager.h"

namespace kuzu {
Expand All @@ -13,7 +11,7 @@ namespace common {
struct BufferBlock {
public:
explicit BufferBlock(std::unique_ptr<storage::MemoryBuffer> block)
: size{block->allocator->getPageSize()}, currentOffset{0}, block{std::move(block)} {}
: size{block->size}, currentOffset{0}, block{std::move(block)} {}

public:
uint64_t size;
Expand Down Expand Up @@ -58,16 +56,11 @@ class InMemOverflowBuffer {

private:
inline bool requireNewBlock(uint64_t sizeToAllocate) {
if (sizeToAllocate > BufferPoolConstants::PAGE_256KB_SIZE) {
throw RuntimeException("Required size " + std::to_string(sizeToAllocate) +
" is greater than the single block size of " +
std::to_string(BufferPoolConstants::PAGE_256KB_SIZE) + ".");
}
return currentBlock == nullptr ||
(currentBlock->currentOffset + sizeToAllocate) > currentBlock->size;
}

void allocateNewBlock();
void allocateNewBlock(uint64_t size);

private:
std::vector<std::unique_ptr<BufferBlock>> blocks;
Expand Down
6 changes: 3 additions & 3 deletions src/include/common/md5.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once

/*
** This code taken from the SQLite test library (can be found at
** https://www.sqlite.org/sqllogictest/doc/trunk/about.wiki).
Expand All @@ -23,7 +24,6 @@
*/

#include <cstdint>
#include <cstring>

namespace kuzu {
namespace common {
Expand Down Expand Up @@ -71,12 +71,12 @@ class MD5 {
public:
// Add additional text to the current MD5 hash.
// note: original name changed from md5_add
void addToMD5(const char* z) {
void addToMD5(const char* z, uint32_t len) {
if (!isInit) {
MD5Init();
isInit = 1;
}
MD5Update((unsigned char*)z, (unsigned)std::strlen(z));
MD5Update((unsigned char*)z, len);
}

// Compute the final signature. Reset the hash generator in preparation
Expand Down
1 change: 1 addition & 0 deletions src/include/function/blob/functions/decode_function.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "common/exception/runtime.h"
#include "common/types/blob.h"
#include "common/vector/value_vector.h"
#include "utf8proc_wrapper.h"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "common/exception/runtime.h"
#include "common/types/int128_t.h"
#include "common/types/interval_t.h"
#include "comparison_functions.h"
Expand Down
2 changes: 1 addition & 1 deletion src/include/function/hash/functions/md5_function.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace function {
struct MD5Operator {
static void operation(ku_string_t& operand, ku_string_t& result, ValueVector& resultVector) {
MD5 hasher;
hasher.addToMD5(reinterpret_cast<const char*>(operand.getData()));
hasher.addToMD5(reinterpret_cast<const char*>(operand.getData()), operand.len);
StringVector::addString(&resultVector, result, std::string(hasher.finishMD5()));
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "common/exception/runtime.h"
#include "common/string_utils.h"
#include "common/vector/value_vector.h"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "common/exception/runtime.h"
#include "common/type_utils.h"
#include "common/types/ku_string.h"
#include "common/vector/value_vector.h"
Expand Down
1 change: 1 addition & 0 deletions src/include/function/list/functions/list_range_function.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "common/exception/runtime.h"
#include "common/vector/value_vector.h"

namespace kuzu {
Expand Down
1 change: 1 addition & 0 deletions src/include/function/map/functions/map_creation_function.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "common/exception/runtime.h"
#include "common/vector/value_vector.h"

namespace kuzu {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "base_regexp_function.h"
#include "common/exception/runtime.h"
#include "common/vector/value_vector.h"
#include "re2.h"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "common/exception/runtime.h"
#include "common/types/ku_string.h"
#include "common/vector/value_vector.h"
#include "function/string/functions/base_regexp_function.h"
Expand Down
14 changes: 9 additions & 5 deletions src/include/storage/buffer_manager/memory_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <mutex>
#include <stack>

#include "common/constants.h"
#include "common/types/types.h"

namespace kuzu {
Expand All @@ -20,13 +21,15 @@ class BufferManager;

class MemoryBuffer {
public:
MemoryBuffer(MemoryAllocator* allocator, common::page_idx_t blockIdx, uint8_t* buffer);
MemoryBuffer(MemoryAllocator* allocator, common::page_idx_t blockIdx, uint8_t* buffer,
uint64_t size = common::BufferPoolConstants::PAGE_256KB_SIZE);
~MemoryBuffer();

public:
uint8_t* buffer;
common::page_idx_t pageIdx;
MemoryAllocator* allocator;
uint64_t size;
};

class MemoryAllocator {
Expand All @@ -36,11 +39,11 @@ class MemoryAllocator {
explicit MemoryAllocator(BufferManager* bm, common::VirtualFileSystem* vfs);
~MemoryAllocator();

std::unique_ptr<MemoryBuffer> allocateBuffer(bool initializeToZero = false);
std::unique_ptr<MemoryBuffer> allocateBuffer(bool initializeToZero, uint64_t size);
inline common::page_offset_t getPageSize() const { return pageSize; }

private:
void freeBlock(common::page_idx_t pageIdx);
void freeBlock(common::page_idx_t pageIdx, uint8_t* buffer);

private:
std::unique_ptr<BMFileHandle> fh;
Expand Down Expand Up @@ -71,8 +74,9 @@ class MemoryManager {
allocator = std::make_unique<MemoryAllocator>(bm, vfs);
}

inline std::unique_ptr<MemoryBuffer> allocateBuffer(bool initializeToZero = false) {
return allocator->allocateBuffer(initializeToZero);
inline std::unique_ptr<MemoryBuffer> allocateBuffer(bool initializeToZero = false,
uint64_t size = common::BufferPoolConstants::PAGE_256KB_SIZE) {
return allocator->allocateBuffer(initializeToZero, size);
}
inline BufferManager* getBufferManager() const { return bm; }

Expand Down
1 change: 1 addition & 0 deletions src/main/query_result.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "binder/expression/expression.h"
#include "common/arrow/arrow_converter.h"
#include "common/exception/runtime.h"
#include "common/types/value/node.h"
#include "common/types/value/rel.h"
#include "processor/result/factorized_table.h"
Expand Down
1 change: 1 addition & 0 deletions src/processor/map/map_port_db.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include "common/exception/runtime.h"
#include "common/file_system/virtual_file_system.h"
#include "planner/operator/persistent/logical_export_db.h"
#include "planner/operator/persistent/logical_import_db.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <sstream>

#include "common/exception/not_implemented.h"
#include "common/exception/runtime.h"
#include "common/types/date_t.h"
#include "miniz_wrapper.hpp"
#include "processor/operator/persistent/reader/parquet/boolean_column_reader.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <cstdio>

#include "common/constants.h"
#include "common/exception/runtime.h"
#include "common/vector/value_vector.h"
#include "processor/operator/persistent/reader/rdf/rdf_utils.h"
#include "serd.h"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "processor/operator/persistent/writer/parquet/basic_column_writer.h"

#include "common/exception/runtime.h"
#include "function/cast/functions/numeric_limits.h"
#include "processor/operator/persistent/reader/parquet/parquet_rle_bp_decoder.h"
#include "processor/operator/persistent/writer//parquet/parquet_rle_bp_encoder.h"
Expand Down
Loading

0 comments on commit d59070f

Please sign in to comment.