Skip to content

Commit

Permalink
Merge pull request #1590 from kuzudb/copy-refactor
Browse files Browse the repository at this point in the history
Refactor copy node
  • Loading branch information
acquamarin committed Jun 6, 2023
2 parents 3d9ed2e + e524ab5 commit 4c5e1be
Show file tree
Hide file tree
Showing 39 changed files with 903 additions and 116 deletions.
2 changes: 1 addition & 1 deletion src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ std::unique_ptr<BoundStatement> Binder::bindCopyClause(const Statement& statemen
tableSchema->tableName));
}
}
return make_unique<BoundCopy>(
return std::make_unique<BoundCopy>(
CopyDescription(boundFilePaths, csvReaderConfig, actualFileType), tableID, tableName);
}

Expand Down
3 changes: 3 additions & 0 deletions src/common/types/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ void LogicalType::setPhysicalType() {
case LogicalTypeID::STRUCT: {
physicalType = PhysicalTypeID::STRUCT;
} break;
case LogicalTypeID::ARROW_COLUMN: {
physicalType = PhysicalTypeID::ARROW_COLUMN;
} break;
default:
throw NotImplementedException{"LogicalType::setPhysicalType()."};
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/vector/auxiliary_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ std::unique_ptr<AuxiliaryBuffer> AuxiliaryBufferFactory::getAuxiliaryBuffer(
case LogicalTypeID::VAR_LIST:
return std::make_unique<ListAuxiliaryBuffer>(
*VarListType::getChildType(&type), memoryManager);
case LogicalTypeID::ARROW_COLUMN:
return std::make_unique<ArrowColumnAuxiliaryBuffer>();
default:
return nullptr;
}
Expand Down
10 changes: 10 additions & 0 deletions src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ uint32_t ValueVector::getDataTypeSize(const LogicalType& type) {
case LogicalTypeID::VAR_LIST: {
return sizeof(list_entry_t);
}
case LogicalTypeID::ARROW_COLUMN: {
return 0;
}
default: {
return LogicalTypeUtils::getFixedTypeSize(type.getPhysicalType());
}
Expand All @@ -102,6 +105,13 @@ void ValueVector::initializeValueBuffer() {
}
}

void ArrowColumnVector::setArrowColumn(
kuzu::common::ValueVector* vector, std::shared_ptr<arrow::Array> column) {
auto arrowColumnBuffer =
reinterpret_cast<ArrowColumnAuxiliaryBuffer*>(vector->auxiliaryBuffer.get());
arrowColumnBuffer->column = std::move(column);
}

template void ValueVector::setValue<nodeID_t>(uint32_t pos, nodeID_t val);
template void ValueVector::setValue<bool>(uint32_t pos, bool val);
template void ValueVector::setValue<int64_t>(uint32_t pos, int64_t val);
Expand Down
3 changes: 3 additions & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ KUZU_API enum class LogicalTypeID : uint8_t {

INTERNAL_ID = 40,

ARROW_COLUMN = 41,

// variable size types
STRING = 50,
VAR_LIST = 52,
Expand All @@ -95,6 +97,7 @@ enum class PhysicalTypeID : uint8_t {
FLOAT = 6,
INTERVAL = 7,
INTERNAL_ID = 9,
ARROW_COLUMN = 10,

// Variable size types.
STRING = 20,
Expand Down
8 changes: 8 additions & 0 deletions src/include/common/vector/auxiliary_buffer.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "arrow/array.h"
#include "common/in_mem_overflow_buffer.h"

namespace kuzu {
Expand Down Expand Up @@ -43,6 +44,13 @@ class StructAuxiliaryBuffer : public AuxiliaryBuffer {
std::vector<std::shared_ptr<ValueVector>> childrenVectors;
};

class ArrowColumnAuxiliaryBuffer : public AuxiliaryBuffer {
friend class ArrowColumnVector;

private:
std::shared_ptr<arrow::Array> column;
};

// ListVector layout:
// To store a list value in the valueVector, we could use two separate vectors.
// 1. A vector(called offset vector) for the list offsets and length(called list_entry_t): This
Expand Down
11 changes: 11 additions & 0 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class ValueVector {
friend class ListAuxiliaryBuffer;
friend class StructVector;
friend class StringVector;
friend class ArrowColumnVector;

public:
explicit ValueVector(LogicalType dataType, storage::MemoryManager* memoryManager = nullptr);
Expand Down Expand Up @@ -154,6 +155,16 @@ class StructVector {
}
};

class ArrowColumnVector {
public:
static inline std::shared_ptr<arrow::Array> getArrowColumn(ValueVector* vector) {
return reinterpret_cast<ArrowColumnAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())->column;
}

static void setArrowColumn(
kuzu::common::ValueVector* vector, std::shared_ptr<arrow::Array> column);
};

class NodeIDVector {
public:
// If there is still non-null values after discarding, return true. Otherwise, return false.
Expand Down
35 changes: 28 additions & 7 deletions src/include/planner/logical_plan/logical_operator/logical_copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,49 @@ class LogicalCopy : public LogicalOperator {

public:
LogicalCopy(const common::CopyDescription& copyDescription, common::table_id_t tableID,
std::string tableName)
: LogicalOperator{LogicalOperatorType::COPY},
copyDescription{copyDescription}, tableID{tableID}, tableName{std::move(tableName)} {}

inline void computeFactorizedSchema() override { createEmptySchema(); }
inline void computeFlatSchema() override { createEmptySchema(); }
std::string tableName, binder::expression_vector arrowColumnExpressions,
std::shared_ptr<binder::Expression> offsetExpression,
std::shared_ptr<binder::Expression> outputExpression)
: LogicalOperator{LogicalOperatorType::COPY}, copyDescription{copyDescription},
tableID{tableID}, tableName{std::move(tableName)}, arrowColumnExpressions{std::move(
arrowColumnExpressions)},
offsetExpression{std::move(offsetExpression)}, outputExpression{
std::move(outputExpression)} {}

inline std::string getExpressionsForPrinting() const override { return tableName; }

inline common::CopyDescription getCopyDescription() const { return copyDescription; }

inline common::table_id_t getTableID() const { return tableID; }

inline std::vector<std::shared_ptr<binder::Expression>> getArrowColumnExpressions() const {
return arrowColumnExpressions;
}

inline std::shared_ptr<binder::Expression> getOffsetExpression() const {
return offsetExpression;
}

inline std::shared_ptr<binder::Expression> getOutputExpression() const {
return outputExpression;
}

void computeFactorizedSchema() override;
void computeFlatSchema() override;

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCopy>(copyDescription, tableID, tableName);
return make_unique<LogicalCopy>(copyDescription, tableID, tableName, arrowColumnExpressions,
offsetExpression, outputExpression);
}

private:
common::CopyDescription copyDescription;
common::table_id_t tableID;
// Used for printing only.
std::string tableName;
binder::expression_vector arrowColumnExpressions;
std::shared_ptr<binder::Expression> offsetExpression;
std::shared_ptr<binder::Expression> outputExpression;
};

} // namespace planner
Expand Down
3 changes: 2 additions & 1 deletion src/include/planner/planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class Planner {

static std::unique_ptr<LogicalPlan> planRenameProperty(const BoundStatement& statement);

static std::unique_ptr<LogicalPlan> planCopy(const BoundStatement& statement);
static std::unique_ptr<LogicalPlan> planCopy(
const catalog::Catalog& catalog, const BoundStatement& statement);
};

} // namespace planner
Expand Down
7 changes: 5 additions & 2 deletions src/include/processor/mapper/plan_mapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "binder/expression/node_expression.h"
#include "common/statement_type.h"
#include "planner/logical_plan/logical_operator/logical_copy.h"
#include "planner/logical_plan/logical_plan.h"
#include "processor/mapper/expression_mapper.h"
#include "processor/operator/result_collector.h"
Expand All @@ -23,8 +24,8 @@ class PlanMapper {
: storageManager{storageManager}, memoryManager{memoryManager},
expressionMapper{}, catalog{catalog}, physicalOperatorID{0} {}

std::unique_ptr<PhysicalPlan> mapLogicalPlanToPhysical(planner::LogicalPlan* logicalPlan,
const binder::expression_vector& expressionsToCollect, common::StatementType statementType);
std::unique_ptr<PhysicalPlan> mapLogicalPlanToPhysical(
planner::LogicalPlan* logicalPlan, const binder::expression_vector& expressionsToCollect);

private:
std::unique_ptr<PhysicalOperator> mapLogicalOperatorToPhysical(
Expand Down Expand Up @@ -95,6 +96,8 @@ class PlanMapper {
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalCopyToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalCopyNodeToPhysical(planner::LogicalCopy* copy);
std::unique_ptr<PhysicalOperator> mapLogicalCopyRelToPhysical(planner::LogicalCopy* copy);
std::unique_ptr<PhysicalOperator> mapLogicalDropTableToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalRenameTableToPhysical(
Expand Down
117 changes: 97 additions & 20 deletions src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
@@ -1,39 +1,116 @@
#pragma once

#include "processor/operator/copy/copy.h"
#include "storage/store/nodes_store.h"
#include "processor/operator/sink.h"
#include "storage/copier/node_copy_executor.h"
#include "storage/in_mem_storage_structure/in_mem_column.h"
#include "storage/store/node_table.h"

namespace kuzu {
namespace processor {

class CopyNode : public Copy {
class CopyNodeSharedState {
public:
CopyNode(catalog::Catalog* catalog, const common::CopyDescription& copyDescription,
storage::NodeTable* table, storage::WAL* wal,
storage::NodesStatisticsAndDeletedIDs* nodesStatistics, storage::RelsStore& relsStore,
uint32_t id, const std::string& paramsString)
: Copy{PhysicalOperatorType::COPY_NODE, catalog, copyDescription, table->getTableID(), wal,
CopyNodeSharedState(uint64_t& numRows, storage::MemoryManager* memoryManager);

inline void initialize(
catalog::NodeTableSchema* nodeTableSchema, const std::string& directory) {
initializePrimaryKey(nodeTableSchema, directory);
initializeColumns(nodeTableSchema, directory);
};

private:
void initializePrimaryKey(
catalog::NodeTableSchema* nodeTableSchema, const std::string& directory);

void initializeColumns(catalog::NodeTableSchema* nodeTableSchema, const std::string& directory);

public:
common::column_id_t pkColumnID;
std::vector<std::unique_ptr<storage::InMemColumn>> columns;
std::unique_ptr<storage::PrimaryKeyIndexBuilder> pkIndex;
uint64_t& numRows;
std::mutex mtx;
std::shared_ptr<FactorizedTable> table;
};

struct CopyNodeLocalState {
CopyNodeLocalState(common::CopyDescription copyDesc, storage::NodeTable* table,
storage::RelsStore* relsStore, catalog::Catalog* catalog, storage::WAL* wal,
DataPos offsetVectorPos, std::vector<DataPos> arrowColumnPoses)
: copyDesc{std::move(copyDesc)}, table{table}, relsStore{relsStore}, catalog{catalog},
wal{wal}, offsetVectorPos{std::move(offsetVectorPos)}, arrowColumnPoses{
std::move(arrowColumnPoses)} {}

common::CopyDescription copyDesc;
storage::NodeTable* table;
storage::RelsStore* relsStore;
catalog::Catalog* catalog;
storage::WAL* wal;
DataPos offsetVectorPos;
common::ValueVector* offsetVector;
std::vector<DataPos> arrowColumnPoses;
std::vector<common::ValueVector*> arrowColumnVectors;
};

class CopyNode : public Sink {
public:
CopyNode(std::unique_ptr<CopyNodeLocalState> localState,
std::shared_ptr<CopyNodeSharedState> sharedState,
std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_NODE, std::move(child),
id, paramsString},
table{table}, nodesStatistics{nodesStatistics}, relsStore{relsStore} {}
localState{std::move(localState)}, sharedState{std::move(sharedState)} {}

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
localState->offsetVector = resultSet->getValueVector(localState->offsetVectorPos).get();
for (auto& arrowColumnPos : localState->arrowColumnPoses) {
localState->arrowColumnVectors.push_back(
resultSet->getValueVector(arrowColumnPos).get());
}
}

std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<CopyNode>(
catalog, copyDescription, table, wal, nodesStatistics, relsStore, id, paramsString);
inline void initGlobalStateInternal(ExecutionContext* context) override {
if (!isCopyAllowed()) {
throw common::CopyException("COPY commands can only be executed once on a table.");
}
auto nodeTableSchema = localState->catalog->getReadOnlyVersion()->getNodeTableSchema(
localState->table->getTableID());
sharedState->initialize(nodeTableSchema, localState->wal->getDirectory());
}

protected:
uint64_t executeInternal(
common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) override;
void executeInternal(ExecutionContext* context) override;

void finalize(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<CopyNode>(std::make_unique<CopyNodeLocalState>(*localState),
sharedState, resultSetDescriptor->copy(), children[0]->clone(), id, paramsString);
}

private:
inline bool isCopyAllowed() override {
return nodesStatistics->getNodeStatisticsAndDeletedIDs(tableID)->getNumTuples() == 0;
inline bool isCopyAllowed() {
auto nodesStatistics = localState->table->getNodeStatisticsAndDeletedIDs();
return nodesStatistics->getNodeStatisticsAndDeletedIDs(localState->table->getTableID())
->getNumTuples() == 0;
}

void flushChunksAndPopulatePKIndex(
const std::vector<std::unique_ptr<storage::InMemColumnChunk>>& columnChunks,
common::offset_t startNodeOffset, common::offset_t endNodeOffset);

template<typename T, typename... Args>
void appendToPKIndex(storage::InMemColumnChunk* chunk, common::offset_t startOffset,
uint64_t numValues, Args... args) {
throw common::CopyException("appendToPKIndex1 not implemented");
}

void populatePKIndex(storage::InMemColumnChunk* chunk, storage::InMemOverflowFile* overflowFile,
common::offset_t startOffset, uint64_t numValues);

private:
storage::NodeTable* table;
storage::NodesStatisticsAndDeletedIDs* nodesStatistics;
storage::RelsStore& relsStore;
std::unique_ptr<CopyNodeLocalState> localState;
std::shared_ptr<CopyNodeSharedState> sharedState;
};

} // namespace processor
Expand Down
40 changes: 40 additions & 0 deletions src/include/processor/operator/copy/copy_npy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#pragma once

#include "processor/operator/copy/copy.h"
#include "storage/store/nodes_store.h"

namespace kuzu {
namespace processor {

class CopyNPY : public Copy {
public:
CopyNPY(catalog::Catalog* catalog, const common::CopyDescription& copyDescription,
storage::NodeTable* table, storage::WAL* wal,
storage::NodesStatisticsAndDeletedIDs* nodesStatistics, storage::RelsStore& relsStore,
uint32_t id, const std::string& paramsString)
: Copy{PhysicalOperatorType::COPY_NPY, catalog, copyDescription, table->getTableID(), wal,
id, paramsString},
table{table}, nodesStatistics{nodesStatistics}, relsStore{relsStore} {}

std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<CopyNPY>(
catalog, copyDescription, table, wal, nodesStatistics, relsStore, id, paramsString);
}

protected:
uint64_t executeInternal(
common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) override;

private:
inline bool isCopyAllowed() override {
return nodesStatistics->getNodeStatisticsAndDeletedIDs(tableID)->getNumTuples() == 0;
}

private:
storage::NodeTable* table;
storage::NodesStatisticsAndDeletedIDs* nodesStatistics;
storage::RelsStore& relsStore;
};

} // namespace processor
} // namespace kuzu
Loading

0 comments on commit 4c5e1be

Please sign in to comment.