diff --git a/src/include/processor/operator/partitioner.h b/src/include/processor/operator/partitioner.h index e305d05de0..7dbfd75c5e 100644 --- a/src/include/processor/operator/partitioner.h +++ b/src/include/processor/operator/partitioner.h @@ -22,12 +22,16 @@ struct PartitioningBuffer { void merge(std::unique_ptr localPartitioningStates); }; +// NOTE: Currently, Partitioner is tightly coupled with CopyRel. We should generalize it later when +// necessary. Here, each partition is essentially a node group. struct PartitionerSharedState { std::mutex mtx; - std::vector numPartitions; + std::vector maxNodeOffsets; // max node offset in each direction. + std::vector numPartitions; // num of partitions in each direction. std::vector> partitioningBuffers; common::partition_idx_t nextPartitionIdx = 0; + void initialize(); common::partition_idx_t getNextPartition(common::vector_idx_t partitioningIdx); void resetState(); void merge(std::vector> localPartitioningStates); @@ -76,10 +80,11 @@ class Partitioner : public Sink { std::unique_ptr clone() final; -private: - void initializePartitioningStates( - std::vector>& partitioningBuffers); + static void initializePartitioningStates( + std::vector>& partitioningBuffers, + std::vector numPartitions); +private: // TODO: For now, CopyRel will guarantee all data are inside one data chunk. Should be // generalized to resultSet later if needed. void copyDataToPartitions(common::partition_idx_t partitioningIdx, diff --git a/src/include/storage/store/node_group.h b/src/include/storage/store/node_group.h index 1c04a61234..c9ce64ee10 100644 --- a/src/include/storage/store/node_group.h +++ b/src/include/storage/store/node_group.h @@ -17,15 +17,16 @@ class NodeGroup { virtual ~NodeGroup() = default; inline uint64_t getNodeGroupIdx() const { return nodeGroupIdx; } - inline common::offset_t getNumNodes() const { return numNodes; } + inline common::row_idx_t getNumRows() const { return numRows; } inline ColumnChunk* getColumnChunk(common::column_id_t columnID) { KU_ASSERT(columnID < chunks.size()); return chunks[columnID].get(); } - inline bool isFull() const { return numNodes == common::StorageConstants::NODE_GROUP_SIZE; } + inline bool isFull() const { return numRows == common::StorageConstants::NODE_GROUP_SIZE; } void resetToEmpty(); - void setChunkToAllNull(common::vector_idx_t chunkIdx); + void setAllNull(); + void setNumValues(common::offset_t numValues); void resizeChunks(uint64_t newSize); uint64_t append(const std::vector& columnVectors, @@ -37,7 +38,7 @@ class NodeGroup { private: uint64_t nodeGroupIdx; - common::offset_t numNodes; + common::row_idx_t numRows; std::vector> chunks; }; @@ -59,9 +60,13 @@ class CSRNodeGroup : public NodeGroup { }; struct NodeGroupFactory { - static std::unique_ptr createNodeGroup(common::ColumnDataFormat dataFormat, + static inline std::unique_ptr createNodeGroup(common::ColumnDataFormat dataFormat, const std::vector>& columnTypes, - bool enableCompression, uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE); + bool enableCompression, uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE) { + return dataFormat == common::ColumnDataFormat::REGULAR ? + std::make_unique(columnTypes, enableCompression, capacity) : + std::make_unique(columnTypes, enableCompression); + } }; } // namespace storage diff --git a/src/include/storage/store/struct_column.h b/src/include/storage/store/struct_column.h index 89a955b93e..f84c309199 100644 --- a/src/include/storage/store/struct_column.h +++ b/src/include/storage/store/struct_column.h @@ -29,6 +29,7 @@ class StructColumn final : public Column { } void write(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) override; + void setNull(common::offset_t nodeOffset) override; void prepareCommitForChunk(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx, LocalVectorCollection* localColumnChunk, diff --git a/src/processor/map/map_copy_from.cpp b/src/processor/map/map_copy_from.cpp index 04e6e73776..02d2ba6352 100644 --- a/src/processor/map/map_copy_from.cpp +++ b/src/processor/map/map_copy_from.cpp @@ -165,13 +165,6 @@ std::unique_ptr PlanMapper::createCopyRel( auto outFSchema = copyFrom->getSchema(); auto tableSchema = dynamic_cast(copyFromInfo->tableSchema); auto partitioningIdx = direction == RelDataDirection::FWD ? 0 : 1; - auto maxBoundNodeOffset = storageManager.getNodesStatisticsAndDeletedIDs()->getMaxNodeOffset( - transaction::Transaction::getDummyReadOnlyTrx().get(), - tableSchema->getBoundTableID(direction)); - // TODO(Guodong/Xiyang): Consider moving this to Partitioner::initGlobalStateInternal. - auto numPartitions = (maxBoundNodeOffset + StorageConstants::NODE_GROUP_SIZE) / - StorageConstants::NODE_GROUP_SIZE; - partitionerSharedState->numPartitions[partitioningIdx] = numPartitions; auto dataFormat = tableSchema->isSingleMultiplicityInDirection(direction) ? ColumnDataFormat::REGULAR : ColumnDataFormat::CSR; @@ -190,10 +183,17 @@ std::unique_ptr PlanMapper::mapCopyRelFrom( auto tableSchema = reinterpret_cast(copyFromInfo->tableSchema); auto prevOperator = mapOperator(copyFrom->getChild(0).get()); KU_ASSERT(prevOperator->getOperatorType() == PhysicalOperatorType::PARTITIONER); + auto nodesStats = storageManager.getNodesStatisticsAndDeletedIDs(); auto partitionerSharedState = dynamic_cast(prevOperator.get())->getSharedState(); - partitionerSharedState->numPartitions.resize(2); - std::vector> columnTypes; + partitionerSharedState->maxNodeOffsets.resize(2); + partitionerSharedState->maxNodeOffsets[0] = + nodesStats->getMaxNodeOffset(transaction::Transaction::getDummyReadOnlyTrx().get(), + tableSchema->getBoundTableID(RelDataDirection::FWD)); + partitionerSharedState->maxNodeOffsets[1] = + nodesStats->getMaxNodeOffset(transaction::Transaction::getDummyReadOnlyTrx().get(), + tableSchema->getBoundTableID(RelDataDirection::BWD)); // TODO(Xiyang): Move binding of column types to binder. + std::vector> columnTypes; columnTypes.push_back(LogicalType::INTERNAL_ID()); // ADJ COLUMN. for (auto& property : tableSchema->properties) { columnTypes.push_back(property->getDataType()->copy()); diff --git a/src/processor/operator/partitioner.cpp b/src/processor/operator/partitioner.cpp index 382ebd7a54..758985ef8e 100644 --- a/src/processor/operator/partitioner.cpp +++ b/src/processor/operator/partitioner.cpp @@ -27,6 +27,16 @@ std::vector> PartitioningInfo::copy( return result; } +void PartitionerSharedState::initialize() { + KU_ASSERT(maxNodeOffsets.size() == 2); + numPartitions.resize(maxNodeOffsets.size()); + for (auto i = 0u; i < maxNodeOffsets.size(); i++) { + numPartitions[i] = (maxNodeOffsets[i] + StorageConstants::NODE_GROUP_SIZE) / + StorageConstants::NODE_GROUP_SIZE; + } + Partitioner::initializePartitioningStates(partitioningBuffers, numPartitions); +} + partition_idx_t PartitionerSharedState::getNextPartition(vector_idx_t partitioningIdx) { std::unique_lock xLck{mtx}; if (nextPartitionIdx >= numPartitions[partitioningIdx]) { @@ -71,12 +81,12 @@ Partitioner::Partitioner(std::unique_ptr resultSetDescripto } void Partitioner::initGlobalStateInternal(ExecutionContext* /*context*/) { - initializePartitioningStates(sharedState->partitioningBuffers); + sharedState->initialize(); } void Partitioner::initLocalStateInternal(ResultSet* /*resultSet*/, ExecutionContext* /*context*/) { localState = std::make_unique(); - initializePartitioningStates(localState->partitioningBuffers); + initializePartitioningStates(localState->partitioningBuffers, sharedState->numPartitions); } static void constructDataChunk(DataChunk* dataChunk, const std::vector& columnPositions, @@ -112,11 +122,11 @@ void Partitioner::executeInternal(ExecutionContext* context) { } void Partitioner::initializePartitioningStates( - std::vector>& partitioningBuffers) { - KU_ASSERT(infos.size() == sharedState->numPartitions.size()); - partitioningBuffers.resize(infos.size()); - for (auto partitioningIdx = 0u; partitioningIdx < infos.size(); partitioningIdx++) { - auto numPartition = sharedState->numPartitions[partitioningIdx]; + std::vector>& partitioningBuffers, + std::vector numPartitions) { + partitioningBuffers.resize(numPartitions.size()); + for (auto partitioningIdx = 0u; partitioningIdx < numPartitions.size(); partitioningIdx++) { + auto numPartition = numPartitions[partitioningIdx]; auto partitioningBuffer = std::make_unique(); partitioningBuffer->partitions.reserve(numPartition); for (auto i = 0u; i < numPartition; i++) { @@ -131,6 +141,8 @@ void Partitioner::copyDataToPartitions( for (auto i = 0u; i < chunkToCopyFrom->state->selVector->selectedSize; i++) { auto posToCopyFrom = chunkToCopyFrom->state->selVector->selectedPositions[i]; auto partitionIdx = partitionIdxes->getValue(posToCopyFrom); + KU_ASSERT( + partitionIdx < localState->getPartitioningBuffer(partitioningIdx)->partitions.size()); auto partition = localState->getPartitioningBuffer(partitioningIdx)->partitions[partitionIdx].get(); if (partition->empty() || diff --git a/src/processor/operator/persistent/copy_node.cpp b/src/processor/operator/persistent/copy_node.cpp index 0fa6b936ab..ed2ac41052 100644 --- a/src/processor/operator/persistent/copy_node.cpp +++ b/src/processor/operator/persistent/copy_node.cpp @@ -42,7 +42,7 @@ void CopyNodeSharedState::appendLocalNodeGroup(std::unique_ptr localN CopyNode::writeAndResetNodeGroup( nodeGroupIdx, pkIndex.get(), pkColumnIdx, table, sharedNodeGroup.get()); } - if (numNodesAppended < localNodeGroup->getNumNodes()) { + if (numNodesAppended < localNodeGroup->getNumRows()) { sharedNodeGroup->append(localNodeGroup.get(), numNodesAppended); } } @@ -92,7 +92,7 @@ void CopyNode::executeInternal(ExecutionContext* context) { copyToNodeGroup(); columnState->selVector = std::move(originalSelVector); } - if (localNodeGroup->getNumNodes() > 0) { + if (localNodeGroup->getNumRows() > 0) { sharedState->appendLocalNodeGroup(std::move(localNodeGroup)); } } @@ -104,7 +104,7 @@ void CopyNode::writeAndResetNodeGroup(node_group_idx_t nodeGroupIdx, auto startOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx); if (pkIndex) { populatePKIndex(pkIndex, nodeGroup->getColumnChunk(pkColumnID), startOffset, - nodeGroup->getNumNodes() /* startPageIdx */); + nodeGroup->getNumRows() /* startPageIdx */); } table->append(nodeGroup); nodeGroup->resetToEmpty(); @@ -158,7 +158,7 @@ void CopyNode::checkNonNullConstraint(NullColumnChunk* nullChunk, offset_t numNo void CopyNode::finalize(ExecutionContext* context) { uint64_t numNodes = StorageUtils::getStartOffsetOfNodeGroup(sharedState->getCurNodeGroupIdx()); if (sharedState->sharedNodeGroup) { - numNodes += sharedState->sharedNodeGroup->getNumNodes(); + numNodes += sharedState->sharedNodeGroup->getNumRows(); auto nodeGroupIdx = sharedState->getNextNodeGroupIdx(); writeAndResetNodeGroup(nodeGroupIdx, sharedState->pkIndex.get(), sharedState->pkColumnIdx, sharedState->table, sharedState->sharedNodeGroup.get()); diff --git a/src/processor/operator/persistent/copy_rdf_resource.cpp b/src/processor/operator/persistent/copy_rdf_resource.cpp index 13665025b6..a50a3a9d23 100644 --- a/src/processor/operator/persistent/copy_rdf_resource.cpp +++ b/src/processor/operator/persistent/copy_rdf_resource.cpp @@ -47,7 +47,7 @@ void CopyRdfResource::executeInternal(ExecutionContext* context) { copyToNodeGroup(vector); columnState->selVector = std::move(originalSelVector); } - if (localNodeGroup->getNumNodes() > 0) { + if (localNodeGroup->getNumRows() > 0) { sharedState->appendLocalNodeGroup(std::move(localNodeGroup)); } } @@ -55,7 +55,7 @@ void CopyRdfResource::executeInternal(ExecutionContext* context) { void CopyRdfResource::finalize(ExecutionContext* context) { uint64_t numNodes = StorageUtils::getStartOffsetOfNodeGroup(sharedState->getCurNodeGroupIdx()); if (sharedState->sharedNodeGroup) { - numNodes += sharedState->sharedNodeGroup->getNumNodes(); + numNodes += sharedState->sharedNodeGroup->getNumRows(); auto nodeGroupIdx = sharedState->getNextNodeGroupIdx(); writeNodeGroup(nodeGroupIdx, sharedState->table, sharedState->sharedNodeGroup.get()); } @@ -76,7 +76,7 @@ void CopyRdfResource::insertIndex(ValueVector* vector) { common::sel_t nextPos = 0; common::offset_t result; auto offset = StorageUtils::getStartOffsetOfNodeGroup(sharedState->getCurNodeGroupIdx()) + - localNodeGroup->getNumNodes(); + localNodeGroup->getNumRows(); for (auto i = 0u; i < vector->state->getNumSelectedValues(); i++) { auto uriStr = vector->getValue(i).getAsString(); if (!sharedState->pkIndex->lookup(uriStr.c_str(), result)) { diff --git a/src/processor/operator/persistent/copy_rel.cpp b/src/processor/operator/persistent/copy_rel.cpp index 099c2f2c1a..7de0037806 100644 --- a/src/processor/operator/persistent/copy_rel.cpp +++ b/src/processor/operator/persistent/copy_rel.cpp @@ -55,23 +55,28 @@ void CopyRel::executeInternal(ExecutionContext* /*context*/) { .get(); auto startOffset = StorageUtils::getStartOffsetOfNodeGroup(localState->currentPartition); auto offsetVectorIdx = info->dataDirection == RelDataDirection::FWD ? 0 : 1; - row_idx_t numRows = 0; + row_idx_t numRels = 0; for (auto& dataChunk : *partitioningBuffer) { auto offsetVector = dataChunk->getValueVector(offsetVectorIdx).get(); setOffsetToWithinNodeGroup(offsetVector, startOffset); - numRows += offsetVector->state->selVector->selectedSize; + numRels += offsetVector->state->selVector->selectedSize; } ColumnChunk* csrOffsetChunk = nullptr; + // Calculate num of source nodes in this node group. + // This will be used to set the num of values of the node group. + auto numNodes = std::min(StorageConstants::NODE_GROUP_SIZE, + partitionerSharedState->maxNodeOffsets[info->partitioningIdx] - startOffset + 1); if (info->dataFormat == ColumnDataFormat::CSR) { auto csrNodeGroup = static_cast(localState->nodeGroup.get()); csrOffsetChunk = csrNodeGroup->getCSROffsetChunk(); - csrOffsetChunk->setNumValues(StorageConstants::NODE_GROUP_SIZE); + // CSR offset chunk should be aligned with num of source nodes in this node group. + csrOffsetChunk->setNumValues(numNodes); populateCSROffsets(csrOffsetChunk, partitioningBuffer, offsetVectorIdx); // Resize csr data column chunks. - localState->nodeGroup->resizeChunks(numRows); + localState->nodeGroup->resizeChunks(numRels); } else { - // Set adj column chunk to all null. - localState->nodeGroup->setChunkToAllNull(0 /* chunkIdx */); + localState->nodeGroup->setAllNull(); + localState->nodeGroup->getColumnChunk(0)->setNumValues(numNodes); } for (auto& dataChunk : *partitioningBuffer) { if (info->dataFormat == ColumnDataFormat::CSR) { @@ -84,7 +89,7 @@ void CopyRel::executeInternal(ExecutionContext* /*context*/) { localState->nodeGroup->finalize(localState->currentPartition); // Flush node group to table. sharedState->table->append(localState->nodeGroup.get(), info->dataDirection); - sharedState->incrementNumRows(localState->nodeGroup->getNumNodes()); + sharedState->incrementNumRows(localState->nodeGroup->getNumRows()); localState->nodeGroup->resetToEmpty(); } } @@ -111,22 +116,21 @@ void CopyRel::populateCSROffsets( } } -// TODO(Guodong): Can we guarantee vector is not filtered and get rid of access to selVector? void CopyRel::setOffsetToWithinNodeGroup(ValueVector* vector, offset_t startOffset) { KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::INT64); + KU_ASSERT(vector->state->selVector->isUnfiltered()); auto offsets = (offset_t*)vector->getData(); for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) { - auto pos = vector->state->selVector->selectedPositions[i]; - offsets[pos] -= startOffset; + offsets[i] -= startOffset; } } void CopyRel::setOffsetFromCSROffsets(ValueVector* offsetVector, offset_t* csrOffsets) { KU_ASSERT(offsetVector->dataType.getPhysicalType() == PhysicalTypeID::INT64); + KU_ASSERT(offsetVector->state->selVector->isUnfiltered()); for (auto i = 0u; i < offsetVector->state->selVector->selectedSize; i++) { - auto pos = offsetVector->state->selVector->selectedPositions[i]; - auto nodeOffset = offsetVector->getValue(pos); - offsetVector->setValue(pos, csrOffsets[nodeOffset]); + auto nodeOffset = offsetVector->getValue(i); + offsetVector->setValue(i, csrOffsets[nodeOffset]); csrOffsets[nodeOffset]++; } } diff --git a/src/storage/store/node_group.cpp b/src/storage/store/node_group.cpp index b9dfd3fb10..5614a24162 100644 --- a/src/storage/store/node_group.cpp +++ b/src/storage/store/node_group.cpp @@ -11,7 +11,7 @@ namespace storage { NodeGroup::NodeGroup(const std::vector>& columnTypes, bool enableCompression, uint64_t capacity) - : nodeGroupIdx{UINT64_MAX}, numNodes{0} { + : nodeGroupIdx{UINT64_MAX}, numRows{0} { chunks.reserve(columnTypes.size()); for (auto& type : columnTypes) { chunks.push_back( @@ -20,7 +20,7 @@ NodeGroup::NodeGroup(const std::vector>& co } NodeGroup::NodeGroup(const std::vector>& columns, bool enableCompression) - : nodeGroupIdx{UINT64_MAX}, numNodes{0} { + : nodeGroupIdx{UINT64_MAX}, numRows{0} { chunks.reserve(columns.size()); for (auto columnID = 0u; columnID < columns.size(); columnID++) { chunks.push_back(ColumnChunkFactory::createColumnChunk( @@ -29,16 +29,23 @@ NodeGroup::NodeGroup(const std::vector>& columns, bool e } void NodeGroup::resetToEmpty() { - numNodes = 0; + numRows = 0; nodeGroupIdx = UINT64_MAX; for (auto& chunk : chunks) { chunk->resetToEmpty(); } } -void NodeGroup::setChunkToAllNull(common::vector_idx_t chunkIdx) { - KU_ASSERT(chunkIdx < chunks.size()); - chunks[chunkIdx]->getNullChunk()->resetToAllNull(); +void NodeGroup::setAllNull() { + for (auto& chunk : chunks) { + chunk->getNullChunk()->resetToAllNull(); + } +} + +void NodeGroup::setNumValues(common::offset_t numValues) { + for (auto& chunk : chunks) { + chunk->setNumValues(numValues); + } } void NodeGroup::resizeChunks(uint64_t newSize) { @@ -50,7 +57,7 @@ void NodeGroup::resizeChunks(uint64_t newSize) { uint64_t NodeGroup::append(const std::vector& columnVectors, DataChunkState* columnState, uint64_t numValuesToAppend) { auto numValuesToAppendInChunk = - std::min(numValuesToAppend, StorageConstants::NODE_GROUP_SIZE - numNodes); + std::min(numValuesToAppend, StorageConstants::NODE_GROUP_SIZE - numRows); auto serialSkip = 0u; auto originalSize = columnState->selVector->selectedSize; columnState->selVector->selectedSize = numValuesToAppendInChunk; @@ -64,18 +71,18 @@ uint64_t NodeGroup::append(const std::vector& columnVectors, chunk->append(columnVectors[i - serialSkip]); } columnState->selVector->selectedSize = originalSize; - numNodes += numValuesToAppendInChunk; + numRows += numValuesToAppendInChunk; return numValuesToAppendInChunk; } offset_t NodeGroup::append(NodeGroup* other, offset_t offsetInOtherNodeGroup) { KU_ASSERT(other->chunks.size() == chunks.size()); auto numNodesToAppend = std::min( - other->numNodes - offsetInOtherNodeGroup, StorageConstants::NODE_GROUP_SIZE - numNodes); + other->numRows - offsetInOtherNodeGroup, StorageConstants::NODE_GROUP_SIZE - numRows); for (auto i = 0u; i < chunks.size(); i++) { chunks[i]->append(other->chunks[i].get(), offsetInOtherNodeGroup, numNodesToAppend); } - numNodes += numNodesToAppend; + numRows += numNodesToAppend; return numNodesToAppend; } @@ -91,7 +98,7 @@ void NodeGroup::write(DataChunk* dataChunk, vector_idx_t offsetVectorIdx) { KU_ASSERT(vectorIdx < dataChunk->getNumValueVectors()); chunks[chunkIdx++]->write(dataChunk->getValueVector(vectorIdx++).get(), offsetVector); } - numNodes += offsetVector->state->selVector->selectedSize; + numRows += offsetVector->state->selVector->selectedSize; } void NodeGroup::finalize(uint64_t nodeGroupIdx_) { @@ -101,13 +108,5 @@ void NodeGroup::finalize(uint64_t nodeGroupIdx_) { } } -std::unique_ptr NodeGroupFactory::createNodeGroup(common::ColumnDataFormat dataFormat, - const std::vector>& columnTypes, bool enableCompression, - uint64_t capacity) { - return dataFormat == ColumnDataFormat::REGULAR ? - std::make_unique(columnTypes, enableCompression, capacity) : - std::make_unique(columnTypes, enableCompression); -} - } // namespace storage } // namespace kuzu diff --git a/src/storage/store/struct_column.cpp b/src/storage/store/struct_column.cpp index 47ac9f1c98..97021d3bd4 100644 --- a/src/storage/store/struct_column.cpp +++ b/src/storage/store/struct_column.cpp @@ -73,6 +73,10 @@ void StructColumn::lookupInternal( void StructColumn::write( offset_t nodeOffset, ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom) { KU_ASSERT(vectorToWriteFrom->dataType.getPhysicalType() == PhysicalTypeID::STRUCT); + if (vectorToWriteFrom->isNull(posInVectorToWriteFrom)) { + setNull(posInVectorToWriteFrom); + return; + } nullColumn->write(nodeOffset, vectorToWriteFrom, posInVectorToWriteFrom); KU_ASSERT(childColumns.size() == StructVector::getFieldVectors(vectorToWriteFrom).size()); for (auto i = 0u; i < childColumns.size(); i++) { @@ -81,6 +85,13 @@ void StructColumn::write( } } +void StructColumn::setNull(common::offset_t nodeOffset) { + nullColumn->setNull(nodeOffset); + for (const auto& childColumn : childColumns) { + childColumn->setNull(nodeOffset); + } +} + void StructColumn::append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) { Column::append(columnChunk, nodeGroupIdx); KU_ASSERT(columnChunk->getDataType()->getPhysicalType() == PhysicalTypeID::STRUCT);