Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve efficiency of merging bulk insertions into the hash index #3403

Merged
merged 1 commit into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/include/storage/index/hash_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,17 @@ class HashIndex final : public OnDiskHashIndex {
}
// Resizes the on-disk index to support the given number of new entries
void reserve(uint64_t newEntries);

struct HashIndexEntryView {
slot_id_t diskSlotId;
uint8_t fingerprint;
const SlotEntry<T>* entry;
};

void sortEntries(typename InMemHashIndex<T>::SlotIterator& slotToMerge,
std::vector<HashIndexEntryView>& partitions);
void mergeBulkInserts();
void mergeSlot(typename InMemHashIndex<T>::SlotIterator& slotToMerge,
void mergeSlot(std::vector<HashIndexEntryView>& slotToMerge,
typename BaseDiskArray<Slot<T>>::WriteIterator& diskSlotIterator,
typename BaseDiskArray<Slot<T>>::WriteIterator& diskOverflowSlotIterator, slot_id_t slotId);

Expand Down
8 changes: 6 additions & 2 deletions src/include/storage/storage_structure/disk_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,9 @@ class BaseDiskArray {

inline WriteIterator iter_mut() { return WriteIterator{diskArray.iter_mut(sizeof(U))}; }
inline uint64_t getAPIdx(uint64_t idx) const { return diskArray.getAPIdx(idx); }
uint32_t getAlignedElementSize() const { return 1 << diskArray.header.alignedElementSizeLog2; }
static constexpr uint32_t getAlignedElementSize() {
return (1 << (std::bit_width(sizeof(U)) - 1));
}

private:
BaseDiskArrayInternal diskArray;
Expand Down Expand Up @@ -451,7 +453,9 @@ class InMemDiskArrayBuilder {

inline void saveToDisk() { diskArray.saveToDisk(); }

uint32_t getAlignedElementSize() const { return 1 << diskArray.header.alignedElementSizeLog2; }
static constexpr uint32_t getAlignedElementSize() {
return BaseDiskArray<U>::getAlignedElementSize();
}

private:
InMemDiskArrayBuilderInternal diskArray;
Expand Down
181 changes: 107 additions & 74 deletions src/storage/index/hash_index.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "storage/index/hash_index.h"

#include <bitset>
#include <cstdint>
#include <type_traits>

Expand All @@ -20,6 +21,7 @@
#include "storage/index/in_mem_hash_index.h"
#include "storage/storage_structure/disk_array.h"
#include "transaction/transaction.h"
#include <ranges>

using namespace kuzu::common;
using namespace kuzu::transaction;
Expand Down Expand Up @@ -459,6 +461,28 @@ void HashIndex<T>::reserve(uint64_t newEntries) {
}
}

template<typename T>
void HashIndex<T>::sortEntries(typename InMemHashIndex<T>::SlotIterator& slotToMerge,
std::vector<HashIndexEntryView>& entries) {
do {
auto numEntries = slotToMerge.slot->header.numEntries();
for (auto entryPos = 0u; entryPos < numEntries; entryPos++) {
const auto* entry = &slotToMerge.slot->entries[entryPos];
const auto hash = hashStored(TransactionType::WRITE, entry->key);
const auto primarySlot =
HashIndexUtils::getPrimarySlotIdForHash(*indexHeaderForWriteTrx, hash);
entries.push_back(HashIndexEntryView{primarySlot,
slotToMerge.slot->header.fingerprints[entryPos], entry});
}
} while (bulkInsertLocalStorage.nextChainedSlot(slotToMerge));
std::sort(entries.begin(), entries.end(), [&](auto entry1, auto entry2) -> bool {
// Sort based on the entry's disk slot ID so that the first slot is at the end
// Sorting is done reversed so that we can process from the back of the list,
// using the size to track the remaining entries
return entry1.diskSlotId > entry2.diskSlotId;
});
}

template<typename T>
void HashIndex<T>::mergeBulkInserts() {
// TODO: Ideally we can split slots at the same time that we insert new ones
Expand Down Expand Up @@ -489,96 +513,105 @@ void HashIndex<T>::mergeBulkInserts() {
// able to pin the same page simultaneously
// Alternatively, cache new slots in memory and pushBack them at the end like in splitSlots
auto diskOverflowSlotIterator = oSlots->iter_mut();
for (uint64_t diskSlotId = 0; diskSlotId < pSlots->getNumElements(TransactionType::WRITE);
diskSlotId++) {
// Determine which local slot corresponds to the disk slot
// Note: this assumes that the slot id is a truncation of the hash.
auto localSlotId = HashIndexUtils::getPrimarySlotIdForHash(
bulkInsertLocalStorage.getIndexHeader(), diskSlotId);
auto localSlot =
typename InMemHashIndex<T>::SlotIterator(localSlotId, &bulkInsertLocalStorage);
mergeSlot(localSlot, diskSlotIterator, diskOverflowSlotIterator, diskSlotId);
}
// Since we may over-estimate the required capacity when reserving based on the total number of
// rows (due to reserving the average expected per index, not the actual) it is possible that
// the in-memory index contains more primary slots than the on-disk index after reserving. The
// first pass is correct for the slots that it accessses, and the remaining here are processed
// by truncating the local slotId to match the on-disk header.
for (uint64_t localSlotId = pSlots->getNumElements(TransactionType::WRITE);
localSlotId < bulkInsertLocalStorage.numPrimarySlots(); localSlotId++) {
auto diskSlotId =
HashIndexUtils::getPrimarySlotIdForHash(*indexHeaderForWriteTrx, localSlotId);
auto localSlot =
typename InMemHashIndex<T>::SlotIterator(localSlotId, &bulkInsertLocalStorage);
mergeSlot(localSlot, diskSlotIterator, diskOverflowSlotIterator, diskSlotId);

// Store sorted slot positions. Re-use to avoid re-allocating memory
// TODO: Unify implementations to make sure this matches the size used by the disk array
constexpr size_t NUM_SLOTS_PER_PAGE =
BufferPoolConstants::PAGE_4KB_SIZE / BaseDiskArray<Slot<T>>::getAlignedElementSize();
std::array<std::vector<HashIndexEntryView>, NUM_SLOTS_PER_PAGE> partitionedEntries;
// Sort entries for a page of slots at a time, then move vertically and process all entries
// which map to a given page on disk, then horizontally to the next page in the set. These pages
// may not be consecutive, but we reduce the memory overhead for storing the information about
// the sorted data and still just process each page once.
for (uint64_t localSlotId = 0; localSlotId < bulkInsertLocalStorage.numPrimarySlots();
localSlotId += NUM_SLOTS_PER_PAGE) {
for (size_t i = 0;
i < NUM_SLOTS_PER_PAGE && localSlotId + i < bulkInsertLocalStorage.numPrimarySlots();
i++) {
auto localSlot =
typename InMemHashIndex<T>::SlotIterator(localSlotId + i, &bulkInsertLocalStorage);
partitionedEntries[i].clear();
// Results are sorted in reverse, so we can process the end first and pop_back to remove
// them from the vector
sortEntries(localSlot, partitionedEntries[i]);
}
// Repeat until there are no un-processed partitions in partitionedEntries
// This will run at most NUM_SLOTS_PER_PAGE times the number of entries
std::bitset<NUM_SLOTS_PER_PAGE> done;
while (!done.all()) {
std::optional<page_idx_t> diskSlotPage;
for (size_t i = 0; i < NUM_SLOTS_PER_PAGE; i++) {
if (!done[i] && !partitionedEntries[i].empty()) {
auto diskSlotId = partitionedEntries[i].back().diskSlotId;
if (!diskSlotPage) {
diskSlotPage = diskSlotId / NUM_SLOTS_PER_PAGE;
}
if (diskSlotId / NUM_SLOTS_PER_PAGE == diskSlotPage) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

though may not make a big difference, could we do early stop if the diskSlotId is not equal to diskSlotPage?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, even if one of the in-memory slots doesn't have any elements that match a given page, later ones could.

mergeSlot(partitionedEntries[i], diskSlotIterator, diskOverflowSlotIterator,
diskSlotId);
if (partitionedEntries[i].empty()) {
done[i] = true;
}
}
} else {
done[i] = true;
}
}
}
}
KU_ASSERT(originalNumEntries + bulkInsertLocalStorage.getIndexHeader().numEntries ==
indexHeaderForWriteTrx->numEntries);
}

template<typename T>
void HashIndex<T>::mergeSlot(typename InMemHashIndex<T>::SlotIterator& slotToMerge,
void HashIndex<T>::mergeSlot(std::vector<HashIndexEntryView>& slotToMerge,
typename BaseDiskArray<Slot<T>>::WriteIterator& diskSlotIterator,
typename BaseDiskArray<Slot<T>>::WriteIterator& diskOverflowSlotIterator,
slot_id_t diskSlotId) {
slot_id_t diskEntryPos = 0u;
Slot<T>* diskSlot = nullptr;
// mergeSlot should only be called when there is at least one entry for the given disk slot id
// in the slot to merge
Slot<T>* diskSlot = &*diskSlotIterator.seek(diskSlotId);
// Merge slot from local storage to existing slot
do {
// Skip if there are no valid entries
if (!slotToMerge.slot->header.validityMask) {
continue;
for (const auto& entry : slotToMerge | std::views::reverse) {
if (entry.diskSlotId != diskSlotId) {
return;
}
for (auto entryPos = 0u; entryPos < getSlotCapacity<T>(); entryPos++) {
if (!slotToMerge.slot->header.isEntryValid(entryPos)) {
continue;
// Find the next empty entry, or add a new slot if there are no more entries
while (
diskSlot->header.isEntryValid(diskEntryPos) || diskEntryPos >= getSlotCapacity<T>()) {
diskEntryPos++;
if (diskEntryPos >= getSlotCapacity<T>()) {
if (diskSlot->header.nextOvfSlotId == 0) {
// If there are no more disk slots in this chain, we need to add one
diskSlot->header.nextOvfSlotId = diskOverflowSlotIterator.size();
// This may invalidate diskSlot
diskOverflowSlotIterator.pushBack(Slot<T>());
} else {
diskOverflowSlotIterator.seek(diskSlot->header.nextOvfSlotId);
}
diskSlot = &*diskOverflowSlotIterator;
// Check to make sure we're not looping
KU_ASSERT(diskOverflowSlotIterator.idx() != diskSlot->header.nextOvfSlotId);
diskEntryPos = 0;
}
// Only copy entries that match the current primary slot on disk
auto key = slotToMerge.slot->entries[entryPos].key;
}
KU_ASSERT(diskEntryPos < getSlotCapacity<T>());
copyAndUpdateSlotHeader<const T&, true>(*diskSlot, diskEntryPos, entry.entry->key,
UINT32_MAX, entry.fingerprint);
KU_ASSERT([&]() {
auto key = entry.entry->key;
auto hash = hashStored(TransactionType::WRITE, key);
auto primarySlot =
HashIndexUtils::getPrimarySlotIdForHash(*indexHeaderForWriteTrx, hash);
if (primarySlot != diskSlotId) {
continue;
}
// Delay seeking as long as possible to avoid writing to slots which we don't modify
if (diskSlot == nullptr) {
diskSlot = &*diskSlotIterator.seek(diskSlotId);
}
// Find the next empty entry, or add a new slot if there are no more entries
while (diskSlot->header.isEntryValid(diskEntryPos) ||
diskEntryPos >= getSlotCapacity<T>()) {
diskEntryPos++;
if (diskEntryPos >= getSlotCapacity<T>()) {
if (diskSlot->header.nextOvfSlotId == 0) {
// If there are no more disk slots in this chain, we need to add one
diskSlot->header.nextOvfSlotId = diskOverflowSlotIterator.size();
// This may invalidate diskSlot
diskOverflowSlotIterator.pushBack(Slot<T>());
} else {
diskOverflowSlotIterator.seek(diskSlot->header.nextOvfSlotId);
}
diskSlot = &*diskOverflowSlotIterator;
// Check to make sure we're not looping
KU_ASSERT(diskOverflowSlotIterator.idx() != diskSlot->header.nextOvfSlotId);
diskEntryPos = 0;
}
}
KU_ASSERT(diskEntryPos < getSlotCapacity<T>());
copyAndUpdateSlotHeader<const T&, true>(*diskSlot, diskEntryPos,
slotToMerge.slot->entries[entryPos].key, UINT32_MAX,
slotToMerge.slot->header.fingerprints[entryPos]);
slotToMerge.slot->header.setEntryInvalid(entryPos);
KU_ASSERT([&]() {
KU_ASSERT(slotToMerge.slot->header.fingerprints[entryPos] ==
HashIndexUtils::getFingerprintForHash(hash));
KU_ASSERT(primarySlot == diskSlotId);
return true;
}());
indexHeaderForWriteTrx->numEntries++;
diskEntryPos++;
}
} while (bulkInsertLocalStorage.nextChainedSlot(slotToMerge));
KU_ASSERT(entry.fingerprint == HashIndexUtils::getFingerprintForHash(hash));
KU_ASSERT(primarySlot == diskSlotId);
return true;
}());
indexHeaderForWriteTrx->numEntries++;
diskEntryPos++;
slotToMerge.pop_back();
}
}

template<typename T>
Expand Down
Loading