Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor copy node #1590

Merged
merged 1 commit into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ std::unique_ptr<BoundStatement> Binder::bindCopyClause(const Statement& statemen
tableSchema->tableName));
}
}
return make_unique<BoundCopy>(
return std::make_unique<BoundCopy>(
CopyDescription(boundFilePaths, csvReaderConfig, actualFileType), tableID, tableName);
}

Expand Down
3 changes: 3 additions & 0 deletions src/common/types/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ void LogicalType::setPhysicalType() {
case LogicalTypeID::STRUCT: {
physicalType = PhysicalTypeID::STRUCT;
} break;
case LogicalTypeID::ARROW_COLUMN: {
physicalType = PhysicalTypeID::ARROW_COLUMN;
} break;
default:
throw NotImplementedException{"LogicalType::setPhysicalType()."};
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/vector/auxiliary_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ std::unique_ptr<AuxiliaryBuffer> AuxiliaryBufferFactory::getAuxiliaryBuffer(
case LogicalTypeID::VAR_LIST:
return std::make_unique<ListAuxiliaryBuffer>(
*VarListType::getChildType(&type), memoryManager);
case LogicalTypeID::ARROW_COLUMN:
return std::make_unique<ArrowColumnAuxiliaryBuffer>();
default:
return nullptr;
}
Expand Down
10 changes: 10 additions & 0 deletions src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ uint32_t ValueVector::getDataTypeSize(const LogicalType& type) {
case LogicalTypeID::VAR_LIST: {
return sizeof(list_entry_t);
}
case LogicalTypeID::ARROW_COLUMN: {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't think this is a good idea. It doesn't make sense to me to have ARROW column/array as a logical type.

return 0;
}
default: {
return LogicalTypeUtils::getFixedTypeSize(type.getPhysicalType());
}
Expand All @@ -102,6 +105,13 @@ void ValueVector::initializeValueBuffer() {
}
}

void ArrowColumnVector::setArrowColumn(
kuzu::common::ValueVector* vector, std::shared_ptr<arrow::Array> column) {
auto arrowColumnBuffer =
reinterpret_cast<ArrowColumnAuxiliaryBuffer*>(vector->auxiliaryBuffer.get());
arrowColumnBuffer->column = std::move(column);
}

template void ValueVector::setValue<nodeID_t>(uint32_t pos, nodeID_t val);
template void ValueVector::setValue<bool>(uint32_t pos, bool val);
template void ValueVector::setValue<int64_t>(uint32_t pos, int64_t val);
Expand Down
3 changes: 3 additions & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ KUZU_API enum class LogicalTypeID : uint8_t {

INTERNAL_ID = 40,

ARROW_COLUMN = 41,

// variable size types
STRING = 50,
VAR_LIST = 52,
Expand All @@ -95,6 +97,7 @@ enum class PhysicalTypeID : uint8_t {
FLOAT = 6,
INTERVAL = 7,
INTERNAL_ID = 9,
ARROW_COLUMN = 10,

// Variable size types.
STRING = 20,
Expand Down
8 changes: 8 additions & 0 deletions src/include/common/vector/auxiliary_buffer.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "arrow/array.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's think of a way to get rid of this include. I hope we can go through common/arrow. One way is to have a zero-copy conversion in copier from arrow's arrow_array to common's arrow_array.

#include "common/in_mem_overflow_buffer.h"

namespace kuzu {
Expand Down Expand Up @@ -43,6 +44,13 @@ class StructAuxiliaryBuffer : public AuxiliaryBuffer {
std::vector<std::shared_ptr<ValueVector>> childrenVectors;
};

class ArrowColumnAuxiliaryBuffer : public AuxiliaryBuffer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the best way to implement the arrow auxiliary is that we store the arrow array in the aux to keep its lifetime, and point to the aux array when possible from ValueVector. Let's discuss this a bit more offline.

friend class ArrowColumnVector;

private:
std::shared_ptr<arrow::Array> column;
};

// ListVector layout:
// To store a list value in the valueVector, we could use two separate vectors.
// 1. A vector(called offset vector) for the list offsets and length(called list_entry_t): This
Expand Down
11 changes: 11 additions & 0 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class ValueVector {
friend class ListAuxiliaryBuffer;
friend class StructVector;
friend class StringVector;
friend class ArrowColumnVector;

public:
explicit ValueVector(LogicalType dataType, storage::MemoryManager* memoryManager = nullptr);
Expand Down Expand Up @@ -154,6 +155,16 @@ class StructVector {
}
};

class ArrowColumnVector {
public:
static inline std::shared_ptr<arrow::Array> getArrowColumn(ValueVector* vector) {
return reinterpret_cast<ArrowColumnAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())->column;
}

static void setArrowColumn(
kuzu::common::ValueVector* vector, std::shared_ptr<arrow::Array> column);
};

class NodeIDVector {
public:
// If there is still non-null values after discarding, return true. Otherwise, return false.
Expand Down
35 changes: 28 additions & 7 deletions src/include/planner/logical_plan/logical_operator/logical_copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,49 @@ class LogicalCopy : public LogicalOperator {

public:
LogicalCopy(const common::CopyDescription& copyDescription, common::table_id_t tableID,
std::string tableName)
: LogicalOperator{LogicalOperatorType::COPY},
copyDescription{copyDescription}, tableID{tableID}, tableName{std::move(tableName)} {}

inline void computeFactorizedSchema() override { createEmptySchema(); }
inline void computeFlatSchema() override { createEmptySchema(); }
std::string tableName, binder::expression_vector arrowColumnExpressions,
std::shared_ptr<binder::Expression> offsetExpression,
std::shared_ptr<binder::Expression> outputExpression)
: LogicalOperator{LogicalOperatorType::COPY}, copyDescription{copyDescription},
tableID{tableID}, tableName{std::move(tableName)}, arrowColumnExpressions{std::move(
arrowColumnExpressions)},
offsetExpression{std::move(offsetExpression)}, outputExpression{
std::move(outputExpression)} {}

inline std::string getExpressionsForPrinting() const override { return tableName; }

inline common::CopyDescription getCopyDescription() const { return copyDescription; }

inline common::table_id_t getTableID() const { return tableID; }

inline std::vector<std::shared_ptr<binder::Expression>> getArrowColumnExpressions() const {
return arrowColumnExpressions;
}

inline std::shared_ptr<binder::Expression> getOffsetExpression() const {
return offsetExpression;
}

inline std::shared_ptr<binder::Expression> getOutputExpression() const {
return outputExpression;
}

void computeFactorizedSchema() override;
void computeFlatSchema() override;

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCopy>(copyDescription, tableID, tableName);
return make_unique<LogicalCopy>(copyDescription, tableID, tableName, arrowColumnExpressions,
offsetExpression, outputExpression);
}

private:
common::CopyDescription copyDescription;
common::table_id_t tableID;
// Used for printing only.
std::string tableName;
binder::expression_vector arrowColumnExpressions;
std::shared_ptr<binder::Expression> offsetExpression;
std::shared_ptr<binder::Expression> outputExpression;
};

} // namespace planner
Expand Down
3 changes: 2 additions & 1 deletion src/include/planner/planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class Planner {

static std::unique_ptr<LogicalPlan> planRenameProperty(const BoundStatement& statement);

static std::unique_ptr<LogicalPlan> planCopy(const BoundStatement& statement);
static std::unique_ptr<LogicalPlan> planCopy(
const catalog::Catalog& catalog, const BoundStatement& statement);
};

} // namespace planner
Expand Down
7 changes: 5 additions & 2 deletions src/include/processor/mapper/plan_mapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "binder/expression/node_expression.h"
#include "common/statement_type.h"
#include "planner/logical_plan/logical_operator/logical_copy.h"
#include "planner/logical_plan/logical_plan.h"
#include "processor/mapper/expression_mapper.h"
#include "processor/operator/result_collector.h"
Expand All @@ -23,8 +24,8 @@ class PlanMapper {
: storageManager{storageManager}, memoryManager{memoryManager},
expressionMapper{}, catalog{catalog}, physicalOperatorID{0} {}

std::unique_ptr<PhysicalPlan> mapLogicalPlanToPhysical(planner::LogicalPlan* logicalPlan,
const binder::expression_vector& expressionsToCollect, common::StatementType statementType);
std::unique_ptr<PhysicalPlan> mapLogicalPlanToPhysical(
planner::LogicalPlan* logicalPlan, const binder::expression_vector& expressionsToCollect);

private:
std::unique_ptr<PhysicalOperator> mapLogicalOperatorToPhysical(
Expand Down Expand Up @@ -95,6 +96,8 @@ class PlanMapper {
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalCopyToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalCopyNodeToPhysical(planner::LogicalCopy* copy);
std::unique_ptr<PhysicalOperator> mapLogicalCopyRelToPhysical(planner::LogicalCopy* copy);
std::unique_ptr<PhysicalOperator> mapLogicalDropTableToPhysical(
planner::LogicalOperator* logicalOperator);
std::unique_ptr<PhysicalOperator> mapLogicalRenameTableToPhysical(
Expand Down
117 changes: 97 additions & 20 deletions src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
@@ -1,39 +1,116 @@
#pragma once

#include "processor/operator/copy/copy.h"
#include "storage/store/nodes_store.h"
#include "processor/operator/sink.h"
#include "storage/copier/node_copy_executor.h"
#include "storage/in_mem_storage_structure/in_mem_column.h"
#include "storage/store/node_table.h"

namespace kuzu {
namespace processor {

class CopyNode : public Copy {
class CopyNodeSharedState {
public:
CopyNode(catalog::Catalog* catalog, const common::CopyDescription& copyDescription,
storage::NodeTable* table, storage::WAL* wal,
storage::NodesStatisticsAndDeletedIDs* nodesStatistics, storage::RelsStore& relsStore,
uint32_t id, const std::string& paramsString)
: Copy{PhysicalOperatorType::COPY_NODE, catalog, copyDescription, table->getTableID(), wal,
CopyNodeSharedState(uint64_t& numRows, storage::MemoryManager* memoryManager);

inline void initialize(
catalog::NodeTableSchema* nodeTableSchema, const std::string& directory) {
initializePrimaryKey(nodeTableSchema, directory);
initializeColumns(nodeTableSchema, directory);
};

private:
void initializePrimaryKey(
catalog::NodeTableSchema* nodeTableSchema, const std::string& directory);

void initializeColumns(catalog::NodeTableSchema* nodeTableSchema, const std::string& directory);

public:
common::column_id_t pkColumnID;
std::vector<std::unique_ptr<storage::InMemColumn>> columns;
std::unique_ptr<storage::PrimaryKeyIndexBuilder> pkIndex;
uint64_t& numRows;
std::mutex mtx;
std::shared_ptr<FactorizedTable> table;
};

struct CopyNodeLocalState {
CopyNodeLocalState(common::CopyDescription copyDesc, storage::NodeTable* table,
storage::RelsStore* relsStore, catalog::Catalog* catalog, storage::WAL* wal,
DataPos offsetVectorPos, std::vector<DataPos> arrowColumnPoses)
: copyDesc{std::move(copyDesc)}, table{table}, relsStore{relsStore}, catalog{catalog},
wal{wal}, offsetVectorPos{std::move(offsetVectorPos)}, arrowColumnPoses{
std::move(arrowColumnPoses)} {}

common::CopyDescription copyDesc;
storage::NodeTable* table;
storage::RelsStore* relsStore;
catalog::Catalog* catalog;
storage::WAL* wal;
DataPos offsetVectorPos;
common::ValueVector* offsetVector;
std::vector<DataPos> arrowColumnPoses;
std::vector<common::ValueVector*> arrowColumnVectors;
};

class CopyNode : public Sink {
public:
CopyNode(std::unique_ptr<CopyNodeLocalState> localState,
std::shared_ptr<CopyNodeSharedState> sharedState,
std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_NODE, std::move(child),
id, paramsString},
table{table}, nodesStatistics{nodesStatistics}, relsStore{relsStore} {}
localState{std::move(localState)}, sharedState{std::move(sharedState)} {}

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
localState->offsetVector = resultSet->getValueVector(localState->offsetVectorPos).get();
for (auto& arrowColumnPos : localState->arrowColumnPoses) {
localState->arrowColumnVectors.push_back(
resultSet->getValueVector(arrowColumnPos).get());
}
}

std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<CopyNode>(
catalog, copyDescription, table, wal, nodesStatistics, relsStore, id, paramsString);
inline void initGlobalStateInternal(ExecutionContext* context) override {
if (!isCopyAllowed()) {
throw common::CopyException("COPY commands can only be executed once on a table.");
}
auto nodeTableSchema = localState->catalog->getReadOnlyVersion()->getNodeTableSchema(
localState->table->getTableID());
sharedState->initialize(nodeTableSchema, localState->wal->getDirectory());
}

protected:
uint64_t executeInternal(
common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) override;
void executeInternal(ExecutionContext* context) override;

void finalize(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<CopyNode>(std::make_unique<CopyNodeLocalState>(*localState),
sharedState, resultSetDescriptor->copy(), children[0]->clone(), id, paramsString);
}

private:
inline bool isCopyAllowed() override {
return nodesStatistics->getNodeStatisticsAndDeletedIDs(tableID)->getNumTuples() == 0;
inline bool isCopyAllowed() {
auto nodesStatistics = localState->table->getNodeStatisticsAndDeletedIDs();
return nodesStatistics->getNodeStatisticsAndDeletedIDs(localState->table->getTableID())
->getNumTuples() == 0;
}

void flushChunksAndPopulatePKIndex(
const std::vector<std::unique_ptr<storage::InMemColumnChunk>>& columnChunks,
common::offset_t startNodeOffset, common::offset_t endNodeOffset);

template<typename T, typename... Args>
void appendToPKIndex(storage::InMemColumnChunk* chunk, common::offset_t startOffset,
uint64_t numValues, Args... args) {
throw common::CopyException("appendToPKIndex1 not implemented");
}

void populatePKIndex(storage::InMemColumnChunk* chunk, storage::InMemOverflowFile* overflowFile,
common::offset_t startOffset, uint64_t numValues);

private:
storage::NodeTable* table;
storage::NodesStatisticsAndDeletedIDs* nodesStatistics;
storage::RelsStore& relsStore;
std::unique_ptr<CopyNodeLocalState> localState;
std::shared_ptr<CopyNodeSharedState> sharedState;
};

} // namespace processor
Expand Down
40 changes: 40 additions & 0 deletions src/include/processor/operator/copy/copy_npy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#pragma once

#include "processor/operator/copy/copy.h"
#include "storage/store/nodes_store.h"

namespace kuzu {
namespace processor {

class CopyNPY : public Copy {
acquamarin marked this conversation as resolved.
Show resolved Hide resolved
public:
CopyNPY(catalog::Catalog* catalog, const common::CopyDescription& copyDescription,
storage::NodeTable* table, storage::WAL* wal,
storage::NodesStatisticsAndDeletedIDs* nodesStatistics, storage::RelsStore& relsStore,
uint32_t id, const std::string& paramsString)
: Copy{PhysicalOperatorType::COPY_NPY, catalog, copyDescription, table->getTableID(), wal,
id, paramsString},
table{table}, nodesStatistics{nodesStatistics}, relsStore{relsStore} {}

std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<CopyNPY>(
catalog, copyDescription, table, wal, nodesStatistics, relsStore, id, paramsString);
}

protected:
uint64_t executeInternal(
common::TaskScheduler* taskScheduler, ExecutionContext* executionContext) override;

private:
inline bool isCopyAllowed() override {
return nodesStatistics->getNodeStatisticsAndDeletedIDs(tableID)->getNumTuples() == 0;
}

private:
storage::NodeTable* table;
storage::NodesStatisticsAndDeletedIDs* nodesStatistics;
storage::RelsStore& relsStore;
};

} // namespace processor
} // namespace kuzu
Loading
Loading