Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize Hash Index slot splitting #3325

Merged
merged 1 commit into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/include/storage/index/hash_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ class HashIndex final : public OnDiskHashIndex {

inline uint64_t appendOverflowSlot(Slot<T>&& newSlot) { return oSlots->pushBack(newSlot); }

inline void splitSlot(HashIndexHeader& header);
void rehashSlots(HashIndexHeader& header);
void splitSlots(HashIndexHeader& header, slot_id_t numSlotsToSplit);

// Resizes the local storage to support the given number of new entries
inline void bulkReserve(uint64_t /*newEntries*/) override {
Expand Down
26 changes: 21 additions & 5 deletions src/include/storage/storage_structure/disk_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ class BaseDiskArrayInternal {
// The pages are cached while the elements are stored on the same page
// Designed for sequential writes, but supports random writes too (at the cost that the page
// caching is only beneficial when seeking from one element to another on the same page)
//
// The iterator is not locked, allowing multiple to be used at the same time, but access to
// individual pages is locked through the BMFileHandle. It will hang if you seek/pushback on the
// same page as another iterator in an overlapping scope.
struct WriteIterator {
BaseDiskArrayInternal& diskArray;
PageCursor apCursor;
Expand All @@ -153,15 +157,13 @@ class BaseDiskArrayInternal {
WALPageIdxAndFrame walPageIdxAndFrame;
static const transaction::TransactionType TRX_TYPE = transaction::TransactionType::WRITE;
uint64_t idx;
std::unique_lock<std::shared_mutex> lock;
DEFAULT_MOVE_CONSTRUCT(WriteIterator);

// Constructs WriteIterator in an invalid state. Seek must be called before accessing data
WriteIterator(uint32_t valueSize, BaseDiskArrayInternal& diskArray,
std::unique_lock<std::shared_mutex>&& lock)
WriteIterator(uint32_t valueSize, BaseDiskArrayInternal& diskArray)
: diskArray(diskArray), apCursor(), valueSize(valueSize),
walPageIdxAndFrame{common::INVALID_PAGE_IDX, common::INVALID_PAGE_IDX, nullptr},
idx(0), lock(std::move(lock)) {
idx(0) {
diskArray.hasTransactionalUpdates = true;
}

Expand All @@ -188,6 +190,10 @@ class BaseDiskArrayInternal {

WriteIterator iter_mut(uint64_t valueSize);

inline common::page_idx_t getAPIdx(uint64_t idx) const {
return getAPIdxAndOffsetInAP(idx).pageIdx;
}

protected:
// Updates to new pages (new to this transaction) bypass the wal file.
void updatePage(uint64_t pageIdx, bool isNewPage, std::function<void(uint8_t*)> updateOp);
Expand Down Expand Up @@ -272,6 +278,8 @@ inline std::span<uint8_t> getSpan(U& val) {

template<typename U>
class BaseDiskArray {
static_assert(sizeof(U) <= common::BufferPoolConstants::PAGE_4KB_SIZE);

public:
// Used by copiers.
BaseDiskArray(FileHandle& fileHandle, common::page_idx_t headerPageIdx, uint64_t elementSize)
Expand Down Expand Up @@ -332,8 +340,12 @@ class BaseDiskArray {
}

inline uint64_t idx() const { return iter.idx; }
inline uint64_t getAPIdx() const { return iter.apCursor.pageIdx; }

inline void pushBack(U val) { return iter.pushBack(getSpan(val)); }
inline WriteIterator& pushBack(U val) {
iter.pushBack(getSpan(val));
return *this;
}

inline uint64_t size() const { return iter.size(); }

Expand All @@ -342,6 +354,8 @@ class BaseDiskArray {
};

inline WriteIterator iter_mut() { return WriteIterator{diskArray.iter_mut(sizeof(U))}; }
inline uint64_t getAPIdx(uint64_t idx) const { return diskArray.getAPIdx(idx); }
uint32_t getAlignedElementSize() const { return 1 << diskArray.header.alignedElementSizeLog2; }

private:
BaseDiskArrayInternal diskArray;
Expand Down Expand Up @@ -437,6 +451,8 @@ class InMemDiskArrayBuilder {

inline void saveToDisk() { diskArray.saveToDisk(); }

uint32_t getAlignedElementSize() const { return 1 << diskArray.header.alignedElementSizeLog2; }

private:
InMemDiskArrayBuilderInternal diskArray;
};
Expand Down
89 changes: 51 additions & 38 deletions src/storage/index/hash_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <type_traits>

#include "common/assert.h"
#include "common/constants.h"
#include "common/string_utils.h"
#include "common/type_utils.h"
#include "common/types/int128_t.h"
Expand Down Expand Up @@ -224,11 +225,7 @@ bool HashIndex<T>::lookupInPersistentIndex(TransactionType trxType, Key key, off
template<typename T>
void HashIndex<T>::insertIntoPersistentIndex(Key key, offset_t value) {
auto& header = *this->indexHeaderForWriteTrx;
slot_id_t numRequiredEntries = HashIndexUtils::getNumRequiredEntries(header.numEntries + 1);
while (numRequiredEntries >
pSlots->getNumElements(TransactionType::WRITE) * getSlotCapacity<T>()) {
this->splitSlot(header);
}
reserve(1);
auto hashValue = HashIndexUtils::hash(key);
auto fingerprint = HashIndexUtils::getFingerprintForHash(hashValue);
auto iter = getSlotIterator(HashIndexUtils::getPrimarySlotIdForHash(header, hashValue),
Expand Down Expand Up @@ -363,30 +360,48 @@ inline void HashIndex<ku_string_t>::insert(std::string_view key, SlotEntry<ku_st
}

template<typename T>
void HashIndex<T>::splitSlot(HashIndexHeader& header) {
appendPSlot();
rehashSlots(header);
header.incrementNextSplitSlotId();
}

template<typename T>
void HashIndex<T>::rehashSlots(HashIndexHeader& header) {
auto slotsToSplit = getChainedSlots(header.nextSplitSlotId);
for (auto& [slotInfo, slot] : slotsToSplit) {
auto slotHeader = slot.header;
slot.header.reset();
updateSlot(slotInfo, slot);
for (auto entryPos = 0u; entryPos < getSlotCapacity<T>(); entryPos++) {
if (!slotHeader.isEntryValid(entryPos)) {
continue; // Skip invalid entries.
void HashIndex<T>::splitSlots(HashIndexHeader& header, slot_id_t numSlotsToSplit) {
auto originalSlotIterator = pSlots->iter_mut();
auto newSlotIterator = pSlots->iter_mut();
auto overflowSlotIterator = oSlots->iter_mut();
// The overflow slot iterators will hang if they access the same page
// So instead buffer new overflow slots here and append them at the end
std::vector<Slot<T>> newOverflowSlots;

for (slot_id_t i = 0; i < numSlotsToSplit; i++) {
auto* newSlot = &*newSlotIterator.pushBack(Slot<T>());
entry_pos_t newEntryPos = 0;
Slot<T>* originalSlot = &*originalSlotIterator.seek(header.nextSplitSlotId);
do {
for (entry_pos_t originalEntryPos = 0; originalEntryPos < getSlotCapacity<T>();
originalEntryPos++) {
if (!originalSlot->header.isEntryValid(originalEntryPos)) {
continue; // Skip invalid entries.
}
if (newEntryPos >= getSlotCapacity<T>()) {
newOverflowSlots.emplace_back();
newSlot = &newOverflowSlots.back();
newEntryPos = 0;
}
// Copy entry from old slot to new slot
const auto& key = originalSlot->entries[originalEntryPos].key;
hash_t hash = this->hashStored(TransactionType::WRITE, key);
auto newSlotId = hash & header.higherLevelHashMask;
if (newSlotId != header.nextSplitSlotId) {
KU_ASSERT(newSlotId == newSlotIterator.idx());
copyAndUpdateSlotHeader<const T&, true>(*newSlot, newEntryPos,
originalSlot->entries[originalEntryPos].key, UINT32_MAX,
originalSlot->header.fingerprints[originalEntryPos]);
originalSlot->header.setEntryInvalid(originalEntryPos);
newEntryPos++;
}
}
const auto& key = slot.entries[entryPos].key;
hash_t hash = this->hashStored(TransactionType::WRITE, key);
auto fingerprint = HashIndexUtils::getFingerprintForHash(hash);
auto newSlotId = hash & header.higherLevelHashMask;
KU_ASSERT(newSlotId < pSlots->getNumElements(TransactionType::WRITE));
copyEntryToSlot(newSlotId, key, fingerprint);
}
} while (originalSlot->header.nextOvfSlotId != 0 &&
(originalSlot = &*overflowSlotIterator.seek(originalSlot->header.nextOvfSlotId)));
header.incrementNextSplitSlotId();
}
for (auto&& slot : newOverflowSlots) {
overflowSlotIterator.pushBack(std::move(slot));
}
}

Expand Down Expand Up @@ -422,6 +437,12 @@ void HashIndex<T>::reserve(uint64_t newEntries) {
auto numRequiredSlots =
std::max((numRequiredEntries + getSlotCapacity<T>() - 1) / getSlotCapacity<T>(),
static_cast<slot_id_t>(1ul << this->indexHeaderForWriteTrx->currentLevel));
// Always start with at least one page worth of slots.
// This guarantees that when splitting the source and destination slot are never on the same
// page
// Which allows safe use of multiple disk array iterators.
numRequiredSlots = std::max(numRequiredSlots,
BufferPoolConstants::PAGE_4KB_SIZE / pSlots->getAlignedElementSize());
// If there are no entries, we can just re-size the number of primary slots and re-calculate the
// levels
if (this->indexHeaderForWriteTrx->numEntries == 0) {
Expand All @@ -437,16 +458,8 @@ void HashIndex<T>::reserve(uint64_t newEntries) {
numRequiredSlots - numSlotsOfCurrentLevel;
};
} else {
// Otherwise, split and re-hash until there are enough primary slots
// TODO(bmwinger): resize pSlots first, update the levels and then rehash from the original
// nextSplitSlotId to the new nextSplitSlotId using the final slot function to avoid
// re-hashing a slot multiple times
for (auto slots = pSlots->getNumElements(TransactionType::WRITE); slots < numRequiredSlots;
slots++) {
// TODO: Use diskarray iterator to avoid updating the same page repeatedly
// appendPSlot/pushBack
splitSlot(*this->indexHeaderForWriteTrx);
}
splitSlots(*this->indexHeaderForWriteTrx,
numRequiredSlots - pSlots->getNumElements(TransactionType::WRITE));
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/storage/index/in_mem_hash_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ InMemHashIndex<T>::InMemHashIndex(OverflowFileHandle* overflowFileHandle)
pSlots{std::make_unique<InMemDiskArrayBuilder<Slot<T>>>(dummy, 0, 0, true)},
oSlots{std::make_unique<InMemDiskArrayBuilder<Slot<T>>>(dummy, 0, 1, true)},
indexHeader{TypeUtils::getPhysicalTypeIDForType<T>()} {
allocatePSlots(1u << this->indexHeader.currentLevel);
// Match HashIndex in allocating at least one page of slots so that we don't split within the
// same page
allocateSlots(BufferPoolConstants::PAGE_4KB_SIZE / pSlots->getAlignedElementSize());
}

template<typename T>
Expand Down
9 changes: 6 additions & 3 deletions src/storage/storage_structure/disk_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ DiskArrayHeader::DiskArrayHeader(uint64_t elementSize)
: alignedElementSizeLog2{(uint64_t)ceil(log2(elementSize))},
numElementsPerPageLog2{BufferPoolConstants::PAGE_4KB_SIZE_LOG2 - alignedElementSizeLog2},
elementPageOffsetMask{BitmaskUtils::all1sMaskForLeastSignificantBits(numElementsPerPageLog2)},
firstPIPPageIdx{DBFileUtils::NULL_PAGE_IDX}, numElements{0}, numAPs{0} {}
firstPIPPageIdx{DBFileUtils::NULL_PAGE_IDX}, numElements{0}, numAPs{0} {
KU_ASSERT(elementSize <= BufferPoolConstants::PAGE_4KB_SIZE);
}

void DiskArrayHeader::saveToDisk(FileHandle& fileHandle, uint64_t headerPageIdx) {
fileHandle.getFileInfo()->writeFile(reinterpret_cast<uint8_t*>(this), sizeof(DiskArrayHeader),
Expand Down Expand Up @@ -158,13 +160,15 @@ void BaseDiskArrayInternal::update(uint64_t idx, std::span<uint8_t> val) {
}

uint64_t BaseDiskArrayInternal::pushBack(std::span<uint8_t> val) {
std::unique_lock xLck{diskArraySharedMtx};
auto it = iter_mut(val.size());
auto originalNumElements = getNumElementsNoLock(TransactionType::WRITE);
it.pushBack(val);
return originalNumElements;
}

uint64_t BaseDiskArrayInternal::resize(uint64_t newNumElements, std::span<uint8_t> defaultVal) {
benjaminwinger marked this conversation as resolved.
Show resolved Hide resolved
std::unique_lock xLck{diskArraySharedMtx};
auto it = iter_mut(defaultVal.size());
auto originalNumElements = getNumElementsNoLock(TransactionType::WRITE);
while (it.size() < newNumElements) {
Expand Down Expand Up @@ -392,8 +396,7 @@ void BaseDiskArrayInternal::WriteIterator::getPage(common::page_idx_t newPageIdx
}

BaseDiskArrayInternal::WriteIterator BaseDiskArrayInternal::iter_mut(uint64_t valueSize) {
std::unique_lock xLck{diskArraySharedMtx};
return BaseDiskArrayInternal::WriteIterator(valueSize, *this, std::move(xLck));
return BaseDiskArrayInternal::WriteIterator(valueSize, *this);
}

BaseInMemDiskArray::BaseInMemDiskArray(FileHandle& fileHandle, DBFileID dbFileID,
Expand Down
Loading