Skip to content

Commit

Permalink
Remove 256K page size limit for ftable (#3266)
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed Apr 15, 2024
1 parent 919e109 commit 3600a93
Show file tree
Hide file tree
Showing 13 changed files with 2,182 additions and 89 deletions.
3 changes: 3 additions & 0 deletions src/include/processor/operator/order_by/key_block_merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ struct StrKeyColInfo {
};

class MergedKeyBlocks {
private:
static constexpr uint64_t DATA_BLOCK_SIZE = common::BufferPoolConstants::PAGE_256KB_SIZE;

public:
MergedKeyBlocks(uint32_t numBytesPerTuple, uint64_t numTuples,
storage::MemoryManager* memoryManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ namespace processor {
using encode_function_t = std::function<void(const uint8_t*, uint8_t*, bool)>;

class OrderByKeyEncoder {
private:
static constexpr uint64_t DATA_BLOCK_SIZE = common::BufferPoolConstants::PAGE_256KB_SIZE;

public:
OrderByKeyEncoder(const OrderByDataInfo& orderByDataInfo, storage::MemoryManager* memoryManager,
Expand All @@ -50,8 +52,6 @@ class OrderByKeyEncoder {

inline uint32_t getNumBytesPerTuple() const { return numBytesPerTuple; }

inline uint32_t getMaxNumTuplesPerBlock() const { return maxNumTuplesPerBlock; }

inline uint32_t getNumTuplesInCurBlock() const { return keyBlocks.back()->numTuples; }

static uint32_t getNumBytesPerTuple(const std::vector<common::ValueVector*>& keyVectors);
Expand All @@ -78,9 +78,6 @@ class OrderByKeyEncoder {
return *(strBuffer + 13) == (isAsc ? UINT8_MAX : 0);
}

static uint32_t getNumBytesPerTuple(
const std::vector<std::shared_ptr<common::ValueVector>>& keyVectors);

static uint32_t getEncodingSize(const common::LogicalType& dataType);

void encodeKeys(const std::vector<common::ValueVector*>& orderByKeys);
Expand Down
5 changes: 3 additions & 2 deletions src/include/processor/operator/order_by/radix_sort.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ struct TieRange {
class RadixSort {
private:
static constexpr uint16_t COUNTING_ARRAY_SIZE = 256;
static constexpr uint64_t DATA_BLOCK_SIZE = common::BufferPoolConstants::PAGE_256KB_SIZE;

public:
RadixSort(storage::MemoryManager* memoryManager, FactorizedTable& factorizedTable,
OrderByKeyEncoder& orderByKeyEncoder, std::vector<StrKeyColInfo> strKeyColsInfo)
: tmpSortingResultBlock{std::make_unique<DataBlock>(memoryManager)},
tmpTuplePtrSortingBlock{std::make_unique<DataBlock>(memoryManager)},
: tmpSortingResultBlock{std::make_unique<DataBlock>(memoryManager, DATA_BLOCK_SIZE)},
tmpTuplePtrSortingBlock{std::make_unique<DataBlock>(memoryManager, DATA_BLOCK_SIZE)},
factorizedTable{factorizedTable}, strKeyColsInfo{std::move(strKeyColsInfo)},
numBytesPerTuple{orderByKeyEncoder.getNumBytesPerTuple()},
numBytesToRadixSort{numBytesPerTuple - 8} {}
Expand Down
2 changes: 2 additions & 0 deletions src/include/processor/result/base_hash_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class BaseHashTable {
virtual ~BaseHashTable() = default;

protected:
static constexpr uint64_t HASH_BLOCK_SIZE = common::BufferPoolConstants::PAGE_256KB_SIZE;

uint64_t getSlotIdxForHash(common::hash_t hash) const { return hash & bitmask; }
void setMaxNumHashSlots(uint64_t newSize);
void computeAndCombineVecHash(const std::vector<common::ValueVector*>& unFlatKeyVectors,
Expand Down
62 changes: 32 additions & 30 deletions src/include/processor/result/factorized_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,56 +32,51 @@ struct BlockAppendingInfo {
// released when this struct goes out of scope.
class DataBlock {
public:
explicit DataBlock(storage::MemoryManager* memoryManager)
: numTuples{0}, memoryManager{memoryManager} {
block = memoryManager->allocateBuffer(true /* initializeToZero */);
freeSize = block->allocator->getPageSize();
DataBlock(storage::MemoryManager* mm, uint64_t size)
: numTuples{0}, totalSize{size}, freeSize{size} {
block = mm->allocateBuffer(true /* initializeToZero */, size);
}

DataBlock(DataBlock&& other) = default;

inline uint8_t* getData() const { return block->buffer; }
inline void resetNumTuplesAndFreeSize() {
freeSize = common::BufferPoolConstants::PAGE_256KB_SIZE;
uint8_t* getData() const { return block->buffer; }
uint8_t* getWritableData() const { return block->buffer + totalSize - freeSize; }
void resetNumTuplesAndFreeSize() {
freeSize = totalSize;
numTuples = 0;
}
inline void resetToZero() {
memset(block->buffer, 0, common::BufferPoolConstants::PAGE_256KB_SIZE);
}
void resetToZero() { memset(block->buffer, 0, totalSize); }

static void copyTuples(DataBlock* blockToCopyFrom, ft_tuple_idx_t tupleIdxToCopyFrom,
DataBlock* blockToCopyInto, ft_tuple_idx_t tupleIdxToCopyTo, uint32_t numTuplesToCopy,
uint32_t numBytesPerTuple);

public:
uint64_t freeSize;
uint32_t numTuples;
storage::MemoryManager* memoryManager;
uint64_t totalSize;
uint64_t freeSize;

private:
std::unique_ptr<storage::MemoryBuffer> block;
};

class DataBlockCollection {
public:
// This interface is used for unflat tuple blocks, for which numBytesPerTuple and
// This interface is used for unFlat tuple blocks, for which numBytesPerTuple and
// numTuplesPerBlock are useless.
DataBlockCollection() : numBytesPerTuple{UINT32_MAX}, numTuplesPerBlock{UINT32_MAX} {}
DataBlockCollection(uint32_t numBytesPerTuple, uint32_t numTuplesPerBlock)
: numBytesPerTuple{numBytesPerTuple}, numTuplesPerBlock{numTuplesPerBlock} {}

inline void append(std::unique_ptr<DataBlock> otherBlock) {
blocks.push_back(std::move(otherBlock));
}
inline void append(std::vector<std::unique_ptr<DataBlock>> otherBlocks) {
void append(std::unique_ptr<DataBlock> otherBlock) { blocks.push_back(std::move(otherBlock)); }
void append(std::vector<std::unique_ptr<DataBlock>> otherBlocks) {
std::move(begin(otherBlocks), end(otherBlocks), back_inserter(blocks));
}
inline void append(std::unique_ptr<DataBlockCollection> other) {
append(std::move(other->blocks));
}
inline bool isEmpty() { return blocks.empty(); }
inline std::vector<std::unique_ptr<DataBlock>>& getBlocks() { return blocks; }
inline DataBlock* getBlock(ft_block_idx_t blockIdx) { return blocks[blockIdx].get(); }
void append(std::unique_ptr<DataBlockCollection> other) { append(std::move(other->blocks)); }
bool needAllocation(uint64_t size) const { return isEmpty() || blocks.back()->freeSize < size; }

bool isEmpty() const { return blocks.empty(); }
const std::vector<std::unique_ptr<DataBlock>>& getBlocks() const { return blocks; }
DataBlock* getBlock(ft_block_idx_t blockIdx) { return blocks[blockIdx].get(); }
DataBlock* getLastBlock() { return blocks.back().get(); }

void merge(DataBlockCollection& other);

Expand Down Expand Up @@ -226,7 +221,7 @@ class FactorizedTable {
uint64_t getTotalNumFlatTuples() const;
uint64_t getNumFlatTuples(ft_tuple_idx_t tupleIdx) const;

inline std::vector<std::unique_ptr<DataBlock>>& getTupleDataBlocks() {
inline const std::vector<std::unique_ptr<DataBlock>>& getTupleDataBlocks() {
return flatTupleBlockCollection->getBlocks();
}
inline const FactorizedTableSchema* getTableSchema() const { return tableSchema.get(); }
Expand All @@ -246,7 +241,7 @@ class FactorizedTable {
tableSchema->getColumn(colIdx)->getNumBytes());
}

inline uint64_t getNumTuplesPerBlock() const { return numTuplesPerBlock; }
inline uint64_t getNumTuplesPerBlock() const { return numFlatTuplesPerBlock; }

inline bool hasNoNullGuarantee(ft_col_idx_t colIdx) const {
return tableSchema->getColumn(colIdx)->hasNoNullGuarantee();
Expand All @@ -271,7 +266,7 @@ class FactorizedTable {
}
inline std::pair<ft_block_idx_t, ft_block_offset_t> getBlockIdxAndTupleIdxInBlock(
uint64_t tupleIdx) const {
return std::make_pair(tupleIdx / numTuplesPerBlock, tupleIdx % numTuplesPerBlock);
return std::make_pair(tupleIdx / numFlatTuplesPerBlock, tupleIdx % numFlatTuplesPerBlock);
}

std::vector<BlockAppendingInfo> allocateFlatTupleBlocks(uint64_t numTuplesToAppend);
Expand Down Expand Up @@ -308,11 +303,18 @@ class FactorizedTable {

private:
storage::MemoryManager* memoryManager;
// Table Schema. Keeping track of factorization structure.
std::unique_ptr<FactorizedTableSchema> tableSchema;
// Number of rows in table.
uint64_t numTuples;
uint32_t numTuplesPerBlock;
// Radix sort requires there is a fixed number of tuple in a block.
uint64_t flatTupleBlockSize;
uint32_t numFlatTuplesPerBlock;
// Data blocks for flat tuples.
std::unique_ptr<DataBlockCollection> flatTupleBlockCollection;
std::unique_ptr<DataBlockCollection> unflatTupleBlockCollection;
// Data blocks for unFlat tuples.
std::unique_ptr<DataBlockCollection> unFlatTupleBlockCollection;
// Overflow buffer storing variable size part of an entry.
std::unique_ptr<common::InMemOverflowBuffer> inMemOverflowBuffer;
};

Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/buffer_manager/memory_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ class MemoryManager {
allocator = std::make_unique<MemoryAllocator>(bm, vfs);
}

inline std::unique_ptr<MemoryBuffer> allocateBuffer(bool initializeToZero = false,
std::unique_ptr<MemoryBuffer> allocateBuffer(bool initializeToZero = false,
uint64_t size = common::BufferPoolConstants::PAGE_256KB_SIZE) {
return allocator->allocateBuffer(initializeToZero, size);
}
inline BufferManager* getBufferManager() const { return bm; }
BufferManager* getBufferManager() const { return bm; }

private:
BufferManager* bm;
Expand Down
9 changes: 4 additions & 5 deletions src/processor/operator/aggregate/aggregate_hash_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,13 @@ void AggregateHashTable::initializeFT(
}

void AggregateHashTable::initializeHashTable(uint64_t numEntriesToAllocate) {
setMaxNumHashSlots(nextPowerOfTwo(
std::max(BufferPoolConstants::PAGE_256KB_SIZE / sizeof(HashSlot), numEntriesToAllocate)));
auto numHashSlotsPerBlock = BufferPoolConstants::PAGE_256KB_SIZE / sizeof(HashSlot);
auto numHashSlotsPerBlock = HASH_BLOCK_SIZE / sizeof(HashSlot);
setMaxNumHashSlots(nextPowerOfTwo(std::max(numHashSlotsPerBlock, numEntriesToAllocate)));
initSlotConstant(numHashSlotsPerBlock);
auto numDataBlocks =
maxNumHashSlots / numHashSlotsPerBlock + (maxNumHashSlots % numHashSlotsPerBlock != 0);
for (auto i = 0u; i < numDataBlocks; i++) {
hashSlotsBlocks.emplace_back(std::make_unique<DataBlock>(&memoryManager));
hashSlotsBlocks.emplace_back(std::make_unique<DataBlock>(&memoryManager, HASH_BLOCK_SIZE));
}
}

Expand Down Expand Up @@ -577,7 +576,7 @@ void AggregateHashTable::addDataBlocksIfNecessary(uint64_t maxNumHashSlots) {
auto numHashSlotsBlocksNeeded =
(maxNumHashSlots + numHashSlotsPerBlock - 1) / numHashSlotsPerBlock;
while (hashSlotsBlocks.size() < numHashSlotsBlocksNeeded) {
hashSlotsBlocks.emplace_back(std::make_unique<DataBlock>(&memoryManager));
hashSlotsBlocks.emplace_back(std::make_unique<DataBlock>(&memoryManager, HASH_BLOCK_SIZE));
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/processor/operator/hash_join/join_hash_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace processor {
JoinHashTable::JoinHashTable(MemoryManager& memoryManager, logical_type_vec_t keyTypes,
std::unique_ptr<FactorizedTableSchema> tableSchema)
: BaseHashTable{memoryManager, std::move(keyTypes)} {
auto numSlotsPerBlock = BufferPoolConstants::PAGE_256KB_SIZE / sizeof(uint8_t*);
auto numSlotsPerBlock = HASH_BLOCK_SIZE / sizeof(uint8_t*);
initSlotConstant(numSlotsPerBlock);
// Prev pointer is always the last column in the table.
prevPtrColOffset = tableSchema->getColOffset(tableSchema->getNumColumns() - PREV_PTR_COL_IDX);
Expand Down Expand Up @@ -107,7 +107,7 @@ void JoinHashTable::allocateHashSlots(uint64_t numTuples) {
auto numSlotsPerBlock = (uint64_t)1 << numSlotsPerBlockLog2;
auto numBlocksNeeded = (maxNumHashSlots + numSlotsPerBlock - 1) / numSlotsPerBlock;
while (hashSlotsBlocks.size() < numBlocksNeeded) {
hashSlotsBlocks.emplace_back(std::make_unique<DataBlock>(&memoryManager));
hashSlotsBlocks.emplace_back(std::make_unique<DataBlock>(&memoryManager, HASH_BLOCK_SIZE));
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/processor/operator/order_by/key_block_merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ namespace processor {
MergedKeyBlocks::MergedKeyBlocks(uint32_t numBytesPerTuple, uint64_t numTuples,
MemoryManager* memoryManager)
: numBytesPerTuple{numBytesPerTuple},
numTuplesPerBlock{(uint32_t)(BufferPoolConstants::PAGE_256KB_SIZE / numBytesPerTuple)},
numTuples{numTuples}, endTupleOffset{numTuplesPerBlock * numBytesPerTuple} {
numTuplesPerBlock{(uint32_t)(DATA_BLOCK_SIZE / numBytesPerTuple)}, numTuples{numTuples},
endTupleOffset{numTuplesPerBlock * numBytesPerTuple} {
auto numKeyBlocks = numTuples / numTuplesPerBlock + (numTuples % numTuplesPerBlock ? 1 : 0);
for (auto i = 0u; i < numKeyBlocks; i++) {
keyBlocks.emplace_back(std::make_shared<DataBlock>(memoryManager));
keyBlocks.emplace_back(std::make_shared<DataBlock>(memoryManager, DATA_BLOCK_SIZE));
}
}

// This constructor is used to convert a keyBlock to a MergedKeyBlocks.
MergedKeyBlocks::MergedKeyBlocks(uint32_t numBytesPerTuple, std::shared_ptr<DataBlock> keyBlock)
: numBytesPerTuple{numBytesPerTuple},
numTuplesPerBlock{(uint32_t)(BufferPoolConstants::PAGE_256KB_SIZE / numBytesPerTuple)},
numTuplesPerBlock{(uint32_t)(DATA_BLOCK_SIZE / numBytesPerTuple)},
numTuples{keyBlock->numTuples}, endTupleOffset{numTuplesPerBlock * numBytesPerTuple} {
keyBlocks.emplace_back(std::move(keyBlock));
}
Expand Down
8 changes: 4 additions & 4 deletions src/processor/operator/order_by/order_by_key_encoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ OrderByKeyEncoder::OrderByKeyEncoder(const OrderByDataInfo& orderByDataInfo,
throw RuntimeException(
"The number of tuples per block of factorizedTable exceeds the maximum blockOffset!");
}
keyBlocks.emplace_back(std::make_unique<DataBlock>(memoryManager));
keyBlocks.emplace_back(std::make_unique<DataBlock>(memoryManager, DATA_BLOCK_SIZE));
KU_ASSERT(this->numBytesPerTuple == getNumBytesPerTuple());
maxNumTuplesPerBlock = BufferPoolConstants::PAGE_256KB_SIZE / numBytesPerTuple;
maxNumTuplesPerBlock = DATA_BLOCK_SIZE / numBytesPerTuple;
if (maxNumTuplesPerBlock <= 0) {
throw RuntimeException(
stringFormat("TupleSize({} bytes) is larger than the LARGE_PAGE_SIZE({} bytes)",
numBytesPerTuple, BufferPoolConstants::PAGE_256KB_SIZE));
numBytesPerTuple, DATA_BLOCK_SIZE));
}
encodeFunctions.reserve(orderByDataInfo.keysPos.size());
for (auto& type : orderByDataInfo.keyTypes) {
Expand Down Expand Up @@ -196,7 +196,7 @@ void OrderByKeyEncoder::encodeFTIdx(uint32_t numEntriesToEncode, uint8_t* tupleI

void OrderByKeyEncoder::allocateMemoryIfFull() {
if (getNumTuplesInCurBlock() == maxNumTuplesPerBlock) {
keyBlocks.emplace_back(std::make_shared<DataBlock>(memoryManager));
keyBlocks.emplace_back(std::make_shared<DataBlock>(memoryManager, DATA_BLOCK_SIZE));
}
}

Expand Down
Loading

0 comments on commit 3600a93

Please sign in to comment.