Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
acquamarin committed May 31, 2023
1 parent 64630f7 commit 0bb6add
Show file tree
Hide file tree
Showing 19 changed files with 163 additions and 127 deletions.
2 changes: 1 addition & 1 deletion src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ std::unique_ptr<BoundStatement> Binder::bindCopyClause(const Statement& statemen
tableSchema->tableName));
}
}
return make_unique<BoundCopy>(
return std::make_unique<BoundCopy>(
CopyDescription(boundFilePaths, csvReaderConfig, actualFileType), tableID, tableName);
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/vector/auxiliary_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ std::unique_ptr<AuxiliaryBuffer> AuxiliaryBufferFactory::getAuxiliaryBuffer(
return std::make_unique<ListAuxiliaryBuffer>(
*VarListType::getChildType(&type), memoryManager);
case LogicalTypeID::ARROW_DATA:
return std::make_unique<ArrowDataAuxiliaryBuffer>();
return std::make_unique<ArrowColumnAuxiliaryBuffer>();
default:
return nullptr;
}
Expand Down
15 changes: 15 additions & 0 deletions src/common/vector/value_vector.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "common/vector/value_vector.h"

#include "arrow/array.h"
#include "common/vector/auxiliary_buffer.h"

namespace kuzu {
Expand Down Expand Up @@ -105,6 +106,20 @@ void ValueVector::initializeValueBuffer() {
}
}

void ArrowColumnVector::setArrowData(kuzu::common::ValueVector* vector,
kuzu::common::offset_t startOffset, std::shared_ptr<arrow::Array> data) {
auto arrowColumnBuffer =
reinterpret_cast<ArrowColumnAuxiliaryBuffer*>(vector->auxiliaryBuffer.get());
arrowColumnBuffer->startOffset = startOffset;
arrowColumnBuffer->column = std::move(data);
}

offset_t ArrowColumnVector::getEndOffset(kuzu::common::ValueVector* vector) {
auto arrowColumnBuffer =
reinterpret_cast<ArrowColumnAuxiliaryBuffer*>(vector->auxiliaryBuffer.get());
return arrowColumnBuffer->startOffset + arrowColumnBuffer->column->length() - 1;
}

template void ValueVector::setValue<nodeID_t>(uint32_t pos, nodeID_t val);
template void ValueVector::setValue<bool>(uint32_t pos, bool val);
template void ValueVector::setValue<int64_t>(uint32_t pos, int64_t val);
Expand Down
13 changes: 2 additions & 11 deletions src/include/binder/copy/bound_copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,19 @@ class BoundCopy : public BoundStatement {
common::CopyDescription copyDescription, common::table_id_t tableID, std::string tableName)
: BoundStatement{common::StatementType::COPY,
BoundStatementResult::createSingleStringColumnResult()},
copyDescription{std::move(copyDescription)}, tableID{tableID}, tableName{
std::move(tableName)} {
arrowDataExpression = std::make_shared<VariableExpression>(
common::LogicalType{common::LogicalTypeID::ARROW_DATA}, "ArrowData" /* uniqueName */,
"ArrowData" /* variableName */);
}
copyDescription{std::move(copyDescription)}, tableID{tableID}, tableName{std::move(
tableName)} {}

inline common::CopyDescription getCopyDescription() const { return copyDescription; }

inline common::table_id_t getTableID() const { return tableID; }

inline std::string getTableName() const { return tableName; }

inline std::shared_ptr<Expression> getArrowDataExpression() const {
return arrowDataExpression;
}

private:
common::CopyDescription copyDescription;
common::table_id_t tableID;
std::string tableName;
std::shared_ptr<Expression> arrowDataExpression;
};

} // namespace binder
Expand Down
17 changes: 4 additions & 13 deletions src/include/common/vector/auxiliary_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,12 @@ class StructAuxiliaryBuffer : public AuxiliaryBuffer {
std::vector<std::shared_ptr<ValueVector>> childrenVectors;
};

struct ArrowData {
common::offset_t startOffset;
std::shared_ptr<arrow::RecordBatch> recordBatch;

ArrowData() = default;

ArrowData(common::offset_t startOffset, std::shared_ptr<arrow::RecordBatch> recordBatch)
: startOffset{startOffset}, recordBatch{std::move(recordBatch)} {}
};

class ArrowDataAuxiliaryBuffer : public AuxiliaryBuffer {
friend class ArrowDataVector;
class ArrowColumnAuxiliaryBuffer : public AuxiliaryBuffer {
friend class ArrowColumnVector;

private:
ArrowData arrowData;
common::offset_t startOffset;
std::shared_ptr<arrow::Array> column;
};

// ListVector layout:
Expand Down
20 changes: 12 additions & 8 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class ValueVector {
friend class ListAuxiliaryBuffer;
friend class StructVector;
friend class StringVector;
friend class ArrowDataVector;
friend class ArrowColumnVector;

public:
explicit ValueVector(LogicalType dataType, storage::MemoryManager* memoryManager = nullptr);
Expand Down Expand Up @@ -156,17 +156,21 @@ class StructVector {
}
};

class ArrowDataVector {
class ArrowColumnVector {
public:
static inline void setArrowData(ValueVector* vector, ArrowData arrowData) {
reinterpret_cast<ArrowDataAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())->arrowData =
std::move(arrowData);
static inline offset_t getStartOffset(ValueVector* vector) {
return reinterpret_cast<ArrowColumnAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())
->startOffset;
}

static inline ArrowData& getArrowData(ValueVector* vector) {
return reinterpret_cast<ArrowDataAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())
->arrowData;
static inline std::shared_ptr<arrow::Array> getColumn(ValueVector* vector) {
return reinterpret_cast<ArrowColumnAuxiliaryBuffer*>(vector->auxiliaryBuffer.get())->column;
}

static void setArrowData(
ValueVector* vector, offset_t startOffset, std::shared_ptr<arrow::Array> data);

static offset_t getEndOffset(ValueVector* vector);
};

class NodeIDVector {
Expand Down
39 changes: 14 additions & 25 deletions src/include/planner/logical_plan/logical_operator/logical_copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,52 +11,41 @@ class LogicalCopy : public LogicalOperator {

public:
LogicalCopy(const common::CopyDescription& copyDescription, common::table_id_t tableID,
std::string tableName, std::shared_ptr<binder::Expression> arrowDataExpression,
std::string tableName, binder::expression_vector arrowColumnExpressions,
std::shared_ptr<binder::Expression> outputExpression)
: LogicalOperator{LogicalOperatorType::COPY},
copyDescription{copyDescription}, tableID{tableID}, tableName{std::move(tableName)},
arrowDataExpression{std::move(arrowDataExpression)}, outputExpression{
std::move(outputExpression)} {}

inline void computeFactorizedSchema() override {
createEmptySchema();
schema->createGroup();
schema->insertToGroupAndScope(arrowDataExpression, 0);
schema->insertToGroupAndScope(outputExpression, 0);
}
inline void computeFlatSchema() override {
createEmptySchema();
auto groupPos = schema->createGroup();
schema->insertToGroupAndScope(arrowDataExpression, groupPos);
schema->insertToGroupAndScope(outputExpression, groupPos);
schema->setGroupAsSingleState(groupPos);
}
arrowColumnExpressions{std::move(arrowColumnExpressions)}, outputExpression{std::move(
outputExpression)} {}

inline std::string getExpressionsForPrinting() const override { return tableName; }

inline common::CopyDescription getCopyDescription() const { return copyDescription; }

inline common::table_id_t getTableID() const { return tableID; }

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCopy>(
copyDescription, tableID, tableName, arrowDataExpression, outputExpression);
}

inline std::shared_ptr<binder::Expression> getArrowDataExpression() const {
return arrowDataExpression;
inline std::vector<std::shared_ptr<binder::Expression>> getArrowColumnExpressions() const {
return arrowColumnExpressions;
}

inline std::shared_ptr<binder::Expression> getOutputExpression() const {
return outputExpression;
}

void computeFactorizedSchema() override;
void computeFlatSchema() override;

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCopy>(
copyDescription, tableID, tableName, arrowColumnExpressions, outputExpression);
}

private:
common::CopyDescription copyDescription;
common::table_id_t tableID;
// Used for printing only.
std::string tableName;
std::shared_ptr<binder::Expression> arrowDataExpression;
binder::expression_vector arrowColumnExpressions;
std::shared_ptr<binder::Expression> outputExpression;
};

Expand Down
3 changes: 2 additions & 1 deletion src/include/planner/planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class Planner {

static std::unique_ptr<LogicalPlan> planRenameProperty(const BoundStatement& statement);

static std::unique_ptr<LogicalPlan> planCopy(const BoundStatement& statement);
static std::unique_ptr<LogicalPlan> planCopy(
const catalog::Catalog& catalog, const BoundStatement& statement);
};

} // namespace planner
Expand Down
34 changes: 19 additions & 15 deletions src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,53 +36,58 @@ class CopyNodeSharedState {
struct CopyNodeLocalState {
CopyNodeLocalState(common::CopyDescription copyDesc, storage::NodeTable* table,
storage::RelsStore* relsStore, catalog::Catalog* catalog, storage::WAL* wal,
DataPos arrowDataPos)
: copyDesc{copyDesc}, table{table}, relsStore{relsStore}, catalog{catalog}, wal{wal},
arrowDataPos{std::move(arrowDataPos)} {}
std::vector<DataPos> arrowColumnPoses)
: copyDesc{std::move(copyDesc)}, table{table}, relsStore{relsStore}, catalog{catalog},
wal{wal}, arrowColumnPoses{std::move(arrowColumnPoses)} {}

common::CopyDescription copyDesc;
storage::NodeTable* table;
storage::RelsStore* relsStore;
catalog::Catalog* catalog;
storage::WAL* wal;
DataPos arrowDataPos;
std::vector<DataPos> arrowColumnPoses;
std::vector<common::ValueVector*> arrowColumnVectors;
};

class CopyNode : public Sink {
public:
CopyNode(CopyNodeLocalState localState, std::shared_ptr<CopyNodeSharedState> sharedState,
CopyNode(std::unique_ptr<CopyNodeLocalState> localState,
std::shared_ptr<CopyNodeSharedState> sharedState,
std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_NODE, std::move(child),
id, paramsString},
localState{std::move(localState)}, sharedState{std::move(sharedState)} {}

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
arrowDataVector = resultSet->getValueVector(localState.arrowDataPos).get();
for (auto& arrowColumnPos : localState->arrowColumnPoses) {
localState->arrowColumnVectors.push_back(
resultSet->getValueVector(arrowColumnPos).get());
}
}

inline void initGlobalStateInternal(ExecutionContext* context) override {
if (!isCopyAllowed()) {
throw common::CopyException("COPY commands can only be executed once on a table.");
}
auto nodeTableSchema = localState.catalog->getReadOnlyVersion()->getNodeTableSchema(
localState.table->getTableID());
sharedState->initialize(nodeTableSchema, localState.wal->getDirectory());
auto nodeTableSchema = localState->catalog->getReadOnlyVersion()->getNodeTableSchema(
localState->table->getTableID());
sharedState->initialize(nodeTableSchema, localState->wal->getDirectory());
}

void executeInternal(ExecutionContext* context) override;

void finalize(ExecutionContext* context) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<CopyNode>(localState, sharedState, resultSetDescriptor->copy(),
children[0]->clone(), id, paramsString);
return std::make_unique<CopyNode>(std::make_unique<CopyNodeLocalState>(*localState),
sharedState, resultSetDescriptor->copy(), children[0]->clone(), id, paramsString);
}

private:
inline bool isCopyAllowed() {
auto nodesStatistics = localState.table->getNodeStatisticsAndDeletedIDs();
return nodesStatistics->getNodeStatisticsAndDeletedIDs(localState.table->getTableID())
auto nodesStatistics = localState->table->getNodeStatisticsAndDeletedIDs();
return nodesStatistics->getNodeStatisticsAndDeletedIDs(localState->table->getTableID())
->getNumTuples() == 0;
}

Expand All @@ -100,9 +105,8 @@ class CopyNode : public Sink {
common::offset_t startOffset, uint64_t numValues);

private:
CopyNodeLocalState localState;
std::unique_ptr<CopyNodeLocalState> localState;
std::shared_ptr<CopyNodeSharedState> sharedState;
common::ValueVector* arrowDataVector;
};

} // namespace processor
Expand Down
10 changes: 5 additions & 5 deletions src/include/processor/operator/copy/read_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ class ReadCSVSharedState : public ReadFileSharedState {

class ReadCSV : public ReadFile {
public:
ReadCSV(DataPos fileRecordPos, std::shared_ptr<ReadFileSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: ReadFile{std::move(fileRecordPos), std::move(sharedState), PhysicalOperatorType::READ_CSV,
id, paramsString} {}
ReadCSV(std::vector<DataPos> arrowColumnPoses, std::shared_ptr<ReadFileSharedState> sharedState,
uint32_t id, const std::string& paramsString)
: ReadFile{std::move(arrowColumnPoses), std::move(sharedState),
PhysicalOperatorType::READ_CSV, id, paramsString} {}

inline std::shared_ptr<arrow::RecordBatch> readTuples(
std::unique_ptr<ReadFileMorsel> morsel) override {
Expand All @@ -48,7 +48,7 @@ class ReadCSV : public ReadFile {
}

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ReadCSV>(fileRecordPos, sharedState, id, paramsString);
return std::make_unique<ReadCSV>(arrowColumnPoses, sharedState, id, paramsString);
}
};

Expand Down
23 changes: 13 additions & 10 deletions src/include/processor/operator/copy/read_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ namespace processor {

class ReadFileMorsel {
public:
static constexpr common::block_idx_t BLOCK_IDX_INVALID = UINT64_MAX;

ReadFileMorsel(common::offset_t nodeOffset, common::block_idx_t blockIdx, uint64_t numNodes,
std::string filePath)
: nodeOffset{nodeOffset}, blockIdx{blockIdx}, numNodes{numNodes}, filePath{std::move(
filePath)} {};

virtual ~ReadFileMorsel() = default;

static constexpr common::block_idx_t BLOCK_IDX_INVALID = UINT64_MAX;

public:
common::offset_t nodeOffset;
common::block_idx_t blockIdx;
Expand All @@ -40,23 +40,26 @@ class ReadFileSharedState {
uint64_t numRows;

protected:
std::mutex mtx;
common::offset_t nodeOffset;
std::unordered_map<std::string, storage::TableCopyExecutor::FileBlockInfo> fileBlockInfos;
common::block_idx_t curBlockIdx;
std::vector<std::string> filePaths;
common::vector_idx_t curFileIdx;
std::mutex mtx;
};

class ReadFile : public PhysicalOperator {
public:
ReadFile(DataPos fileRecordPos, std::shared_ptr<ReadFileSharedState> sharedState,
PhysicalOperatorType operatorType, uint32_t id, const std::string& paramsString)
: PhysicalOperator{operatorType, id, paramsString}, fileRecordPos{std::move(fileRecordPos)},
sharedState{std::move(sharedState)} {}
ReadFile(std::vector<DataPos> arrowColumnPoses,
std::shared_ptr<ReadFileSharedState> sharedState, PhysicalOperatorType operatorType,
uint32_t id, const std::string& paramsString)
: PhysicalOperator{operatorType, id, paramsString},
arrowColumnPoses{std::move(arrowColumnPoses)}, sharedState{std::move(sharedState)} {}

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
fileRecordVector = resultSet->getValueVector(fileRecordPos).get();
for (auto& arrowColumnPos : arrowColumnPoses) {
arrowColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
}
}

inline void initGlobalStateInternal(kuzu::processor::ExecutionContext* context) override {
Expand All @@ -72,8 +75,8 @@ class ReadFile : public PhysicalOperator {

protected:
std::shared_ptr<ReadFileSharedState> sharedState;
DataPos fileRecordPos;
common::ValueVector* fileRecordVector;
std::vector<DataPos> arrowColumnPoses;
std::vector<common::ValueVector*> arrowColumnVectors;
};

} // namespace processor
Expand Down
Loading

0 comments on commit 0bb6add

Please sign in to comment.