Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework npy copy to integrate with query processor pipeline #1734

Merged
merged 1 commit into from
Jul 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ template void ValueVector::setValue<date_t>(uint32_t pos, date_t val);
template void ValueVector::setValue<timestamp_t>(uint32_t pos, timestamp_t val);
template void ValueVector::setValue<interval_t>(uint32_t pos, interval_t val);
template void ValueVector::setValue<list_entry_t>(uint32_t pos, list_entry_t val);
template void ValueVector::setValue<vector_idx_t>(uint32_t pos, vector_idx_t val);

template<>
void ValueVector::setValue(uint32_t pos, ku_string_t val) {
Expand Down
2 changes: 2 additions & 0 deletions src/include/main/connection.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <mutex>

#include "client_context.h"
#include "database.h"
#include "prepared_statement.h"
Expand Down
19 changes: 13 additions & 6 deletions src/include/planner/logical_plan/logical_operator/logical_copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ class LogicalCopy : public LogicalOperator {
LogicalCopy(const common::CopyDescription& copyDescription, common::table_id_t tableID,
std::string tableName, binder::expression_vector arrowColumnExpressions,
std::shared_ptr<binder::Expression> offsetExpression,
std::shared_ptr<binder::Expression> columnIdxExpression,
std::shared_ptr<binder::Expression> outputExpression)
: LogicalOperator{LogicalOperatorType::COPY}, copyDescription{copyDescription},
tableID{tableID}, tableName{std::move(tableName)}, arrowColumnExpressions{std::move(
arrowColumnExpressions)},
offsetExpression{std::move(offsetExpression)}, outputExpression{
std::move(outputExpression)} {}
: LogicalOperator{LogicalOperatorType::COPY},
copyDescription{copyDescription}, tableID{tableID}, tableName{std::move(tableName)},
arrowColumnExpressions{std::move(arrowColumnExpressions)}, offsetExpression{std::move(
offsetExpression)},
columnIdxExpression{std::move(columnIdxExpression)}, outputExpression{
std::move(outputExpression)} {}

inline std::string getExpressionsForPrinting() const override { return tableName; }

Expand All @@ -34,6 +36,10 @@ class LogicalCopy : public LogicalOperator {
return offsetExpression;
}

inline std::shared_ptr<binder::Expression> getColumnIdxExpression() const {
return columnIdxExpression;
}

inline std::shared_ptr<binder::Expression> getOutputExpression() const {
return outputExpression;
}
Expand All @@ -43,7 +49,7 @@ class LogicalCopy : public LogicalOperator {

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCopy>(copyDescription, tableID, tableName, arrowColumnExpressions,
offsetExpression, outputExpression);
offsetExpression, columnIdxExpression, outputExpression);
}

private:
Expand All @@ -53,6 +59,7 @@ class LogicalCopy : public LogicalOperator {
std::string tableName;
binder::expression_vector arrowColumnExpressions;
std::shared_ptr<binder::Expression> offsetExpression;
std::shared_ptr<binder::Expression> columnIdxExpression;
std::shared_ptr<binder::Expression> outputExpression;
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "catalog/catalog_structs.h"
#include "logical_ddl.h"

namespace kuzu {
Expand Down
24 changes: 15 additions & 9 deletions src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,15 @@ class CopyNodeSharedState {
};

struct CopyNodeLocalState {
CopyNodeLocalState(common::CopyDescription copyDesc, storage::NodeTable* table,
CopyNodeLocalState(const common::CopyDescription& copyDesc, storage::NodeTable* table,
storage::RelsStore* relsStore, catalog::Catalog* catalog, storage::WAL* wal,
DataPos offsetVectorPos, std::vector<DataPos> arrowColumnPoses)
: copyDesc{std::move(copyDesc)}, table{table}, relsStore{relsStore}, catalog{catalog},
wal{wal}, offsetVectorPos{std::move(offsetVectorPos)}, arrowColumnPoses{
std::move(arrowColumnPoses)} {}
const DataPos& offsetVectorPos, std::vector<DataPos> arrowColumnPoses)
: copyDesc{copyDesc}, table{table}, relsStore{relsStore}, catalog{catalog}, wal{wal},
offsetVectorPos{offsetVectorPos}, offsetVector{nullptr}, arrowColumnPoses{std::move(
arrowColumnPoses)} {}

std::pair<common::offset_t, common::offset_t> getStartAndEndOffset(
common::vector_idx_t columnIdx);

common::CopyDescription copyDesc;
storage::NodeTable* table;
Expand Down Expand Up @@ -88,6 +91,12 @@ class CopyNode : public Sink {
sharedState, resultSetDescriptor->copy(), children[0]->clone(), id, paramsString);
}

protected:
void populatePKIndex(storage::InMemColumnChunk* chunk, storage::InMemOverflowFile* overflowFile,
common::offset_t startOffset, uint64_t numValues);

void logCopyWALRecord();

private:
inline bool isCopyAllowed() {
auto nodesStatistics = localState->table->getNodeStatisticsAndDeletedIDs();
Expand All @@ -105,10 +114,7 @@ class CopyNode : public Sink {
throw common::CopyException("appendToPKIndex1 not implemented");
}

void populatePKIndex(storage::InMemColumnChunk* chunk, storage::InMemOverflowFile* overflowFile,
common::offset_t startOffset, uint64_t numValues);

private:
protected:
std::unique_ptr<CopyNodeLocalState> localState;
std::shared_ptr<CopyNodeSharedState> sharedState;
};
Expand Down
40 changes: 0 additions & 40 deletions src/include/processor/operator/copy/copy_npy.h

This file was deleted.

58 changes: 58 additions & 0 deletions src/include/processor/operator/copy/copy_npy_node.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#pragma once

#include "processor/operator/copy/copy_node.h"

namespace kuzu {
namespace processor {

struct CopyNPYNodeLocalState : public CopyNodeLocalState {
public:
CopyNPYNodeLocalState(const common::CopyDescription& copyDesc, storage::NodeTable* table,
storage::RelsStore* relsStore, catalog::Catalog* catalog, storage::WAL* wal,
const DataPos& offsetVectorPos, const DataPos& columnIdxPos,
std::vector<DataPos> arrowColumnPoses)
: CopyNodeLocalState{copyDesc, table, relsStore, catalog, wal, offsetVectorPos,
std::move(arrowColumnPoses)},
columnIdxPos{columnIdxPos}, columnIdxVector{nullptr} {}

DataPos columnIdxPos;
common::ValueVector* columnIdxVector;
};

class CopyNPYNode : public CopyNode {
public:
CopyNPYNode(std::unique_ptr<CopyNPYNodeLocalState> localState,
std::shared_ptr<CopyNodeSharedState> sharedState,
std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: CopyNode{std::move(localState), std::move(sharedState), std::move(resultSetDescriptor),
std::move(child), id, paramsString} {}

void executeInternal(ExecutionContext* context) final;

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final {
auto npyLocalState = (CopyNPYNodeLocalState*)localState.get();
npyLocalState->offsetVector =
resultSet->getValueVector(npyLocalState->offsetVectorPos).get();
npyLocalState->columnIdxVector =
resultSet->getValueVector(npyLocalState->columnIdxPos).get();
for (auto& arrowColumnPos : npyLocalState->arrowColumnPoses) {
npyLocalState->arrowColumnVectors.push_back(
resultSet->getValueVector(arrowColumnPos).get());
}
}

void flushChunksAndPopulatePKIndexSingleColumn(
std::vector<std::unique_ptr<storage::InMemColumnChunk>>& columnChunks,
common::offset_t startNodeOffset, common::offset_t endNodeOffset,
common::vector_idx_t columnToCopy);

inline std::unique_ptr<PhysicalOperator> clone() final {
return std::make_unique<CopyNPYNode>(
std::make_unique<CopyNPYNodeLocalState>((CopyNPYNodeLocalState&)*localState),
sharedState, resultSetDescriptor->copy(), children[0]->clone(), id, paramsString);
}
};

} // namespace processor
} // namespace kuzu
2 changes: 1 addition & 1 deletion src/include/processor/operator/copy/read_file.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include "processor/operator/physical_operator.h"
#include "storage/copier/node_copy_executor.h"
#include "storage/copier/table_copy_utils.h"

namespace kuzu {
namespace processor {
Expand Down
61 changes: 61 additions & 0 deletions src/include/processor/operator/copy/read_npy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#pragma once

#include "processor/operator/copy/read_file.h"
#include "storage/copier/npy_reader.h"
#include "storage/in_mem_storage_structure/in_mem_column.h"

namespace kuzu {
namespace processor {

class ReadNPYMorsel : public ReadFileMorsel {
public:
ReadNPYMorsel(common::offset_t nodeOffset, common::block_idx_t blockIdx, uint64_t numNodes,
std::string filePath, common::vector_idx_t curFileIdx)
: ReadFileMorsel{nodeOffset, blockIdx, numNodes, std::move(filePath)}, columnIdx{
curFileIdx} {}

inline common::vector_idx_t getColumnIdx() const { return columnIdx; }

private:
common::vector_idx_t columnIdx;
};

class ReadNPYSharedState : public ReadFileSharedState {
public:
ReadNPYSharedState(catalog::NodeTableSchema* tableSchema, std::vector<std::string> filePaths)
: ReadFileSharedState{std::move(filePaths), tableSchema} {}

std::unique_ptr<ReadFileMorsel> getMorsel() final;

private:
void countNumLines() final;
};

class ReadNPY : public ReadFile {
public:
ReadNPY(std::vector<DataPos> arrowColumnPoses, const DataPos& offsetVectorPos,
const DataPos& columnIdxPos, std::shared_ptr<ReadFileSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: ReadFile{std::move(arrowColumnPoses), offsetVectorPos, std::move(sharedState),
PhysicalOperatorType::READ_NPY, id, paramsString},
columnIdxPos{columnIdxPos}, columnIdxVector{nullptr} {}

std::shared_ptr<arrow::RecordBatch> readTuples(std::unique_ptr<ReadFileMorsel> morsel) final;

bool getNextTuplesInternal(ExecutionContext* context) final;

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

inline std::unique_ptr<PhysicalOperator> clone() final {
return std::make_unique<ReadNPY>(
arrowColumnPoses, offsetVectorPos, columnIdxPos, sharedState, id, paramsString);
}

private:
std::unique_ptr<storage::NpyReader> reader;
DataPos columnIdxPos;
common::ValueVector* columnIdxVector;
};

} // namespace processor
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/processor/operator/physical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ enum class PhysicalOperatorType : uint8_t {
COPY_REL,
COPY_NPY,
READ_CSV,
READ_NPY,
READ_PARQUET,
CREATE_NODE,
CREATE_NODE_TABLE,
Expand Down
5 changes: 2 additions & 3 deletions src/include/processor/physical_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ class PhysicalPlan {
explicit PhysicalPlan(std::unique_ptr<PhysicalOperator> lastOperator)
: lastOperator{std::move(lastOperator)} {}

inline bool isCopyRelOrNPY() const {
return lastOperator->getOperatorType() == PhysicalOperatorType::COPY_REL ||
lastOperator->getOperatorType() == PhysicalOperatorType::COPY_NPY;
inline bool isCopyRel() const {
return lastOperator->getOperatorType() == PhysicalOperatorType::COPY_REL;
}

public:
Expand Down
42 changes: 0 additions & 42 deletions src/include/storage/copier/node_copy_executor.h

This file was deleted.

2 changes: 2 additions & 0 deletions src/planner/operator/logical_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ void LogicalCopy::computeFactorizedSchema() {
auto groupPos = schema->createGroup();
schema->insertToGroupAndScope(arrowColumnExpressions, groupPos);
schema->insertToGroupAndScope(offsetExpression, groupPos);
schema->insertToGroupAndScope(columnIdxExpression, groupPos);
schema->insertToGroupAndScope(outputExpression, groupPos);
schema->setGroupAsSingleState(groupPos);
}
Expand All @@ -17,6 +18,7 @@ void LogicalCopy::computeFlatSchema() {
schema->createGroup();
schema->insertToGroupAndScope(arrowColumnExpressions, 0);
schema->insertToGroupAndScope(offsetExpression, 0);
schema->insertToGroupAndScope(columnIdxExpression, 0);
schema->insertToGroupAndScope(outputExpression, 0);
}

Expand Down
Loading
Loading