diff --git a/src/include/storage/index/hash_index.h b/src/include/storage/index/hash_index.h index 48910d579a..d394c7704d 100644 --- a/src/include/storage/index/hash_index.h +++ b/src/include/storage/index/hash_index.h @@ -124,8 +124,17 @@ class HashIndex final : public OnDiskHashIndex { } // Resizes the on-disk index to support the given number of new entries void reserve(uint64_t newEntries); + + struct HashIndexEntryView { + slot_id_t diskSlotId; + uint8_t fingerprint; + const SlotEntry* entry; + }; + + void sortEntries(typename InMemHashIndex::SlotIterator& slotToMerge, + std::vector& partitions); void mergeBulkInserts(); - void mergeSlot(typename InMemHashIndex::SlotIterator& slotToMerge, + void mergeSlot(std::vector& slotToMerge, typename BaseDiskArray>::WriteIterator& diskSlotIterator, typename BaseDiskArray>::WriteIterator& diskOverflowSlotIterator, slot_id_t slotId); diff --git a/src/include/storage/storage_structure/disk_array.h b/src/include/storage/storage_structure/disk_array.h index 0cc13b3f2d..e6509d3d57 100644 --- a/src/include/storage/storage_structure/disk_array.h +++ b/src/include/storage/storage_structure/disk_array.h @@ -355,7 +355,9 @@ class BaseDiskArray { inline WriteIterator iter_mut() { return WriteIterator{diskArray.iter_mut(sizeof(U))}; } inline uint64_t getAPIdx(uint64_t idx) const { return diskArray.getAPIdx(idx); } - uint32_t getAlignedElementSize() const { return 1 << diskArray.header.alignedElementSizeLog2; } + static constexpr uint32_t getAlignedElementSize() { + return (1 << (std::bit_width(sizeof(U)) - 1)); + } private: BaseDiskArrayInternal diskArray; @@ -451,7 +453,9 @@ class InMemDiskArrayBuilder { inline void saveToDisk() { diskArray.saveToDisk(); } - uint32_t getAlignedElementSize() const { return 1 << diskArray.header.alignedElementSizeLog2; } + static constexpr uint32_t getAlignedElementSize() { + return BaseDiskArray::getAlignedElementSize(); + } private: InMemDiskArrayBuilderInternal diskArray; diff --git a/src/storage/index/hash_index.cpp b/src/storage/index/hash_index.cpp index 5f67c2e51d..fd21ec818e 100644 --- a/src/storage/index/hash_index.cpp +++ b/src/storage/index/hash_index.cpp @@ -1,5 +1,6 @@ #include "storage/index/hash_index.h" +#include #include #include @@ -20,6 +21,7 @@ #include "storage/index/in_mem_hash_index.h" #include "storage/storage_structure/disk_array.h" #include "transaction/transaction.h" +#include using namespace kuzu::common; using namespace kuzu::transaction; @@ -459,6 +461,28 @@ void HashIndex::reserve(uint64_t newEntries) { } } +template +void HashIndex::sortEntries(typename InMemHashIndex::SlotIterator& slotToMerge, + std::vector& entries) { + do { + auto numEntries = slotToMerge.slot->header.numEntries(); + for (auto entryPos = 0u; entryPos < numEntries; entryPos++) { + const auto* entry = &slotToMerge.slot->entries[entryPos]; + const auto hash = hashStored(TransactionType::WRITE, entry->key); + const auto primarySlot = + HashIndexUtils::getPrimarySlotIdForHash(*indexHeaderForWriteTrx, hash); + entries.push_back(HashIndexEntryView{primarySlot, + slotToMerge.slot->header.fingerprints[entryPos], entry}); + } + } while (bulkInsertLocalStorage.nextChainedSlot(slotToMerge)); + std::sort(entries.begin(), entries.end(), [&](auto entry1, auto entry2) -> bool { + // Sort based on the entry's disk slot ID so that the first slot is at the end + // Sorting is done reversed so that we can process from the back of the list, + // using the size to track the remaining entries + return entry1.diskSlotId > entry2.diskSlotId; + }); +} + template void HashIndex::mergeBulkInserts() { // TODO: Ideally we can split slots at the same time that we insert new ones @@ -489,96 +513,105 @@ void HashIndex::mergeBulkInserts() { // able to pin the same page simultaneously // Alternatively, cache new slots in memory and pushBack them at the end like in splitSlots auto diskOverflowSlotIterator = oSlots->iter_mut(); - for (uint64_t diskSlotId = 0; diskSlotId < pSlots->getNumElements(TransactionType::WRITE); - diskSlotId++) { - // Determine which local slot corresponds to the disk slot - // Note: this assumes that the slot id is a truncation of the hash. - auto localSlotId = HashIndexUtils::getPrimarySlotIdForHash( - bulkInsertLocalStorage.getIndexHeader(), diskSlotId); - auto localSlot = - typename InMemHashIndex::SlotIterator(localSlotId, &bulkInsertLocalStorage); - mergeSlot(localSlot, diskSlotIterator, diskOverflowSlotIterator, diskSlotId); - } - // Since we may over-estimate the required capacity when reserving based on the total number of - // rows (due to reserving the average expected per index, not the actual) it is possible that - // the in-memory index contains more primary slots than the on-disk index after reserving. The - // first pass is correct for the slots that it accessses, and the remaining here are processed - // by truncating the local slotId to match the on-disk header. - for (uint64_t localSlotId = pSlots->getNumElements(TransactionType::WRITE); - localSlotId < bulkInsertLocalStorage.numPrimarySlots(); localSlotId++) { - auto diskSlotId = - HashIndexUtils::getPrimarySlotIdForHash(*indexHeaderForWriteTrx, localSlotId); - auto localSlot = - typename InMemHashIndex::SlotIterator(localSlotId, &bulkInsertLocalStorage); - mergeSlot(localSlot, diskSlotIterator, diskOverflowSlotIterator, diskSlotId); + + // Store sorted slot positions. Re-use to avoid re-allocating memory + // TODO: Unify implementations to make sure this matches the size used by the disk array + constexpr size_t NUM_SLOTS_PER_PAGE = + BufferPoolConstants::PAGE_4KB_SIZE / BaseDiskArray>::getAlignedElementSize(); + std::array, NUM_SLOTS_PER_PAGE> partitionedEntries; + // Sort entries for a page of slots at a time, then move vertically and process all entries + // which map to a given page on disk, then horizontally to the next page in the set. These pages + // may not be consecutive, but we reduce the memory overhead for storing the information about + // the sorted data and still just process each page once. + for (uint64_t localSlotId = 0; localSlotId < bulkInsertLocalStorage.numPrimarySlots(); + localSlotId += NUM_SLOTS_PER_PAGE) { + for (size_t i = 0; + i < NUM_SLOTS_PER_PAGE && localSlotId + i < bulkInsertLocalStorage.numPrimarySlots(); + i++) { + auto localSlot = + typename InMemHashIndex::SlotIterator(localSlotId + i, &bulkInsertLocalStorage); + partitionedEntries[i].clear(); + // Results are sorted in reverse, so we can process the end first and pop_back to remove + // them from the vector + sortEntries(localSlot, partitionedEntries[i]); + } + // Repeat until there are no un-processed partitions in partitionedEntries + // This will run at most NUM_SLOTS_PER_PAGE times the number of entries + std::bitset done; + while (!done.all()) { + std::optional diskSlotPage; + for (size_t i = 0; i < NUM_SLOTS_PER_PAGE; i++) { + if (!done[i] && !partitionedEntries[i].empty()) { + auto diskSlotId = partitionedEntries[i].back().diskSlotId; + if (!diskSlotPage) { + diskSlotPage = diskSlotId / NUM_SLOTS_PER_PAGE; + } + if (diskSlotId / NUM_SLOTS_PER_PAGE == diskSlotPage) { + mergeSlot(partitionedEntries[i], diskSlotIterator, diskOverflowSlotIterator, + diskSlotId); + if (partitionedEntries[i].empty()) { + done[i] = true; + } + } + } else { + done[i] = true; + } + } + } } KU_ASSERT(originalNumEntries + bulkInsertLocalStorage.getIndexHeader().numEntries == indexHeaderForWriteTrx->numEntries); } template -void HashIndex::mergeSlot(typename InMemHashIndex::SlotIterator& slotToMerge, +void HashIndex::mergeSlot(std::vector& slotToMerge, typename BaseDiskArray>::WriteIterator& diskSlotIterator, typename BaseDiskArray>::WriteIterator& diskOverflowSlotIterator, slot_id_t diskSlotId) { slot_id_t diskEntryPos = 0u; - Slot* diskSlot = nullptr; + // mergeSlot should only be called when there is at least one entry for the given disk slot id + // in the slot to merge + Slot* diskSlot = &*diskSlotIterator.seek(diskSlotId); // Merge slot from local storage to existing slot - do { - // Skip if there are no valid entries - if (!slotToMerge.slot->header.validityMask) { - continue; + for (const auto& entry : slotToMerge | std::views::reverse) { + if (entry.diskSlotId != diskSlotId) { + return; } - for (auto entryPos = 0u; entryPos < getSlotCapacity(); entryPos++) { - if (!slotToMerge.slot->header.isEntryValid(entryPos)) { - continue; + // Find the next empty entry, or add a new slot if there are no more entries + while ( + diskSlot->header.isEntryValid(diskEntryPos) || diskEntryPos >= getSlotCapacity()) { + diskEntryPos++; + if (diskEntryPos >= getSlotCapacity()) { + if (diskSlot->header.nextOvfSlotId == 0) { + // If there are no more disk slots in this chain, we need to add one + diskSlot->header.nextOvfSlotId = diskOverflowSlotIterator.size(); + // This may invalidate diskSlot + diskOverflowSlotIterator.pushBack(Slot()); + } else { + diskOverflowSlotIterator.seek(diskSlot->header.nextOvfSlotId); + } + diskSlot = &*diskOverflowSlotIterator; + // Check to make sure we're not looping + KU_ASSERT(diskOverflowSlotIterator.idx() != diskSlot->header.nextOvfSlotId); + diskEntryPos = 0; } - // Only copy entries that match the current primary slot on disk - auto key = slotToMerge.slot->entries[entryPos].key; + } + KU_ASSERT(diskEntryPos < getSlotCapacity()); + copyAndUpdateSlotHeader(*diskSlot, diskEntryPos, entry.entry->key, + UINT32_MAX, entry.fingerprint); + KU_ASSERT([&]() { + auto key = entry.entry->key; auto hash = hashStored(TransactionType::WRITE, key); auto primarySlot = HashIndexUtils::getPrimarySlotIdForHash(*indexHeaderForWriteTrx, hash); - if (primarySlot != diskSlotId) { - continue; - } - // Delay seeking as long as possible to avoid writing to slots which we don't modify - if (diskSlot == nullptr) { - diskSlot = &*diskSlotIterator.seek(diskSlotId); - } - // Find the next empty entry, or add a new slot if there are no more entries - while (diskSlot->header.isEntryValid(diskEntryPos) || - diskEntryPos >= getSlotCapacity()) { - diskEntryPos++; - if (diskEntryPos >= getSlotCapacity()) { - if (diskSlot->header.nextOvfSlotId == 0) { - // If there are no more disk slots in this chain, we need to add one - diskSlot->header.nextOvfSlotId = diskOverflowSlotIterator.size(); - // This may invalidate diskSlot - diskOverflowSlotIterator.pushBack(Slot()); - } else { - diskOverflowSlotIterator.seek(diskSlot->header.nextOvfSlotId); - } - diskSlot = &*diskOverflowSlotIterator; - // Check to make sure we're not looping - KU_ASSERT(diskOverflowSlotIterator.idx() != diskSlot->header.nextOvfSlotId); - diskEntryPos = 0; - } - } - KU_ASSERT(diskEntryPos < getSlotCapacity()); - copyAndUpdateSlotHeader(*diskSlot, diskEntryPos, - slotToMerge.slot->entries[entryPos].key, UINT32_MAX, - slotToMerge.slot->header.fingerprints[entryPos]); - slotToMerge.slot->header.setEntryInvalid(entryPos); - KU_ASSERT([&]() { - KU_ASSERT(slotToMerge.slot->header.fingerprints[entryPos] == - HashIndexUtils::getFingerprintForHash(hash)); - KU_ASSERT(primarySlot == diskSlotId); - return true; - }()); - indexHeaderForWriteTrx->numEntries++; - diskEntryPos++; - } - } while (bulkInsertLocalStorage.nextChainedSlot(slotToMerge)); + KU_ASSERT(entry.fingerprint == HashIndexUtils::getFingerprintForHash(hash)); + KU_ASSERT(primarySlot == diskSlotId); + return true; + }()); + indexHeaderForWriteTrx->numEntries++; + diskEntryPos++; + slotToMerge.pop_back(); + } } template