Skip to content

Commit

Permalink
rework copy npy to integrate with query processor pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
aziz-mu authored and ray6080 committed Jul 8, 2023
1 parent 711ae67 commit 13a4f59
Show file tree
Hide file tree
Showing 29 changed files with 484 additions and 323 deletions.
1 change: 1 addition & 0 deletions src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ template void ValueVector::setValue<date_t>(uint32_t pos, date_t val);
template void ValueVector::setValue<timestamp_t>(uint32_t pos, timestamp_t val);
template void ValueVector::setValue<interval_t>(uint32_t pos, interval_t val);
template void ValueVector::setValue<list_entry_t>(uint32_t pos, list_entry_t val);
template void ValueVector::setValue<vector_idx_t>(uint32_t pos, vector_idx_t val);

template<>
void ValueVector::setValue(uint32_t pos, ku_string_t val) {
Expand Down
2 changes: 2 additions & 0 deletions src/include/main/connection.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <mutex>

#include "client_context.h"
#include "database.h"
#include "prepared_statement.h"
Expand Down
19 changes: 13 additions & 6 deletions src/include/planner/logical_plan/logical_operator/logical_copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ class LogicalCopy : public LogicalOperator {
LogicalCopy(const common::CopyDescription& copyDescription, common::table_id_t tableID,
std::string tableName, binder::expression_vector arrowColumnExpressions,
std::shared_ptr<binder::Expression> offsetExpression,
std::shared_ptr<binder::Expression> columnIdxExpression,
std::shared_ptr<binder::Expression> outputExpression)
: LogicalOperator{LogicalOperatorType::COPY}, copyDescription{copyDescription},
tableID{tableID}, tableName{std::move(tableName)}, arrowColumnExpressions{std::move(
arrowColumnExpressions)},
offsetExpression{std::move(offsetExpression)}, outputExpression{
std::move(outputExpression)} {}
: LogicalOperator{LogicalOperatorType::COPY},
copyDescription{copyDescription}, tableID{tableID}, tableName{std::move(tableName)},
arrowColumnExpressions{std::move(arrowColumnExpressions)}, offsetExpression{std::move(
offsetExpression)},
columnIdxExpression{std::move(columnIdxExpression)}, outputExpression{
std::move(outputExpression)} {}

inline std::string getExpressionsForPrinting() const override { return tableName; }

Expand All @@ -34,6 +36,10 @@ class LogicalCopy : public LogicalOperator {
return offsetExpression;
}

inline std::shared_ptr<binder::Expression> getColumnIdxExpression() const {
return columnIdxExpression;
}

inline std::shared_ptr<binder::Expression> getOutputExpression() const {
return outputExpression;
}
Expand All @@ -43,7 +49,7 @@ class LogicalCopy : public LogicalOperator {

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCopy>(copyDescription, tableID, tableName, arrowColumnExpressions,
offsetExpression, outputExpression);
offsetExpression, columnIdxExpression, outputExpression);
}

private:
Expand All @@ -53,6 +59,7 @@ class LogicalCopy : public LogicalOperator {
std::string tableName;
binder::expression_vector arrowColumnExpressions;
std::shared_ptr<binder::Expression> offsetExpression;
std::shared_ptr<binder::Expression> columnIdxExpression;
std::shared_ptr<binder::Expression> outputExpression;
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "catalog/catalog_structs.h"
#include "logical_ddl.h"

namespace kuzu {
Expand Down
24 changes: 15 additions & 9 deletions src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,15 @@ class CopyNodeSharedState {
};

struct CopyNodeLocalState {
CopyNodeLocalState(common::CopyDescription copyDesc, storage::NodeTable* table,
CopyNodeLocalState(const common::CopyDescription& copyDesc, storage::NodeTable* table,
storage::RelsStore* relsStore, catalog::Catalog* catalog, storage::WAL* wal,
DataPos offsetVectorPos, std::vector<DataPos> arrowColumnPoses)
: copyDesc{std::move(copyDesc)}, table{table}, relsStore{relsStore}, catalog{catalog},
wal{wal}, offsetVectorPos{std::move(offsetVectorPos)}, arrowColumnPoses{
std::move(arrowColumnPoses)} {}
const DataPos& offsetVectorPos, std::vector<DataPos> arrowColumnPoses)
: copyDesc{copyDesc}, table{table}, relsStore{relsStore}, catalog{catalog}, wal{wal},
offsetVectorPos{offsetVectorPos}, offsetVector{nullptr}, arrowColumnPoses{std::move(
arrowColumnPoses)} {}

std::pair<common::offset_t, common::offset_t> getStartAndEndOffset(
common::vector_idx_t columnIdx);

common::CopyDescription copyDesc;
storage::NodeTable* table;
Expand Down Expand Up @@ -88,6 +91,12 @@ class CopyNode : public Sink {
sharedState, resultSetDescriptor->copy(), children[0]->clone(), id, paramsString);
}

protected:
void populatePKIndex(storage::InMemColumnChunk* chunk, storage::InMemOverflowFile* overflowFile,
common::offset_t startOffset, uint64_t numValues);

void logCopyWALRecord();

private:
inline bool isCopyAllowed() {
auto nodesStatistics = localState->table->getNodeStatisticsAndDeletedIDs();
Expand All @@ -105,10 +114,7 @@ class CopyNode : public Sink {
throw common::CopyException("appendToPKIndex1 not implemented");
}

void populatePKIndex(storage::InMemColumnChunk* chunk, storage::InMemOverflowFile* overflowFile,
common::offset_t startOffset, uint64_t numValues);

private:
protected:
std::unique_ptr<CopyNodeLocalState> localState;
std::shared_ptr<CopyNodeSharedState> sharedState;
};
Expand Down
40 changes: 0 additions & 40 deletions src/include/processor/operator/copy/copy_npy.h

This file was deleted.

58 changes: 58 additions & 0 deletions src/include/processor/operator/copy/copy_npy_node.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#pragma once

#include "processor/operator/copy/copy_node.h"

namespace kuzu {
namespace processor {

struct CopyNPYNodeLocalState : public CopyNodeLocalState {
public:
CopyNPYNodeLocalState(const common::CopyDescription& copyDesc, storage::NodeTable* table,
storage::RelsStore* relsStore, catalog::Catalog* catalog, storage::WAL* wal,
const DataPos& offsetVectorPos, const DataPos& columnIdxPos,
std::vector<DataPos> arrowColumnPoses)
: CopyNodeLocalState{copyDesc, table, relsStore, catalog, wal, offsetVectorPos,
std::move(arrowColumnPoses)},
columnIdxPos{columnIdxPos}, columnIdxVector{nullptr} {}

DataPos columnIdxPos;
common::ValueVector* columnIdxVector;
};

class CopyNPYNode : public CopyNode {
public:
CopyNPYNode(std::unique_ptr<CopyNPYNodeLocalState> localState,
std::shared_ptr<CopyNodeSharedState> sharedState,
std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: CopyNode{std::move(localState), std::move(sharedState), std::move(resultSetDescriptor),
std::move(child), id, paramsString} {}

void executeInternal(ExecutionContext* context) final;

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final {
auto npyLocalState = (CopyNPYNodeLocalState*)localState.get();
npyLocalState->offsetVector =
resultSet->getValueVector(npyLocalState->offsetVectorPos).get();
npyLocalState->columnIdxVector =
resultSet->getValueVector(npyLocalState->columnIdxPos).get();
for (auto& arrowColumnPos : npyLocalState->arrowColumnPoses) {
npyLocalState->arrowColumnVectors.push_back(
resultSet->getValueVector(arrowColumnPos).get());
}
}

void flushChunksAndPopulatePKIndexSingleColumn(
std::vector<std::unique_ptr<storage::InMemColumnChunk>>& columnChunks,
common::offset_t startNodeOffset, common::offset_t endNodeOffset,
common::vector_idx_t columnToCopy);

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<CopyNPYNode>(
std::make_unique<CopyNPYNodeLocalState>((CopyNPYNodeLocalState&)*localState),
sharedState, resultSetDescriptor->copy(), children[0]->clone(), id, paramsString);
}
};

} // namespace processor
} // namespace kuzu
2 changes: 1 addition & 1 deletion src/include/processor/operator/copy/read_file.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include "processor/operator/physical_operator.h"
#include "storage/copier/node_copy_executor.h"
#include "storage/copier/table_copy_utils.h"

namespace kuzu {
namespace processor {
Expand Down
61 changes: 61 additions & 0 deletions src/include/processor/operator/copy/read_npy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#pragma once

#include "processor/operator/copy/read_file.h"
#include "storage/copier/npy_reader.h"
#include "storage/in_mem_storage_structure/in_mem_column.h"

namespace kuzu {
namespace processor {

class ReadNPYMorsel : public ReadFileMorsel {
public:
ReadNPYMorsel(common::offset_t nodeOffset, common::block_idx_t blockIdx, uint64_t numNodes,
std::string filePath, common::vector_idx_t curFileIdx)
: ReadFileMorsel{nodeOffset, blockIdx, numNodes, std::move(filePath)}, columnIdx{
curFileIdx} {}

inline common::vector_idx_t getColumnIdx() const { return columnIdx; }

private:
common::vector_idx_t columnIdx;
};

class ReadNPYSharedState : public ReadFileSharedState {
public:
ReadNPYSharedState(catalog::NodeTableSchema* tableSchema, std::vector<std::string> filePaths)
: ReadFileSharedState{std::move(filePaths), tableSchema} {}

std::unique_ptr<ReadFileMorsel> getMorsel() final;

private:
void countNumLines() override;
};

class ReadNPY : public ReadFile {
public:
ReadNPY(std::vector<DataPos> arrowColumnPoses, const DataPos& offsetVectorPos,
const DataPos& columnIdxPos, std::shared_ptr<ReadFileSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: ReadFile{std::move(arrowColumnPoses), offsetVectorPos, std::move(sharedState),
PhysicalOperatorType::READ_NPY, id, paramsString},
columnIdxPos{columnIdxPos}, columnIdxVector{nullptr} {}

std::shared_ptr<arrow::RecordBatch> readTuples(std::unique_ptr<ReadFileMorsel> morsel) final;

bool getNextTuplesInternal(ExecutionContext* context) final;

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

inline std::unique_ptr<PhysicalOperator> clone() final {
return std::make_unique<ReadNPY>(
arrowColumnPoses, offsetVectorPos, columnIdxPos, sharedState, id, paramsString);
}

private:
std::unique_ptr<storage::NpyReader> reader;
DataPos columnIdxPos;
common::ValueVector* columnIdxVector;
};

} // namespace processor
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/processor/operator/physical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ enum class PhysicalOperatorType : uint8_t {
COPY_REL,
COPY_NPY,
READ_CSV,
READ_NPY,
READ_PARQUET,
CREATE_NODE,
CREATE_NODE_TABLE,
Expand Down
5 changes: 2 additions & 3 deletions src/include/processor/physical_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ class PhysicalPlan {
explicit PhysicalPlan(std::unique_ptr<PhysicalOperator> lastOperator)
: lastOperator{std::move(lastOperator)} {}

inline bool isCopyRelOrNPY() const {
return lastOperator->getOperatorType() == PhysicalOperatorType::COPY_REL ||
lastOperator->getOperatorType() == PhysicalOperatorType::COPY_NPY;
inline bool isCopyRel() const {
return lastOperator->getOperatorType() == PhysicalOperatorType::COPY_REL;
}

public:
Expand Down
42 changes: 0 additions & 42 deletions src/include/storage/copier/node_copy_executor.h

This file was deleted.

2 changes: 2 additions & 0 deletions src/planner/operator/logical_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ void LogicalCopy::computeFactorizedSchema() {
auto groupPos = schema->createGroup();
schema->insertToGroupAndScope(arrowColumnExpressions, groupPos);
schema->insertToGroupAndScope(offsetExpression, groupPos);
schema->insertToGroupAndScope(columnIdxExpression, groupPos);
schema->insertToGroupAndScope(outputExpression, groupPos);
schema->setGroupAsSingleState(groupPos);
}
Expand All @@ -17,6 +18,7 @@ void LogicalCopy::computeFlatSchema() {
schema->createGroup();
schema->insertToGroupAndScope(arrowColumnExpressions, 0);
schema->insertToGroupAndScope(offsetExpression, 0);
schema->insertToGroupAndScope(columnIdxExpression, 0);
schema->insertToGroupAndScope(outputExpression, 0);
}

Expand Down
Loading

0 comments on commit 13a4f59

Please sign in to comment.