Skip to content

Commit

Permalink
correcting the set of numValues in CopyRel and fix writing null struct
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Nov 17, 2023
1 parent 46c23f3 commit 64a6c4b
Show file tree
Hide file tree
Showing 10 changed files with 102 additions and 65 deletions.
13 changes: 9 additions & 4 deletions src/include/processor/operator/partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@ struct PartitioningBuffer {
void merge(std::unique_ptr<PartitioningBuffer> localPartitioningStates);
};

// NOTE: Currently, Partitioner is tightly coupled with CopyRel. We should generalize it later when
// necessary. Here, each partition is essentially a node group.
struct PartitionerSharedState {
std::mutex mtx;
std::vector<common::partition_idx_t> numPartitions;
std::vector<common::offset_t> maxNodeOffsets; // max node offset in each direction.
std::vector<common::partition_idx_t> numPartitions; // num of partitions in each direction.
std::vector<std::unique_ptr<PartitioningBuffer>> partitioningBuffers;
common::partition_idx_t nextPartitionIdx = 0;

void initialize();
common::partition_idx_t getNextPartition(common::vector_idx_t partitioningIdx);
void resetState();
void merge(std::vector<std::unique_ptr<PartitioningBuffer>> localPartitioningStates);
Expand Down Expand Up @@ -76,10 +80,11 @@ class Partitioner : public Sink {

std::unique_ptr<PhysicalOperator> clone() final;

private:
void initializePartitioningStates(
std::vector<std::unique_ptr<PartitioningBuffer>>& partitioningBuffers);
static void initializePartitioningStates(
std::vector<std::unique_ptr<PartitioningBuffer>>& partitioningBuffers,
std::vector<common::partition_idx_t> numPartitions);

private:
// TODO: For now, CopyRel will guarantee all data are inside one data chunk. Should be
// generalized to resultSet later if needed.
void copyDataToPartitions(common::partition_idx_t partitioningIdx,
Expand Down
17 changes: 11 additions & 6 deletions src/include/storage/store/node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@ class NodeGroup {
virtual ~NodeGroup() = default;

inline uint64_t getNodeGroupIdx() const { return nodeGroupIdx; }
inline common::offset_t getNumNodes() const { return numNodes; }
inline common::row_idx_t getNumRows() const { return numRows; }
inline ColumnChunk* getColumnChunk(common::column_id_t columnID) {
KU_ASSERT(columnID < chunks.size());
return chunks[columnID].get();
}
inline bool isFull() const { return numNodes == common::StorageConstants::NODE_GROUP_SIZE; }
inline bool isFull() const { return numRows == common::StorageConstants::NODE_GROUP_SIZE; }

void resetToEmpty();
void setChunkToAllNull(common::vector_idx_t chunkIdx);
void setAllNull();
void setNumValues(common::offset_t numValues);
void resizeChunks(uint64_t newSize);

uint64_t append(const std::vector<common::ValueVector*>& columnVectors,
Expand All @@ -37,7 +38,7 @@ class NodeGroup {

private:
uint64_t nodeGroupIdx;
common::offset_t numNodes;
common::row_idx_t numRows;
std::vector<std::unique_ptr<ColumnChunk>> chunks;
};

Expand All @@ -59,9 +60,13 @@ class CSRNodeGroup : public NodeGroup {
};

struct NodeGroupFactory {
static std::unique_ptr<NodeGroup> createNodeGroup(common::ColumnDataFormat dataFormat,
static inline std::unique_ptr<NodeGroup> createNodeGroup(common::ColumnDataFormat dataFormat,
const std::vector<std::unique_ptr<common::LogicalType>>& columnTypes,
bool enableCompression, uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE);
bool enableCompression, uint64_t capacity = common::StorageConstants::NODE_GROUP_SIZE) {
return dataFormat == common::ColumnDataFormat::REGULAR ?
std::make_unique<NodeGroup>(columnTypes, enableCompression, capacity) :
std::make_unique<CSRNodeGroup>(columnTypes, enableCompression);
}
};

} // namespace storage
Expand Down
1 change: 1 addition & 0 deletions src/include/storage/store/struct_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class StructColumn final : public Column {
}
void write(common::offset_t nodeOffset, common::ValueVector* vectorToWriteFrom,
uint32_t posInVectorToWriteFrom) override;
void setNull(common::offset_t nodeOffset) override;

void prepareCommitForChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, LocalVectorCollection* localColumnChunk,
Expand Down
18 changes: 9 additions & 9 deletions src/processor/map/map_copy_from.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,6 @@ std::unique_ptr<PhysicalOperator> PlanMapper::createCopyRel(
auto outFSchema = copyFrom->getSchema();
auto tableSchema = dynamic_cast<RelTableSchema*>(copyFromInfo->tableSchema);
auto partitioningIdx = direction == RelDataDirection::FWD ? 0 : 1;
auto maxBoundNodeOffset = storageManager.getNodesStatisticsAndDeletedIDs()->getMaxNodeOffset(
transaction::Transaction::getDummyReadOnlyTrx().get(),
tableSchema->getBoundTableID(direction));
// TODO(Guodong/Xiyang): Consider moving this to Partitioner::initGlobalStateInternal.
auto numPartitions = (maxBoundNodeOffset + StorageConstants::NODE_GROUP_SIZE) /
StorageConstants::NODE_GROUP_SIZE;
partitionerSharedState->numPartitions[partitioningIdx] = numPartitions;
auto dataFormat = tableSchema->isSingleMultiplicityInDirection(direction) ?
ColumnDataFormat::REGULAR :
ColumnDataFormat::CSR;
Expand All @@ -190,10 +183,17 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyRelFrom(
auto tableSchema = reinterpret_cast<RelTableSchema*>(copyFromInfo->tableSchema);
auto prevOperator = mapOperator(copyFrom->getChild(0).get());
KU_ASSERT(prevOperator->getOperatorType() == PhysicalOperatorType::PARTITIONER);
auto nodesStats = storageManager.getNodesStatisticsAndDeletedIDs();
auto partitionerSharedState = dynamic_cast<Partitioner*>(prevOperator.get())->getSharedState();
partitionerSharedState->numPartitions.resize(2);
std::vector<std::unique_ptr<LogicalType>> columnTypes;
partitionerSharedState->maxNodeOffsets.resize(2);
partitionerSharedState->maxNodeOffsets[0] =
nodesStats->getMaxNodeOffset(transaction::Transaction::getDummyReadOnlyTrx().get(),
tableSchema->getBoundTableID(RelDataDirection::FWD));
partitionerSharedState->maxNodeOffsets[1] =
nodesStats->getMaxNodeOffset(transaction::Transaction::getDummyReadOnlyTrx().get(),
tableSchema->getBoundTableID(RelDataDirection::BWD));
// TODO(Xiyang): Move binding of column types to binder.
std::vector<std::unique_ptr<LogicalType>> columnTypes;
columnTypes.push_back(LogicalType::INTERNAL_ID()); // ADJ COLUMN.
for (auto& property : tableSchema->properties) {
columnTypes.push_back(property->getDataType()->copy());
Expand Down
26 changes: 19 additions & 7 deletions src/processor/operator/partitioner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ std::vector<std::unique_ptr<PartitioningInfo>> PartitioningInfo::copy(
return result;
}

void PartitionerSharedState::initialize() {
KU_ASSERT(maxNodeOffsets.size() == 2);
numPartitions.resize(maxNodeOffsets.size());
for (auto i = 0u; i < maxNodeOffsets.size(); i++) {
numPartitions[i] = (maxNodeOffsets[i] + StorageConstants::NODE_GROUP_SIZE) /
StorageConstants::NODE_GROUP_SIZE;
}
Partitioner::initializePartitioningStates(partitioningBuffers, numPartitions);
}

partition_idx_t PartitionerSharedState::getNextPartition(vector_idx_t partitioningIdx) {
std::unique_lock xLck{mtx};
if (nextPartitionIdx >= numPartitions[partitioningIdx]) {
Expand Down Expand Up @@ -71,12 +81,12 @@ Partitioner::Partitioner(std::unique_ptr<ResultSetDescriptor> resultSetDescripto
}

void Partitioner::initGlobalStateInternal(ExecutionContext* /*context*/) {
initializePartitioningStates(sharedState->partitioningBuffers);
sharedState->initialize();
}

void Partitioner::initLocalStateInternal(ResultSet* /*resultSet*/, ExecutionContext* /*context*/) {
localState = std::make_unique<PartitionerLocalState>();
initializePartitioningStates(localState->partitioningBuffers);
initializePartitioningStates(localState->partitioningBuffers, sharedState->numPartitions);
}

static void constructDataChunk(DataChunk* dataChunk, const std::vector<DataPos>& columnPositions,
Expand Down Expand Up @@ -112,11 +122,11 @@ void Partitioner::executeInternal(ExecutionContext* context) {
}

void Partitioner::initializePartitioningStates(
std::vector<std::unique_ptr<PartitioningBuffer>>& partitioningBuffers) {
KU_ASSERT(infos.size() == sharedState->numPartitions.size());
partitioningBuffers.resize(infos.size());
for (auto partitioningIdx = 0u; partitioningIdx < infos.size(); partitioningIdx++) {
auto numPartition = sharedState->numPartitions[partitioningIdx];
std::vector<std::unique_ptr<PartitioningBuffer>>& partitioningBuffers,
std::vector<common::partition_idx_t> numPartitions) {
partitioningBuffers.resize(numPartitions.size());
for (auto partitioningIdx = 0u; partitioningIdx < numPartitions.size(); partitioningIdx++) {
auto numPartition = numPartitions[partitioningIdx];
auto partitioningBuffer = std::make_unique<PartitioningBuffer>();
partitioningBuffer->partitions.reserve(numPartition);
for (auto i = 0u; i < numPartition; i++) {
Expand All @@ -131,6 +141,8 @@ void Partitioner::copyDataToPartitions(
for (auto i = 0u; i < chunkToCopyFrom->state->selVector->selectedSize; i++) {
auto posToCopyFrom = chunkToCopyFrom->state->selVector->selectedPositions[i];
auto partitionIdx = partitionIdxes->getValue<partition_idx_t>(posToCopyFrom);
KU_ASSERT(
partitionIdx < localState->getPartitioningBuffer(partitioningIdx)->partitions.size());
auto partition =
localState->getPartitioningBuffer(partitioningIdx)->partitions[partitionIdx].get();
if (partition->empty() ||
Expand Down
8 changes: 4 additions & 4 deletions src/processor/operator/persistent/copy_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ void CopyNodeSharedState::appendLocalNodeGroup(std::unique_ptr<NodeGroup> localN
CopyNode::writeAndResetNodeGroup(
nodeGroupIdx, pkIndex.get(), pkColumnIdx, table, sharedNodeGroup.get());
}
if (numNodesAppended < localNodeGroup->getNumNodes()) {
if (numNodesAppended < localNodeGroup->getNumRows()) {
sharedNodeGroup->append(localNodeGroup.get(), numNodesAppended);
}
}
Expand Down Expand Up @@ -92,7 +92,7 @@ void CopyNode::executeInternal(ExecutionContext* context) {
copyToNodeGroup();
columnState->selVector = std::move(originalSelVector);
}
if (localNodeGroup->getNumNodes() > 0) {
if (localNodeGroup->getNumRows() > 0) {
sharedState->appendLocalNodeGroup(std::move(localNodeGroup));
}
}
Expand All @@ -104,7 +104,7 @@ void CopyNode::writeAndResetNodeGroup(node_group_idx_t nodeGroupIdx,
auto startOffset = StorageUtils::getStartOffsetOfNodeGroup(nodeGroupIdx);
if (pkIndex) {
populatePKIndex(pkIndex, nodeGroup->getColumnChunk(pkColumnID), startOffset,
nodeGroup->getNumNodes() /* startPageIdx */);
nodeGroup->getNumRows() /* startPageIdx */);
}
table->append(nodeGroup);
nodeGroup->resetToEmpty();
Expand Down Expand Up @@ -158,7 +158,7 @@ void CopyNode::checkNonNullConstraint(NullColumnChunk* nullChunk, offset_t numNo
void CopyNode::finalize(ExecutionContext* context) {
uint64_t numNodes = StorageUtils::getStartOffsetOfNodeGroup(sharedState->getCurNodeGroupIdx());
if (sharedState->sharedNodeGroup) {
numNodes += sharedState->sharedNodeGroup->getNumNodes();
numNodes += sharedState->sharedNodeGroup->getNumRows();
auto nodeGroupIdx = sharedState->getNextNodeGroupIdx();
writeAndResetNodeGroup(nodeGroupIdx, sharedState->pkIndex.get(), sharedState->pkColumnIdx,
sharedState->table, sharedState->sharedNodeGroup.get());
Expand Down
6 changes: 3 additions & 3 deletions src/processor/operator/persistent/copy_rdf_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ void CopyRdfResource::executeInternal(ExecutionContext* context) {
copyToNodeGroup(vector);
columnState->selVector = std::move(originalSelVector);
}
if (localNodeGroup->getNumNodes() > 0) {
if (localNodeGroup->getNumRows() > 0) {
sharedState->appendLocalNodeGroup(std::move(localNodeGroup));
}
}

void CopyRdfResource::finalize(ExecutionContext* context) {
uint64_t numNodes = StorageUtils::getStartOffsetOfNodeGroup(sharedState->getCurNodeGroupIdx());
if (sharedState->sharedNodeGroup) {
numNodes += sharedState->sharedNodeGroup->getNumNodes();
numNodes += sharedState->sharedNodeGroup->getNumRows();
auto nodeGroupIdx = sharedState->getNextNodeGroupIdx();
writeNodeGroup(nodeGroupIdx, sharedState->table, sharedState->sharedNodeGroup.get());
}
Expand All @@ -76,7 +76,7 @@ void CopyRdfResource::insertIndex(ValueVector* vector) {
common::sel_t nextPos = 0;
common::offset_t result;
auto offset = StorageUtils::getStartOffsetOfNodeGroup(sharedState->getCurNodeGroupIdx()) +
localNodeGroup->getNumNodes();
localNodeGroup->getNumRows();
for (auto i = 0u; i < vector->state->getNumSelectedValues(); i++) {
auto uriStr = vector->getValue<common::ku_string_t>(i).getAsString();
if (!sharedState->pkIndex->lookup(uriStr.c_str(), result)) {
Expand Down
30 changes: 17 additions & 13 deletions src/processor/operator/persistent/copy_rel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,28 @@ void CopyRel::executeInternal(ExecutionContext* /*context*/) {
.get();
auto startOffset = StorageUtils::getStartOffsetOfNodeGroup(localState->currentPartition);
auto offsetVectorIdx = info->dataDirection == RelDataDirection::FWD ? 0 : 1;
row_idx_t numRows = 0;
row_idx_t numRels = 0;
for (auto& dataChunk : *partitioningBuffer) {
auto offsetVector = dataChunk->getValueVector(offsetVectorIdx).get();
setOffsetToWithinNodeGroup(offsetVector, startOffset);
numRows += offsetVector->state->selVector->selectedSize;
numRels += offsetVector->state->selVector->selectedSize;
}
ColumnChunk* csrOffsetChunk = nullptr;
// Calculate num of source nodes in this node group.
// This will be used to set the num of values of the node group.
auto numNodes = std::min(StorageConstants::NODE_GROUP_SIZE,
partitionerSharedState->maxNodeOffsets[info->partitioningIdx] - startOffset + 1);
if (info->dataFormat == ColumnDataFormat::CSR) {
auto csrNodeGroup = static_cast<CSRNodeGroup*>(localState->nodeGroup.get());
csrOffsetChunk = csrNodeGroup->getCSROffsetChunk();
csrOffsetChunk->setNumValues(StorageConstants::NODE_GROUP_SIZE);
// CSR offset chunk should be aligned with num of source nodes in this node group.
csrOffsetChunk->setNumValues(numNodes);
populateCSROffsets(csrOffsetChunk, partitioningBuffer, offsetVectorIdx);
// Resize csr data column chunks.
localState->nodeGroup->resizeChunks(numRows);
localState->nodeGroup->resizeChunks(numRels);
} else {
// Set adj column chunk to all null.
localState->nodeGroup->setChunkToAllNull(0 /* chunkIdx */);
localState->nodeGroup->setAllNull();
localState->nodeGroup->getColumnChunk(0)->setNumValues(numNodes);
}
for (auto& dataChunk : *partitioningBuffer) {
if (info->dataFormat == ColumnDataFormat::CSR) {
Expand All @@ -84,7 +89,7 @@ void CopyRel::executeInternal(ExecutionContext* /*context*/) {
localState->nodeGroup->finalize(localState->currentPartition);
// Flush node group to table.
sharedState->table->append(localState->nodeGroup.get(), info->dataDirection);
sharedState->incrementNumRows(localState->nodeGroup->getNumNodes());
sharedState->incrementNumRows(localState->nodeGroup->getNumRows());
localState->nodeGroup->resetToEmpty();
}
}
Expand All @@ -111,22 +116,21 @@ void CopyRel::populateCSROffsets(
}
}

// TODO(Guodong): Can we guarantee vector is not filtered and get rid of access to selVector?
void CopyRel::setOffsetToWithinNodeGroup(ValueVector* vector, offset_t startOffset) {
KU_ASSERT(vector->dataType.getPhysicalType() == PhysicalTypeID::INT64);
KU_ASSERT(vector->state->selVector->isUnfiltered());
auto offsets = (offset_t*)vector->getData();
for (auto i = 0u; i < vector->state->selVector->selectedSize; i++) {
auto pos = vector->state->selVector->selectedPositions[i];
offsets[pos] -= startOffset;
offsets[i] -= startOffset;
}
}

void CopyRel::setOffsetFromCSROffsets(ValueVector* offsetVector, offset_t* csrOffsets) {
KU_ASSERT(offsetVector->dataType.getPhysicalType() == PhysicalTypeID::INT64);
KU_ASSERT(offsetVector->state->selVector->isUnfiltered());
for (auto i = 0u; i < offsetVector->state->selVector->selectedSize; i++) {
auto pos = offsetVector->state->selVector->selectedPositions[i];
auto nodeOffset = offsetVector->getValue<offset_t>(pos);
offsetVector->setValue(pos, csrOffsets[nodeOffset]);
auto nodeOffset = offsetVector->getValue<offset_t>(i);
offsetVector->setValue(i, csrOffsets[nodeOffset]);
csrOffsets[nodeOffset]++;
}
}
Expand Down
Loading

0 comments on commit 64a6c4b

Please sign in to comment.