Skip to content

Commit

Permalink
fix npy bugs; move map copy outside map_ddl
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Jul 8, 2023
1 parent 57dc403 commit c01d26f
Show file tree
Hide file tree
Showing 14 changed files with 276 additions and 254 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "catalog/catalog_structs.h"
#include "logical_ddl.h"

namespace kuzu {
Expand Down
55 changes: 10 additions & 45 deletions src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ class CopyNodeSharedState {
};

struct CopyNodeLocalState {
CopyNodeLocalState(common::CopyDescription copyDesc, storage::NodeTable* table,
CopyNodeLocalState(const common::CopyDescription& copyDesc, storage::NodeTable* table,
storage::RelsStore* relsStore, catalog::Catalog* catalog, storage::WAL* wal,
DataPos offsetVectorPos, std::vector<DataPos> arrowColumnPoses)
: copyDesc{std::move(copyDesc)}, table{table}, relsStore{relsStore}, catalog{catalog},
wal{wal}, offsetVectorPos{std::move(offsetVectorPos)}, arrowColumnPoses{
std::move(arrowColumnPoses)} {}
const DataPos& offsetVectorPos, std::vector<DataPos> arrowColumnPoses)
: copyDesc{copyDesc}, table{table}, relsStore{relsStore}, catalog{catalog}, wal{wal},
offsetVectorPos{offsetVectorPos}, offsetVector{nullptr}, arrowColumnPoses{std::move(
arrowColumnPoses)} {}

virtual void initializeValues(ResultSet* resultSet);
std::pair<common::offset_t, common::offset_t> getStartAndEndOffset(
common::column_id_t columnID);

common::CopyDescription copyDesc;
storage::NodeTable* table;
Expand All @@ -65,7 +66,7 @@ class CopyNode : public Sink {
localState{std::move(localState)}, sharedState{std::move(sharedState)} {}

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
localState->initializeValues(resultSet);
localState->offsetVector = resultSet->getValueVector(localState->offsetVectorPos).get();
for (auto& arrowColumnPos : localState->arrowColumnPoses) {
localState->arrowColumnVectors.push_back(
resultSet->getValueVector(arrowColumnPos).get());
Expand Down Expand Up @@ -94,6 +95,8 @@ class CopyNode : public Sink {
void populatePKIndex(storage::InMemColumnChunk* chunk, storage::InMemOverflowFile* overflowFile,
common::offset_t startOffset, uint64_t numValues);

void logCopyWALRecord();

private:
inline bool isCopyAllowed() {
auto nodesStatistics = localState->table->getNodeStatisticsAndDeletedIDs();
Expand All @@ -116,43 +119,5 @@ class CopyNode : public Sink {
std::shared_ptr<CopyNodeSharedState> sharedState;
};

struct CopyNPYNodeLocalState : public CopyNodeLocalState {
public:
CopyNPYNodeLocalState(common::CopyDescription copyDesc, storage::NodeTable* table,
storage::RelsStore* relsStore, catalog::Catalog* catalog, storage::WAL* wal,
DataPos offsetVectorPos, DataPos columnIdxPos, std::vector<DataPos> arrowColumnPoses)
: CopyNodeLocalState{std::move(copyDesc), table, relsStore, catalog, wal,
std::move(offsetVectorPos), std::move(arrowColumnPoses)},
columnIdxPos{std::move(columnIdxPos)} {}

void initializeValues(ResultSet* resultSet) override;
DataPos columnIdxPos;
common::ValueVector* columnIdxVector;
};

class CopyNPYNode : public CopyNode {
public:
CopyNPYNode(std::unique_ptr<CopyNPYNodeLocalState> localState,
std::shared_ptr<CopyNodeSharedState> sharedState,
std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: CopyNode{std::move(localState), sharedState, std::move(resultSetDescriptor),
std::move(child), id, paramsString} {}

void executeInternal(ExecutionContext* context) override;

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
localState->initializeValues(resultSet);
for (auto& arrowColumnPos : localState->arrowColumnPoses) {
localState->arrowColumnVectors.push_back(
resultSet->getValueVector(arrowColumnPos).get());
}
}

void flushChunksAndPopulatePKIndexSingleColumn(
std::vector<std::unique_ptr<storage::InMemColumnChunk>>& columnChunks,
common::offset_t startNodeOffset, common::offset_t endNodeOffset, int64_t columnToCopy);
};

} // namespace processor
} // namespace kuzu
57 changes: 57 additions & 0 deletions src/include/processor/operator/copy/copy_npy_node.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#pragma once

#include "processor/operator/copy/copy_node.h"

namespace kuzu {
namespace processor {

struct CopyNPYNodeLocalState : public CopyNodeLocalState {
public:
CopyNPYNodeLocalState(const common::CopyDescription& copyDesc, storage::NodeTable* table,
storage::RelsStore* relsStore, catalog::Catalog* catalog, storage::WAL* wal,
const DataPos& offsetVectorPos, const DataPos& columnIdxPos,
std::vector<DataPos> arrowColumnPoses)
: CopyNodeLocalState{copyDesc, table, relsStore, catalog, wal, offsetVectorPos,
std::move(arrowColumnPoses)},
columnIdxPos{columnIdxPos}, columnIdxVector{nullptr} {}

DataPos columnIdxPos;
common::ValueVector* columnIdxVector;
};

class CopyNPYNode : public CopyNode {
public:
CopyNPYNode(std::unique_ptr<CopyNPYNodeLocalState> localState,
std::shared_ptr<CopyNodeSharedState> sharedState,
std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: CopyNode{std::move(localState), std::move(sharedState), std::move(resultSetDescriptor),
std::move(child), id, paramsString} {}

void executeInternal(ExecutionContext* context) final;

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final {
auto npyLocalState = (CopyNPYNodeLocalState*)localState.get();
npyLocalState->offsetVector =
resultSet->getValueVector(npyLocalState->offsetVectorPos).get();
npyLocalState->columnIdxVector =
resultSet->getValueVector(npyLocalState->columnIdxPos).get();
for (auto& arrowColumnPos : npyLocalState->arrowColumnPoses) {
npyLocalState->arrowColumnVectors.push_back(
resultSet->getValueVector(arrowColumnPos).get());
}
}

void flushChunksAndPopulatePKIndexSingleColumn(
std::vector<std::unique_ptr<storage::InMemColumnChunk>>& columnChunks,
common::offset_t startNodeOffset, common::offset_t endNodeOffset, int64_t columnToCopy);

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<CopyNPYNode>(
std::make_unique<CopyNPYNodeLocalState>((CopyNPYNodeLocalState&)*localState),
sharedState, resultSetDescriptor->copy(), children[0]->clone(), id, paramsString);
}
};

} // namespace processor
} // namespace kuzu
5 changes: 2 additions & 3 deletions src/include/processor/physical_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ class PhysicalPlan {
explicit PhysicalPlan(std::unique_ptr<PhysicalOperator> lastOperator)
: lastOperator{std::move(lastOperator)} {}

inline bool isCopyRelOrNPY() const {
return lastOperator->getOperatorType() == PhysicalOperatorType::COPY_REL ||
lastOperator->getOperatorType() == PhysicalOperatorType::COPY_NPY;
inline bool isCopyRel() const {
return lastOperator->getOperatorType() == PhysicalOperatorType::COPY_REL;
}

public:
Expand Down
1 change: 1 addition & 0 deletions src/processor/mapper/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ add_library(kuzu_processor_mapper
map_acc_hash_join.cpp
map_standalone_call.cpp
map_in_query_call.cpp
map_copy.cpp
map_create.cpp
map_cross_product.cpp
map_ddl.cpp
Expand Down
112 changes: 112 additions & 0 deletions src/processor/mapper/map_copy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#include "planner/logical_plan/logical_operator/logical_copy.h"
#include "processor/mapper/plan_mapper.h"
#include "processor/operator/copy/copy_node.h"
#include "processor/operator/copy/copy_npy_node.h"
#include "processor/operator/copy/copy_rel.h"
#include "processor/operator/copy/read_csv.h"
#include "processor/operator/copy/read_file.h"
#include "processor/operator/copy/read_npy.h"
#include "processor/operator/copy/read_parquet.h"
#include "processor/operator/table_scan/factorized_table_scan.h"

using namespace kuzu::planner;

namespace kuzu {
namespace processor {

std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalCopyToPhysical(
LogicalOperator* logicalOperator) {
auto copy = (LogicalCopy*)logicalOperator;
auto tableName = catalog->getReadOnlyVersion()->getTableName(copy->getTableID());
if (catalog->getReadOnlyVersion()->containNodeTable(tableName)) {
return mapLogicalCopyNodeToPhysical(copy);
} else {
return mapLogicalCopyRelToPhysical(copy);
}
}

std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalCopyNodeToPhysical(LogicalCopy* copy) {
auto fileType = copy->getCopyDescription().fileType;
if (fileType != common::CopyDescription::FileType::CSV &&
fileType != common::CopyDescription::FileType::PARQUET &&
fileType != common::CopyDescription::FileType::NPY) {
throw common::NotImplementedException{"PlanMapper::mapLogicalCopyToPhysical"};

Check warning on line 33 in src/processor/mapper/map_copy.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/mapper/map_copy.cpp#L33

Added line #L33 was not covered by tests
}
std::unique_ptr<ReadFile> readFile;
std::shared_ptr<ReadFileSharedState> readFileSharedState;
auto outSchema = copy->getSchema();
auto arrowColumnExpressions = copy->getArrowColumnExpressions();
std::vector<DataPos> arrowColumnPoses;
arrowColumnPoses.reserve(arrowColumnExpressions.size());
for (auto& arrowColumnPos : arrowColumnExpressions) {
arrowColumnPoses.emplace_back(outSchema->getExpressionPos(*arrowColumnPos));
}
auto offsetExpression = copy->getOffsetExpression();
auto columnIdxExpression = copy->getColumnIdxExpression();
auto offsetVectorPos = DataPos(outSchema->getExpressionPos(*offsetExpression));
auto columnIdxPos = DataPos(outSchema->getExpressionPos(*columnIdxExpression));
auto nodeTableSchema = catalog->getReadOnlyVersion()->getNodeTableSchema(copy->getTableID());
switch (copy->getCopyDescription().fileType) {
case (common::CopyDescription::FileType::CSV): {
readFileSharedState =
std::make_shared<ReadCSVSharedState>(*copy->getCopyDescription().csvReaderConfig,
copy->getCopyDescription().filePaths, nodeTableSchema);
readFile = std::make_unique<ReadCSV>(arrowColumnPoses, offsetVectorPos, readFileSharedState,
getOperatorID(), copy->getExpressionsForPrinting());
} break;
case (common::CopyDescription::FileType::PARQUET): {
readFileSharedState = std::make_shared<ReadParquetSharedState>(
copy->getCopyDescription().filePaths, nodeTableSchema);
readFile = std::make_unique<ReadParquet>(arrowColumnPoses, offsetVectorPos,
readFileSharedState, getOperatorID(), copy->getExpressionsForPrinting());
} break;
case (common::CopyDescription::FileType::NPY): {
readFileSharedState = std::make_shared<ReadNPYSharedState>(
nodeTableSchema, copy->getCopyDescription().filePaths);
readFile = std::make_unique<ReadNPY>(arrowColumnPoses, offsetVectorPos, columnIdxPos,
readFileSharedState, getOperatorID(), copy->getExpressionsForPrinting());
} break;
default:
throw common::NotImplementedException("PlanMapper::mapLogicalCopyNodeToPhysical");

Check warning on line 70 in src/processor/mapper/map_copy.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/mapper/map_copy.cpp#L69-L70

Added lines #L69 - L70 were not covered by tests
}
auto copyNodeSharedState =
std::make_shared<CopyNodeSharedState>(readFileSharedState->numRows, memoryManager);
auto outputExpression = copy->getOutputExpression();
auto outputVectorPos = DataPos(outSchema->getExpressionPos(*outputExpression));
auto ftSharedState = std::make_shared<FTableSharedState>(
copyNodeSharedState->table, common::DEFAULT_VECTOR_CAPACITY);
std::unique_ptr<CopyNode> copyNode;
if (copy->getCopyDescription().fileType == common::CopyDescription::FileType::NPY) {
auto localState = std::make_unique<CopyNPYNodeLocalState>(copy->getCopyDescription(),
storageManager.getNodesStore().getNodeTable(copy->getTableID()),
&storageManager.getRelsStore(), catalog, storageManager.getWAL(), offsetVectorPos,
columnIdxPos, arrowColumnPoses);
copyNode = std::make_unique<CopyNPYNode>(std::move(localState), copyNodeSharedState,
std::make_unique<ResultSetDescriptor>(copy->getSchema()), std::move(readFile),
getOperatorID(), copy->getExpressionsForPrinting());
} else {
auto localState = std::make_unique<CopyNodeLocalState>(copy->getCopyDescription(),
storageManager.getNodesStore().getNodeTable(copy->getTableID()),
&storageManager.getRelsStore(), catalog, storageManager.getWAL(), offsetVectorPos,
arrowColumnPoses);
copyNode = std::make_unique<CopyNode>(std::move(localState), copyNodeSharedState,
std::make_unique<ResultSetDescriptor>(copy->getSchema()), std::move(readFile),
getOperatorID(), copy->getExpressionsForPrinting());
}
// We need to create another pipeline to return the copy message to the user.
// The new pipeline only contains a factorizedTableScan and a resultCollector.
return std::make_unique<FactorizedTableScan>(std::vector<DataPos>{outputVectorPos},
std::vector<uint32_t>{0} /* colIndicesToScan */, ftSharedState, std::move(copyNode),
getOperatorID(), copy->getExpressionsForPrinting());
}

std::unique_ptr<PhysicalOperator> PlanMapper::mapLogicalCopyRelToPhysical(LogicalCopy* copy) {
auto relsStatistics = &storageManager.getRelsStore().getRelsStatistics();
auto table = storageManager.getRelsStore().getRelTable(copy->getTableID());
return std::make_unique<CopyRel>(catalog, copy->getCopyDescription(), table,
storageManager.getWAL(), relsStatistics, storageManager.getNodesStore(), getOperatorID(),
copy->getExpressionsForPrinting());
}

} // namespace processor
} // namespace kuzu
Loading

0 comments on commit c01d26f

Please sign in to comment.