Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove size requirements for the in memory hash index #3373

Merged
merged 1 commit into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions src/include/storage/index/hash_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,8 @@ class HashIndex final : public OnDiskHashIndex {
void splitSlots(HashIndexHeader& header, slot_id_t numSlotsToSplit);

// Resizes the local storage to support the given number of new entries
inline void bulkReserve(uint64_t /*newEntries*/) override {
// FIXME: Don't bulk reserve for now. because of the way things are split up, we may reserve
// more than we actually need, and then when flushing the number of primary slots won't
// match Another reason it may be better to start smaller and split slots when merging to
// persistent storage bulkInsertLocalStorage.bulkReserve(indexHeaderForWriteTrx->numEntries
// + newEntries);
inline void bulkReserve(uint64_t newEntries) override {
bulkInsertLocalStorage.reserve(newEntries);
}
// Resizes the on-disk index to support the given number of new entries
void reserve(uint64_t newEntries);
Expand Down
82 changes: 48 additions & 34 deletions src/storage/index/hash_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,6 @@ size_t HashIndex<T>::append(const IndexBuffer<BufferKeyType>& buffer) {
}
}
}
// Keep the same number of primary slots in the builder as we will eventually need when
// flushing to disk, so that we know each slot to write to
bulkInsertLocalStorage.reserve(
indexHeaderForWriteTrx->numEntries + bulkInsertLocalStorage.size() + buffer.size());
return bulkInsertLocalStorage.append(buffer);
}

Expand Down Expand Up @@ -477,17 +473,6 @@ void HashIndex<T>::mergeBulkInserts() {
// TODO: one pass would also reduce locking when frames are unpinned,
// which is useful if this can be paralellized
reserve(bulkInsertLocalStorage.size());
KU_ASSERT(
bulkInsertLocalStorage.numPrimarySlots() == pSlots->getNumElements(TransactionType::WRITE));
KU_ASSERT(this->indexHeaderForWriteTrx->currentLevel ==
bulkInsertLocalStorage.getIndexHeader().currentLevel);
KU_ASSERT(this->indexHeaderForWriteTrx->levelHashMask ==
bulkInsertLocalStorage.getIndexHeader().levelHashMask);
KU_ASSERT(this->indexHeaderForWriteTrx->higherLevelHashMask ==
bulkInsertLocalStorage.getIndexHeader().higherLevelHashMask);
KU_ASSERT(this->indexHeaderForWriteTrx->nextSplitSlotId ==
bulkInsertLocalStorage.getIndexHeader().nextSplitSlotId);

auto originalNumEntries = this->indexHeaderForWriteTrx->numEntries;

// Storing as many slots in-memory as on-disk shouldn't be necessary (for one it makes memory
Expand All @@ -502,17 +487,30 @@ void HashIndex<T>::mergeBulkInserts() {
// slots All new slots will be sequential and benefit from caching, but for existing randomly
// accessed slots we just benefit from the interface. However, the two iterators would not be
// able to pin the same page simultaneously
// Alternatively, cache new slots in memory and pushBack them at the end like in splitSlots
auto diskOverflowSlotIterator = oSlots->iter_mut();
for (uint64_t slotId = 0; slotId < bulkInsertLocalStorage.numPrimarySlots(); slotId++) {
auto localSlot = typename InMemHashIndex<T>::SlotIterator(slotId, &bulkInsertLocalStorage);
// If mask is empty, skip this slot
if (!localSlot.slot->header.validityMask) {
// There should be no entries in the overflow slots, as we never leave gaps in the
// builder
continue;
}
diskSlotIterator.seek(slotId);
mergeSlot(localSlot, diskSlotIterator, diskOverflowSlotIterator, slotId);
for (uint64_t diskSlotId = 0; diskSlotId < pSlots->getNumElements(TransactionType::WRITE);
diskSlotId++) {
// Determine which local slot corresponds to the disk slot
// Note: this assumes that the slot id is a truncation of the hash.
auto localSlotId = HashIndexUtils::getPrimarySlotIdForHash(
bulkInsertLocalStorage.getIndexHeader(), diskSlotId);
auto localSlot =
typename InMemHashIndex<T>::SlotIterator(localSlotId, &bulkInsertLocalStorage);
mergeSlot(localSlot, diskSlotIterator, diskOverflowSlotIterator, diskSlotId);
}
// Since we may over-estimate the required capacity when reserving based on the total number of
// rows (due to reserving the average expected per index, not the actual) it is possible that
// the in-memory index contains more primary slots than the on-disk index after reserving. The
// first pass is correct for the slots that it accessses, and the remaining here are processed
// by truncating the local slotId to match the on-disk header.
for (uint64_t localSlotId = pSlots->getNumElements(TransactionType::WRITE);
localSlotId < bulkInsertLocalStorage.numPrimarySlots(); localSlotId++) {
auto diskSlotId =
HashIndexUtils::getPrimarySlotIdForHash(*indexHeaderForWriteTrx, localSlotId);
auto localSlot =
typename InMemHashIndex<T>::SlotIterator(localSlotId, &bulkInsertLocalStorage);
mergeSlot(localSlot, diskSlotIterator, diskOverflowSlotIterator, diskSlotId);
}
KU_ASSERT(originalNumEntries + bulkInsertLocalStorage.getIndexHeader().numEntries ==
indexHeaderForWriteTrx->numEntries);
Expand All @@ -521,13 +519,32 @@ void HashIndex<T>::mergeBulkInserts() {
template<typename T>
void HashIndex<T>::mergeSlot(typename InMemHashIndex<T>::SlotIterator& slotToMerge,
typename BaseDiskArray<Slot<T>>::WriteIterator& diskSlotIterator,
typename BaseDiskArray<Slot<T>>::WriteIterator& diskOverflowSlotIterator, slot_id_t slotId) {
typename BaseDiskArray<Slot<T>>::WriteIterator& diskOverflowSlotIterator,
slot_id_t diskSlotId) {
slot_id_t diskEntryPos = 0u;
auto* diskSlot = &*diskSlotIterator;
Slot<T>* diskSlot = nullptr;
// Merge slot from local storage to existing slot
do {
for (auto entryPos = 0u; entryPos < slotToMerge.slot->header.numEntries(); entryPos++) {
KU_ASSERT(slotToMerge.slot->header.isEntryValid(entryPos));
// Skip if there are no valid entries
if (!slotToMerge.slot->header.validityMask) {
continue;
}
for (auto entryPos = 0u; entryPos < getSlotCapacity<T>(); entryPos++) {
if (!slotToMerge.slot->header.isEntryValid(entryPos)) {
continue;
}
// Only copy entries that match the current primary slot on disk
auto key = slotToMerge.slot->entries[entryPos].key;
auto hash = hashStored(TransactionType::WRITE, key);
auto primarySlot =
HashIndexUtils::getPrimarySlotIdForHash(*indexHeaderForWriteTrx, hash);
if (primarySlot != diskSlotId) {
continue;
}
// Delay seeking as long as possible to avoid writing to slots which we don't modify
if (diskSlot == nullptr) {
diskSlot = &*diskSlotIterator.seek(diskSlotId);
}
// Find the next empty entry, or add a new slot if there are no more entries
while (diskSlot->header.isEntryValid(diskEntryPos) ||
diskEntryPos >= getSlotCapacity<T>()) {
Expand All @@ -551,14 +568,11 @@ void HashIndex<T>::mergeSlot(typename InMemHashIndex<T>::SlotIterator& slotToMer
copyAndUpdateSlotHeader<const T&, true>(*diskSlot, diskEntryPos,
slotToMerge.slot->entries[entryPos].key, UINT32_MAX,
slotToMerge.slot->header.fingerprints[entryPos]);
slotToMerge.slot->header.setEntryInvalid(entryPos);
KU_ASSERT([&]() {
auto hash =
hashStored(TransactionType::WRITE, slotToMerge.slot->entries[entryPos].key);
auto primarySlot =
HashIndexUtils::getPrimarySlotIdForHash(*indexHeaderForWriteTrx, hash);
KU_ASSERT(slotToMerge.slot->header.fingerprints[entryPos] ==
HashIndexUtils::getFingerprintForHash(hash));
KU_ASSERT(primarySlot == slotId);
KU_ASSERT(primarySlot == diskSlotId);
return true;
}());
indexHeaderForWriteTrx->numEntries++;
Expand Down
9 changes: 4 additions & 5 deletions src/storage/index/in_mem_hash_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ template<typename T>
void InMemHashIndex<T>::reserve(uint32_t numEntries_) {
slot_id_t numRequiredEntries = HashIndexUtils::getNumRequiredEntries(numEntries_);
auto numRequiredSlots = (numRequiredEntries + getSlotCapacity<T>() - 1) / getSlotCapacity<T>();
if (numRequiredSlots <= pSlots->size()) {
return;
}
if (indexHeader.numEntries == 0) {
allocateSlots(numRequiredSlots);
} else {
Expand Down Expand Up @@ -137,11 +140,7 @@ void InMemHashIndex<T>::splitSlot(HashIndexHeader& header) {

template<typename T>
size_t InMemHashIndex<T>::append(const IndexBuffer<BufferKeyType>& buffer) {
slot_id_t numRequiredEntries =
HashIndexUtils::getNumRequiredEntries(this->indexHeader.numEntries + buffer.size());
while (numRequiredEntries > pSlots->size() * getSlotCapacity<T>()) {
this->splitSlot(this->indexHeader);
}
reserve(indexHeader.numEntries + buffer.size());
// Do both searches after splitting. Returning early if the key already exists isn't a
// particular concern and doing both after splitting allows the slotID to be reused
common::hash_t hashes[BUFFER_SIZE];
Expand Down
Loading