diff --git a/src/include/common/type_utils.h b/src/include/common/type_utils.h index 43b03dbd2ae..b396fe1985c 100644 --- a/src/include/common/type_utils.h +++ b/src/include/common/type_utils.h @@ -80,7 +80,7 @@ class TypeUtils { return common::PhysicalTypeID::INT128; } else if constexpr (std::is_same_v) { return common::PhysicalTypeID::INTERVAL; - } else if constexpr (std::is_same_v) { + } else if constexpr (std::same_as || std::same_as) { return common::PhysicalTypeID::STRING; } else { KU_UNREACHABLE; diff --git a/src/include/common/types/types.h b/src/include/common/types/types.h index a0b90450d98..2a79f1ae20d 100644 --- a/src/include/common/types/types.h +++ b/src/include/common/types/types.h @@ -85,7 +85,7 @@ template concept IndexHashable = ((std::integral && !std::is_same_v) || std::floating_point || std::is_same_v || std::is_same_v || - std::is_same_v); + std::is_same_v || std::same_as); enum class KUZU_API LogicalTypeID : uint8_t { ANY = 0, diff --git a/src/include/processor/operator/persistent/index_builder.h b/src/include/processor/operator/persistent/index_builder.h index 529e28e0724..12b5c5222de 100644 --- a/src/include/processor/operator/persistent/index_builder.h +++ b/src/include/processor/operator/persistent/index_builder.h @@ -16,9 +16,6 @@ namespace kuzu { namespace processor { -constexpr size_t BUFFER_SIZE = 1024; -template -using Buffer = common::StaticVector, BUFFER_SIZE>; const size_t SHOULD_FLUSH_QUEUE_SIZE = 32; class IndexBuilderGlobalQueues { @@ -28,7 +25,7 @@ class IndexBuilderGlobalQueues { void flushToDisk() const; template - void insert(size_t index, Buffer elem) { + void insert(size_t index, storage::IndexBuffer elem) { auto& typedQueues = std::get>(queues).array; typedQueues[index].push(std::move(elem)); if (typedQueues[index].approxSize() < SHOULD_FLUSH_QUEUE_SIZE) { @@ -49,7 +46,7 @@ class IndexBuilderGlobalQueues { template struct Queue { - std::array>, storage::NUM_HASH_INDEXES> array; + std::array>, storage::NUM_HASH_INDEXES> array; // Type information to help std::visit. Value is not used T type; }; @@ -92,7 +89,7 @@ class IndexBuilderLocalBuffers { // These arrays are much too large to be inline. template - using Buffers = std::array, storage::NUM_HASH_INDEXES>; + using Buffers = std::array, storage::NUM_HASH_INDEXES>; template using UniqueBuffers = std::unique_ptr>; std::variant, UniqueBuffers, UniqueBuffers, diff --git a/src/include/storage/index/hash_index_builder.h b/src/include/storage/index/hash_index_builder.h index aaf2e81bc48..92e1b213d48 100644 --- a/src/include/storage/index/hash_index_builder.h +++ b/src/include/storage/index/hash_index_builder.h @@ -1,6 +1,9 @@ #pragma once +#include "common/static_vector.h" #include "common/type_utils.h" +#include "common/types/internal_id_t.h" +#include "common/types/ku_string.h" #include "common/types/types.h" #include "storage/index/hash_index_header.h" #include "storage/index/hash_index_slot.h" @@ -18,6 +21,10 @@ class InMemHashIndex { virtual void bulkReserve(uint32_t numEntries) = 0; }; +constexpr size_t BUFFER_SIZE = 1024; +template +using IndexBuffer = common::StaticVector, BUFFER_SIZE>; + /** * Basic index file consists of three disk arrays: indexHeader, primary slots (pSlots), and overflow * slots (oSlots). @@ -44,9 +51,10 @@ class InMemHashIndex { * * */ -// T is the key type used to access values -// S is the stored type, which is usually the same as T, with the exception of strings -template +// T is the key type stored in the slots. +// For strings this is different than the type used when inserting/searching +// (see BufferKeyType and Key) +template class HashIndexBuilder final : public InMemHashIndex { static_assert(getSlotCapacity() <= SlotHeader::FINGERPRINT_CAPACITY); // Size of the validity mask @@ -62,13 +70,21 @@ class HashIndexBuilder final : public InMemHashIndex { // Reserves space for at least the specified number of elements. void bulkReserve(uint32_t numEntries) override; - bool append(T key, common::offset_t value); - bool lookup(T key, common::offset_t& result); + using BufferKeyType = + typename std::conditional, std::string, T>::type; + // Appends the buffer to the index. Returns the number of values successfully inserted. + // I.e. if a key fails to insert, its index will be the return value + size_t append(const IndexBuffer& buffer); + using Key = + typename std::conditional, std::string_view, T>::type; + bool lookup(Key key, common::offset_t& result); void flush() override; private: - Slot* getSlot(const SlotInfo& slotInfo); + // Assumes that space has already been allocated for the entry + bool appendInternal(Key key, common::offset_t value, common::hash_t hash); + Slot* getSlot(const SlotInfo& slotInfo); uint32_t allocatePSlots(uint32_t numSlotsToAllocate); uint32_t allocateAOSlot(); @@ -80,12 +96,12 @@ class HashIndexBuilder final : public InMemHashIndex { */ void splitSlot(HashIndexHeader& header); - inline bool equals(T keyToLookup, const S& keyInEntry) const { + inline bool equals(Key keyToLookup, const T& keyInEntry) const { return keyToLookup == keyInEntry; } inline void insert( - T key, Slot* slot, uint8_t entryPos, common::offset_t value, uint8_t fingerprint) { + Key key, Slot* slot, uint8_t entryPos, common::offset_t value, uint8_t fingerprint) { auto entry = slot->entries[entryPos].data; memcpy(entry, &key, sizeof(T)); memcpy(entry + sizeof(T), &value, sizeof(common::offset_t)); @@ -94,14 +110,14 @@ class HashIndexBuilder final : public InMemHashIndex { } void copy(const uint8_t* oldEntry, slot_id_t newSlotId, uint8_t fingerprint); void insertToNewOvfSlot( - T key, Slot* previousSlot, common::offset_t offset, uint8_t fingerprint); - common::hash_t hashStored(const S& key) const; + Key key, Slot* previousSlot, common::offset_t offset, uint8_t fingerprint); + common::hash_t hashStored(const T& key) const; struct SlotIterator { - explicit SlotIterator(slot_id_t newSlotId, HashIndexBuilder* builder) + explicit SlotIterator(slot_id_t newSlotId, HashIndexBuilder* builder) : slotInfo{newSlotId, SlotType::PRIMARY}, slot(builder->getSlot(slotInfo)) {} SlotInfo slotInfo; - Slot* slot; + Slot* slot; }; inline bool nextChainedSlot(SlotIterator& iter) { @@ -118,13 +134,13 @@ class HashIndexBuilder final : public InMemHashIndex { std::shared_ptr fileHandle; std::unique_ptr inMemOverflowFile; std::unique_ptr> headerArray; - std::unique_ptr>> pSlots; - std::unique_ptr>> oSlots; + std::unique_ptr>> pSlots; + std::unique_ptr>> oSlots; std::unique_ptr indexHeader; }; template<> -bool HashIndexBuilder::equals( +bool HashIndexBuilder::equals( std::string_view keyToLookup, const common::ku_string_t& keyInEntry) const; class PrimaryKeyIndexBuilder { @@ -134,26 +150,15 @@ class PrimaryKeyIndexBuilder { void bulkReserve(uint32_t numEntries); - // Note: append assumes that bulkRserve has been called before it and the index has reserved - // enough space already. - template - bool append(T key, common::offset_t value) { - return appendWithIndexPos(key, value, HashIndexUtils::getHashIndexPosition(key)); - } - bool append(std::string_view key, common::offset_t value) { - return appendWithIndexPos(key, value, HashIndexUtils::getHashIndexPosition(key)); - } - template - bool appendWithIndexPos(T key, common::offset_t value, uint64_t indexPos) { + // Appends the buffer to the index. Returns the number of values successfully inserted. + // I.e. if a key fails to insert, its index will be the return value + template + size_t appendWithIndexPos(const IndexBuffer& buffer, uint64_t indexPos) { KU_ASSERT(keyDataTypeID == common::TypeUtils::getPhysicalTypeIDForType()); - KU_ASSERT(HashIndexUtils::getHashIndexPosition(key) == indexPos); - return getTypedHashIndex(indexPos)->append(key, value); - } - bool appendWithIndexPos(std::string_view key, common::offset_t value, uint64_t indexPos) { - KU_ASSERT(keyDataTypeID == common::PhysicalTypeID::STRING); - KU_ASSERT(HashIndexUtils::getHashIndexPosition(key) == indexPos); - return getTypedHashIndex(indexPos)->append( - key, value); + KU_ASSERT(std::all_of(buffer.begin(), buffer.end(), [&](auto& elem) { + return HashIndexUtils::getHashIndexPosition(elem.first) == indexPos; + })); + return getTypedHashIndex(indexPos)->append(buffer); } template bool lookup(T key, common::offset_t& result) { @@ -162,8 +167,7 @@ class PrimaryKeyIndexBuilder { } bool lookup(std::string_view key, common::offset_t& result) { KU_ASSERT(keyDataTypeID == common::PhysicalTypeID::STRING); - return getTypedHashIndex( - HashIndexUtils::getHashIndexPosition(key)) + return getTypedHashIndex(HashIndexUtils::getHashIndexPosition(key)) ->lookup(key, result); } @@ -173,10 +177,19 @@ class PrimaryKeyIndexBuilder { common::PhysicalTypeID keyTypeID() const { return keyDataTypeID; } private: - template - inline HashIndexBuilder* getTypedHashIndex(uint64_t indexPos) { - return common::ku_dynamic_cast*>( - hashIndexBuilders[indexPos].get()); + template + using HashIndexType = + typename std::conditional || std::same_as, + common::ku_string_t, T>::type; + template + inline HashIndexBuilder>* getTypedHashIndex(uint64_t indexPos) { + if constexpr (std::same_as, common::ku_string_t>) { + return common::ku_dynamic_cast*>( + hashIndexBuilders[indexPos].get()); + } else { + return common::ku_dynamic_cast*>( + hashIndexBuilders[indexPos].get()); + } } private: diff --git a/src/processor/operator/persistent/index_builder.cpp b/src/processor/operator/persistent/index_builder.cpp index 9ea69537b24..acb0c313dfb 100644 --- a/src/processor/operator/persistent/index_builder.cpp +++ b/src/processor/operator/persistent/index_builder.cpp @@ -34,34 +34,26 @@ void IndexBuilderGlobalQueues::maybeConsumeIndex(size_t index) { if (!mutexes[index].try_lock()) { return; } - std::unique_lock lck{mutexes[index], std::adopt_lock}; - - std::visit(overload{[&](Queue&& queues) { - using T = std::decay_t; - Buffer elem; - while (queues.array[index].pop(elem)) { - for (auto [key, value] : elem) { - if (!pkIndex->appendWithIndexPos(key, value, index)) { - throw CopyException( - ExceptionMessage::duplicatePKException(std::move(key))); - } - } - } - return; - }, - [&](auto&& queues) { - using T = std::decay_t; - Buffer elem; - while (queues.array[index].pop(elem)) { - for (auto [key, value] : elem) { - if (!pkIndex->appendWithIndexPos(key, value, index)) { - throw CopyException(ExceptionMessage::duplicatePKException( - TypeUtils::toString(key))); - } - } - } - return; - }}, + + std::visit( + [&](auto&& queues) { + using T = std::decay_t; + std::unique_lock lck{mutexes[index], std::adopt_lock}; + IndexBuffer buffer; + while (queues.array[index].pop(buffer)) { + auto numValuesInserted = pkIndex->appendWithIndexPos(buffer, index); + if (numValuesInserted < buffer.size()) { + if constexpr (std::same_as) { + throw CopyException(ExceptionMessage::duplicatePKException( + std::move(buffer[numValuesInserted].first))); + } else { + throw CopyException(ExceptionMessage::duplicatePKException( + TypeUtils::toString(buffer[numValuesInserted].first))); + } + } + } + return; + }, std::move(queues)); } diff --git a/src/storage/index/hash_index_builder.cpp b/src/storage/index/hash_index_builder.cpp index 0584d005964..fed0d067ddb 100644 --- a/src/storage/index/hash_index_builder.cpp +++ b/src/storage/index/hash_index_builder.cpp @@ -16,27 +16,27 @@ using namespace kuzu::common; namespace kuzu { namespace storage { -template -HashIndexBuilder::HashIndexBuilder(const std::shared_ptr& fileHandle, +template +HashIndexBuilder::HashIndexBuilder(const std::shared_ptr& fileHandle, std::unique_ptr overflowFile, uint64_t indexPos, PhysicalTypeID keyDataType) : fileHandle(fileHandle), inMemOverflowFile(std::move(overflowFile)) { this->indexHeader = std::make_unique(keyDataType); headerArray = std::make_unique>(*fileHandle, NUM_HEADER_PAGES * indexPos + INDEX_HEADER_ARRAY_HEADER_PAGE_IDX, 0 /* numElements */); - pSlots = std::make_unique>>( + pSlots = std::make_unique>>( *fileHandle, NUM_HEADER_PAGES * indexPos + P_SLOTS_HEADER_PAGE_IDX, 0 /* numElements */); // Reserve a slot for oSlots, which is always skipped, as we treat slot idx 0 as NULL. - oSlots = std::make_unique>>( + oSlots = std::make_unique>>( *fileHandle, NUM_HEADER_PAGES * indexPos + O_SLOTS_HEADER_PAGE_IDX, 1 /* numElements */); allocatePSlots(1u << this->indexHeader->currentLevel); } -template -void HashIndexBuilder::bulkReserve(uint32_t numEntries_) { +template +void HashIndexBuilder::bulkReserve(uint32_t numEntries_) { slot_id_t numRequiredEntries = HashIndexUtils::getNumRequiredEntries(this->indexHeader->numEntries, numEntries_); // Build from scratch. - auto numRequiredSlots = (numRequiredEntries + getSlotCapacity() - 1) / getSlotCapacity(); + auto numRequiredSlots = (numRequiredEntries + getSlotCapacity() - 1) / getSlotCapacity(); auto numSlotsOfCurrentLevel = 1u << this->indexHeader->currentLevel; while ((numSlotsOfCurrentLevel << 1) < numRequiredSlots) { this->indexHeader->incrementLevel(); @@ -51,12 +51,11 @@ void HashIndexBuilder::bulkReserve(uint32_t numEntries_) { } } -template -void HashIndexBuilder::copy( - const uint8_t* oldEntry, slot_id_t newSlotId, uint8_t fingerprint) { +template +void HashIndexBuilder::copy(const uint8_t* oldEntry, slot_id_t newSlotId, uint8_t fingerprint) { SlotIterator iter(newSlotId, this); do { - for (auto newEntryPos = 0u; newEntryPos < getSlotCapacity(); newEntryPos++) { + for (auto newEntryPos = 0u; newEntryPos < getSlotCapacity(); newEntryPos++) { if (!iter.slot->header.isEntryValid(newEntryPos)) { // The original slot was marked as unused, but // copying to the original slot is unnecessary and will cause undefined behaviour @@ -78,8 +77,8 @@ void HashIndexBuilder::copy( newOvfSlot->header.setEntryValid(newEntryPos, fingerprint); } -template -void HashIndexBuilder::splitSlot(HashIndexHeader& header) { +template +void HashIndexBuilder::splitSlot(HashIndexHeader& header) { // Add new slot allocatePSlots(1); @@ -91,12 +90,12 @@ void HashIndexBuilder::splitSlot(HashIndexHeader& header) { // Reset everything except the next overflow id so we can reuse the overflow slot iter.slot->header.reset(); iter.slot->header.nextOvfSlotId = slotHeader.nextOvfSlotId; - for (auto entryPos = 0u; entryPos < getSlotCapacity(); entryPos++) { + for (auto entryPos = 0u; entryPos < getSlotCapacity(); entryPos++) { if (!slotHeader.isEntryValid(entryPos)) { continue; // Skip invalid entries. } const auto* data = (iter.slot->entries[entryPos].data); - hash_t hash = this->hashStored(*reinterpret_cast(data)); + hash_t hash = this->hashStored(*reinterpret_cast(data)); auto fingerprint = HashIndexUtils::getFingerprintForHash(hash); auto newSlotId = hash & header.higherLevelHashMask; copy(data, newSlotId, fingerprint); @@ -106,21 +105,35 @@ void HashIndexBuilder::splitSlot(HashIndexHeader& header) { header.incrementNextSplitSlotId(); } -template -bool HashIndexBuilder::append(T key, offset_t value) { +template +size_t HashIndexBuilder::append(const IndexBuffer& buffer) { slot_id_t numRequiredEntries = - HashIndexUtils::getNumRequiredEntries(this->indexHeader->numEntries, 1); - while (numRequiredEntries > pSlots->getNumElements() * getSlotCapacity()) { + HashIndexUtils::getNumRequiredEntries(this->indexHeader->numEntries, buffer.size()); + while (numRequiredEntries > pSlots->getNumElements() * getSlotCapacity()) { this->splitSlot(*this->indexHeader); } // Do both searches after splitting. Returning early if the key already exists isn't a // particular concern and doing both after splitting allows the slotID to be reused - auto hashValue = HashIndexUtils::hash(key); - auto fingerprint = HashIndexUtils::getFingerprintForHash(hashValue); - auto slotID = HashIndexUtils::getPrimarySlotIdForHash(*this->indexHeader, hashValue); + common::hash_t hashes[BUFFER_SIZE]; + for (size_t i = 0; i < buffer.size(); i++) { + hashes[i] = HashIndexUtils::hash(buffer[i].first); + } + for (size_t i = 0; i < buffer.size(); i++) { + auto& [key, value] = buffer[i]; + if (!appendInternal(key, value, hashes[i])) { + return i; + } + } + return buffer.size(); +} + +template +bool HashIndexBuilder::appendInternal(Key key, common::offset_t value, common::hash_t hash) { + auto fingerprint = HashIndexUtils::getFingerprintForHash(hash); + auto slotID = HashIndexUtils::getPrimarySlotIdForHash(*this->indexHeader, hash); SlotIterator iter(slotID, this); do { - for (auto entryPos = 0u; entryPos < getSlotCapacity(); entryPos++) { + for (auto entryPos = 0u; entryPos < getSlotCapacity(); entryPos++) { if (!iter.slot->header.isEntryValid(entryPos)) { // Insert to this position // The builder never keeps holes and doesn't support deletions, so this must be the @@ -130,7 +143,7 @@ bool HashIndexBuilder::append(T key, offset_t value) { this->indexHeader->numEntries++; return true; } else if (iter.slot->header.fingerprints[entryPos] == fingerprint && - equals(key, *(S*)iter.slot->entries[entryPos].data)) { + equals(key, *(T*)iter.slot->entries[entryPos].data)) { // Value already exists return false; } @@ -142,17 +155,17 @@ bool HashIndexBuilder::append(T key, offset_t value) { return true; } -template -bool HashIndexBuilder::lookup(T key, offset_t& result) { +template +bool HashIndexBuilder::lookup(Key key, offset_t& result) { auto hashValue = HashIndexUtils::hash(key); auto fingerprint = HashIndexUtils::getFingerprintForHash(hashValue); auto slotId = HashIndexUtils::getPrimarySlotIdForHash(*this->indexHeader, hashValue); SlotIterator iter(slotId, this); do { - for (auto entryPos = 0u; entryPos < getSlotCapacity(); entryPos++) { + for (auto entryPos = 0u; entryPos < getSlotCapacity(); entryPos++) { if (iter.slot->header.isEntryValid(entryPos) && iter.slot->header.fingerprints[entryPos] == fingerprint && - equals(key, *(S*)iter.slot->entries[entryPos].data)) { + equals(key, *(T*)iter.slot->entries[entryPos].data)) { // Value already exists result = *(common::offset_t*)(iter.slot->entries[entryPos].data + this->indexHeader->numBytesPerKey); @@ -163,24 +176,24 @@ bool HashIndexBuilder::lookup(T key, offset_t& result) { return false; } -template -uint32_t HashIndexBuilder::allocatePSlots(uint32_t numSlotsToAllocate) { +template +uint32_t HashIndexBuilder::allocatePSlots(uint32_t numSlotsToAllocate) { auto oldNumSlots = pSlots->getNumElements(); auto newNumSlots = oldNumSlots + numSlotsToAllocate; pSlots->resize(newNumSlots, true /* setToZero */); return oldNumSlots; } -template -uint32_t HashIndexBuilder::allocateAOSlot() { +template +uint32_t HashIndexBuilder::allocateAOSlot() { auto oldNumSlots = oSlots->getNumElements(); auto newNumSlots = oldNumSlots + 1; oSlots->resize(newNumSlots, true /* setToZero */); return oldNumSlots; } -template -Slot* HashIndexBuilder::getSlot(const SlotInfo& slotInfo) { +template +Slot* HashIndexBuilder::getSlot(const SlotInfo& slotInfo) { if (slotInfo.slotType == SlotType::PRIMARY) { return &pSlots->operator[](slotInfo.slotId); } else { @@ -188,21 +201,21 @@ Slot* HashIndexBuilder::getSlot(const SlotInfo& slotInfo) { } } -template -void HashIndexBuilder::flush() { +template +void HashIndexBuilder::flush() { headerArray->resize(1, true /* setToZero */); headerArray->operator[](0) = *this->indexHeader; headerArray->saveToDisk(); pSlots->saveToDisk(); oSlots->saveToDisk(); - if constexpr (std::is_same_v) { + if constexpr (std::is_same_v) { inMemOverflowFile->flush(); } } -template -inline void HashIndexBuilder::insertToNewOvfSlot( - T key, Slot* previousSlot, common::offset_t offset, uint8_t fingerprint) { +template +inline void HashIndexBuilder::insertToNewOvfSlot( + Key key, Slot* previousSlot, common::offset_t offset, uint8_t fingerprint) { auto newSlotId = allocateAOSlot(); previousSlot->header.nextOvfSlotId = newSlotId; auto newSlot = getSlot(SlotInfo{newSlotId, SlotType::OVF}); @@ -211,28 +224,27 @@ inline void HashIndexBuilder::insertToNewOvfSlot( } template<> -void HashIndexBuilder::insert(std::string_view key, - Slot* slot, uint8_t entryPos, offset_t offset, uint8_t fingerprint) { +void HashIndexBuilder::insert(std::string_view key, Slot* slot, + uint8_t entryPos, offset_t offset, uint8_t fingerprint) { auto entry = slot->entries[entryPos].data; inMemOverflowFile->appendString(key, *(ku_string_t*)entry); memcpy(entry + NUM_BYTES_FOR_STRING_KEY, &offset, sizeof(common::offset_t)); slot->header.setEntryValid(entryPos, fingerprint); } -template -common::hash_t HashIndexBuilder::hashStored(const S& key) const { +template +common::hash_t HashIndexBuilder::hashStored(const T& key) const { return HashIndexUtils::hash(key); } template<> -common::hash_t HashIndexBuilder::hashStored( - const ku_string_t& key) const { +common::hash_t HashIndexBuilder::hashStored(const ku_string_t& key) const { auto kuString = key; return HashIndexUtils::hash(inMemOverflowFile->readString(&kuString)); } template<> -bool HashIndexBuilder::equals( +bool HashIndexBuilder::equals( std::string_view keyToLookup, const ku_string_t& keyInEntry) const { // Checks if prefix and len matches first. if (!HashIndexUtils::areStringPrefixAndLenEqual(keyToLookup, keyInEntry)) { @@ -262,7 +274,7 @@ template class HashIndexBuilder; template class HashIndexBuilder; template class HashIndexBuilder; template class HashIndexBuilder; -template class HashIndexBuilder; +template class HashIndexBuilder; PrimaryKeyIndexBuilder::PrimaryKeyIndexBuilder( const std::string& fName, PhysicalTypeID keyDataType, VirtualFileSystem* vfs) @@ -281,9 +293,8 @@ PrimaryKeyIndexBuilder::PrimaryKeyIndexBuilder( for (auto i = 0u; i < NUM_HASH_INDEXES; i++) { auto overflowFile = std::make_unique(overflowFileInfo, overflowPageCounter); - hashIndexBuilders.push_back( - std::make_unique>( - fileHandle, std::move(overflowFile), i, keyDataType)); + hashIndexBuilders.push_back(std::make_unique>( + fileHandle, std::move(overflowFile), i, keyDataType)); } } else if constexpr (HashablePrimitive) { for (auto i = 0u; i < NUM_HASH_INDEXES; i++) {