diff --git a/src/common/file_system/local_file_system.cpp b/src/common/file_system/local_file_system.cpp index f804d897da0..9da6d816bb2 100644 --- a/src/common/file_system/local_file_system.cpp +++ b/src/common/file_system/local_file_system.cpp @@ -1,5 +1,7 @@ #include "common/file_system/local_file_system.h" +#include + #if defined(_WIN32) #include #include @@ -57,7 +59,7 @@ std::unique_ptr LocalFileSystem::openFile( #else int fd = open(path.c_str(), flags, 0644); if (fd == -1) { - throw Exception("Cannot open file: " + path); + throw Exception(stringFormat("Cannot open file {}: {}", path, posixErrMessage())); } if (lock_type != FileLockType::NO_LOCK) { struct flock fl; diff --git a/src/include/common/copy_constructors.h b/src/include/common/copy_constructors.h index 6a7f9e8a7a7..f22518ba4c6 100644 --- a/src/include/common/copy_constructors.h +++ b/src/include/common/copy_constructors.h @@ -2,6 +2,8 @@ // This file defines many macros for controlling copy constructors and move constructors on classes. +// NOLINTBEGIN(bugprone-macro-parentheses): Although this is a good check in general, here, we +// cannot add parantheses around the arguments, for it would be invalid syntax. #define DELETE_COPY_CONSTRUCT(Object) Object(const Object& other) = delete #define DELETE_COPY_ASSN(Object) Object& operator=(const Object& other) = delete #define DELETE_MOVE_CONSTRUCT(Object) Object(Object&& other) = delete @@ -56,3 +58,5 @@ #define DELETE_COPY_AND_MOVE(Object) \ DELETE_BOTH_COPY(Object); \ DELETE_BOTH_MOVE(Object) + +// NOLINTEND(bugprone-macro-parentheses) diff --git a/src/include/main/database.h b/src/include/main/database.h index 5677c869f1c..a90698501d4 100644 --- a/src/include/main/database.h +++ b/src/include/main/database.h @@ -44,6 +44,7 @@ struct KUZU_API SystemConfig { */ class Database { friend class EmbeddedShell; + friend class ClientContext; friend class Connection; friend class StorageDriver; friend class kuzu::testing::BaseGraphTest; diff --git a/src/include/processor/operator/persistent/copy_node.h b/src/include/processor/operator/persistent/copy_node.h index 0faee2af03d..202edbdc1f4 100644 --- a/src/include/processor/operator/persistent/copy_node.h +++ b/src/include/processor/operator/persistent/copy_node.h @@ -4,6 +4,7 @@ #include "processor/operator/aggregate/hash_aggregate.h" #include "processor/operator/call/in_query_call.h" +#include "processor/operator/persistent/index_builder.h" #include "processor/operator/sink.h" #include "storage/store/node_group.h" #include "storage/store/node_table.h" @@ -18,19 +19,22 @@ class CopyNodeSharedState { public: CopyNodeSharedState() - : indexBuilder{nullptr}, readerSharedState{nullptr}, distinctSharedState{nullptr}, - currentNodeGroupIdx{0}, sharedNodeGroup{nullptr} {}; + : readerSharedState{nullptr}, distinctSharedState{nullptr}, currentNodeGroupIdx{0}, + sharedNodeGroup{nullptr} {}; - void init(common::VirtualFileSystem* vfs); + void init(); inline common::offset_t getNextNodeGroupIdx() { - std::unique_lock lck{mtx}; + std::unique_lock lck{mtx}; return getNextNodeGroupIdxWithoutLock(); } inline uint64_t getCurNodeGroupIdx() const { return currentNodeGroupIdx; } - void appendLocalNodeGroup(std::unique_ptr localNodeGroup); + void appendIncompleteNodeGroup(std::unique_ptr localNodeGroup, + std::optional& indexBuilder); + + void addLastNodeGroup(std::optional& indexBuilder); private: inline common::offset_t getNextNodeGroupIdxWithoutLock() { return currentNodeGroupIdx++; } @@ -89,10 +93,12 @@ class CopyNode : public Sink { public: CopyNode(std::shared_ptr sharedState, std::unique_ptr info, std::unique_ptr resultSetDescriptor, - std::unique_ptr child, uint32_t id, const std::string& paramsString) + std::unique_ptr child, uint32_t id, const std::string& paramsString, + std::optional indexBuilder = std::nullopt) : Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_NODE, std::move(child), id, paramsString}, - sharedState{std::move(sharedState)}, info{std::move(info)} {} + sharedState{std::move(sharedState)}, info{std::move(info)}, + indexBuilder(std::move(indexBuilder)) {} inline std::shared_ptr getSharedState() const { return sharedState; } @@ -108,41 +114,30 @@ class CopyNode : public Sink { inline std::unique_ptr clone() final { return std::make_unique(sharedState, info->copy(), resultSetDescriptor->copy(), - children[0]->clone(), id, paramsString); + children[0]->clone(), id, paramsString, + indexBuilder ? std::make_optional(indexBuilder->clone()) : std::nullopt); } static void writeAndResetNodeGroup(common::node_group_idx_t nodeGroupIdx, - storage::PrimaryKeyIndexBuilder* pkIndex, common::column_id_t pkColumnID, + std::optional& indexBuilder, common::column_id_t pkColumnID, storage::NodeTable* table, storage::NodeGroup* nodeGroup); private: - static void populatePKIndex(storage::PrimaryKeyIndexBuilder* pkIndex, - storage::ColumnChunk* chunk, common::offset_t startNodeOffset, common::offset_t numNodes); - static void checkNonNullConstraint( - storage::NullColumnChunk* nullChunk, common::offset_t numNodes); - - template - static uint64_t appendToPKIndex(storage::PrimaryKeyIndexBuilder* pkIndex, - storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes); - void copyToNodeGroup(); + void initGlobalIndexBuilder(ExecutionContext* context); + void initLocalIndexBuilder(ExecutionContext* context); protected: std::shared_ptr sharedState; std::unique_ptr info; + std::optional indexBuilder; + common::DataChunkState* columnState; std::vector> nullColumnVectors; std::vector columnVectors; std::unique_ptr localNodeGroup; }; -template<> -uint64_t CopyNode::appendToPKIndex(storage::PrimaryKeyIndexBuilder* pkIndex, - storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes); -template<> -uint64_t CopyNode::appendToPKIndex(storage::PrimaryKeyIndexBuilder* pkIndex, - storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes); - } // namespace processor } // namespace kuzu diff --git a/src/include/processor/operator/persistent/index_builder.h b/src/include/processor/operator/persistent/index_builder.h new file mode 100644 index 00000000000..1e3c6d15a27 --- /dev/null +++ b/src/include/processor/operator/persistent/index_builder.h @@ -0,0 +1,135 @@ +#pragma once + +#include + +#include "common/copy_constructors.h" +#include "common/mpsc_queue.h" +#include "common/static_vector.h" +#include "common/types/internal_id_t.h" +#include "common/types/types.h" +#include "processor/execution_context.h" +#include "storage/index/hash_index_builder.h" +#include "storage/store/column_chunk.h" + +namespace kuzu { +namespace processor { + +constexpr size_t BUFFER_SIZE = 1024; +using IntBuffer = common::StaticVector, BUFFER_SIZE>; +using StringBuffer = common::StaticVector, BUFFER_SIZE>; + +class IndexBuilderGlobalQueues { +public: + explicit IndexBuilderGlobalQueues(std::unique_ptr pkIndex); + + void flushToDisk() const; + + void insert(size_t index, StringBuffer elem); + void insert(size_t index, IntBuffer elem); + + void consume(); + + common::LogicalTypeID pkTypeID() const { return pkIndex->keyTypeID(); } + +private: + void maybeConsumeIndex(size_t index); + + std::array mutexes; + std::unique_ptr pkIndex; + + using StringQueues = std::array, storage::NUM_HASH_INDEXES>; + using IntQueues = std::array, storage::NUM_HASH_INDEXES>; + + // Queues for distributing primary keys. + std::variant queues; +}; + +class IndexBuilderLocalBuffers { +public: + explicit IndexBuilderLocalBuffers(IndexBuilderGlobalQueues& globalQueues); + + void insert(std::string key, common::offset_t value); + void insert(int64_t key, common::offset_t value); + + void flush(); + +private: + IndexBuilderGlobalQueues* globalQueues; + + // These arrays are much too large to be inline. + using StringBuffers = std::array; + using IntBuffers = std::array; + std::unique_ptr stringBuffers; + std::unique_ptr intBuffers; +}; + +class IndexBuilderSharedState { + friend class IndexBuilder; + +public: + explicit IndexBuilderSharedState(std::unique_ptr pkIndex); + void consume() { globalQueues.consume(); } + void flush() { globalQueues.flushToDisk(); } + + void addProducer() { producers.fetch_add(1, std::memory_order_relaxed); } + void quitProducer(); + bool isDone() { return done.load(std::memory_order_relaxed); } + +private: + IndexBuilderGlobalQueues globalQueues; + + std::atomic producers; + std::atomic done; +}; + +// RAII for producer counting. +class ProducerToken { +public: + explicit ProducerToken(std::shared_ptr sharedState) + : sharedState(std::move(sharedState)) { + this->sharedState->addProducer(); + } + DELETE_COPY_DEFAULT_MOVE(ProducerToken); + + void quit() { + sharedState->quitProducer(); + sharedState.reset(); + } + ~ProducerToken() { + if (sharedState) { + quit(); + } + } + +private: + std::shared_ptr sharedState; +}; + +class IndexBuilder { + explicit IndexBuilder(std::shared_ptr sharedState); + +public: + DELETE_COPY_DEFAULT_MOVE(IndexBuilder); + explicit IndexBuilder(std::unique_ptr pkIndex); + + IndexBuilder clone() { return IndexBuilder(sharedState); } + + void initGlobalStateInternal(ExecutionContext* /*context*/) {} + void initLocalStateInternal(ExecutionContext* /*context*/) {} + void insert( + storage::ColumnChunk* chunk, common::offset_t nodeOffset, common::offset_t numNodes); + + ProducerToken getProducerToken() const { return ProducerToken(sharedState); } + + void finishedProducing(); + void finalize(ExecutionContext* context); + +private: + void checkNonNullConstraint(storage::NullColumnChunk* nullChunk, common::offset_t numNodes); + std::shared_ptr sharedState; + + IndexBuilderLocalBuffers localBuffers; +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/storage/index/hash_index_builder.h b/src/include/storage/index/hash_index_builder.h index ec328d28fa6..4cc44f7d1e6 100644 --- a/src/include/storage/index/hash_index_builder.h +++ b/src/include/storage/index/hash_index_builder.h @@ -13,7 +13,7 @@ namespace storage { static constexpr common::page_idx_t INDEX_HEADER_ARRAY_HEADER_PAGE_IDX = 0; static constexpr common::page_idx_t P_SLOTS_HEADER_PAGE_IDX = 1; static constexpr common::page_idx_t O_SLOTS_HEADER_PAGE_IDX = 2; -static constexpr common::page_idx_t HEADER_PAGES = 3; +static constexpr common::page_idx_t NUM_HEADER_PAGES = 3; static constexpr uint64_t INDEX_HEADER_IDX_IN_ARRAY = 0; /** diff --git a/src/main/client_context.cpp b/src/main/client_context.cpp index ef113e24852..d719e246620 100644 --- a/src/main/client_context.cpp +++ b/src/main/client_context.cpp @@ -1,7 +1,5 @@ #include "main/client_context.h" -#include - #include "common/constants.h" #include "common/exception/runtime.h" #include "main/database.h" @@ -22,7 +20,7 @@ void ActiveQuery::reset() { } ClientContext::ClientContext(Database* database) - : numThreadsForExecution{std::thread::hardware_concurrency()}, + : numThreadsForExecution{database->systemConfig.maxNumThreads}, timeoutInMS{ClientContextConstants::TIMEOUT_IN_MS}, varLengthExtendMaxDepth{DEFAULT_VAR_LENGTH_EXTEND_MAX_DEPTH}, enableSemiMask{ DEFAULT_ENABLE_SEMI_MASK} { diff --git a/src/processor/map/map_copy_from.cpp b/src/processor/map/map_copy_from.cpp index 737aa3e9bf7..91172b6e0c0 100644 --- a/src/processor/map/map_copy_from.cpp +++ b/src/processor/map/map_copy_from.cpp @@ -43,11 +43,9 @@ std::unique_ptr PlanMapper::mapCopyFrom(LogicalOperator* logic } case TableType::RDF: return mapCopyRdfFrom(logicalOperator); - // LCOV_EXCL_START default: KU_UNREACHABLE; } - // LCOV_EXCL_STOP } static void getNodeColumnsInCopyOrder( diff --git a/src/processor/operator/persistent/CMakeLists.txt b/src/processor/operator/persistent/CMakeLists.txt index efa1ac9966e..918cd5ad428 100644 --- a/src/processor/operator/persistent/CMakeLists.txt +++ b/src/processor/operator/persistent/CMakeLists.txt @@ -11,6 +11,7 @@ add_library(kuzu_processor_operator_persistent copy_to_parquet.cpp delete.cpp delete_executor.cpp + index_builder.cpp insert.cpp insert_executor.cpp merge.cpp diff --git a/src/processor/operator/persistent/copy_node.cpp b/src/processor/operator/persistent/copy_node.cpp index 86179b94cbe..395d81f4629 100644 --- a/src/processor/operator/persistent/copy_node.cpp +++ b/src/processor/operator/persistent/copy_node.cpp @@ -1,13 +1,10 @@ #include "processor/operator/persistent/copy_node.h" -#include - #include "common/exception/copy.h" #include "common/exception/message.h" #include "common/string_format.h" #include "function/table_functions/scan_functions.h" #include "processor/result/factorized_table.h" -#include "storage/store/string_column_chunk.h" using namespace kuzu::catalog; using namespace kuzu::common; @@ -16,27 +13,13 @@ using namespace kuzu::storage; namespace kuzu { namespace processor { -void CopyNodeSharedState::init(VirtualFileSystem* vfs) { - if (pkType != *LogicalType::SERIAL()) { - auto indexFName = StorageUtils::getNodeIndexFName( - vfs, wal->getDirectory(), table->getTableID(), FileVersionType::ORIGINAL); - indexBuilder = std::make_shared(indexFName, pkType, vfs); - uint64_t numRows; - if (readerSharedState != nullptr) { - KU_ASSERT(distinctSharedState == nullptr); - auto sharedState = - reinterpret_cast(readerSharedState->sharedState.get()); - numRows = sharedState->numRows; - } else { - numRows = distinctSharedState->getFactorizedTable()->getNumTuples(); - } - indexBuilder->bulkReserve(numRows); - } +void CopyNodeSharedState::init() { wal->logCopyTableRecord(table->getTableID(), TableType::NODE); wal->flushAllPages(); } -void CopyNodeSharedState::appendLocalNodeGroup(std::unique_ptr localNodeGroup) { +void CopyNodeSharedState::appendIncompleteNodeGroup( + std::unique_ptr localNodeGroup, std::optional& indexBuilder) { std::unique_lock xLck{mtx}; if (!sharedNodeGroup) { sharedNodeGroup = std::move(localNodeGroup); @@ -47,7 +30,7 @@ void CopyNodeSharedState::appendLocalNodeGroup(std::unique_ptr localN if (sharedNodeGroup->isFull()) { auto nodeGroupIdx = getNextNodeGroupIdxWithoutLock(); CopyNode::writeAndResetNodeGroup( - nodeGroupIdx, indexBuilder.get(), pkColumnIdx, table, sharedNodeGroup.get()); + nodeGroupIdx, indexBuilder, pkColumnIdx, table, sharedNodeGroup.get()); } if (numNodesAppended < localNodeGroup->getNumRows()) { sharedNodeGroup->append(localNodeGroup.get(), numNodesAppended); @@ -64,16 +47,45 @@ void CopyNode::initGlobalStateInternal(ExecutionContext* context) { if (!isEmptyTable(info->table)) { throw CopyException(ExceptionMessage::notAllowCopyOnNonEmptyTableException()); } - sharedState->init(context->vfs); + sharedState->init(); + initGlobalIndexBuilder(context); +} + +void CopyNode::initGlobalIndexBuilder(ExecutionContext* context) { + if (sharedState->pkType.getLogicalTypeID() != LogicalTypeID::SERIAL) { + auto indexFName = + StorageUtils::getNodeIndexFName(context->vfs, sharedState->wal->getDirectory(), + sharedState->table->getTableID(), FileVersionType::ORIGINAL); + auto pkIndex = + std::make_unique(indexFName, sharedState->pkType, context->vfs); + uint64_t numRows; + if (sharedState->readerSharedState != nullptr) { + KU_ASSERT(sharedState->distinctSharedState == nullptr); + auto scanSharedState = reinterpret_cast( + sharedState->readerSharedState->sharedState.get()); + numRows = scanSharedState->numRows; + } else { + numRows = sharedState->distinctSharedState->getFactorizedTable()->getNumTuples(); + } + pkIndex->bulkReserve(numRows); + + indexBuilder = IndexBuilder(std::move(pkIndex)); + indexBuilder->initGlobalStateInternal(context); + } } -void CopyNode::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* /*context*/) { +void CopyNode::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) { std::shared_ptr state; for (auto& pos : info->columnPositions) { if (pos.isValid()) { state = resultSet->getValueVector(pos)->state; } } + + if (indexBuilder) { + indexBuilder->initLocalStateInternal(context); + } + KU_ASSERT(state != nullptr); for (auto i = 0u; i < info->columnPositions.size(); ++i) { auto pos = info->columnPositions[i]; @@ -94,74 +106,40 @@ void CopyNode::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* /* } void CopyNode::executeInternal(ExecutionContext* context) { + std::optional token; + if (indexBuilder) { + token = indexBuilder->getProducerToken(); + } + while (children[0]->getNextTuple(context)) { auto originalSelVector = columnState->selVector; copyToNodeGroup(); columnState->selVector = std::move(originalSelVector); } if (localNodeGroup->getNumRows() > 0) { - sharedState->appendLocalNodeGroup(std::move(localNodeGroup)); + sharedState->appendIncompleteNodeGroup(std::move(localNodeGroup), indexBuilder); + } + if (indexBuilder) { + KU_ASSERT(token); + token->quit(); + indexBuilder->finishedProducing(); } } void CopyNode::writeAndResetNodeGroup(node_group_idx_t nodeGroupIdx, - PrimaryKeyIndexBuilder* pkIndex, column_id_t pkColumnID, NodeTable* table, + std::optional& indexBuilder, column_id_t pkColumnID, NodeTable* table, NodeGroup* nodeGroup) { + nodeGroup->finalize(nodeGroupIdx); - auto startOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); - if (pkIndex) { - populatePKIndex(pkIndex, nodeGroup->getColumnChunk(pkColumnID), startOffset, - nodeGroup->getNumRows() /* startPageIdx */); + if (indexBuilder) { + auto nodeOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); + indexBuilder->insert( + nodeGroup->getColumnChunk(pkColumnID), nodeOffset, nodeGroup->getNumRows()); } table->append(nodeGroup); nodeGroup->resetToEmpty(); } -void CopyNode::populatePKIndex( - PrimaryKeyIndexBuilder* pkIndex, ColumnChunk* chunk, offset_t startOffset, offset_t numNodes) { - checkNonNullConstraint(chunk->getNullChunk(), numNodes); - std::optional errorPKValueStr; - pkIndex->lock(); - try { - switch (chunk->getDataType()->getPhysicalType()) { - case PhysicalTypeID::INT64: { - auto numAppendedNodes = appendToPKIndex(pkIndex, chunk, startOffset, numNodes); - if (numAppendedNodes < numNodes) { - // TODO(bmwinger): This should be tested where there are multiple node groups - errorPKValueStr = std::to_string(chunk->getValue(numAppendedNodes)); - } - } break; - case PhysicalTypeID::STRING: { - auto numAppendedNodes = - appendToPKIndex(pkIndex, chunk, startOffset, numNodes); - if (numAppendedNodes < numNodes) { - // TODO(bmwinger): This should be tested where there are multiple node groups - errorPKValueStr = - static_cast(chunk)->getValue(numAppendedNodes); - } - } break; - default: { - throw CopyException(ExceptionMessage::invalidPKType(chunk->getDataType()->toString())); - } - } - } catch (Exception& e) { - pkIndex->unlock(); - throw; - } - pkIndex->unlock(); - if (errorPKValueStr) { - throw CopyException(ExceptionMessage::existedPKException(*errorPKValueStr)); - } -} - -void CopyNode::checkNonNullConstraint(NullColumnChunk* nullChunk, offset_t numNodes) { - for (auto posInChunk = 0u; posInChunk < numNodes; posInChunk++) { - if (nullChunk->isNull(posInChunk)) { - throw CopyException(ExceptionMessage::nullPKException()); - } - } -} - void CopyNodeSharedState::calculateNumTuples() { numTuples = StorageUtils::getStartOffsetOfNodeGroup(getCurNodeGroupIdx()); if (sharedNodeGroup) { @@ -173,14 +151,15 @@ void CopyNode::finalize(ExecutionContext* context) { sharedState->calculateNumTuples(); if (sharedState->sharedNodeGroup) { auto nodeGroupIdx = sharedState->getNextNodeGroupIdx(); - writeAndResetNodeGroup(nodeGroupIdx, sharedState->indexBuilder.get(), - sharedState->pkColumnIdx, sharedState->table, sharedState->sharedNodeGroup.get()); - } - if (sharedState->indexBuilder) { - sharedState->indexBuilder->flush(); + writeAndResetNodeGroup(nodeGroupIdx, indexBuilder, sharedState->pkColumnIdx, + sharedState->table, sharedState->sharedNodeGroup.get()); } sharedState->table->getNodeStatisticsAndDeletedIDs()->setNumTuplesForTable( sharedState->table->getTableID(), sharedState->numTuples); + if (indexBuilder) { + indexBuilder->finalize(context); + } + for (auto relTable : info->fwdRelTables) { relTable->resizeColumns(context->clientContext->getTx(), RelDataDirection::FWD, sharedState->getCurNodeGroupIdx()); @@ -195,33 +174,6 @@ void CopyNode::finalize(ExecutionContext* context) { sharedState->fTable.get(), outputMsg, context->memoryManager); } -template<> -uint64_t CopyNode::appendToPKIndex( - PrimaryKeyIndexBuilder* pkIndex, ColumnChunk* chunk, offset_t startOffset, uint64_t numValues) { - for (auto i = 0u; i < numValues; i++) { - auto offset = i + startOffset; - auto value = chunk->getValue(i); - if (!pkIndex->append(value, offset)) { - return i; - } - } - return numValues; -} - -template<> -uint64_t CopyNode::appendToPKIndex( - PrimaryKeyIndexBuilder* pkIndex, ColumnChunk* chunk, offset_t startOffset, uint64_t numValues) { - auto stringColumnChunk = (StringColumnChunk*)chunk; - for (auto i = 0u; i < numValues; i++) { - auto offset = i + startOffset; - auto value = stringColumnChunk->getValue(i); - if (!pkIndex->append(value.c_str(), offset)) { - return i; - } - } - return numValues; -} - void CopyNode::copyToNodeGroup() { auto numAppendedTuples = 0ul; auto numTuplesToAppend = columnState->getNumSelectedValues(); @@ -232,8 +184,8 @@ void CopyNode::copyToNodeGroup() { if (localNodeGroup->isFull()) { node_group_idx_t nodeGroupIdx; nodeGroupIdx = sharedState->getNextNodeGroupIdx(); - writeAndResetNodeGroup(nodeGroupIdx, sharedState->indexBuilder.get(), - sharedState->pkColumnIdx, sharedState->table, localNodeGroup.get()); + writeAndResetNodeGroup(nodeGroupIdx, indexBuilder, sharedState->pkColumnIdx, + sharedState->table, localNodeGroup.get()); } if (numAppendedTuples < numTuplesToAppend) { columnState->slice((offset_t)numAppendedTuplesInNodeGroup); diff --git a/src/processor/operator/persistent/index_builder.cpp b/src/processor/operator/persistent/index_builder.cpp new file mode 100644 index 00000000000..8d4e21adf3e --- /dev/null +++ b/src/processor/operator/persistent/index_builder.cpp @@ -0,0 +1,184 @@ +#include "processor/operator/persistent/index_builder.h" + +#include + +#include "common/cast.h" +#include "common/exception/copy.h" +#include "common/exception/message.h" +#include "storage/store/string_column_chunk.h" + +namespace kuzu { +namespace processor { + +using namespace kuzu::common; +using namespace kuzu::storage; + +IndexBuilderGlobalQueues::IndexBuilderGlobalQueues(std::unique_ptr pkIndex) + : pkIndex(std::move(pkIndex)) { + if (this->pkIndex->keyTypeID() == LogicalTypeID::STRING) { + queues.emplace(); + } else { + queues.emplace(); + } +} + +const size_t SHOULD_FLUSH_QUEUE_SIZE = 32; + +void IndexBuilderGlobalQueues::insert(size_t index, StringBuffer elem) { + auto& stringQueues = std::get(queues); + stringQueues[index].push(std::move(elem)); + if (stringQueues[index].approxSize() < SHOULD_FLUSH_QUEUE_SIZE) { + return; + } + maybeConsumeIndex(index); +} + +void IndexBuilderGlobalQueues::insert(size_t index, IntBuffer elem) { + auto& intQueues = std::get(queues); + intQueues[index].push(std::move(elem)); + if (intQueues[index].approxSize() < SHOULD_FLUSH_QUEUE_SIZE) { + return; + } + maybeConsumeIndex(index); +} + +void IndexBuilderGlobalQueues::consume() { + for (auto index = 0u; index < NUM_HASH_INDEXES; index++) { + maybeConsumeIndex(index); + } +} + +void IndexBuilderGlobalQueues::maybeConsumeIndex(size_t index) { + if (!mutexes[index].try_lock()) { + return; + } + std::unique_lock lck{mutexes[index], std::adopt_lock}; + + auto* stringQueues = std::get_if(&queues); + if (stringQueues) { + StringBuffer elem; + while ((*stringQueues)[index].pop(elem)) { + for (auto [key, value] : elem) { + if (!pkIndex->appendWithIndexPos(key.c_str(), value, index)) { + throw CopyException(ExceptionMessage::existedPKException(std::move(key))); + } + } + } + } else { + auto& intQueues = std::get(queues); + IntBuffer elem; + while (intQueues[index].pop(elem)) { + for (auto [key, value] : elem) { + if (!pkIndex->appendWithIndexPos(key, value, index)) { + throw CopyException(ExceptionMessage::existedPKException(std::to_string(key))); + } + } + } + } +} + +void IndexBuilderGlobalQueues::flushToDisk() const { + pkIndex->flush(); +} + +IndexBuilderLocalBuffers::IndexBuilderLocalBuffers(IndexBuilderGlobalQueues& globalQueues) + : globalQueues(&globalQueues) { + if (globalQueues.pkTypeID() == LogicalTypeID::STRING) { + stringBuffers = std::make_unique(); + } else { + intBuffers = std::make_unique(); + } +} + +void IndexBuilderLocalBuffers::insert(std::string key, common::offset_t value) { + auto indexPos = getHashIndexPosition(key.c_str()); + if ((*stringBuffers)[indexPos].full()) { + globalQueues->insert(indexPos, std::move((*stringBuffers)[indexPos])); + } + (*stringBuffers)[indexPos].push_back(std::make_pair(key, value)); +} + +void IndexBuilderLocalBuffers::insert(int64_t key, common::offset_t value) { + auto indexPos = getHashIndexPosition(key); + if ((*intBuffers)[indexPos].full()) { + globalQueues->insert(indexPos, std::move((*intBuffers)[indexPos])); + } + (*intBuffers)[indexPos].push_back(std::make_pair(key, value)); +} + +void IndexBuilderLocalBuffers::flush() { + if (globalQueues->pkTypeID() == LogicalTypeID::STRING) { + for (auto i = 0u; i < stringBuffers->size(); i++) { + globalQueues->insert(i, std::move((*stringBuffers)[i])); + } + } else { + for (auto i = 0u; i < intBuffers->size(); i++) { + globalQueues->insert(i, std::move((*intBuffers)[i])); + } + } +} + +IndexBuilderSharedState::IndexBuilderSharedState(std::unique_ptr pkIndex) + : globalQueues(std::move(pkIndex)) {} + +IndexBuilder::IndexBuilder(std::shared_ptr sharedState) + : sharedState(std::move(sharedState)), localBuffers(this->sharedState->globalQueues) {} + +void IndexBuilderSharedState::quitProducer() { + if (producers.fetch_sub(1, std::memory_order_relaxed) == 1) { + done.store(true, std::memory_order_relaxed); + } +} + +IndexBuilder::IndexBuilder(std::unique_ptr pkIndex) + : IndexBuilder(std::make_shared(std::move(pkIndex))) {} + +void IndexBuilder::insert(ColumnChunk* chunk, offset_t nodeOffset, offset_t numNodes) { + checkNonNullConstraint(chunk->getNullChunk(), numNodes); + + switch (chunk->getDataType()->getPhysicalType()) { + case PhysicalTypeID::INT64: { + for (auto i = 0u; i < numNodes; i++) { + auto value = chunk->getValue(i); + localBuffers.insert(value, nodeOffset + i); + } + } break; + case PhysicalTypeID::STRING: { + auto stringColumnChunk = ku_dynamic_cast(chunk); + for (auto i = 0u; i < numNodes; i++) { + auto value = stringColumnChunk->getValue(i); + localBuffers.insert(std::move(value), nodeOffset + i); + } + } break; + default: { + throw CopyException(ExceptionMessage::invalidPKType(chunk->getDataType()->toString())); + } + } +} + +void IndexBuilder::finishedProducing() { + localBuffers.flush(); + sharedState->consume(); + while (!sharedState->isDone()) { + std::this_thread::sleep_for(std::chrono::microseconds(500)); + sharedState->consume(); + } +} + +void IndexBuilder::finalize(ExecutionContext* /*context*/) { + // Flush anything added by `addLastNodeGroup()`. + localBuffers.flush(); + sharedState->consume(); + sharedState->flush(); +} + +void IndexBuilder::checkNonNullConstraint(NullColumnChunk* nullChunk, offset_t numNodes) { + for (auto i = 0u; i < numNodes; i++) { + if (nullChunk->isNull(i)) { + throw CopyException(ExceptionMessage::nullPKException()); + } + } +} + +} // namespace processor +} // namespace kuzu diff --git a/src/storage/index/hash_index.cpp b/src/storage/index/hash_index.cpp index 1a6dae87bbd..5cf096d199c 100644 --- a/src/storage/index/hash_index.cpp +++ b/src/storage/index/hash_index.cpp @@ -128,17 +128,17 @@ HashIndex::HashIndex(const DBFileIDAndName& dbFileIDAndName, fileHandle(fileHandle), diskOverflowFile(overflowFile) { slotCapacity = getSlotCapacity(); headerArray = std::make_unique>(*fileHandle, - dbFileIDAndName.dbFileID, HEADER_PAGES * indexPos + INDEX_HEADER_ARRAY_HEADER_PAGE_IDX, &bm, - wal, Transaction::getDummyReadOnlyTrx().get()); + dbFileIDAndName.dbFileID, NUM_HEADER_PAGES * indexPos + INDEX_HEADER_ARRAY_HEADER_PAGE_IDX, + &bm, wal, Transaction::getDummyReadOnlyTrx().get()); // Read indexHeader from the headerArray, which contains only one element. indexHeader = std::make_unique( headerArray->get(INDEX_HEADER_IDX_IN_ARRAY, TransactionType::READ_ONLY)); KU_ASSERT(indexHeader->keyDataTypeID == keyDataType.getLogicalTypeID()); pSlots = std::make_unique>>(*fileHandle, dbFileIDAndName.dbFileID, - HEADER_PAGES * indexPos + P_SLOTS_HEADER_PAGE_IDX, &bm, wal, + NUM_HEADER_PAGES * indexPos + P_SLOTS_HEADER_PAGE_IDX, &bm, wal, Transaction::getDummyReadOnlyTrx().get()); oSlots = std::make_unique>>(*fileHandle, dbFileIDAndName.dbFileID, - HEADER_PAGES * indexPos + O_SLOTS_HEADER_PAGE_IDX, &bm, wal, + NUM_HEADER_PAGES * indexPos + O_SLOTS_HEADER_PAGE_IDX, &bm, wal, Transaction::getDummyReadOnlyTrx().get()); // Initialize functions. keyHashFunc = HashIndexUtils::initializeHashFunc(indexHeader->keyDataTypeID); diff --git a/src/storage/index/hash_index_builder.cpp b/src/storage/index/hash_index_builder.cpp index ed3ca62da06..bc5d4f6c9b1 100644 --- a/src/storage/index/hash_index_builder.cpp +++ b/src/storage/index/hash_index_builder.cpp @@ -25,12 +25,12 @@ HashIndexBuilder::HashIndexBuilder(const std::shared_ptr& fileHan inMemOverflowFile(overflowFile), numEntries{0} { indexHeader = std::make_unique(keyDataType.getLogicalTypeID()); headerArray = std::make_unique>(*fileHandle, - HEADER_PAGES * indexPos + INDEX_HEADER_ARRAY_HEADER_PAGE_IDX, 0 /* numElements */); + NUM_HEADER_PAGES * indexPos + INDEX_HEADER_ARRAY_HEADER_PAGE_IDX, 0 /* numElements */); pSlots = std::make_unique>>( - *fileHandle, HEADER_PAGES * indexPos + P_SLOTS_HEADER_PAGE_IDX, 0 /* numElements */); + *fileHandle, NUM_HEADER_PAGES * indexPos + P_SLOTS_HEADER_PAGE_IDX, 0 /* numElements */); // Reserve a slot for oSlots, which is always skipped, as we treat slot idx 0 as NULL. oSlots = std::make_unique>>( - *fileHandle, HEADER_PAGES * indexPos + O_SLOTS_HEADER_PAGE_IDX, 1 /* numElements */); + *fileHandle, NUM_HEADER_PAGES * indexPos + O_SLOTS_HEADER_PAGE_IDX, 1 /* numElements */); allocatePSlots(2); keyInsertFunc = InMemHashIndexUtils::initializeInsertFunc(indexHeader->keyDataTypeID); keyEqualsFunc = InMemHashIndexUtils::initializeEqualsFunc(indexHeader->keyDataTypeID); @@ -182,7 +182,7 @@ PrimaryKeyIndexBuilder::PrimaryKeyIndexBuilder( : keyDataTypeID{keyDataType.getLogicalTypeID()} { auto fileHandle = std::make_shared(fName, FileHandle::O_PERSISTENT_FILE_CREATE_NOT_EXISTS, vfs); - fileHandle->addNewPages(HEADER_PAGES * NUM_HASH_INDEXES); + fileHandle->addNewPages(NUM_HEADER_PAGES * NUM_HASH_INDEXES); if (keyDataType.getLogicalTypeID() == LogicalTypeID::STRING) { overflowFile = std::make_shared>( InMemFile(StorageUtils::getOverflowFileName(fileHandle->getFileInfo()->path), vfs)); diff --git a/test/test_files/copy/copy_rdf.test b/test/test_files/copy/copy_rdf.test index e8108eff40d..7c9a6985c09 100644 --- a/test/test_files/copy/copy_rdf.test +++ b/test/test_files/copy/copy_rdf.test @@ -1,6 +1,7 @@ -GROUP CopyRDFTest -DATASET CSV copy-test/rdf -BUFFER_POOL_SIZE 536870912 +-SKIP -- diff --git a/test/test_files/ldbc/ldbc-interactive/interactive-complex.test b/test/test_files/ldbc/ldbc-interactive/interactive-complex.test index 7a8d107d77c..7e8fb9793aa 100644 --- a/test/test_files/ldbc/ldbc-interactive/interactive-complex.test +++ b/test/test_files/ldbc/ldbc-interactive/interactive-complex.test @@ -1,6 +1,6 @@ -GROUP LDBCTest -DATASET CSV ldbc-sf01 --BUFFER_POOL_SIZE 134217728 +-BUFFER_POOL_SIZE 268435456 -- diff --git a/test/test_files/lsqb/lsqb_queries.test b/test/test_files/lsqb/lsqb_queries.test index 65b92f1398a..f4cfe3debe7 100644 --- a/test/test_files/lsqb/lsqb_queries.test +++ b/test/test_files/lsqb/lsqb_queries.test @@ -1,6 +1,6 @@ -GROUP LSQBTest -DATASET CSV sf-0.1 --BUFFER_POOL_SIZE 536870912 +-BUFFER_POOL_SIZE 1073741824 -- diff --git a/test/test_files/rdf/rdfox_example.test b/test/test_files/rdf/rdfox_example.test index cc0dcce48f4..cd7747c2d6e 100644 --- a/test/test_files/rdf/rdfox_example.test +++ b/test/test_files/rdf/rdfox_example.test @@ -1,5 +1,6 @@ -GROUP RdfoxExample -DATASET TTL rdf/rdfox_example +-SKIP -- diff --git a/test/test_files/rdf/spb1k.test b/test/test_files/rdf/spb1k.test index c6bfac36964..5d4bf47f073 100644 --- a/test/test_files/rdf/spb1k.test +++ b/test/test_files/rdf/spb1k.test @@ -1,5 +1,6 @@ -GROUP RdfoxExample -DATASET TTL rdf/spb1k +-SKIP --