Skip to content

Commit

Permalink
refactor copy node info
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Jul 21, 2023
1 parent fac6192 commit 391e0a7
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 46 deletions.
54 changes: 20 additions & 34 deletions src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,36 +33,27 @@ class CopyNodeSharedState {
bool hasLoggedWAL;
};

struct CopyNodeDataInfo {
struct CopyNodeInfo {
DataPos rowIdxVectorPos;
DataPos filePathVectorPos;
std::vector<DataPos> dataColumnPoses;
common::CopyDescription copyDesc;
storage::NodeTable* table;
storage::RelsStore* relsStore;
catalog::Catalog* catalog;
storage::WAL* wal;
};

class CopyNode : public Sink {
public:
CopyNode(std::shared_ptr<CopyNodeSharedState> sharedState, CopyNodeDataInfo copyNodeDataInfo,
const common::CopyDescription& copyDesc, storage::NodeTable* table,
storage::RelsStore* relsStore, catalog::Catalog* catalog, storage::WAL* wal,
CopyNode(std::shared_ptr<CopyNodeSharedState> sharedState, CopyNodeInfo copyNodeInfo,
std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_NODE, std::move(child),
id, paramsString},
sharedState{std::move(sharedState)}, copyNodeDataInfo{std::move(copyNodeDataInfo)},
copyDesc{copyDesc}, table{table}, relsStore{relsStore}, catalog{catalog}, wal{wal},
rowIdxVector{nullptr}, filePathVector{nullptr} {
auto tableSchema = catalog->getReadOnlyVersion()->getNodeTableSchema(table->getTableID());
copyStates.resize(tableSchema->getNumProperties());
for (auto i = 0u; i < tableSchema->getNumProperties(); i++) {
auto& property = tableSchema->properties[i];
copyStates[i] = std::make_unique<storage::PropertyCopyState>(property.dataType);
}
}
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString);

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
rowIdxVector = resultSet->getValueVector(copyNodeDataInfo.rowIdxVectorPos).get();
filePathVector = resultSet->getValueVector(copyNodeDataInfo.filePathVectorPos).get();
for (auto& arrowColumnPos : copyNodeDataInfo.dataColumnPoses) {
rowIdxVector = resultSet->getValueVector(copyNodeInfo.rowIdxVectorPos).get();
filePathVector = resultSet->getValueVector(copyNodeInfo.filePathVectorPos).get();
for (auto& arrowColumnPos : copyNodeInfo.dataColumnPoses) {
dataColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
}
}
Expand All @@ -71,18 +62,18 @@ class CopyNode : public Sink {
if (!isCopyAllowed()) {
throw common::CopyException("COPY commands can only be executed once on a table.");
}
auto nodeTableSchema =
catalog->getReadOnlyVersion()->getNodeTableSchema(table->getTableID());
sharedState->initialize(nodeTableSchema, wal->getDirectory());
auto nodeTableSchema = copyNodeInfo.catalog->getReadOnlyVersion()->getNodeTableSchema(
copyNodeInfo.table->getTableID());
sharedState->initialize(nodeTableSchema, copyNodeInfo.wal->getDirectory());
}

void executeInternal(ExecutionContext* context) override;

void finalize(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<CopyNode>(sharedState, copyNodeDataInfo, copyDesc, table, relsStore,
catalog, wal, resultSetDescriptor->copy(), children[0]->clone(), id, paramsString);
return std::make_unique<CopyNode>(sharedState, copyNodeInfo, resultSetDescriptor->copy(),
children[0]->clone(), id, paramsString);
}

protected:
Expand All @@ -97,9 +88,9 @@ class CopyNode : public Sink {
std::pair<std::string, common::row_idx_t> getFilePathAndRowIdxInFile();

private:
inline bool isCopyAllowed() {
auto nodesStatistics = table->getNodeStatisticsAndDeletedIDs();
return nodesStatistics->getNodeStatisticsAndDeletedIDs(table->getTableID())
inline bool isCopyAllowed() const {
auto nodesStatistics = copyNodeInfo.table->getNodeStatisticsAndDeletedIDs();
return nodesStatistics->getNodeStatisticsAndDeletedIDs(copyNodeInfo.table->getTableID())
->getNumTuples() == 0;
}

Expand All @@ -116,12 +107,7 @@ class CopyNode : public Sink {

protected:
std::shared_ptr<CopyNodeSharedState> sharedState;
CopyNodeDataInfo copyNodeDataInfo;
common::CopyDescription copyDesc;
storage::NodeTable* table;
storage::RelsStore* relsStore;
catalog::Catalog* catalog;
storage::WAL* wal;
CopyNodeInfo copyNodeInfo;
common::ValueVector* rowIdxVector;
common::ValueVector* filePathVector;
std::vector<common::ValueVector*> dataColumnVectors;
Expand Down
13 changes: 10 additions & 3 deletions src/processor/mapper/map_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,17 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalCopyNodeToPhysical(Logic
auto copyNodeSharedState =
std::make_shared<CopyNodeSharedState>(readFileSharedState->numRows, memoryManager);
std::unique_ptr<CopyNode> copyNode;
CopyNodeDataInfo copyNodeDataInfo{rowIdxVectorPos, filePathVectorPos, dataColumnPoses};
CopyNodeInfo copyNodeDataInfo{
rowIdxVectorPos,
filePathVectorPos,
dataColumnPoses,
copy->getCopyDescription(),
storageManager.getNodesStore().getNodeTable(copy->getTableID()),
&storageManager.getRelsStore(),
catalog,
storageManager.getWAL(),
};
copyNode = std::make_unique<CopyNode>(copyNodeSharedState, copyNodeDataInfo,
copy->getCopyDescription(), storageManager.getNodesStore().getNodeTable(copy->getTableID()),
&storageManager.getRelsStore(), catalog, storageManager.getWAL(),
std::make_unique<ResultSetDescriptor>(copy->getSchema()), std::move(readFile),
getOperatorID(), copy->getExpressionsForPrinting());
auto outputExpressions = binder::expression_vector{copy->getOutputExpression()};
Expand Down
37 changes: 28 additions & 9 deletions src/processor/operator/copy/copy_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,22 @@ void CopyNodeSharedState::initializeColumns(
}
}

CopyNode::CopyNode(std::shared_ptr<CopyNodeSharedState> sharedState, CopyNodeInfo copyNodeInfo,
std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_NODE, std::move(child), id,
paramsString},
sharedState{std::move(sharedState)}, copyNodeInfo{std::move(copyNodeInfo)},
rowIdxVector{nullptr}, filePathVector{nullptr} {
auto tableSchema = this->copyNodeInfo.catalog->getReadOnlyVersion()->getNodeTableSchema(
this->copyNodeInfo.table->getTableID());
copyStates.resize(tableSchema->getNumProperties());
for (auto i = 0u; i < tableSchema->getNumProperties(); i++) {
auto& property = tableSchema->properties[i];
copyStates[i] = std::make_unique<storage::PropertyCopyState>(property.dataType);
}
}

std::pair<row_idx_t, row_idx_t> CopyNode::getStartAndEndRowIdx(common::vector_idx_t columnIdx) {
auto startRowIdx =
rowIdxVector->getValue<int64_t>(rowIdxVector->state->selVector->selectedPositions[0]);
Expand All @@ -73,8 +89,8 @@ void CopyNode::executeInternal(kuzu::processor::ExecutionContext* context) {
auto [startRowIdx, endRowIdx] = getStartAndEndRowIdx(0 /* columnIdx */);
auto [filePath, startRowIdxInFile] = getFilePathAndRowIdxInFile();
for (auto i = 0u; i < sharedState->columns.size(); i++) {
auto columnChunk =
sharedState->columns[i]->createInMemColumnChunk(startRowIdx, endRowIdx, &copyDesc);
auto columnChunk = sharedState->columns[i]->createInMemColumnChunk(
startRowIdx, endRowIdx, &copyNodeInfo.copyDesc);
columnChunk->copyArrowArray(
*ArrowColumnVector::getArrowColumn(dataColumnVectors[i]), copyStates[i].get());
columnChunks.push_back(std::move(columnChunk));
Expand All @@ -85,20 +101,23 @@ void CopyNode::executeInternal(kuzu::processor::ExecutionContext* context) {
}

void CopyNode::finalize(kuzu::processor::ExecutionContext* context) {
auto tableID = table->getTableID();
auto tableID = copyNodeInfo.table->getTableID();
if (sharedState->pkIndex) {
sharedState->pkIndex->flush();
}
for (auto& column : sharedState->columns) {
column->saveToFile();
}
for (auto& relTableSchema : catalog->getAllRelTableSchemasContainBoundTable(tableID)) {
relsStore->getRelTable(relTableSchema->tableID)
for (auto& relTableSchema :
copyNodeInfo.catalog->getAllRelTableSchemasContainBoundTable(tableID)) {
copyNodeInfo.relsStore->getRelTable(relTableSchema->tableID)
->batchInitEmptyRelsForNewNodes(relTableSchema, sharedState->numRows);
}
table->getNodeStatisticsAndDeletedIDs()->setNumTuplesForTable(tableID, sharedState->numRows);
copyNodeInfo.table->getNodeStatisticsAndDeletedIDs()->setNumTuplesForTable(
tableID, sharedState->numRows);
auto outputMsg = StringUtils::string_format("{} number of tuples has been copied to table: {}.",
sharedState->numRows, catalog->getReadOnlyVersion()->getTableName(tableID).c_str());
sharedState->numRows,
copyNodeInfo.catalog->getReadOnlyVersion()->getTableName(tableID).c_str());
FactorizedTableUtils::appendStringToTable(
sharedState->table.get(), outputMsg, context->memoryManager);
}
Expand Down Expand Up @@ -197,8 +216,8 @@ void CopyNode::populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overf
void CopyNode::logCopyWALRecord() {
std::unique_lock xLck{sharedState->mtx};
if (!sharedState->hasLoggedWAL) {
wal->logCopyNodeRecord(table->getTableID());
wal->flushAllPages();
copyNodeInfo.wal->logCopyNodeRecord(copyNodeInfo.table->getTableID());
copyNodeInfo.wal->flushAllPages();
sharedState->hasLoggedWAL = true;
}
}
Expand Down

0 comments on commit 391e0a7

Please sign in to comment.