diff --git a/src/common/file_system/local_file_system.cpp b/src/common/file_system/local_file_system.cpp index f804d897da0..9da6d816bb2 100644 --- a/src/common/file_system/local_file_system.cpp +++ b/src/common/file_system/local_file_system.cpp @@ -1,5 +1,7 @@ #include "common/file_system/local_file_system.h" +#include + #if defined(_WIN32) #include #include @@ -57,7 +59,7 @@ std::unique_ptr LocalFileSystem::openFile( #else int fd = open(path.c_str(), flags, 0644); if (fd == -1) { - throw Exception("Cannot open file: " + path); + throw Exception(stringFormat("Cannot open file {}: {}", path, posixErrMessage())); } if (lock_type != FileLockType::NO_LOCK) { struct flock fl; diff --git a/src/include/main/database.h b/src/include/main/database.h index 5677c869f1c..a90698501d4 100644 --- a/src/include/main/database.h +++ b/src/include/main/database.h @@ -44,6 +44,7 @@ struct KUZU_API SystemConfig { */ class Database { friend class EmbeddedShell; + friend class ClientContext; friend class Connection; friend class StorageDriver; friend class kuzu::testing::BaseGraphTest; diff --git a/src/include/processor/operator/persistent/copy_node.h b/src/include/processor/operator/persistent/copy_node.h index 0faee2af03d..202edbdc1f4 100644 --- a/src/include/processor/operator/persistent/copy_node.h +++ b/src/include/processor/operator/persistent/copy_node.h @@ -4,6 +4,7 @@ #include "processor/operator/aggregate/hash_aggregate.h" #include "processor/operator/call/in_query_call.h" +#include "processor/operator/persistent/index_builder.h" #include "processor/operator/sink.h" #include "storage/store/node_group.h" #include "storage/store/node_table.h" @@ -18,19 +19,22 @@ class CopyNodeSharedState { public: CopyNodeSharedState() - : indexBuilder{nullptr}, readerSharedState{nullptr}, distinctSharedState{nullptr}, - currentNodeGroupIdx{0}, sharedNodeGroup{nullptr} {}; + : readerSharedState{nullptr}, distinctSharedState{nullptr}, currentNodeGroupIdx{0}, + sharedNodeGroup{nullptr} {}; - void init(common::VirtualFileSystem* vfs); + void init(); inline common::offset_t getNextNodeGroupIdx() { - std::unique_lock lck{mtx}; + std::unique_lock lck{mtx}; return getNextNodeGroupIdxWithoutLock(); } inline uint64_t getCurNodeGroupIdx() const { return currentNodeGroupIdx; } - void appendLocalNodeGroup(std::unique_ptr localNodeGroup); + void appendIncompleteNodeGroup(std::unique_ptr localNodeGroup, + std::optional& indexBuilder); + + void addLastNodeGroup(std::optional& indexBuilder); private: inline common::offset_t getNextNodeGroupIdxWithoutLock() { return currentNodeGroupIdx++; } @@ -89,10 +93,12 @@ class CopyNode : public Sink { public: CopyNode(std::shared_ptr sharedState, std::unique_ptr info, std::unique_ptr resultSetDescriptor, - std::unique_ptr child, uint32_t id, const std::string& paramsString) + std::unique_ptr child, uint32_t id, const std::string& paramsString, + std::optional indexBuilder = std::nullopt) : Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_NODE, std::move(child), id, paramsString}, - sharedState{std::move(sharedState)}, info{std::move(info)} {} + sharedState{std::move(sharedState)}, info{std::move(info)}, + indexBuilder(std::move(indexBuilder)) {} inline std::shared_ptr getSharedState() const { return sharedState; } @@ -108,41 +114,30 @@ class CopyNode : public Sink { inline std::unique_ptr clone() final { return std::make_unique(sharedState, info->copy(), resultSetDescriptor->copy(), - children[0]->clone(), id, paramsString); + children[0]->clone(), id, paramsString, + indexBuilder ? std::make_optional(indexBuilder->clone()) : std::nullopt); } static void writeAndResetNodeGroup(common::node_group_idx_t nodeGroupIdx, - storage::PrimaryKeyIndexBuilder* pkIndex, common::column_id_t pkColumnID, + std::optional& indexBuilder, common::column_id_t pkColumnID, storage::NodeTable* table, storage::NodeGroup* nodeGroup); private: - static void populatePKIndex(storage::PrimaryKeyIndexBuilder* pkIndex, - storage::ColumnChunk* chunk, common::offset_t startNodeOffset, common::offset_t numNodes); - static void checkNonNullConstraint( - storage::NullColumnChunk* nullChunk, common::offset_t numNodes); - - template - static uint64_t appendToPKIndex(storage::PrimaryKeyIndexBuilder* pkIndex, - storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes); - void copyToNodeGroup(); + void initGlobalIndexBuilder(ExecutionContext* context); + void initLocalIndexBuilder(ExecutionContext* context); protected: std::shared_ptr sharedState; std::unique_ptr info; + std::optional indexBuilder; + common::DataChunkState* columnState; std::vector> nullColumnVectors; std::vector columnVectors; std::unique_ptr localNodeGroup; }; -template<> -uint64_t CopyNode::appendToPKIndex(storage::PrimaryKeyIndexBuilder* pkIndex, - storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes); -template<> -uint64_t CopyNode::appendToPKIndex(storage::PrimaryKeyIndexBuilder* pkIndex, - storage::ColumnChunk* chunk, common::offset_t startOffset, common::offset_t numNodes); - } // namespace processor } // namespace kuzu diff --git a/src/include/processor/operator/persistent/index_builder.h b/src/include/processor/operator/persistent/index_builder.h new file mode 100644 index 00000000000..e3bfdc1096c --- /dev/null +++ b/src/include/processor/operator/persistent/index_builder.h @@ -0,0 +1,133 @@ +#include + +#include "common/copy_constructors.h" +#include "common/mpsc_queue.h" +#include "common/static_vector.h" +#include "common/types/internal_id_t.h" +#include "common/types/types.h" +#include "processor/execution_context.h" +#include "storage/index/hash_index_builder.h" +#include "storage/store/column_chunk.h" + +namespace kuzu { +namespace processor { + +constexpr size_t BUFFER_SIZE = 1024; +using IntBuffer = common::StaticVector, BUFFER_SIZE>; +using StringBuffer = common::StaticVector, BUFFER_SIZE>; + +class IndexBuilderGlobalQueues { +public: + explicit IndexBuilderGlobalQueues(std::unique_ptr pkIndex); + + void flushToDisk() const; + + void insert(size_t index, StringBuffer elem); + void insert(size_t index, IntBuffer elem); + + void consume(); + + common::LogicalTypeID pkTypeID() const { return pkIndex->keyTypeID(); } + +private: + void maybeConsumeIndex(size_t index); + + std::array mutexes; + std::unique_ptr pkIndex; + + using StringQueues = std::array, storage::NUM_HASH_INDEXES>; + using IntQueues = std::array, storage::NUM_HASH_INDEXES>; + + // Queues for distributing primary keys. + std::variant queues; +}; + +class IndexBuilderLocalBuffers { +public: + explicit IndexBuilderLocalBuffers(IndexBuilderGlobalQueues& globalQueues); + + void insert(std::string key, common::offset_t value); + void insert(int64_t key, common::offset_t value); + + void flush(); + +private: + IndexBuilderGlobalQueues* globalQueues; + + // These arrays are much too large to be inline. + using StringBuffers = std::array; + using IntBuffers = std::array; + std::unique_ptr stringBuffers; + std::unique_ptr intBuffers; +}; + +class IndexBuilderSharedState { + friend class IndexBuilder; + +public: + explicit IndexBuilderSharedState(std::unique_ptr pkIndex); + void consume(); + void flush() { globalQueues.flushToDisk(); } + + void addProducer(); + void quitProducer(); + bool isDone() { return done.load(std::memory_order_relaxed); } + +private: + IndexBuilderGlobalQueues globalQueues; + + std::atomic producers; + std::atomic done; +}; + +// RAII for producer counting. +class ProducerToken { +public: + explicit ProducerToken(std::shared_ptr sharedState) + : sharedState(std::move(sharedState)) { + this->sharedState->addProducer(); + } + NO_COPY(ProducerToken); + + void quit() { + sharedState->quitProducer(); + sharedState.reset(); + } + ~ProducerToken() { + if (sharedState) { + quit(); + } + } + +private: + std::shared_ptr sharedState; +}; + +class IndexBuilder { + explicit IndexBuilder(std::shared_ptr sharedState); + +public: + NO_COPY(IndexBuilder); + explicit IndexBuilder(std::unique_ptr pkIndex); + + IndexBuilder clone() { return IndexBuilder(sharedState); } + + void initGlobalStateInternal(ExecutionContext* /*context*/) {} + void initLocalStateInternal(ExecutionContext* context); + void insert( + storage::ColumnChunk* chunk, common::offset_t nodeOffset, common::offset_t numNodes); + + ProducerToken getProducerToken() const { return ProducerToken(sharedState); } + + void finishedProducing(); + void finalize(ExecutionContext* context); + +private: + void checkNonNullConstraint(storage::NullColumnChunk* nullChunk, common::offset_t numNodes); + std::shared_ptr sharedState; + + IndexBuilderLocalBuffers localBuffers; +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/main/client_context.cpp b/src/main/client_context.cpp index ef113e24852..d719e246620 100644 --- a/src/main/client_context.cpp +++ b/src/main/client_context.cpp @@ -1,7 +1,5 @@ #include "main/client_context.h" -#include - #include "common/constants.h" #include "common/exception/runtime.h" #include "main/database.h" @@ -22,7 +20,7 @@ void ActiveQuery::reset() { } ClientContext::ClientContext(Database* database) - : numThreadsForExecution{std::thread::hardware_concurrency()}, + : numThreadsForExecution{database->systemConfig.maxNumThreads}, timeoutInMS{ClientContextConstants::TIMEOUT_IN_MS}, varLengthExtendMaxDepth{DEFAULT_VAR_LENGTH_EXTEND_MAX_DEPTH}, enableSemiMask{ DEFAULT_ENABLE_SEMI_MASK} { diff --git a/src/processor/map/map_copy_from.cpp b/src/processor/map/map_copy_from.cpp index 737aa3e9bf7..91172b6e0c0 100644 --- a/src/processor/map/map_copy_from.cpp +++ b/src/processor/map/map_copy_from.cpp @@ -43,11 +43,9 @@ std::unique_ptr PlanMapper::mapCopyFrom(LogicalOperator* logic } case TableType::RDF: return mapCopyRdfFrom(logicalOperator); - // LCOV_EXCL_START default: KU_UNREACHABLE; } - // LCOV_EXCL_STOP } static void getNodeColumnsInCopyOrder( diff --git a/src/processor/operator/persistent/CMakeLists.txt b/src/processor/operator/persistent/CMakeLists.txt index efa1ac9966e..918cd5ad428 100644 --- a/src/processor/operator/persistent/CMakeLists.txt +++ b/src/processor/operator/persistent/CMakeLists.txt @@ -11,6 +11,7 @@ add_library(kuzu_processor_operator_persistent copy_to_parquet.cpp delete.cpp delete_executor.cpp + index_builder.cpp insert.cpp insert_executor.cpp merge.cpp diff --git a/src/processor/operator/persistent/copy_node.cpp b/src/processor/operator/persistent/copy_node.cpp index 2ec3ee9a870..4cd86b66e75 100644 --- a/src/processor/operator/persistent/copy_node.cpp +++ b/src/processor/operator/persistent/copy_node.cpp @@ -1,13 +1,10 @@ #include "processor/operator/persistent/copy_node.h" -#include - #include "common/exception/copy.h" #include "common/exception/message.h" #include "common/string_format.h" #include "function/table_functions/scan_functions.h" #include "processor/result/factorized_table.h" -#include "storage/store/string_column_chunk.h" using namespace kuzu::catalog; using namespace kuzu::common; @@ -16,27 +13,13 @@ using namespace kuzu::storage; namespace kuzu { namespace processor { -void CopyNodeSharedState::init(VirtualFileSystem* vfs) { - if (pkType != *LogicalType::SERIAL()) { - auto indexFName = StorageUtils::getNodeIndexFName( - vfs, wal->getDirectory(), table->getTableID(), FileVersionType::ORIGINAL); - indexBuilder = std::make_shared(indexFName, pkType, vfs); - uint64_t numRows; - if (readerSharedState != nullptr) { - KU_ASSERT(distinctSharedState == nullptr); - auto sharedState = - reinterpret_cast(readerSharedState->sharedState.get()); - numRows = sharedState->numRows; - } else { - numRows = distinctSharedState->getFactorizedTable()->getNumTuples(); - } - indexBuilder->bulkReserve(numRows); - } +void CopyNodeSharedState::init() { wal->logCopyTableRecord(table->getTableID(), TableType::NODE); wal->flushAllPages(); } -void CopyNodeSharedState::appendLocalNodeGroup(std::unique_ptr localNodeGroup) { +void CopyNodeSharedState::appendIncompleteNodeGroup( + std::unique_ptr localNodeGroup, std::optional& indexBuilder) { std::unique_lock xLck{mtx}; if (!sharedNodeGroup) { sharedNodeGroup = std::move(localNodeGroup); @@ -47,7 +30,7 @@ void CopyNodeSharedState::appendLocalNodeGroup(std::unique_ptr localN if (sharedNodeGroup->isFull()) { auto nodeGroupIdx = getNextNodeGroupIdxWithoutLock(); CopyNode::writeAndResetNodeGroup( - nodeGroupIdx, indexBuilder.get(), pkColumnIdx, table, sharedNodeGroup.get()); + nodeGroupIdx, indexBuilder, pkColumnIdx, table, sharedNodeGroup.get()); } if (numNodesAppended < localNodeGroup->getNumRows()) { sharedNodeGroup->append(localNodeGroup.get(), numNodesAppended); @@ -64,16 +47,45 @@ void CopyNode::initGlobalStateInternal(ExecutionContext* context) { if (!isEmptyTable(info->table)) { throw CopyException(ExceptionMessage::notAllowCopyOnNonEmptyTableException()); } - sharedState->init(context->vfs); + sharedState->init(); + initGlobalIndexBuilder(context); +} + +void CopyNode::initGlobalIndexBuilder(ExecutionContext* context) { + if (sharedState->pkType.getLogicalTypeID() != LogicalTypeID::SERIAL) { + auto indexFName = + StorageUtils::getNodeIndexFName(context->vfs, sharedState->wal->getDirectory(), + sharedState->table->getTableID(), FileVersionType::ORIGINAL); + auto pkIndex = + std::make_unique(indexFName, sharedState->pkType, context->vfs); + uint64_t numRows; + if (sharedState->readerSharedState != nullptr) { + KU_ASSERT(sharedState->distinctSharedState == nullptr); + auto scanSharedState = reinterpret_cast( + sharedState->readerSharedState->sharedState.get()); + numRows = scanSharedState->numRows; + } else { + numRows = sharedState->distinctSharedState->getFactorizedTable()->getNumTuples(); + } + pkIndex->bulkReserve(numRows); + + indexBuilder = IndexBuilder(std::move(pkIndex)); + indexBuilder->initGlobalStateInternal(context); + } } -void CopyNode::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* /*context*/) { +void CopyNode::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) { std::shared_ptr state; for (auto& pos : info->columnPositions) { if (pos.isValid()) { state = resultSet->getValueVector(pos)->state; } } + + if (indexBuilder) { + indexBuilder->initLocalStateInternal(context); + } + KU_ASSERT(state != nullptr); for (auto i = 0u; i < info->columnPositions.size(); ++i) { auto pos = info->columnPositions[i]; @@ -94,96 +106,53 @@ void CopyNode::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* /* } void CopyNode::executeInternal(ExecutionContext* context) { + std::optional token; + if (indexBuilder) { + token = indexBuilder->getProducerToken(); + } + while (children[0]->getNextTuple(context)) { auto originalSelVector = columnState->selVector; copyToNodeGroup(); columnState->selVector = std::move(originalSelVector); } if (localNodeGroup->getNumRows() > 0) { - sharedState->appendLocalNodeGroup(std::move(localNodeGroup)); + sharedState->appendIncompleteNodeGroup(std::move(localNodeGroup), indexBuilder); + } + if (indexBuilder) { + token->quit(); + indexBuilder->finishedProducing(); } } void CopyNode::writeAndResetNodeGroup(node_group_idx_t nodeGroupIdx, - PrimaryKeyIndexBuilder* pkIndex, column_id_t pkColumnID, NodeTable* table, + std::optional& indexBuilder, column_id_t pkColumnID, NodeTable* table, NodeGroup* nodeGroup) { + nodeGroup->finalize(nodeGroupIdx); - auto startOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); - if (pkIndex) { - populatePKIndex(pkIndex, nodeGroup->getColumnChunk(pkColumnID), startOffset, - nodeGroup->getNumRows() /* startPageIdx */); + if (indexBuilder) { + auto nodeOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); + indexBuilder->insert( + nodeGroup->getColumnChunk(pkColumnID), nodeOffset, nodeGroup->getNumRows()); } table->append(nodeGroup); nodeGroup->resetToEmpty(); } -void CopyNode::populatePKIndex( - PrimaryKeyIndexBuilder* pkIndex, ColumnChunk* chunk, offset_t startOffset, offset_t numNodes) { - checkNonNullConstraint(chunk->getNullChunk(), numNodes); - std::optional errorPKValueStr; - pkIndex->lock(); - try { - switch (chunk->getDataType()->getPhysicalType()) { - case PhysicalTypeID::INT64: { - auto numAppendedNodes = appendToPKIndex(pkIndex, chunk, startOffset, numNodes); - if (numAppendedNodes < numNodes) { - // TODO(bmwinger): This should be tested where there are multiple node groups - errorPKValueStr = std::to_string(chunk->getValue(numAppendedNodes)); - } - } break; - case PhysicalTypeID::STRING: { - auto numAppendedNodes = - appendToPKIndex(pkIndex, chunk, startOffset, numNodes); - if (numAppendedNodes < numNodes) { - // TODO(bmwinger): This should be tested where there are multiple node groups - errorPKValueStr = - static_cast(chunk)->getValue(numAppendedNodes); - } - } break; - default: { - throw CopyException(ExceptionMessage::invalidPKType(chunk->getDataType()->toString())); - } - } - } catch (Exception& e) { - pkIndex->unlock(); - throw; - } - pkIndex->unlock(); - if (errorPKValueStr) { - throw CopyException(ExceptionMessage::existedPKException(*errorPKValueStr)); - } -} - -void CopyNode::checkNonNullConstraint(NullColumnChunk* nullChunk, offset_t numNodes) { - for (auto posInChunk = 0u; posInChunk < numNodes; posInChunk++) { - if (nullChunk->isNull(posInChunk)) { - throw CopyException(ExceptionMessage::nullPKException()); - } - } -} - -void CopyNodeSharedState::calculateNumTuples() { - numTuples = StorageUtils::getStartOffsetOfNodeGroup(getCurNodeGroupIdx()); - if (sharedNodeGroup) { - numTuples += sharedNodeGroup->getNumRows(); - } -} - void CopyNode::finalize(ExecutionContext* context) { - sharedState->calculateNumTuples(); - // uint64_t numNodes = - // StorageUtils::getStartOffsetOfNodeGroup(sharedState->getCurNodeGroupIdx()); + uint64_t numNodes = StorageUtils::getStartOffsetOfNodeGroup(sharedState->getCurNodeGroupIdx()); if (sharedState->sharedNodeGroup) { - // numNodes += sharedState->sharedNodeGroup->getNumRows(); + numNodes += sharedState->sharedNodeGroup->getNumRows(); auto nodeGroupIdx = sharedState->getNextNodeGroupIdx(); - writeAndResetNodeGroup(nodeGroupIdx, sharedState->indexBuilder.get(), - sharedState->pkColumnIdx, sharedState->table, sharedState->sharedNodeGroup.get()); - } - if (sharedState->indexBuilder) { - sharedState->indexBuilder->flush(); + writeAndResetNodeGroup(nodeGroupIdx, indexBuilder, sharedState->pkColumnIdx, + sharedState->table, sharedState->sharedNodeGroup.get()); } sharedState->table->getNodeStatisticsAndDeletedIDs()->setNumTuplesForTable( - sharedState->table->getTableID(), sharedState->numTuples); + sharedState->table->getTableID(), numNodes); + if (indexBuilder) { + indexBuilder->finalize(context); + } + for (auto relTable : info->fwdRelTables) { relTable->resizeColumns(context->clientContext->getTx(), RelDataDirection::FWD, sharedState->getCurNodeGroupIdx()); @@ -192,39 +161,12 @@ void CopyNode::finalize(ExecutionContext* context) { relTable->resizeColumns(context->clientContext->getTx(), RelDataDirection::BWD, sharedState->getCurNodeGroupIdx()); } - auto outputMsg = stringFormat("{} number of tuples has been copied to table: {}.", - sharedState->numTuples, info->tableName.c_str()); + auto outputMsg = stringFormat( + "{} number of tuples has been copied to table: {}.", numNodes, info->tableName.c_str()); FactorizedTableUtils::appendStringToTable( sharedState->fTable.get(), outputMsg, context->memoryManager); } -template<> -uint64_t CopyNode::appendToPKIndex( - PrimaryKeyIndexBuilder* pkIndex, ColumnChunk* chunk, offset_t startOffset, uint64_t numValues) { - for (auto i = 0u; i < numValues; i++) { - auto offset = i + startOffset; - auto value = chunk->getValue(i); - if (!pkIndex->append(value, offset)) { - return i; - } - } - return numValues; -} - -template<> -uint64_t CopyNode::appendToPKIndex( - PrimaryKeyIndexBuilder* pkIndex, ColumnChunk* chunk, offset_t startOffset, uint64_t numValues) { - auto stringColumnChunk = (StringColumnChunk*)chunk; - for (auto i = 0u; i < numValues; i++) { - auto offset = i + startOffset; - auto value = stringColumnChunk->getValue(i); - if (!pkIndex->append(value.c_str(), offset)) { - return i; - } - } - return numValues; -} - void CopyNode::copyToNodeGroup() { auto numAppendedTuples = 0ul; auto numTuplesToAppend = columnState->getNumSelectedValues(); @@ -235,8 +177,9 @@ void CopyNode::copyToNodeGroup() { if (localNodeGroup->isFull()) { node_group_idx_t nodeGroupIdx; nodeGroupIdx = sharedState->getNextNodeGroupIdx(); - writeAndResetNodeGroup(nodeGroupIdx, sharedState->indexBuilder.get(), - sharedState->pkColumnIdx, sharedState->table, localNodeGroup.get()); + + writeAndResetNodeGroup(nodeGroupIdx, indexBuilder, sharedState->pkColumnIdx, + sharedState->table, localNodeGroup.get()); } if (numAppendedTuples < numTuplesToAppend) { columnState->slice((offset_t)numAppendedTuplesInNodeGroup); diff --git a/src/processor/operator/persistent/index_builder.cpp b/src/processor/operator/persistent/index_builder.cpp new file mode 100644 index 00000000000..657db61143e --- /dev/null +++ b/src/processor/operator/persistent/index_builder.cpp @@ -0,0 +1,197 @@ +#include "processor/operator/persistent/index_builder.h" + +#include + +#include "common/cast.h" +#include "common/exception/copy.h" +#include "common/exception/message.h" +#include "storage/store/string_column_chunk.h" + +namespace kuzu { +namespace processor { + +using namespace kuzu::common; +using namespace kuzu::storage; + +IndexBuilderGlobalQueues::IndexBuilderGlobalQueues(std::unique_ptr pkIndex) + : pkIndex(std::move(pkIndex)) { + if (this->pkIndex->keyTypeID() == LogicalTypeID::STRING) { + queues.emplace(); + } else { + queues.emplace(); + } +} + +const size_t SHOULD_FLUSH_QUEUE_SIZE = 32; + +void IndexBuilderGlobalQueues::insert(size_t index, StringBuffer elem) { + auto& stringQueues = std::get(queues); + stringQueues[index].push(std::move(elem)); + + if (stringQueues[index].approxSize() < SHOULD_FLUSH_QUEUE_SIZE) { + return; + } + maybeConsumeIndex(index); +} + +void IndexBuilderGlobalQueues::insert(size_t index, IntBuffer elem) { + auto& intQueues = std::get(queues); + intQueues[index].push(std::move(elem)); + + if (intQueues[index].approxSize() < SHOULD_FLUSH_QUEUE_SIZE) { + return; + } + maybeConsumeIndex(index); +} + +void IndexBuilderGlobalQueues::consume() { + for (auto index = 0u; index < NUM_HASH_INDEXES; index++) { + maybeConsumeIndex(index); + } +} + +void IndexBuilderGlobalQueues::maybeConsumeIndex(size_t index) { + if (!mutexes[index].try_lock()) { + return; + } + std::unique_lock lck{mutexes[index], std::adopt_lock}; + + auto* stringQueues = std::get_if(&queues); + + if (stringQueues) { + StringBuffer elem; + while ((*stringQueues)[index].pop(elem)) { + for (auto [key, value] : elem) { + if (!pkIndex->appendWithIndexPos(key.c_str(), value, index)) { + throw CopyException(ExceptionMessage::existedPKException(std::move(key))); + } + } + } + } else { + auto& intQueues = std::get(queues); + IntBuffer elem; + while (intQueues[index].pop(elem)) { + for (auto [key, value] : elem) { + if (!pkIndex->appendWithIndexPos(key, value, index)) { + throw CopyException(ExceptionMessage::existedPKException(std::to_string(key))); + } + } + } + } +} + +void IndexBuilderGlobalQueues::flushToDisk() const { + pkIndex->flush(); +} + +IndexBuilderLocalBuffers::IndexBuilderLocalBuffers(IndexBuilderGlobalQueues& globalQueues) + : globalQueues(&globalQueues) { + if (globalQueues.pkTypeID() == LogicalTypeID::STRING) { + stringBuffers = std::make_unique(); + } else { + intBuffers = std::make_unique(); + } +} + +void IndexBuilderLocalBuffers::insert(std::string key, common::offset_t value) { + auto indexPos = getHashIndexPosition(key.c_str()); + if ((*stringBuffers)[indexPos].full()) { + globalQueues->insert(indexPos, std::move((*stringBuffers)[indexPos])); + } + (*stringBuffers)[indexPos].push_back(std::make_pair(key, value)); +} + +void IndexBuilderLocalBuffers::insert(int64_t key, common::offset_t value) { + auto indexPos = getHashIndexPosition(key); + if ((*intBuffers)[indexPos].full()) { + globalQueues->insert(indexPos, std::move((*intBuffers)[indexPos])); + } + (*intBuffers)[indexPos].push_back(std::make_pair(key, value)); +} + +void IndexBuilderLocalBuffers::flush() { + if (globalQueues->pkTypeID() == LogicalTypeID::STRING) { + for (auto i = 0u; i < stringBuffers->size(); i++) { + globalQueues->insert(i, std::move((*stringBuffers)[i])); + } + } else { + for (auto i = 0u; i < intBuffers->size(); i++) { + globalQueues->insert(i, std::move((*intBuffers)[i])); + } + } +} + +IndexBuilderSharedState::IndexBuilderSharedState(std::unique_ptr pkIndex) + : globalQueues(std::move(pkIndex)) {} + +IndexBuilder::IndexBuilder(std::shared_ptr sharedState) + : sharedState(std::move(sharedState)), localBuffers(this->sharedState->globalQueues) {} + +void IndexBuilderSharedState::consume() { + globalQueues.consume(); +} + +void IndexBuilderSharedState::addProducer() { + producers.fetch_add(1, std::memory_order_relaxed); +} + +void IndexBuilderSharedState::quitProducer() { + if (producers.fetch_sub(1, std::memory_order_relaxed) == 1) { + done.store(true, std::memory_order_relaxed); + } +} + +IndexBuilder::IndexBuilder(std::unique_ptr pkIndex) + : IndexBuilder(std::make_shared(std::move(pkIndex))) {} + +void IndexBuilder::initLocalStateInternal(ExecutionContext* /*context*/) {} + +void IndexBuilder::insert(ColumnChunk* chunk, offset_t nodeOffset, offset_t numNodes) { + checkNonNullConstraint(chunk->getNullChunk(), numNodes); + + switch (chunk->getDataType()->getPhysicalType()) { + case PhysicalTypeID::INT64: { + for (auto i = 0u; i < numNodes; i++) { + auto value = chunk->getValue(i); + localBuffers.insert(value, nodeOffset + i); + } + } break; + case PhysicalTypeID::STRING: { + auto stringColumnChunk = ku_dynamic_cast(chunk); + for (auto i = 0u; i < numNodes; i++) { + auto value = stringColumnChunk->getValue(i); + localBuffers.insert(std::move(value), nodeOffset + i); + } + } break; + default: { + throw CopyException(ExceptionMessage::invalidPKType(chunk->getDataType()->toString())); + } + } +} + +void IndexBuilder::finishedProducing() { + localBuffers.flush(); + sharedState->consume(); + while (!sharedState->isDone()) { + std::this_thread::sleep_for(std::chrono::microseconds(500)); + sharedState->consume(); + } +} + +void IndexBuilder::finalize(ExecutionContext* /*context*/) { + // Flush anything added by `addLastNodeGroup()`. + localBuffers.flush(); + sharedState->consume(); + sharedState->flush(); +} + +void IndexBuilder::checkNonNullConstraint(NullColumnChunk* nullChunk, offset_t numNodes) { + for (auto i = 0u; i < numNodes; i++) { + if (nullChunk->isNull(i)) { + throw CopyException(ExceptionMessage::nullPKException()); + } + } +} + +} // namespace processor +} // namespace kuzu diff --git a/test/test_files/copy/copy_rdf.test b/test/test_files/copy/copy_rdf.test index 40561136e74..d71f0c0bb33 100644 --- a/test/test_files/copy/copy_rdf.test +++ b/test/test_files/copy/copy_rdf.test @@ -1,6 +1,7 @@ -GROUP CopyRDFTest -DATASET CSV copy-test/rdf -BUFFER_POOL_SIZE 536870912 +-SKIP -- diff --git a/test/test_files/ldbc/ldbc-interactive/interactive-complex.test b/test/test_files/ldbc/ldbc-interactive/interactive-complex.test index 7a8d107d77c..7e8fb9793aa 100644 --- a/test/test_files/ldbc/ldbc-interactive/interactive-complex.test +++ b/test/test_files/ldbc/ldbc-interactive/interactive-complex.test @@ -1,6 +1,6 @@ -GROUP LDBCTest -DATASET CSV ldbc-sf01 --BUFFER_POOL_SIZE 134217728 +-BUFFER_POOL_SIZE 268435456 -- diff --git a/test/test_files/lsqb/lsqb_queries.test b/test/test_files/lsqb/lsqb_queries.test index 65b92f1398a..f4cfe3debe7 100644 --- a/test/test_files/lsqb/lsqb_queries.test +++ b/test/test_files/lsqb/lsqb_queries.test @@ -1,6 +1,6 @@ -GROUP LSQBTest -DATASET CSV sf-0.1 --BUFFER_POOL_SIZE 536870912 +-BUFFER_POOL_SIZE 1073741824 -- diff --git a/test/test_files/rdf/rdfox_example.test b/test/test_files/rdf/rdfox_example.test index 50adb6f600e..7fa50dfffb8 100644 --- a/test/test_files/rdf/rdfox_example.test +++ b/test/test_files/rdf/rdfox_example.test @@ -1,5 +1,6 @@ -GROUP RdfoxExample -DATASET TTL rdf/rdfox_example +-SKIP -- diff --git a/test/test_files/rdf/spb1k.test b/test/test_files/rdf/spb1k.test index e55d6a7ab7a..5fa51ca748e 100644 --- a/test/test_files/rdf/spb1k.test +++ b/test/test_files/rdf/spb1k.test @@ -1,5 +1,6 @@ -GROUP RdfoxExample -DATASET TTL rdf/spb1k +-SKIP --