Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor copy node info #1844

Merged
merged 1 commit into from
Jul 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 20 additions & 34 deletions src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,36 +33,27 @@ class CopyNodeSharedState {
bool hasLoggedWAL;
};

struct CopyNodeDataInfo {
struct CopyNodeInfo {
DataPos rowIdxVectorPos;
DataPos filePathVectorPos;
std::vector<DataPos> dataColumnPoses;
common::CopyDescription copyDesc;
storage::NodeTable* table;
storage::RelsStore* relsStore;
catalog::Catalog* catalog;
storage::WAL* wal;
};

class CopyNode : public Sink {
public:
CopyNode(std::shared_ptr<CopyNodeSharedState> sharedState, CopyNodeDataInfo copyNodeDataInfo,
const common::CopyDescription& copyDesc, storage::NodeTable* table,
storage::RelsStore* relsStore, catalog::Catalog* catalog, storage::WAL* wal,
CopyNode(std::shared_ptr<CopyNodeSharedState> sharedState, CopyNodeInfo copyNodeInfo,
std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_NODE, std::move(child),
id, paramsString},
sharedState{std::move(sharedState)}, copyNodeDataInfo{std::move(copyNodeDataInfo)},
copyDesc{copyDesc}, table{table}, relsStore{relsStore}, catalog{catalog}, wal{wal},
rowIdxVector{nullptr}, filePathVector{nullptr} {
auto tableSchema = catalog->getReadOnlyVersion()->getNodeTableSchema(table->getTableID());
copyStates.resize(tableSchema->getNumProperties());
for (auto i = 0u; i < tableSchema->getNumProperties(); i++) {
auto& property = tableSchema->properties[i];
copyStates[i] = std::make_unique<storage::PropertyCopyState>(property.dataType);
}
}
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString);

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
rowIdxVector = resultSet->getValueVector(copyNodeDataInfo.rowIdxVectorPos).get();
filePathVector = resultSet->getValueVector(copyNodeDataInfo.filePathVectorPos).get();
for (auto& arrowColumnPos : copyNodeDataInfo.dataColumnPoses) {
rowIdxVector = resultSet->getValueVector(copyNodeInfo.rowIdxVectorPos).get();
filePathVector = resultSet->getValueVector(copyNodeInfo.filePathVectorPos).get();
for (auto& arrowColumnPos : copyNodeInfo.dataColumnPoses) {
dataColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
}
}
Expand All @@ -71,18 +62,18 @@ class CopyNode : public Sink {
if (!isCopyAllowed()) {
throw common::CopyException("COPY commands can only be executed once on a table.");
}
auto nodeTableSchema =
catalog->getReadOnlyVersion()->getNodeTableSchema(table->getTableID());
sharedState->initialize(nodeTableSchema, wal->getDirectory());
auto nodeTableSchema = copyNodeInfo.catalog->getReadOnlyVersion()->getNodeTableSchema(
copyNodeInfo.table->getTableID());
sharedState->initialize(nodeTableSchema, copyNodeInfo.wal->getDirectory());
}

void executeInternal(ExecutionContext* context) override;

void finalize(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<CopyNode>(sharedState, copyNodeDataInfo, copyDesc, table, relsStore,
catalog, wal, resultSetDescriptor->copy(), children[0]->clone(), id, paramsString);
return std::make_unique<CopyNode>(sharedState, copyNodeInfo, resultSetDescriptor->copy(),
children[0]->clone(), id, paramsString);
}

protected:
Expand All @@ -97,9 +88,9 @@ class CopyNode : public Sink {
std::pair<std::string, common::row_idx_t> getFilePathAndRowIdxInFile();

private:
inline bool isCopyAllowed() {
auto nodesStatistics = table->getNodeStatisticsAndDeletedIDs();
return nodesStatistics->getNodeStatisticsAndDeletedIDs(table->getTableID())
inline bool isCopyAllowed() const {
auto nodesStatistics = copyNodeInfo.table->getNodeStatisticsAndDeletedIDs();
return nodesStatistics->getNodeStatisticsAndDeletedIDs(copyNodeInfo.table->getTableID())
->getNumTuples() == 0;
}

Expand All @@ -116,12 +107,7 @@ class CopyNode : public Sink {

protected:
std::shared_ptr<CopyNodeSharedState> sharedState;
CopyNodeDataInfo copyNodeDataInfo;
common::CopyDescription copyDesc;
storage::NodeTable* table;
storage::RelsStore* relsStore;
catalog::Catalog* catalog;
storage::WAL* wal;
CopyNodeInfo copyNodeInfo;
common::ValueVector* rowIdxVector;
common::ValueVector* filePathVector;
std::vector<common::ValueVector*> dataColumnVectors;
Expand Down
13 changes: 10 additions & 3 deletions src/processor/mapper/map_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,17 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalCopyNodeToPhysical(Logic
auto copyNodeSharedState =
std::make_shared<CopyNodeSharedState>(readFileSharedState->numRows, memoryManager);
std::unique_ptr<CopyNode> copyNode;
CopyNodeDataInfo copyNodeDataInfo{rowIdxVectorPos, filePathVectorPos, dataColumnPoses};
CopyNodeInfo copyNodeDataInfo{
rowIdxVectorPos,
filePathVectorPos,
dataColumnPoses,
copy->getCopyDescription(),
storageManager.getNodesStore().getNodeTable(copy->getTableID()),
&storageManager.getRelsStore(),
catalog,
storageManager.getWAL(),
};
copyNode = std::make_unique<CopyNode>(copyNodeSharedState, copyNodeDataInfo,
copy->getCopyDescription(), storageManager.getNodesStore().getNodeTable(copy->getTableID()),
&storageManager.getRelsStore(), catalog, storageManager.getWAL(),
std::make_unique<ResultSetDescriptor>(copy->getSchema()), std::move(readFile),
getOperatorID(), copy->getExpressionsForPrinting());
auto outputExpressions = binder::expression_vector{copy->getOutputExpression()};
Expand Down
37 changes: 28 additions & 9 deletions src/processor/operator/copy/copy_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,22 @@ void CopyNodeSharedState::initializeColumns(
}
}

CopyNode::CopyNode(std::shared_ptr<CopyNodeSharedState> sharedState, CopyNodeInfo copyNodeInfo,
std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_NODE, std::move(child), id,
paramsString},
sharedState{std::move(sharedState)}, copyNodeInfo{std::move(copyNodeInfo)},
rowIdxVector{nullptr}, filePathVector{nullptr} {
auto tableSchema = this->copyNodeInfo.catalog->getReadOnlyVersion()->getNodeTableSchema(
this->copyNodeInfo.table->getTableID());
copyStates.resize(tableSchema->getNumProperties());
for (auto i = 0u; i < tableSchema->getNumProperties(); i++) {
auto& property = tableSchema->properties[i];
copyStates[i] = std::make_unique<storage::PropertyCopyState>(property.dataType);
}
}

std::pair<row_idx_t, row_idx_t> CopyNode::getStartAndEndRowIdx(common::vector_idx_t columnIdx) {
auto startRowIdx =
rowIdxVector->getValue<int64_t>(rowIdxVector->state->selVector->selectedPositions[0]);
Expand All @@ -73,8 +89,8 @@ void CopyNode::executeInternal(kuzu::processor::ExecutionContext* context) {
auto [startRowIdx, endRowIdx] = getStartAndEndRowIdx(0 /* columnIdx */);
auto [filePath, startRowIdxInFile] = getFilePathAndRowIdxInFile();
for (auto i = 0u; i < sharedState->columns.size(); i++) {
auto columnChunk =
sharedState->columns[i]->createInMemColumnChunk(startRowIdx, endRowIdx, &copyDesc);
auto columnChunk = sharedState->columns[i]->createInMemColumnChunk(
startRowIdx, endRowIdx, &copyNodeInfo.copyDesc);
columnChunk->copyArrowArray(
*ArrowColumnVector::getArrowColumn(dataColumnVectors[i]), copyStates[i].get());
columnChunks.push_back(std::move(columnChunk));
Expand All @@ -85,20 +101,23 @@ void CopyNode::executeInternal(kuzu::processor::ExecutionContext* context) {
}

void CopyNode::finalize(kuzu::processor::ExecutionContext* context) {
auto tableID = table->getTableID();
auto tableID = copyNodeInfo.table->getTableID();
if (sharedState->pkIndex) {
sharedState->pkIndex->flush();
}
for (auto& column : sharedState->columns) {
column->saveToFile();
}
for (auto& relTableSchema : catalog->getAllRelTableSchemasContainBoundTable(tableID)) {
relsStore->getRelTable(relTableSchema->tableID)
for (auto& relTableSchema :
copyNodeInfo.catalog->getAllRelTableSchemasContainBoundTable(tableID)) {
copyNodeInfo.relsStore->getRelTable(relTableSchema->tableID)
->batchInitEmptyRelsForNewNodes(relTableSchema, sharedState->numRows);
}
table->getNodeStatisticsAndDeletedIDs()->setNumTuplesForTable(tableID, sharedState->numRows);
copyNodeInfo.table->getNodeStatisticsAndDeletedIDs()->setNumTuplesForTable(
tableID, sharedState->numRows);
auto outputMsg = StringUtils::string_format("{} number of tuples has been copied to table: {}.",
sharedState->numRows, catalog->getReadOnlyVersion()->getTableName(tableID).c_str());
sharedState->numRows,
copyNodeInfo.catalog->getReadOnlyVersion()->getTableName(tableID).c_str());
FactorizedTableUtils::appendStringToTable(
sharedState->table.get(), outputMsg, context->memoryManager);
}
Expand Down Expand Up @@ -197,8 +216,8 @@ void CopyNode::populatePKIndex(InMemColumnChunk* chunk, InMemOverflowFile* overf
void CopyNode::logCopyWALRecord() {
std::unique_lock xLck{sharedState->mtx};
if (!sharedState->hasLoggedWAL) {
wal->logCopyNodeRecord(table->getTableID());
wal->flushAllPages();
copyNodeInfo.wal->logCopyNodeRecord(copyNodeInfo.table->getTableID());
copyNodeInfo.wal->flushAllPages();
sharedState->hasLoggedWAL = true;
}
}
Expand Down
Loading