Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove 256K page size limit for ftable #3266

Merged
merged 1 commit into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/include/processor/operator/order_by/key_block_merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ struct StrKeyColInfo {
};

class MergedKeyBlocks {
private:
andyfengHKU marked this conversation as resolved.
Show resolved Hide resolved
static constexpr uint64_t DATA_BLOCK_SIZE = common::BufferPoolConstants::PAGE_256KB_SIZE;

public:
MergedKeyBlocks(uint32_t numBytesPerTuple, uint64_t numTuples,
storage::MemoryManager* memoryManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ namespace processor {
using encode_function_t = std::function<void(const uint8_t*, uint8_t*, bool)>;

class OrderByKeyEncoder {
private:
static constexpr uint64_t DATA_BLOCK_SIZE = common::BufferPoolConstants::PAGE_256KB_SIZE;

public:
OrderByKeyEncoder(const OrderByDataInfo& orderByDataInfo, storage::MemoryManager* memoryManager,
Expand All @@ -50,8 +52,6 @@ class OrderByKeyEncoder {

inline uint32_t getNumBytesPerTuple() const { return numBytesPerTuple; }

inline uint32_t getMaxNumTuplesPerBlock() const { return maxNumTuplesPerBlock; }

inline uint32_t getNumTuplesInCurBlock() const { return keyBlocks.back()->numTuples; }

static uint32_t getNumBytesPerTuple(const std::vector<common::ValueVector*>& keyVectors);
Expand All @@ -78,9 +78,6 @@ class OrderByKeyEncoder {
return *(strBuffer + 13) == (isAsc ? UINT8_MAX : 0);
}

static uint32_t getNumBytesPerTuple(
const std::vector<std::shared_ptr<common::ValueVector>>& keyVectors);

static uint32_t getEncodingSize(const common::LogicalType& dataType);

void encodeKeys(const std::vector<common::ValueVector*>& orderByKeys);
Expand Down
5 changes: 3 additions & 2 deletions src/include/processor/operator/order_by/radix_sort.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ struct TieRange {
class RadixSort {
private:
static constexpr uint16_t COUNTING_ARRAY_SIZE = 256;
static constexpr uint64_t DATA_BLOCK_SIZE = common::BufferPoolConstants::PAGE_256KB_SIZE;
andyfengHKU marked this conversation as resolved.
Show resolved Hide resolved

public:
RadixSort(storage::MemoryManager* memoryManager, FactorizedTable& factorizedTable,
OrderByKeyEncoder& orderByKeyEncoder, std::vector<StrKeyColInfo> strKeyColsInfo)
: tmpSortingResultBlock{std::make_unique<DataBlock>(memoryManager)},
tmpTuplePtrSortingBlock{std::make_unique<DataBlock>(memoryManager)},
: tmpSortingResultBlock{std::make_unique<DataBlock>(memoryManager, DATA_BLOCK_SIZE)},
tmpTuplePtrSortingBlock{std::make_unique<DataBlock>(memoryManager, DATA_BLOCK_SIZE)},
factorizedTable{factorizedTable}, strKeyColsInfo{std::move(strKeyColsInfo)},
numBytesPerTuple{orderByKeyEncoder.getNumBytesPerTuple()},
numBytesToRadixSort{numBytesPerTuple - 8} {}
Expand Down
2 changes: 2 additions & 0 deletions src/include/processor/result/base_hash_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class BaseHashTable {
virtual ~BaseHashTable() = default;

protected:
static constexpr uint64_t HASH_BLOCK_SIZE = common::BufferPoolConstants::PAGE_256KB_SIZE;

uint64_t getSlotIdxForHash(common::hash_t hash) const { return hash & bitmask; }
void setMaxNumHashSlots(uint64_t newSize);
void computeAndCombineVecHash(const std::vector<common::ValueVector*>& unFlatKeyVectors,
Expand Down
62 changes: 32 additions & 30 deletions src/include/processor/result/factorized_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,56 +32,51 @@ struct BlockAppendingInfo {
// released when this struct goes out of scope.
class DataBlock {
public:
explicit DataBlock(storage::MemoryManager* memoryManager)
: numTuples{0}, memoryManager{memoryManager} {
block = memoryManager->allocateBuffer(true /* initializeToZero */);
freeSize = block->allocator->getPageSize();
DataBlock(storage::MemoryManager* mm, uint64_t size)
: numTuples{0}, totalSize{size}, freeSize{size} {
block = mm->allocateBuffer(true /* initializeToZero */, size);
}

DataBlock(DataBlock&& other) = default;

inline uint8_t* getData() const { return block->buffer; }
inline void resetNumTuplesAndFreeSize() {
freeSize = common::BufferPoolConstants::PAGE_256KB_SIZE;
uint8_t* getData() const { return block->buffer; }
uint8_t* getWritableData() const { return block->buffer + totalSize - freeSize; }
andyfengHKU marked this conversation as resolved.
Show resolved Hide resolved
void resetNumTuplesAndFreeSize() {
freeSize = totalSize;
numTuples = 0;
}
inline void resetToZero() {
memset(block->buffer, 0, common::BufferPoolConstants::PAGE_256KB_SIZE);
}
void resetToZero() { memset(block->buffer, 0, totalSize); }

static void copyTuples(DataBlock* blockToCopyFrom, ft_tuple_idx_t tupleIdxToCopyFrom,
DataBlock* blockToCopyInto, ft_tuple_idx_t tupleIdxToCopyTo, uint32_t numTuplesToCopy,
uint32_t numBytesPerTuple);

public:
uint64_t freeSize;
uint32_t numTuples;
storage::MemoryManager* memoryManager;
uint64_t totalSize;
uint64_t freeSize;

private:
std::unique_ptr<storage::MemoryBuffer> block;
};

class DataBlockCollection {
public:
// This interface is used for unflat tuple blocks, for which numBytesPerTuple and
// This interface is used for unFlat tuple blocks, for which numBytesPerTuple and
// numTuplesPerBlock are useless.
DataBlockCollection() : numBytesPerTuple{UINT32_MAX}, numTuplesPerBlock{UINT32_MAX} {}
DataBlockCollection(uint32_t numBytesPerTuple, uint32_t numTuplesPerBlock)
: numBytesPerTuple{numBytesPerTuple}, numTuplesPerBlock{numTuplesPerBlock} {}

inline void append(std::unique_ptr<DataBlock> otherBlock) {
blocks.push_back(std::move(otherBlock));
}
inline void append(std::vector<std::unique_ptr<DataBlock>> otherBlocks) {
void append(std::unique_ptr<DataBlock> otherBlock) { blocks.push_back(std::move(otherBlock)); }
void append(std::vector<std::unique_ptr<DataBlock>> otherBlocks) {
std::move(begin(otherBlocks), end(otherBlocks), back_inserter(blocks));
}
inline void append(std::unique_ptr<DataBlockCollection> other) {
append(std::move(other->blocks));
}
inline bool isEmpty() { return blocks.empty(); }
inline std::vector<std::unique_ptr<DataBlock>>& getBlocks() { return blocks; }
inline DataBlock* getBlock(ft_block_idx_t blockIdx) { return blocks[blockIdx].get(); }
void append(std::unique_ptr<DataBlockCollection> other) { append(std::move(other->blocks)); }
bool needAllocation(uint64_t size) const { return isEmpty() || blocks.back()->freeSize < size; }

bool isEmpty() const { return blocks.empty(); }
const std::vector<std::unique_ptr<DataBlock>>& getBlocks() const { return blocks; }
DataBlock* getBlock(ft_block_idx_t blockIdx) { return blocks[blockIdx].get(); }
DataBlock* getLastBlock() { return blocks.back().get(); }

void merge(DataBlockCollection& other);

Expand Down Expand Up @@ -226,7 +221,7 @@ class FactorizedTable {
uint64_t getTotalNumFlatTuples() const;
uint64_t getNumFlatTuples(ft_tuple_idx_t tupleIdx) const;

inline std::vector<std::unique_ptr<DataBlock>>& getTupleDataBlocks() {
inline const std::vector<std::unique_ptr<DataBlock>>& getTupleDataBlocks() {
return flatTupleBlockCollection->getBlocks();
}
inline const FactorizedTableSchema* getTableSchema() const { return tableSchema.get(); }
Expand All @@ -246,7 +241,7 @@ class FactorizedTable {
tableSchema->getColumn(colIdx)->getNumBytes());
}

inline uint64_t getNumTuplesPerBlock() const { return numTuplesPerBlock; }
inline uint64_t getNumTuplesPerBlock() const { return numFlatTuplesPerBlock; }

inline bool hasNoNullGuarantee(ft_col_idx_t colIdx) const {
return tableSchema->getColumn(colIdx)->hasNoNullGuarantee();
Expand All @@ -271,7 +266,7 @@ class FactorizedTable {
}
inline std::pair<ft_block_idx_t, ft_block_offset_t> getBlockIdxAndTupleIdxInBlock(
uint64_t tupleIdx) const {
return std::make_pair(tupleIdx / numTuplesPerBlock, tupleIdx % numTuplesPerBlock);
return std::make_pair(tupleIdx / numFlatTuplesPerBlock, tupleIdx % numFlatTuplesPerBlock);
}

std::vector<BlockAppendingInfo> allocateFlatTupleBlocks(uint64_t numTuplesToAppend);
Expand Down Expand Up @@ -308,11 +303,18 @@ class FactorizedTable {

private:
storage::MemoryManager* memoryManager;
// Table Schema. Keeping track of factorization structure.
std::unique_ptr<FactorizedTableSchema> tableSchema;
// Number of rows in table.
uint64_t numTuples;
uint32_t numTuplesPerBlock;
// Radix sort requires there is a fixed number of tuple in a block.
uint64_t flatTupleBlockSize;
uint32_t numFlatTuplesPerBlock;
// Data blocks for flat tuples.
std::unique_ptr<DataBlockCollection> flatTupleBlockCollection;
std::unique_ptr<DataBlockCollection> unflatTupleBlockCollection;
// Data blocks for unFlat tuples.
std::unique_ptr<DataBlockCollection> unFlatTupleBlockCollection;
// Overflow buffer storing variable size part of an entry.
std::unique_ptr<common::InMemOverflowBuffer> inMemOverflowBuffer;
};

Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/buffer_manager/memory_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ class MemoryManager {
allocator = std::make_unique<MemoryAllocator>(bm, vfs);
}

inline std::unique_ptr<MemoryBuffer> allocateBuffer(bool initializeToZero = false,
std::unique_ptr<MemoryBuffer> allocateBuffer(bool initializeToZero = false,
uint64_t size = common::BufferPoolConstants::PAGE_256KB_SIZE) {
return allocator->allocateBuffer(initializeToZero, size);
}
inline BufferManager* getBufferManager() const { return bm; }
BufferManager* getBufferManager() const { return bm; }

private:
BufferManager* bm;
Expand Down
9 changes: 4 additions & 5 deletions src/processor/operator/aggregate/aggregate_hash_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,13 @@ void AggregateHashTable::initializeFT(
}

void AggregateHashTable::initializeHashTable(uint64_t numEntriesToAllocate) {
setMaxNumHashSlots(nextPowerOfTwo(
std::max(BufferPoolConstants::PAGE_256KB_SIZE / sizeof(HashSlot), numEntriesToAllocate)));
auto numHashSlotsPerBlock = BufferPoolConstants::PAGE_256KB_SIZE / sizeof(HashSlot);
auto numHashSlotsPerBlock = HASH_BLOCK_SIZE / sizeof(HashSlot);
setMaxNumHashSlots(nextPowerOfTwo(std::max(numHashSlotsPerBlock, numEntriesToAllocate)));
initSlotConstant(numHashSlotsPerBlock);
auto numDataBlocks =
maxNumHashSlots / numHashSlotsPerBlock + (maxNumHashSlots % numHashSlotsPerBlock != 0);
for (auto i = 0u; i < numDataBlocks; i++) {
hashSlotsBlocks.emplace_back(std::make_unique<DataBlock>(&memoryManager));
hashSlotsBlocks.emplace_back(std::make_unique<DataBlock>(&memoryManager, HASH_BLOCK_SIZE));
}
}

Expand Down Expand Up @@ -577,7 +576,7 @@ void AggregateHashTable::addDataBlocksIfNecessary(uint64_t maxNumHashSlots) {
auto numHashSlotsBlocksNeeded =
(maxNumHashSlots + numHashSlotsPerBlock - 1) / numHashSlotsPerBlock;
while (hashSlotsBlocks.size() < numHashSlotsBlocksNeeded) {
hashSlotsBlocks.emplace_back(std::make_unique<DataBlock>(&memoryManager));
hashSlotsBlocks.emplace_back(std::make_unique<DataBlock>(&memoryManager, HASH_BLOCK_SIZE));
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/processor/operator/hash_join/join_hash_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace processor {
JoinHashTable::JoinHashTable(MemoryManager& memoryManager, logical_type_vec_t keyTypes,
std::unique_ptr<FactorizedTableSchema> tableSchema)
: BaseHashTable{memoryManager, std::move(keyTypes)} {
auto numSlotsPerBlock = BufferPoolConstants::PAGE_256KB_SIZE / sizeof(uint8_t*);
auto numSlotsPerBlock = HASH_BLOCK_SIZE / sizeof(uint8_t*);
initSlotConstant(numSlotsPerBlock);
// Prev pointer is always the last column in the table.
prevPtrColOffset = tableSchema->getColOffset(tableSchema->getNumColumns() - PREV_PTR_COL_IDX);
Expand Down Expand Up @@ -107,7 +107,7 @@ void JoinHashTable::allocateHashSlots(uint64_t numTuples) {
auto numSlotsPerBlock = (uint64_t)1 << numSlotsPerBlockLog2;
auto numBlocksNeeded = (maxNumHashSlots + numSlotsPerBlock - 1) / numSlotsPerBlock;
while (hashSlotsBlocks.size() < numBlocksNeeded) {
hashSlotsBlocks.emplace_back(std::make_unique<DataBlock>(&memoryManager));
hashSlotsBlocks.emplace_back(std::make_unique<DataBlock>(&memoryManager, HASH_BLOCK_SIZE));
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/processor/operator/order_by/key_block_merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ namespace processor {
MergedKeyBlocks::MergedKeyBlocks(uint32_t numBytesPerTuple, uint64_t numTuples,
MemoryManager* memoryManager)
: numBytesPerTuple{numBytesPerTuple},
numTuplesPerBlock{(uint32_t)(BufferPoolConstants::PAGE_256KB_SIZE / numBytesPerTuple)},
numTuples{numTuples}, endTupleOffset{numTuplesPerBlock * numBytesPerTuple} {
numTuplesPerBlock{(uint32_t)(DATA_BLOCK_SIZE / numBytesPerTuple)}, numTuples{numTuples},
endTupleOffset{numTuplesPerBlock * numBytesPerTuple} {
auto numKeyBlocks = numTuples / numTuplesPerBlock + (numTuples % numTuplesPerBlock ? 1 : 0);
for (auto i = 0u; i < numKeyBlocks; i++) {
keyBlocks.emplace_back(std::make_shared<DataBlock>(memoryManager));
keyBlocks.emplace_back(std::make_shared<DataBlock>(memoryManager, DATA_BLOCK_SIZE));
}
}

// This constructor is used to convert a keyBlock to a MergedKeyBlocks.
MergedKeyBlocks::MergedKeyBlocks(uint32_t numBytesPerTuple, std::shared_ptr<DataBlock> keyBlock)
: numBytesPerTuple{numBytesPerTuple},
numTuplesPerBlock{(uint32_t)(BufferPoolConstants::PAGE_256KB_SIZE / numBytesPerTuple)},
numTuplesPerBlock{(uint32_t)(DATA_BLOCK_SIZE / numBytesPerTuple)},
numTuples{keyBlock->numTuples}, endTupleOffset{numTuplesPerBlock * numBytesPerTuple} {
keyBlocks.emplace_back(std::move(keyBlock));
}
Expand Down
8 changes: 4 additions & 4 deletions src/processor/operator/order_by/order_by_key_encoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ OrderByKeyEncoder::OrderByKeyEncoder(const OrderByDataInfo& orderByDataInfo,
throw RuntimeException(
"The number of tuples per block of factorizedTable exceeds the maximum blockOffset!");
}
keyBlocks.emplace_back(std::make_unique<DataBlock>(memoryManager));
keyBlocks.emplace_back(std::make_unique<DataBlock>(memoryManager, DATA_BLOCK_SIZE));
KU_ASSERT(this->numBytesPerTuple == getNumBytesPerTuple());
maxNumTuplesPerBlock = BufferPoolConstants::PAGE_256KB_SIZE / numBytesPerTuple;
maxNumTuplesPerBlock = DATA_BLOCK_SIZE / numBytesPerTuple;
if (maxNumTuplesPerBlock <= 0) {
throw RuntimeException(
stringFormat("TupleSize({} bytes) is larger than the LARGE_PAGE_SIZE({} bytes)",
numBytesPerTuple, BufferPoolConstants::PAGE_256KB_SIZE));
numBytesPerTuple, DATA_BLOCK_SIZE));
}
encodeFunctions.reserve(orderByDataInfo.keysPos.size());
for (auto& type : orderByDataInfo.keyTypes) {
Expand Down Expand Up @@ -196,7 +196,7 @@ void OrderByKeyEncoder::encodeFTIdx(uint32_t numEntriesToEncode, uint8_t* tupleI

void OrderByKeyEncoder::allocateMemoryIfFull() {
if (getNumTuplesInCurBlock() == maxNumTuplesPerBlock) {
keyBlocks.emplace_back(std::make_shared<DataBlock>(memoryManager));
keyBlocks.emplace_back(std::make_shared<DataBlock>(memoryManager, DATA_BLOCK_SIZE));
}
}

Expand Down
Loading
Loading