Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correcting the set of numValues for column chunk in CopyRel and fix writing null struct entry #2445

Merged
merged 1 commit into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions src/include/processor/operator/partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@ struct PartitioningBuffer {
void merge(std::unique_ptr<PartitioningBuffer> localPartitioningStates);
};

// NOTE: Currently, Partitioner is tightly coupled with CopyRel. We should generalize it later when
// necessary. Here, each partition is essentially a node group.
struct PartitionerSharedState {
std::mutex mtx;
std::vector<common::partition_idx_t> numPartitions;
std::vector<common::offset_t> maxNodeOffsets; // max node offset in each direction.
std::vector<common::partition_idx_t> numPartitions; // num of partitions in each direction.
std::vector<std::unique_ptr<PartitioningBuffer>> partitioningBuffers;
common::partition_idx_t nextPartitionIdx = 0;

void initialize();
common::partition_idx_t getNextPartition(common::vector_idx_t partitioningIdx);
void resetState();
void merge(std::vector<std::unique_ptr<PartitioningBuffer>> localPartitioningStates);
Expand Down Expand Up @@ -76,10 +80,11 @@ class Partitioner : public Sink {

std::unique_ptr<PhysicalOperator> clone() final;

private:
void initializePartitioningStates(
std::vector<std::unique_ptr<PartitioningBuffer>>& partitioningBuffers);
static void initializePartitioningStates(
std::vector<std::unique_ptr<PartitioningBuffer>>& partitioningBuffers,
std::vector<common::partition_idx_t> numPartitions);

private:
// TODO: For now, CopyRel will guarantee all data are inside one data chunk. Should be
// generalized to resultSet later if needed.
void copyDataToPartitions(common::partition_idx_t partitioningIdx,
Expand Down
17 changes: 11 additions & 6 deletions src/include/storage/store/node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@ class NodeGroup {
virtual ~NodeGroup() = default;

inline uint64_t getNodeGroupIdx() const { return nodeGroupIdx; }
inline common::offset_t getNumNodes() const { return numNodes; }
inline common::row_idx_t getNumRows() const { return numRows; }
inline ColumnChunk* getColumnChunk(common::column_id_t columnID) {
KU_ASSERT(columnID < chunks.size());
return chunks[columnID].get();
}
inline bool isFull() const { return numNodes == common::StorageConstants::NODE_GROUP_SIZE; }
inline bool isFull() const { return numRows == common::StorageConstants::NODE_GROUP_SIZE; }

void resetToEmpty();
void setChunkToAllNull(common::vector_idx_t chunkIdx);
void setAllNull();
void setNumValues(common::offset_t numValues);
void resizeChunks(uint64_t newSize);

uint64_t append(const std::vector<common::ValueVector*>& columnVectors,
Expand All @@ -37,7 +38,7 @@ class NodeGroup {

private:
uint64_t nodeGroupIdx;
common::offset_t numNodes;
common::row_idx_t numRows;
std::vector<std::unique_ptr<ColumnChunk>> chunks;
};

Expand All @@ -59,9 +60,13 @@ class CSRNodeGroup : public NodeGroup {
};

struct NodeGroupFactory {
static std::unique_ptr<NodeGroup> createNodeGroup(common::ColumnDataFormat dataFormat,
static inline std::unique_ptr<NodeGroup> createNodeGroup(common::ColumnDataFormat dataFormat,
const std::vector<std::unique_ptr<common::LogicalType>>& columnTypes,
bool enableCompression, uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE);
bool enableCompression, uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE) {
return dataFormat == common::ColumnDataFormat::REGULAR ?
std::make_unique<NodeGroup>(columnTypes, enableCompression, capacity) :
std::make_unique<CSRNodeGroup>(columnTypes, enableCompression);
}
};

} // namespace storage
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/struct_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class StructColumn final : public Column {
}
void write(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom,
uint32_t posInVectorToWriteFrom) override;
void setNull(common::offset_t nodeOffset) override;

void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, LocalVectorCollection* localColumnChunk,
Expand Down
18 changes: 9 additions & 9 deletions src/processor/map/map_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,6 @@ std::unique_ptr<PhysicalOperator> PlanMapper::createCopyRel(
auto outFSchema = copyFrom->getSchema();
auto tableSchema = dynamic_cast<RelTableSchema*>(copyFromInfo->tableSchema);
auto partitioningIdx = direction == RelDataDirection::FWD ? 0 : 1;
auto maxBoundNodeOffset = storageManager.getNodesStatisticsAndDeletedIDs()->getMaxNodeOffset(
transaction::Transaction::getDummyReadOnlyTrx().get(),
tableSchema->getBoundTableID(direction));
// TODO(Guodong/Xiyang): Consider moving this to Partitioner::initGlobalStateInternal.
auto numPartitions = (maxBoundNodeOffset + StorageConstants::NODE_GROUP_SIZE) /
StorageConstants::NODE_GROUP_SIZE;
partitionerSharedState->numPartitions[partitioningIdx] = numPartitions;
auto dataFormat = tableSchema->isSingleMultiplicityInDirection(direction) ?
ColumnDataFormat::REGULAR :
ColumnDataFormat::CSR;
Expand All @@ -190,10 +183,17 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyRelFrom(
auto tableSchema = reinterpret_cast<RelTableSchema*>(copyFromInfo->tableSchema);
auto prevOperator = mapOperator(copyFrom->getChild(0).get());
KU_ASSERT(prevOperator->getOperatorType() == PhysicalOperatorType::PARTITIONER);
auto nodesStats = storageManager.getNodesStatisticsAndDeletedIDs();
auto partitionerSharedState = dynamic_cast<Partitioner*>(prevOperator.get())->getSharedState();
partitionerSharedState->numPartitions.resize(2);
std::vector<std::unique_ptr<LogicalType>> columnTypes;
partitionerSharedState->maxNodeOffsets.resize(2);
partitionerSharedState->maxNodeOffsets[0] =
nodesStats->getMaxNodeOffset(transaction::Transaction::getDummyReadOnlyTrx().get(),
tableSchema->getBoundTableID(RelDataDirection::FWD));
partitionerSharedState->maxNodeOffsets[1] =
nodesStats->getMaxNodeOffset(transaction::Transaction::getDummyReadOnlyTrx().get(),
tableSchema->getBoundTableID(RelDataDirection::BWD));
// TODO(Xiyang): Move binding of column types to binder.
std::vector<std::unique_ptr<LogicalType>> columnTypes;
columnTypes.push_back(LogicalType::INTERNAL_ID()); // ADJ COLUMN.
for (auto& property : tableSchema->properties) {
columnTypes.push_back(property->getDataType()->copy());
Expand Down
26 changes: 19 additions & 7 deletions src/processor/operator/partitioner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ std::vector<std::unique_ptr<PartitioningInfo>> PartitioningInfo::copy(
return result;
}

void PartitionerSharedState::initialize() {
KU_ASSERT(maxNodeOffsets.size() == 2);
numPartitions.resize(maxNodeOffsets.size());
for (auto i = 0u; i < maxNodeOffsets.size(); i++) {
numPartitions[i] = (maxNodeOffsets[i] + StorageConstants::NODE_GROUP_SIZE) /
StorageConstants::NODE_GROUP_SIZE;
}
Partitioner::initializePartitioningStates(partitioningBuffers, numPartitions);
}

partition_idx_t PartitionerSharedState::getNextPartition(vector_idx_t partitioningIdx) {
std::unique_lock xLck{mtx};
if (nextPartitionIdx >= numPartitions[partitioningIdx]) {
Expand Down Expand Up @@ -71,12 +81,12 @@ Partitioner::Partitioner(std::unique_ptr<ResultSetDescriptor> resultSetDescripto
}

void Partitioner::initGlobalStateInternal(ExecutionContext* /*context*/) {
initializePartitioningStates(sharedState->partitioningBuffers);
sharedState->initialize();
}

void Partitioner::initLocalStateInternal(ResultSet* /*resultSet*/, ExecutionContext* /*context*/) {
localState = std::make_unique<PartitionerLocalState>();
initializePartitioningStates(localState->partitioningBuffers);
initializePartitioningStates(localState->partitioningBuffers, sharedState->numPartitions);
}

static void constructDataChunk(DataChunk* dataChunk, const std::vector<DataPos>& columnPositions,
Expand Down Expand Up @@ -112,11 +122,11 @@ void Partitioner::executeInternal(ExecutionContext* context) {
}

void Partitioner::initializePartitioningStates(
std::vector<std::unique_ptr<PartitioningBuffer>>& partitioningBuffers) {
KU_ASSERT(infos.size() == sharedState->numPartitions.size());
partitioningBuffers.resize(infos.size());
for (auto partitioningIdx = 0u; partitioningIdx < infos.size(); partitioningIdx++) {
auto numPartition = sharedState->numPartitions[partitioningIdx];
std::vector<std::unique_ptr<PartitioningBuffer>>& partitioningBuffers,
std::vector<common::partition_idx_t> numPartitions) {
partitioningBuffers.resize(numPartitions.size());
for (auto partitioningIdx = 0u; partitioningIdx < numPartitions.size(); partitioningIdx++) {
auto numPartition = numPartitions[partitioningIdx];
auto partitioningBuffer = std::make_unique<PartitioningBuffer>();
partitioningBuffer->partitions.reserve(numPartition);
for (auto i = 0u; i < numPartition; i++) {
Expand All @@ -131,6 +141,8 @@ void Partitioner::copyDataToPartitions(
for (auto i = 0u; i < chunkToCopyFrom->state->selVector->selectedSize; i++) {
auto posToCopyFrom = chunkToCopyFrom->state->selVector->selectedPositions[i];
auto partitionIdx = partitionIdxes->getValue<partition_idx_t>(posToCopyFrom);
KU_ASSERT(
partitionIdx < localState->getPartitioningBuffer(partitioningIdx)->partitions.size());
auto partition =
localState->getPartitioningBuffer(partitioningIdx)->partitions[partitionIdx].get();
if (partition->empty() ||
Expand Down
8 changes: 4 additions & 4 deletions src/processor/operator/persistent/copy_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void CopyNodeSharedState::appendLocalNodeGroup(std::unique_ptr<NodeGroup> localN
CopyNode::writeAndResetNodeGroup(
nodeGroupIdx, pkIndex.get(), pkColumnIdx, table, sharedNodeGroup.get());
}
if (numNodesAppended < localNodeGroup->getNumNodes()) {
if (numNodesAppended < localNodeGroup->getNumRows()) {
sharedNodeGroup->append(localNodeGroup.get(), numNodesAppended);
}
}
Expand Down Expand Up @@ -92,7 +92,7 @@ void CopyNode::executeInternal(ExecutionContext* context) {
copyToNodeGroup();
columnState->selVector = std::move(originalSelVector);
}
if (localNodeGroup->getNumNodes() > 0) {
if (localNodeGroup->getNumRows() > 0) {
sharedState->appendLocalNodeGroup(std::move(localNodeGroup));
}
}
Expand All @@ -104,7 +104,7 @@ void CopyNode::writeAndResetNodeGroup(node_group_idx_t nodeGroupIdx,
auto startOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
if (pkIndex) {
populatePKIndex(pkIndex, nodeGroup->getColumnChunk(pkColumnID), startOffset,
nodeGroup->getNumNodes() /* startPageIdx */);
nodeGroup->getNumRows() /* startPageIdx */);
}
table->append(nodeGroup);
nodeGroup->resetToEmpty();
Expand Down Expand Up @@ -158,7 +158,7 @@ void CopyNode::checkNonNullConstraint(NullColumnChunk* nullChunk, offset_t numNo
void CopyNode::finalize(ExecutionContext* context) {
uint64_t numNodes = StorageUtils::getStartOffsetOfNodeGroup(sharedState->getCurNodeGroupIdx());
if (sharedState->sharedNodeGroup) {
numNodes += sharedState->sharedNodeGroup->getNumNodes();
numNodes += sharedState->sharedNodeGroup->getNumRows();
auto nodeGroupIdx = sharedState->getNextNodeGroupIdx();
writeAndResetNodeGroup(nodeGroupIdx, sharedState->pkIndex.get(), sharedState->pkColumnIdx,
sharedState->table, sharedState->sharedNodeGroup.get());
Expand Down
6 changes: 3 additions & 3 deletions src/processor/operator/persistent/copy_rdf_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ void CopyRdfResource::executeInternal(ExecutionContext* context) {
copyToNodeGroup(vector);
columnState->selVector = std::move(originalSelVector);
}
if (localNodeGroup->getNumNodes() > 0) {
if (localNodeGroup->getNumRows() > 0) {
sharedState->appendLocalNodeGroup(std::move(localNodeGroup));
}
}

void CopyRdfResource::finalize(ExecutionContext* context) {
uint64_t numNodes = StorageUtils::getStartOffsetOfNodeGroup(sharedState->getCurNodeGroupIdx());
if (sharedState->sharedNodeGroup) {
numNodes += sharedState->sharedNodeGroup->getNumNodes();
numNodes += sharedState->sharedNodeGroup->getNumRows();
auto nodeGroupIdx = sharedState->getNextNodeGroupIdx();
writeNodeGroup(nodeGroupIdx, sharedState->table, sharedState->sharedNodeGroup.get());
}
Expand All @@ -76,7 +76,7 @@ void CopyRdfResource::insertIndex(ValueVector* vector) {
common::sel_t nextPos = 0;
common::offset_t result;
auto offset = StorageUtils::getStartOffsetOfNodeGroup(sharedState->getCurNodeGroupIdx()) +
localNodeGroup->getNumNodes();
localNodeGroup->getNumRows();
for (auto i = 0u; i < vector->state->getNumSelectedValues(); i++) {
auto uriStr = vector->getValue<common::ku_string_t>(i).getAsString();
if (!sharedState->pkIndex->lookup(uriStr.c_str(), result)) {
Expand Down
30 changes: 17 additions & 13 deletions src/processor/operator/persistent/copy_rel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,28 @@ void CopyRel::executeInternal(ExecutionContext* /*context*/) {
.get();
auto startOffset = StorageUtils::getStartOffsetOfNodeGroup(localState->currentPartition);
auto offsetVectorIdx = info->dataDirection == RelDataDirection::FWD ? 0 : 1;
row_idx_t numRows = 0;
row_idx_t numRels = 0;
for (auto& dataChunk : *partitioningBuffer) {
auto offsetVector = dataChunk->getValueVector(offsetVectorIdx).get();
setOffsetToWithinNodeGroup(offsetVector, startOffset);
numRows += offsetVector->state->selVector->selectedSize;
numRels += offsetVector->state->selVector->selectedSize;
}
ColumnChunk* csrOffsetChunk = nullptr;
// Calculate num of source nodes in this node group.
// This will be used to set the num of values of the node group.
auto numNodes = std::min(StorageConstants::NODE_GROUP_SIZE,
partitionerSharedState->maxNodeOffsets[info->partitioningIdx] - startOffset + 1);
if (info->dataFormat == ColumnDataFormat::CSR) {
auto csrNodeGroup = static_cast<CSRNodeGroup*>(localState->nodeGroup.get());
csrOffsetChunk = csrNodeGroup->getCSROffsetChunk();
csrOffsetChunk->setNumValues(StorageConstants::NODE_GROUP_SIZE);
// CSR offset chunk should be aligned with num of source nodes in this node group.
csrOffsetChunk->setNumValues(numNodes);
populateCSROffsets(csrOffsetChunk, partitioningBuffer, offsetVectorIdx);
// Resize csr data column chunks.
localState->nodeGroup->resizeChunks(numRows);
localState->nodeGroup->resizeChunks(numRels);
} else {
// Set adj column chunk to all null.
localState->nodeGroup->setChunkToAllNull(0 /* chunkIdx */);
localState->nodeGroup->setAllNull();
localState->nodeGroup->getColumnChunk(0)->setNumValues(numNodes);
}
for (auto& dataChunk : *partitioningBuffer) {
if (info->dataFormat == ColumnDataFormat::CSR) {
Expand All @@ -84,7 +89,7 @@ void CopyRel::executeInternal(ExecutionContext* /*context*/) {
localState->nodeGroup->finalize(localState->currentPartition);
// Flush node group to table.
sharedState->table->append(localState->nodeGroup.get(), info->dataDirection);
sharedState->incrementNumRows(localState->nodeGroup->getNumNodes());
sharedState->incrementNumRows(localState->nodeGroup->getNumRows());
localState->nodeGroup->resetToEmpty();
}
}
Expand All @@ -111,22 +116,21 @@ void CopyRel::populateCSROffsets(
}
}

// TODO(Guodong): Can we guarantee vector is not filtered and get rid of access to selVector?
void CopyRel::setOffsetToWithinNodeGroup(ValueVector* vector, offset_t startOffset) {
KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::INT64);
KU_ASSERT(vector->state->selVector->isUnfiltered());
auto offsets = (offset_t*)vector->getData();
for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) {
auto pos = vector->state->selVector->selectedPositions[i];
offsets[pos] -= startOffset;
offsets[i] -= startOffset;
}
}

void CopyRel::setOffsetFromCSROffsets(ValueVector* offsetVector, offset_t* csrOffsets) {
KU_ASSERT(offsetVector->dataType.getPhysicalType() == PhysicalTypeID::INT64);
KU_ASSERT(offsetVector->state->selVector->isUnfiltered());
for (auto i = 0u; i < offsetVector->state->selVector->selectedSize; i++) {
auto pos = offsetVector->state->selVector->selectedPositions[i];
auto nodeOffset = offsetVector->getValue<offset_t>(pos);
offsetVector->setValue(pos, csrOffsets[nodeOffset]);
auto nodeOffset = offsetVector->getValue<offset_t>(i);
offsetVector->setValue(i, csrOffsets[nodeOffset]);
csrOffsets[nodeOffset]++;
}
}
Expand Down
Loading