From faa57564dfbd3acbc7955f1ab21f2e7309bb4229 Mon Sep 17 00:00:00 2001 From: Guodong Jin Date: Tue, 23 May 2023 00:43:23 -0400 Subject: [PATCH] Change hash index concurrent build --- CMakeLists.txt | 2 +- src/binder/bound_statement_result.cpp | 2 +- src/include/storage/index/hash_index_builder.h | 16 +++++----------- src/include/storage/storage_info.h | 3 ++- src/storage/copier/node_copier.cpp | 2 ++ src/storage/copier/node_copy_executor.cpp | 2 -- src/storage/index/hash_index_builder.cpp | 11 ----------- 7 files changed, 11 insertions(+), 27 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4b8e191448..c5d6b1540c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.11) -project(Kuzu VERSION 0.0.3.4 LANGUAGES CXX) +project(Kuzu VERSION 0.0.3.5 LANGUAGES CXX) find_package(Threads REQUIRED) diff --git a/src/binder/bound_statement_result.cpp b/src/binder/bound_statement_result.cpp index 52a57aed3f..d8b90e6d35 100644 --- a/src/binder/bound_statement_result.cpp +++ b/src/binder/bound_statement_result.cpp @@ -7,7 +7,7 @@ namespace binder { std::unique_ptr BoundStatementResult::createSingleStringColumnResult() { auto result = std::make_unique(); - auto columnName = std::string("outputMsg"); + auto columnName = std::string("result"); auto value = std::make_unique(columnName); auto stringColumn = std::make_shared(std::move(value), columnName); result->addColumn(stringColumn, expression_vector{stringColumn}); diff --git a/src/include/storage/index/hash_index_builder.h b/src/include/storage/index/hash_index_builder.h index 4f471f2037..4e69d035a7 100644 --- a/src/include/storage/index/hash_index_builder.h +++ b/src/include/storage/index/hash_index_builder.h @@ -107,17 +107,6 @@ class HashIndexBuilder : public BaseHashIndex { uint32_t allocatePSlots(uint32_t numSlotsToAllocate); uint32_t allocateAOSlot(); - inline void lockSlot(SlotInfo& slotInfo) { - assert(slotInfo.slotType == SlotType::PRIMARY); - std::shared_lock sLck{pSlotSharedMutex}; - pSlotsMutexes[slotInfo.slotId]->lock(); - } - inline void unlockSlot(const SlotInfo& slotInfo) { - assert(slotInfo.slotType == SlotType::PRIMARY); - std::shared_lock sLck{pSlotSharedMutex}; - pSlotsMutexes[slotInfo.slotId]->unlock(); - } - private: std::unique_ptr fileHandle; std::unique_ptr> headerArray; @@ -151,6 +140,10 @@ class PrimaryKeyIndexBuilder { } } + inline void lock() { mtx.lock(); } + + inline void unlock() { mtx.unlock(); } + inline void bulkReserve(uint32_t numEntries) { keyDataTypeID == common::LogicalTypeID::INT64 ? hashIndexBuilderForInt64->bulkReserve(numEntries) : @@ -189,6 +182,7 @@ class PrimaryKeyIndexBuilder { } private: + std::mutex mtx; common::LogicalTypeID keyDataTypeID; std::unique_ptr> hashIndexBuilderForInt64; std::unique_ptr> hashIndexBuilderForString; diff --git a/src/include/storage/storage_info.h b/src/include/storage/storage_info.h index 8ab450f7cb..ac2be14102 100644 --- a/src/include/storage/storage_info.h +++ b/src/include/storage/storage_info.h @@ -12,7 +12,8 @@ using storage_version_t = uint64_t; struct StorageVersionInfo { static std::unordered_map getStorageVersionInfo() { - return {{"0.0.3.4", 5}, {"0.0.3.3", 4}, {"0.0.3.2", 3}, {"0.0.3.1", 2}, {"0.0.3", 1}}; + return {{"0.0.3.5", 6}, {"0.0.3.4", 5}, {"0.0.3.3", 4}, {"0.0.3.2", 3}, {"0.0.3.1", 2}, + {"0.0.3", 1}}; } static storage_version_t getStorageVersion(); diff --git a/src/storage/copier/node_copier.cpp b/src/storage/copier/node_copier.cpp index 5d319178dd..bb800761a5 100644 --- a/src/storage/copier/node_copier.cpp +++ b/src/storage/copier/node_copier.cpp @@ -100,6 +100,7 @@ void NodeCopier::populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* ove } } // No nulls, so we can populate the index with actual values. + pkIndex->lock(); switch (chunk->getDataType().getLogicalTypeID()) { case LogicalTypeID::INT64: { appendToPKIndex(chunk, startOffset, numValues); @@ -112,6 +113,7 @@ void NodeCopier::populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* ove throw CopyException("Primary key must be either INT64, STRING or SERIAL."); } } + pkIndex->unlock(); } template<> diff --git a/src/storage/copier/node_copy_executor.cpp b/src/storage/copier/node_copy_executor.cpp index a5c12914fd..7c7276cbbd 100644 --- a/src/storage/copier/node_copy_executor.cpp +++ b/src/storage/copier/node_copy_executor.cpp @@ -27,7 +27,6 @@ static column_id_t getPKColumnID( } void NodeCopyExecutor::populateColumns(processor::ExecutionContext* executionContext) { - logger->info("Populating properties"); auto primaryKey = reinterpret_cast(tableSchema)->getPrimaryKey(); std::unique_ptr pkIndex; if (primaryKey.dataType.getLogicalTypeID() != common::LogicalTypeID::SERIAL) { @@ -80,7 +79,6 @@ void NodeCopyExecutor::populateColumns(processor::ExecutionContext* executionCon for (auto& task : tasks) { taskScheduler.scheduleTaskAndWaitOrError(task, executionContext); } - logger->info("Done populating properties, constructing the pk index."); } } // namespace storage diff --git a/src/storage/index/hash_index_builder.cpp b/src/storage/index/hash_index_builder.cpp index bc6619c0c3..ad5a973230 100644 --- a/src/storage/index/hash_index_builder.cpp +++ b/src/storage/index/hash_index_builder.cpp @@ -62,12 +62,10 @@ bool HashIndexBuilder::appendInternal(const uint8_t* key, offset_t value) { SlotInfo pSlotInfo{getPrimarySlotIdForKey(*indexHeader, key), SlotType::PRIMARY}; auto currentSlotInfo = pSlotInfo; Slot* currentSlot = nullptr; - lockSlot(pSlotInfo); while (currentSlotInfo.slotType == SlotType::PRIMARY || currentSlotInfo.slotId != 0) { currentSlot = getSlot(currentSlotInfo); if (lookupOrExistsInSlotWithoutLock(currentSlot, key)) { // Key already exists. No append is allowed. - unlockSlot(pSlotInfo); return false; } if (currentSlot->header.numEntries < HashIndexConstants::SLOT_CAPACITY) { @@ -78,7 +76,6 @@ bool HashIndexBuilder::appendInternal(const uint8_t* key, offset_t value) { } assert(currentSlot); insertToSlotWithoutLock(currentSlot, key, value); - unlockSlot(pSlotInfo); numEntries.fetch_add(1); return true; } @@ -101,20 +98,14 @@ bool HashIndexBuilder::lookupInternalWithoutLock(const uint8_t* key, offset_t template uint32_t HashIndexBuilder::allocatePSlots(uint32_t numSlotsToAllocate) { - std::unique_lock xLock{pSlotSharedMutex}; auto oldNumSlots = pSlots->getNumElements(); auto newNumSlots = oldNumSlots + numSlotsToAllocate; pSlots->resize(newNumSlots, true /* setToZero */); - pSlotsMutexes.resize(newNumSlots); - for (auto i = oldNumSlots; i < newNumSlots; i++) { - pSlotsMutexes[i] = std::make_unique(); - } return oldNumSlots; } template uint32_t HashIndexBuilder::allocateAOSlot() { - std::unique_lock xLock{oSlotsSharedMutex}; auto oldNumSlots = oSlots->getNumElements(); auto newNumSlots = oldNumSlots + 1; oSlots->resize(newNumSlots, true /* setToZero */); @@ -124,10 +115,8 @@ uint32_t HashIndexBuilder::allocateAOSlot() { template Slot* HashIndexBuilder::getSlot(const SlotInfo& slotInfo) { if (slotInfo.slotType == SlotType::PRIMARY) { - std::shared_lock sLck{pSlotSharedMutex}; return &pSlots->operator[](slotInfo.slotId); } else { - std::shared_lock sLck{oSlotsSharedMutex}; return &oSlots->operator[](slotInfo.slotId); } }