Skip to content

Commit

Permalink
Remove size requirements for the in memory hash index
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminwinger committed Apr 24, 2024
1 parent fa3afd7 commit d0be3dd
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 45 deletions.
8 changes: 2 additions & 6 deletions src/include/storage/index/hash_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,8 @@ class HashIndex final : public OnDiskHashIndex {
void splitSlots(HashIndexHeader& header, slot_id_t numSlotsToSplit);

// Resizes the local storage to support the given number of new entries
inline void bulkReserve(uint64_t /*newEntries*/) override {
// FIXME: Don't bulk reserve for now. because of the way things are split up, we may reserve
// more than we actually need, and then when flushing the number of primary slots won't
// match Another reason it may be better to start smaller and split slots when merging to
// persistent storage bulkInsertLocalStorage.bulkReserve(indexHeaderForWriteTrx->numEntries
// + newEntries);
inline void bulkReserve(uint64_t newEntries) override {
bulkInsertLocalStorage.reserve(newEntries);
}
// Resizes the on-disk index to support the given number of new entries
void reserve(uint64_t newEntries);
Expand Down
82 changes: 48 additions & 34 deletions src/storage/index/hash_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,6 @@ size_t HashIndex<T>::append(const IndexBuffer<BufferKeyType>& buffer) {
}
}
}
// Keep the same number of primary slots in the builder as we will eventually need when
// flushing to disk, so that we know each slot to write to
bulkInsertLocalStorage.reserve(
indexHeaderForWriteTrx->numEntries + bulkInsertLocalStorage.size() + buffer.size());
return bulkInsertLocalStorage.append(buffer);
}

Expand Down Expand Up @@ -477,17 +473,6 @@ void HashIndex<T>::mergeBulkInserts() {
// TODO: one pass would also reduce locking when frames are unpinned,
// which is useful if this can be paralellized
reserve(bulkInsertLocalStorage.size());
KU_ASSERT(
bulkInsertLocalStorage.numPrimarySlots() == pSlots->getNumElements(TransactionType::WRITE));
KU_ASSERT(this->indexHeaderForWriteTrx->currentLevel ==
bulkInsertLocalStorage.getIndexHeader().currentLevel);
KU_ASSERT(this->indexHeaderForWriteTrx->levelHashMask ==
bulkInsertLocalStorage.getIndexHeader().levelHashMask);
KU_ASSERT(this->indexHeaderForWriteTrx->higherLevelHashMask ==
bulkInsertLocalStorage.getIndexHeader().higherLevelHashMask);
KU_ASSERT(this->indexHeaderForWriteTrx->nextSplitSlotId ==
bulkInsertLocalStorage.getIndexHeader().nextSplitSlotId);

auto originalNumEntries = this->indexHeaderForWriteTrx->numEntries;

// Storing as many slots in-memory as on-disk shouldn't be necessary (for one it makes memory
Expand All @@ -502,17 +487,30 @@ void HashIndex<T>::mergeBulkInserts() {
// slots All new slots will be sequential and benefit from caching, but for existing randomly
// accessed slots we just benefit from the interface. However, the two iterators would not be
// able to pin the same page simultaneously
// Alternatively, cache new slots in memory and pushBack them at the end like in splitSlots
auto diskOverflowSlotIterator = oSlots->iter_mut();
for (uint64_t slotId = 0; slotId < bulkInsertLocalStorage.numPrimarySlots(); slotId++) {
auto localSlot = typename InMemHashIndex<T>::SlotIterator(slotId, &bulkInsertLocalStorage);
// If mask is empty, skip this slot
if (!localSlot.slot->header.validityMask) {
// There should be no entries in the overflow slots, as we never leave gaps in the
// builder
continue;
}
diskSlotIterator.seek(slotId);
mergeSlot(localSlot, diskSlotIterator, diskOverflowSlotIterator, slotId);
for (uint64_t diskSlotId = 0; diskSlotId < pSlots->getNumElements(TransactionType::WRITE);
diskSlotId++) {
// Determine which local slot corresponds to the disk slot
// Note: this assumes that the slot id is a truncation of the hash.
auto localSlotId = HashIndexUtils::getPrimarySlotIdForHash(
bulkInsertLocalStorage.getIndexHeader(), diskSlotId);
auto localSlot =
typename InMemHashIndex<T>::SlotIterator(localSlotId, &bulkInsertLocalStorage);
mergeSlot(localSlot, diskSlotIterator, diskOverflowSlotIterator, diskSlotId);
}
// Since we may over-estimate the required capacity when reserving based on the total number of
// rows (due to reserving the average expected per index, not the actual) it is possible that
// the in-memory index contains more primary slots than the on-disk index after reserving. The
// first pass is correct for the slots that it accessses, and the remaining here are processed
// by truncating the local slotId to match the on-disk header.
for (uint64_t localSlotId = pSlots->getNumElements(TransactionType::WRITE);
localSlotId < bulkInsertLocalStorage.numPrimarySlots(); localSlotId++) {
auto diskSlotId =
HashIndexUtils::getPrimarySlotIdForHash(*indexHeaderForWriteTrx, localSlotId);
auto localSlot =
typename InMemHashIndex<T>::SlotIterator(localSlotId, &bulkInsertLocalStorage);
mergeSlot(localSlot, diskSlotIterator, diskOverflowSlotIterator, diskSlotId);
}
KU_ASSERT(originalNumEntries + bulkInsertLocalStorage.getIndexHeader().numEntries ==
indexHeaderForWriteTrx->numEntries);
Expand All @@ -521,13 +519,32 @@ void HashIndex<T>::mergeBulkInserts() {
template<typename T>
void HashIndex<T>::mergeSlot(typename InMemHashIndex<T>::SlotIterator& slotToMerge,
typename BaseDiskArray<Slot<T>>::WriteIterator& diskSlotIterator,
typename BaseDiskArray<Slot<T>>::WriteIterator& diskOverflowSlotIterator, slot_id_t slotId) {
typename BaseDiskArray<Slot<T>>::WriteIterator& diskOverflowSlotIterator,
slot_id_t diskSlotId) {
slot_id_t diskEntryPos = 0u;
auto* diskSlot = &*diskSlotIterator;
Slot<T>* diskSlot = nullptr;
// Merge slot from local storage to existing slot
do {
for (auto entryPos = 0u; entryPos < slotToMerge.slot->header.numEntries(); entryPos++) {
KU_ASSERT(slotToMerge.slot->header.isEntryValid(entryPos));
// Skip if there are no valid entries
if (!slotToMerge.slot->header.validityMask) {
continue;
}
for (auto entryPos = 0u; entryPos < getSlotCapacity<T>(); entryPos++) {
if (!slotToMerge.slot->header.isEntryValid(entryPos)) {
continue;
}
// Only copy entries that match the current primary slot on disk
auto key = slotToMerge.slot->entries[entryPos].key;
auto hash = hashStored(TransactionType::WRITE, key);
auto primarySlot =
HashIndexUtils::getPrimarySlotIdForHash(*indexHeaderForWriteTrx, hash);
if (primarySlot != diskSlotId) {
continue;
}
// Delay seeking as long as possible to avoid writing to slots which we don't modify
if (diskSlot == nullptr) {
diskSlot = &*diskSlotIterator.seek(diskSlotId);
}
// Find the next empty entry, or add a new slot if there are no more entries
while (diskSlot->header.isEntryValid(diskEntryPos) ||
diskEntryPos >= getSlotCapacity<T>()) {
Expand All @@ -551,14 +568,11 @@ void HashIndex<T>::mergeSlot(typename InMemHashIndex<T>::SlotIterator& slotToMer
copyAndUpdateSlotHeader<const T&, true>(*diskSlot, diskEntryPos,
slotToMerge.slot->entries[entryPos].key, UINT32_MAX,
slotToMerge.slot->header.fingerprints[entryPos]);
slotToMerge.slot->header.setEntryInvalid(entryPos);
KU_ASSERT([&]() {
auto hash =
hashStored(TransactionType::WRITE, slotToMerge.slot->entries[entryPos].key);
auto primarySlot =
HashIndexUtils::getPrimarySlotIdForHash(*indexHeaderForWriteTrx, hash);
KU_ASSERT(slotToMerge.slot->header.fingerprints[entryPos] ==
HashIndexUtils::getFingerprintForHash(hash));
KU_ASSERT(primarySlot == slotId);
KU_ASSERT(primarySlot == diskSlotId);
return true;
}());
indexHeaderForWriteTrx->numEntries++;
Expand Down
6 changes: 1 addition & 5 deletions src/storage/index/in_mem_hash_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,7 @@ void InMemHashIndex<T>::splitSlot(HashIndexHeader& header) {

template<typename T>
size_t InMemHashIndex<T>::append(const IndexBuffer<BufferKeyType>& buffer) {
slot_id_t numRequiredEntries =
HashIndexUtils::getNumRequiredEntries(this->indexHeader.numEntries + buffer.size());
while (numRequiredEntries > pSlots->size() * getSlotCapacity<T>()) {
this->splitSlot(this->indexHeader);
}
reserve(indexHeader.numEntries + buffer.size());
// Do both searches after splitting. Returning early if the key already exists isn't a
// particular concern and doing both after splitting allows the slotID to be reused
common::hash_t hashes[BUFFER_SIZE];
Expand Down

0 comments on commit d0be3dd

Please sign in to comment.