Skip to content

Commit

Permalink
Improve efficiency of merging bulk insertions into the hash index (#3403
Browse files Browse the repository at this point in the history
)
  • Loading branch information
benjaminwinger committed Apr 29, 2024
1 parent eab016d commit 88cc154
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 77 deletions.
11 changes: 10 additions & 1 deletion src/include/storage/index/hash_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,17 @@ class HashIndex final : public OnDiskHashIndex {
}
// Resizes the on-disk index to support the given number of new entries
void reserve(uint64_t newEntries);

struct HashIndexEntryView {
slot_id_t diskSlotId;
uint8_t fingerprint;
const SlotEntry<T>* entry;
};

void sortEntries(typename InMemHashIndex<T>::SlotIterator& slotToMerge,
std::vector<HashIndexEntryView>& partitions);
void mergeBulkInserts();
void mergeSlot(typename InMemHashIndex<T>::SlotIterator& slotToMerge,
void mergeSlot(std::vector<HashIndexEntryView>& slotToMerge,
typename BaseDiskArray<Slot<T>>::WriteIterator& diskSlotIterator,
typename BaseDiskArray<Slot<T>>::WriteIterator& diskOverflowSlotIterator, slot_id_t slotId);

Expand Down
8 changes: 6 additions & 2 deletions src/include/storage/storage_structure/disk_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,9 @@ class BaseDiskArray {

inline WriteIterator iter_mut() { return WriteIterator{diskArray.iter_mut(sizeof(U))}; }
inline uint64_t getAPIdx(uint64_t idx) const { return diskArray.getAPIdx(idx); }
uint32_t getAlignedElementSize() const { return 1 << diskArray.header.alignedElementSizeLog2; }
static constexpr uint32_t getAlignedElementSize() {
return (1 << (std::bit_width(sizeof(U)) - 1));
}

private:
BaseDiskArrayInternal diskArray;
Expand Down Expand Up @@ -451,7 +453,9 @@ class InMemDiskArrayBuilder {

inline void saveToDisk() { diskArray.saveToDisk(); }

uint32_t getAlignedElementSize() const { return 1 << diskArray.header.alignedElementSizeLog2; }
static constexpr uint32_t getAlignedElementSize() {
return BaseDiskArray<U>::getAlignedElementSize();
}

private:
InMemDiskArrayBuilderInternal diskArray;
Expand Down
181 changes: 107 additions & 74 deletions src/storage/index/hash_index.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "storage/index/hash_index.h"

#include <bitset>
#include <cstdint>
#include <type_traits>

Expand All @@ -20,6 +21,7 @@
#include "storage/index/in_mem_hash_index.h"
#include "storage/storage_structure/disk_array.h"
#include "transaction/transaction.h"
#include <ranges>

using namespace kuzu::common;
using namespace kuzu::transaction;
Expand Down Expand Up @@ -459,6 +461,28 @@ void HashIndex<T>::reserve(uint64_t newEntries) {
}
}

template<typename T>
void HashIndex<T>::sortEntries(typename InMemHashIndex<T>::SlotIterator& slotToMerge,
std::vector<HashIndexEntryView>& entries) {
do {
auto numEntries = slotToMerge.slot->header.numEntries();
for (auto entryPos = 0u; entryPos < numEntries; entryPos++) {
const auto* entry = &slotToMerge.slot->entries[entryPos];
const auto hash = hashStored(TransactionType::WRITE, entry->key);
const auto primarySlot =
HashIndexUtils::getPrimarySlotIdForHash(*indexHeaderForWriteTrx, hash);
entries.push_back(HashIndexEntryView{primarySlot,
slotToMerge.slot->header.fingerprints[entryPos], entry});
}
} while (bulkInsertLocalStorage.nextChainedSlot(slotToMerge));
std::sort(entries.begin(), entries.end(), [&](auto entry1, auto entry2) -> bool {
// Sort based on the entry's disk slot ID so that the first slot is at the end
// Sorting is done reversed so that we can process from the back of the list,
// using the size to track the remaining entries
return entry1.diskSlotId > entry2.diskSlotId;
});
}

template<typename T>
void HashIndex<T>::mergeBulkInserts() {
// TODO: Ideally we can split slots at the same time that we insert new ones
Expand Down Expand Up @@ -489,96 +513,105 @@ void HashIndex<T>::mergeBulkInserts() {
// able to pin the same page simultaneously
// Alternatively, cache new slots in memory and pushBack them at the end like in splitSlots
auto diskOverflowSlotIterator = oSlots->iter_mut();
for (uint64_t diskSlotId = 0; diskSlotId < pSlots->getNumElements(TransactionType::WRITE);
diskSlotId++) {
// Determine which local slot corresponds to the disk slot
// Note: this assumes that the slot id is a truncation of the hash.
auto localSlotId = HashIndexUtils::getPrimarySlotIdForHash(
bulkInsertLocalStorage.getIndexHeader(), diskSlotId);
auto localSlot =
typename InMemHashIndex<T>::SlotIterator(localSlotId, &bulkInsertLocalStorage);
mergeSlot(localSlot, diskSlotIterator, diskOverflowSlotIterator, diskSlotId);
}
// Since we may over-estimate the required capacity when reserving based on the total number of
// rows (due to reserving the average expected per index, not the actual) it is possible that
// the in-memory index contains more primary slots than the on-disk index after reserving. The
// first pass is correct for the slots that it accessses, and the remaining here are processed
// by truncating the local slotId to match the on-disk header.
for (uint64_t localSlotId = pSlots->getNumElements(TransactionType::WRITE);
localSlotId < bulkInsertLocalStorage.numPrimarySlots(); localSlotId++) {
auto diskSlotId =
HashIndexUtils::getPrimarySlotIdForHash(*indexHeaderForWriteTrx, localSlotId);
auto localSlot =
typename InMemHashIndex<T>::SlotIterator(localSlotId, &bulkInsertLocalStorage);
mergeSlot(localSlot, diskSlotIterator, diskOverflowSlotIterator, diskSlotId);

// Store sorted slot positions. Re-use to avoid re-allocating memory
// TODO: Unify implementations to make sure this matches the size used by the disk array
constexpr size_t NUM_SLOTS_PER_PAGE =
BufferPoolConstants::PAGE_4KB_SIZE / BaseDiskArray<Slot<T>>::getAlignedElementSize();
std::array<std::vector<HashIndexEntryView>, NUM_SLOTS_PER_PAGE> partitionedEntries;
// Sort entries for a page of slots at a time, then move vertically and process all entries
// which map to a given page on disk, then horizontally to the next page in the set. These pages
// may not be consecutive, but we reduce the memory overhead for storing the information about
// the sorted data and still just process each page once.
for (uint64_t localSlotId = 0; localSlotId < bulkInsertLocalStorage.numPrimarySlots();
localSlotId += NUM_SLOTS_PER_PAGE) {
for (size_t i = 0;
i < NUM_SLOTS_PER_PAGE && localSlotId + i < bulkInsertLocalStorage.numPrimarySlots();
i++) {
auto localSlot =
typename InMemHashIndex<T>::SlotIterator(localSlotId + i, &bulkInsertLocalStorage);
partitionedEntries[i].clear();
// Results are sorted in reverse, so we can process the end first and pop_back to remove
// them from the vector
sortEntries(localSlot, partitionedEntries[i]);
}
// Repeat until there are no un-processed partitions in partitionedEntries
// This will run at most NUM_SLOTS_PER_PAGE times the number of entries
std::bitset<NUM_SLOTS_PER_PAGE> done;
while (!done.all()) {
std::optional<page_idx_t> diskSlotPage;
for (size_t i = 0; i < NUM_SLOTS_PER_PAGE; i++) {
if (!done[i] && !partitionedEntries[i].empty()) {
auto diskSlotId = partitionedEntries[i].back().diskSlotId;
if (!diskSlotPage) {
diskSlotPage = diskSlotId / NUM_SLOTS_PER_PAGE;
}
if (diskSlotId / NUM_SLOTS_PER_PAGE == diskSlotPage) {
mergeSlot(partitionedEntries[i], diskSlotIterator, diskOverflowSlotIterator,
diskSlotId);
if (partitionedEntries[i].empty()) {
done[i] = true;
}
}
} else {
done[i] = true;
}
}
}
}
KU_ASSERT(originalNumEntries + bulkInsertLocalStorage.getIndexHeader().numEntries ==
indexHeaderForWriteTrx->numEntries);
}

template<typename T>
void HashIndex<T>::mergeSlot(typename InMemHashIndex<T>::SlotIterator& slotToMerge,
void HashIndex<T>::mergeSlot(std::vector<HashIndexEntryView>& slotToMerge,
typename BaseDiskArray<Slot<T>>::WriteIterator& diskSlotIterator,
typename BaseDiskArray<Slot<T>>::WriteIterator& diskOverflowSlotIterator,
slot_id_t diskSlotId) {
slot_id_t diskEntryPos = 0u;
Slot<T>* diskSlot = nullptr;
// mergeSlot should only be called when there is at least one entry for the given disk slot id
// in the slot to merge
Slot<T>* diskSlot = &*diskSlotIterator.seek(diskSlotId);
// Merge slot from local storage to existing slot
do {
// Skip if there are no valid entries
if (!slotToMerge.slot->header.validityMask) {
continue;
for (const auto& entry : slotToMerge | std::views::reverse) {
if (entry.diskSlotId != diskSlotId) {
return;
}
for (auto entryPos = 0u; entryPos < getSlotCapacity<T>(); entryPos++) {
if (!slotToMerge.slot->header.isEntryValid(entryPos)) {
continue;
// Find the next empty entry, or add a new slot if there are no more entries
while (
diskSlot->header.isEntryValid(diskEntryPos) || diskEntryPos >= getSlotCapacity<T>()) {
diskEntryPos++;
if (diskEntryPos >= getSlotCapacity<T>()) {
if (diskSlot->header.nextOvfSlotId == 0) {
// If there are no more disk slots in this chain, we need to add one
diskSlot->header.nextOvfSlotId = diskOverflowSlotIterator.size();
// This may invalidate diskSlot
diskOverflowSlotIterator.pushBack(Slot<T>());
} else {
diskOverflowSlotIterator.seek(diskSlot->header.nextOvfSlotId);
}
diskSlot = &*diskOverflowSlotIterator;
// Check to make sure we're not looping
KU_ASSERT(diskOverflowSlotIterator.idx() != diskSlot->header.nextOvfSlotId);
diskEntryPos = 0;
}
// Only copy entries that match the current primary slot on disk
auto key = slotToMerge.slot->entries[entryPos].key;
}
KU_ASSERT(diskEntryPos < getSlotCapacity<T>());
copyAndUpdateSlotHeader<const T&, true>(*diskSlot, diskEntryPos, entry.entry->key,
UINT32_MAX, entry.fingerprint);
KU_ASSERT([&]() {
auto key = entry.entry->key;
auto hash = hashStored(TransactionType::WRITE, key);
auto primarySlot =
HashIndexUtils::getPrimarySlotIdForHash(*indexHeaderForWriteTrx, hash);
if (primarySlot != diskSlotId) {
continue;
}
// Delay seeking as long as possible to avoid writing to slots which we don't modify
if (diskSlot == nullptr) {
diskSlot = &*diskSlotIterator.seek(diskSlotId);
}
// Find the next empty entry, or add a new slot if there are no more entries
while (diskSlot->header.isEntryValid(diskEntryPos) ||
diskEntryPos >= getSlotCapacity<T>()) {
diskEntryPos++;
if (diskEntryPos >= getSlotCapacity<T>()) {
if (diskSlot->header.nextOvfSlotId == 0) {
// If there are no more disk slots in this chain, we need to add one
diskSlot->header.nextOvfSlotId = diskOverflowSlotIterator.size();
// This may invalidate diskSlot
diskOverflowSlotIterator.pushBack(Slot<T>());
} else {
diskOverflowSlotIterator.seek(diskSlot->header.nextOvfSlotId);
}
diskSlot = &*diskOverflowSlotIterator;
// Check to make sure we're not looping
KU_ASSERT(diskOverflowSlotIterator.idx() != diskSlot->header.nextOvfSlotId);
diskEntryPos = 0;
}
}
KU_ASSERT(diskEntryPos < getSlotCapacity<T>());
copyAndUpdateSlotHeader<const T&, true>(*diskSlot, diskEntryPos,
slotToMerge.slot->entries[entryPos].key, UINT32_MAX,
slotToMerge.slot->header.fingerprints[entryPos]);
slotToMerge.slot->header.setEntryInvalid(entryPos);
KU_ASSERT([&]() {
KU_ASSERT(slotToMerge.slot->header.fingerprints[entryPos] ==
HashIndexUtils::getFingerprintForHash(hash));
KU_ASSERT(primarySlot == diskSlotId);
return true;
}());
indexHeaderForWriteTrx->numEntries++;
diskEntryPos++;
}
} while (bulkInsertLocalStorage.nextChainedSlot(slotToMerge));
KU_ASSERT(entry.fingerprint == HashIndexUtils::getFingerprintForHash(hash));
KU_ASSERT(primarySlot == diskSlotId);
return true;
}());
indexHeaderForWriteTrx->numEntries++;
diskEntryPos++;
slotToMerge.pop_back();
}
}

template<typename T>
Expand Down

0 comments on commit 88cc154

Please sign in to comment.