Skip to content

Commit

Permalink
Optimize hash index slot splitting (#3325)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminwinger committed Apr 19, 2024
1 parent 48188ce commit 4917262
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 49 deletions.
3 changes: 1 addition & 2 deletions src/include/storage/index/hash_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ class HashIndex final : public OnDiskHashIndex {

inline uint64_t appendOverflowSlot(Slot<T>&& newSlot) { return oSlots->pushBack(newSlot); }

inline void splitSlot(HashIndexHeader& header);
void rehashSlots(HashIndexHeader& header);
void splitSlots(HashIndexHeader& header, slot_id_t numSlotsToSplit);

// Resizes the local storage to support the given number of new entries
inline void bulkReserve(uint64_t /*newEntries*/) override {
Expand Down
26 changes: 21 additions & 5 deletions src/include/storage/storage_structure/disk_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ class BaseDiskArrayInternal {
// The pages are cached while the elements are stored on the same page
// Designed for sequential writes, but supports random writes too (at the cost that the page
// caching is only beneficial when seeking from one element to another on the same page)
//
// The iterator is not locked, allowing multiple to be used at the same time, but access to
// individual pages is locked through the BMFileHandle. It will hang if you seek/pushback on the
// same page as another iterator in an overlapping scope.
struct WriteIterator {
BaseDiskArrayInternal& diskArray;
PageCursor apCursor;
Expand All @@ -153,15 +157,13 @@ class BaseDiskArrayInternal {
WALPageIdxAndFrame walPageIdxAndFrame;
static const transaction::TransactionType TRX_TYPE = transaction::TransactionType::WRITE;
uint64_t idx;
std::unique_lock<std::shared_mutex> lock;
DEFAULT_MOVE_CONSTRUCT(WriteIterator);

// Constructs WriteIterator in an invalid state. Seek must be called before accessing data
WriteIterator(uint32_t valueSize, BaseDiskArrayInternal& diskArray,
std::unique_lock<std::shared_mutex>&& lock)
WriteIterator(uint32_t valueSize, BaseDiskArrayInternal& diskArray)
: diskArray(diskArray), apCursor(), valueSize(valueSize),
walPageIdxAndFrame{common::INVALID_PAGE_IDX, common::INVALID_PAGE_IDX, nullptr},
idx(0), lock(std::move(lock)) {
idx(0) {
diskArray.hasTransactionalUpdates = true;
}

Expand All @@ -188,6 +190,10 @@ class BaseDiskArrayInternal {

WriteIterator iter_mut(uint64_t valueSize);

inline common::page_idx_t getAPIdx(uint64_t idx) const {
return getAPIdxAndOffsetInAP(idx).pageIdx;
}

protected:
// Updates to new pages (new to this transaction) bypass the wal file.
void updatePage(uint64_t pageIdx, bool isNewPage, std::function<void(uint8_t*)> updateOp);
Expand Down Expand Up @@ -272,6 +278,8 @@ inline std::span<uint8_t> getSpan(U& val) {

template<typename U>
class BaseDiskArray {
static_assert(sizeof(U) <= common::BufferPoolConstants::PAGE_4KB_SIZE);

public:
// Used by copiers.
BaseDiskArray(FileHandle& fileHandle, common::page_idx_t headerPageIdx, uint64_t elementSize)
Expand Down Expand Up @@ -332,8 +340,12 @@ class BaseDiskArray {
}

inline uint64_t idx() const { return iter.idx; }
inline uint64_t getAPIdx() const { return iter.apCursor.pageIdx; }

inline void pushBack(U val) { return iter.pushBack(getSpan(val)); }
inline WriteIterator& pushBack(U val) {
iter.pushBack(getSpan(val));
return *this;
}

inline uint64_t size() const { return iter.size(); }

Expand All @@ -342,6 +354,8 @@ class BaseDiskArray {
};

inline WriteIterator iter_mut() { return WriteIterator{diskArray.iter_mut(sizeof(U))}; }
inline uint64_t getAPIdx(uint64_t idx) const { return diskArray.getAPIdx(idx); }
uint32_t getAlignedElementSize() const { return 1 << diskArray.header.alignedElementSizeLog2; }

private:
BaseDiskArrayInternal diskArray;
Expand Down Expand Up @@ -437,6 +451,8 @@ class InMemDiskArrayBuilder {

inline void saveToDisk() { diskArray.saveToDisk(); }

uint32_t getAlignedElementSize() const { return 1 << diskArray.header.alignedElementSizeLog2; }

private:
InMemDiskArrayBuilderInternal diskArray;
};
Expand Down
89 changes: 51 additions & 38 deletions src/storage/index/hash_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <type_traits>

#include "common/assert.h"
#include "common/constants.h"
#include "common/string_utils.h"
#include "common/type_utils.h"
#include "common/types/int128_t.h"
Expand Down Expand Up @@ -224,11 +225,7 @@ bool HashIndex<T>::lookupInPersistentIndex(TransactionType trxType, Key key, off
template<typename T>
void HashIndex<T>::insertIntoPersistentIndex(Key key, offset_t value) {
auto& header = *this->indexHeaderForWriteTrx;
slot_id_t numRequiredEntries = HashIndexUtils::getNumRequiredEntries(header.numEntries + 1);
while (numRequiredEntries >
pSlots->getNumElements(TransactionType::WRITE) * getSlotCapacity<T>()) {
this->splitSlot(header);
}
reserve(1);
auto hashValue = HashIndexUtils::hash(key);
auto fingerprint = HashIndexUtils::getFingerprintForHash(hashValue);
auto iter = getSlotIterator(HashIndexUtils::getPrimarySlotIdForHash(header, hashValue),
Expand Down Expand Up @@ -363,30 +360,48 @@ inline void HashIndex<ku_string_t>::insert(std::string_view key, SlotEntry<ku_st
}

template<typename T>
void HashIndex<T>::splitSlot(HashIndexHeader& header) {
appendPSlot();
rehashSlots(header);
header.incrementNextSplitSlotId();
}

template<typename T>
void HashIndex<T>::rehashSlots(HashIndexHeader& header) {
auto slotsToSplit = getChainedSlots(header.nextSplitSlotId);
for (auto& [slotInfo, slot] : slotsToSplit) {
auto slotHeader = slot.header;
slot.header.reset();
updateSlot(slotInfo, slot);
for (auto entryPos = 0u; entryPos < getSlotCapacity<T>(); entryPos++) {
if (!slotHeader.isEntryValid(entryPos)) {
continue; // Skip invalid entries.
void HashIndex<T>::splitSlots(HashIndexHeader& header, slot_id_t numSlotsToSplit) {
auto originalSlotIterator = pSlots->iter_mut();
auto newSlotIterator = pSlots->iter_mut();
auto overflowSlotIterator = oSlots->iter_mut();
// The overflow slot iterators will hang if they access the same page
// So instead buffer new overflow slots here and append them at the end
std::vector<Slot<T>> newOverflowSlots;

for (slot_id_t i = 0; i < numSlotsToSplit; i++) {
auto* newSlot = &*newSlotIterator.pushBack(Slot<T>());
entry_pos_t newEntryPos = 0;
Slot<T>* originalSlot = &*originalSlotIterator.seek(header.nextSplitSlotId);
do {
for (entry_pos_t originalEntryPos = 0; originalEntryPos < getSlotCapacity<T>();
originalEntryPos++) {
if (!originalSlot->header.isEntryValid(originalEntryPos)) {
continue; // Skip invalid entries.
}
if (newEntryPos >= getSlotCapacity<T>()) {
newOverflowSlots.emplace_back();
newSlot = &newOverflowSlots.back();
newEntryPos = 0;
}
// Copy entry from old slot to new slot
const auto& key = originalSlot->entries[originalEntryPos].key;
hash_t hash = this->hashStored(TransactionType::WRITE, key);
auto newSlotId = hash & header.higherLevelHashMask;
if (newSlotId != header.nextSplitSlotId) {
KU_ASSERT(newSlotId == newSlotIterator.idx());
copyAndUpdateSlotHeader<const T&, true>(*newSlot, newEntryPos,
originalSlot->entries[originalEntryPos].key, UINT32_MAX,
originalSlot->header.fingerprints[originalEntryPos]);
originalSlot->header.setEntryInvalid(originalEntryPos);
newEntryPos++;
}
}
const auto& key = slot.entries[entryPos].key;
hash_t hash = this->hashStored(TransactionType::WRITE, key);
auto fingerprint = HashIndexUtils::getFingerprintForHash(hash);
auto newSlotId = hash & header.higherLevelHashMask;
KU_ASSERT(newSlotId < pSlots->getNumElements(TransactionType::WRITE));
copyEntryToSlot(newSlotId, key, fingerprint);
}
} while (originalSlot->header.nextOvfSlotId != 0 &&
(originalSlot = &*overflowSlotIterator.seek(originalSlot->header.nextOvfSlotId)));
header.incrementNextSplitSlotId();
}
for (auto&& slot : newOverflowSlots) {
overflowSlotIterator.pushBack(std::move(slot));
}
}

Expand Down Expand Up @@ -422,6 +437,12 @@ void HashIndex<T>::reserve(uint64_t newEntries) {
auto numRequiredSlots =
std::max((numRequiredEntries + getSlotCapacity<T>() - 1) / getSlotCapacity<T>(),
static_cast<slot_id_t>(1ul << this->indexHeaderForWriteTrx->currentLevel));
// Always start with at least one page worth of slots.
// This guarantees that when splitting the source and destination slot are never on the same
// page
// Which allows safe use of multiple disk array iterators.
numRequiredSlots = std::max(numRequiredSlots,
BufferPoolConstants::PAGE_4KB_SIZE / pSlots->getAlignedElementSize());
// If there are no entries, we can just re-size the number of primary slots and re-calculate the
// levels
if (this->indexHeaderForWriteTrx->numEntries == 0) {
Expand All @@ -437,16 +458,8 @@ void HashIndex<T>::reserve(uint64_t newEntries) {
numRequiredSlots - numSlotsOfCurrentLevel;
};
} else {
// Otherwise, split and re-hash until there are enough primary slots
// TODO(bmwinger): resize pSlots first, update the levels and then rehash from the original
// nextSplitSlotId to the new nextSplitSlotId using the final slot function to avoid
// re-hashing a slot multiple times
for (auto slots = pSlots->getNumElements(TransactionType::WRITE); slots < numRequiredSlots;
slots++) {
// TODO: Use diskarray iterator to avoid updating the same page repeatedly
// appendPSlot/pushBack
splitSlot(*this->indexHeaderForWriteTrx);
}
splitSlots(*this->indexHeaderForWriteTrx,
numRequiredSlots - pSlots->getNumElements(TransactionType::WRITE));
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/storage/index/in_mem_hash_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ InMemHashIndex<T>::InMemHashIndex(OverflowFileHandle* overflowFileHandle)
pSlots{std::make_unique<InMemDiskArrayBuilder<Slot<T>>>(dummy, 0, 0, true)},
oSlots{std::make_unique<InMemDiskArrayBuilder<Slot<T>>>(dummy, 0, 1, true)},
indexHeader{TypeUtils::getPhysicalTypeIDForType<T>()} {
allocatePSlots(1u << this->indexHeader.currentLevel);
// Match HashIndex in allocating at least one page of slots so that we don't split within the
// same page
allocateSlots(BufferPoolConstants::PAGE_4KB_SIZE / pSlots->getAlignedElementSize());
}

template<typename T>
Expand Down
9 changes: 6 additions & 3 deletions src/storage/storage_structure/disk_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ DiskArrayHeader::DiskArrayHeader(uint64_t elementSize)
: alignedElementSizeLog2{(uint64_t)ceil(log2(elementSize))},
numElementsPerPageLog2{BufferPoolConstants::PAGE_4KB_SIZE_LOG2 - alignedElementSizeLog2},
elementPageOffsetMask{BitmaskUtils::all1sMaskForLeastSignificantBits(numElementsPerPageLog2)},
firstPIPPageIdx{DBFileUtils::NULL_PAGE_IDX}, numElements{0}, numAPs{0} {}
firstPIPPageIdx{DBFileUtils::NULL_PAGE_IDX}, numElements{0}, numAPs{0} {
KU_ASSERT(elementSize <= BufferPoolConstants::PAGE_4KB_SIZE);
}

void DiskArrayHeader::saveToDisk(FileHandle& fileHandle, uint64_t headerPageIdx) {
fileHandle.getFileInfo()->writeFile(reinterpret_cast<uint8_t*>(this), sizeof(DiskArrayHeader),
Expand Down Expand Up @@ -158,13 +160,15 @@ void BaseDiskArrayInternal::update(uint64_t idx, std::span<uint8_t> val) {
}

uint64_t BaseDiskArrayInternal::pushBack(std::span<uint8_t> val) {
std::unique_lock xLck{diskArraySharedMtx};
auto it = iter_mut(val.size());
auto originalNumElements = getNumElementsNoLock(TransactionType::WRITE);
it.pushBack(val);
return originalNumElements;
}

uint64_t BaseDiskArrayInternal::resize(uint64_t newNumElements, std::span<uint8_t> defaultVal) {
std::unique_lock xLck{diskArraySharedMtx};
auto it = iter_mut(defaultVal.size());
auto originalNumElements = getNumElementsNoLock(TransactionType::WRITE);
while (it.size() < newNumElements) {
Expand Down Expand Up @@ -392,8 +396,7 @@ void BaseDiskArrayInternal::WriteIterator::getPage(common::page_idx_t newPageIdx
}

BaseDiskArrayInternal::WriteIterator BaseDiskArrayInternal::iter_mut(uint64_t valueSize) {
std::unique_lock xLck{diskArraySharedMtx};
return BaseDiskArrayInternal::WriteIterator(valueSize, *this, std::move(xLck));
return BaseDiskArrayInternal::WriteIterator(valueSize, *this);
}

BaseInMemDiskArray::BaseInMemDiskArray(FileHandle& fileHandle, DBFileID dbFileID,
Expand Down

0 comments on commit 4917262

Please sign in to comment.