Skip to content

Commit

Permalink
remove rowIdx and filePath dataPos
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Jul 25, 2023
1 parent 1d01d75 commit 864923b
Show file tree
Hide file tree
Showing 15 changed files with 27 additions and 91 deletions.
22 changes: 4 additions & 18 deletions src/include/planner/logical_plan/logical_operator/logical_copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,11 @@ class LogicalCopy : public LogicalOperator {
public:
LogicalCopy(const common::CopyDescription& copyDescription, common::table_id_t tableID,
std::string tableName, binder::expression_vector dataColumnExpressions,
std::shared_ptr<binder::Expression> rowIdxExpression,
std::shared_ptr<binder::Expression> filePathExpression,
std::shared_ptr<binder::Expression> outputExpression)
: LogicalOperator{LogicalOperatorType::COPY},
copyDescription{copyDescription}, tableID{tableID}, tableName{std::move(tableName)},
dataColumnExpressions{std::move(dataColumnExpressions)}, rowIdxExpression{std::move(
rowIdxExpression)},
filePathExpression{std::move(filePathExpression)}, outputExpression{
std::move(outputExpression)} {}
dataColumnExpressions{std::move(dataColumnExpressions)}, outputExpression{std::move(
outputExpression)} {}

inline std::string getExpressionsForPrinting() const override { return tableName; }

Expand All @@ -32,14 +28,6 @@ class LogicalCopy : public LogicalOperator {
return dataColumnExpressions;
}

inline std::shared_ptr<binder::Expression> getRowIdxExpression() const {
return rowIdxExpression;
}

inline std::shared_ptr<binder::Expression> getFilePathExpression() const {
return filePathExpression;
}

inline std::shared_ptr<binder::Expression> getOutputExpression() const {
return outputExpression;
}
Expand All @@ -48,8 +36,8 @@ class LogicalCopy : public LogicalOperator {
void computeFlatSchema() override;

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCopy>(copyDescription, tableID, tableName, dataColumnExpressions,
rowIdxExpression, filePathExpression, outputExpression);
return make_unique<LogicalCopy>(
copyDescription, tableID, tableName, dataColumnExpressions, outputExpression);
}

private:
Expand All @@ -58,8 +46,6 @@ class LogicalCopy : public LogicalOperator {
// Used for printing only.
std::string tableName;
binder::expression_vector dataColumnExpressions;
std::shared_ptr<binder::Expression> rowIdxExpression;
std::shared_ptr<binder::Expression> filePathExpression;
std::shared_ptr<binder::Expression> outputExpression;
};

Expand Down
6 changes: 0 additions & 6 deletions src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ class CopyNodeSharedState {
};

struct CopyNodeInfo {
DataPos rowIdxVectorPos;
DataPos filePathVectorPos;
std::vector<DataPos> dataColumnPoses;
common::CopyDescription copyDesc;
storage::NodeTable* table;
Expand All @@ -62,8 +60,6 @@ class CopyNode : public Sink {
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString);

inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
rowIdxVector = resultSet->getValueVector(copyNodeInfo.rowIdxVectorPos).get();
filePathVector = resultSet->getValueVector(copyNodeInfo.filePathVectorPos).get();
for (auto& arrowColumnPos : copyNodeInfo.dataColumnPoses) {
dataColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
}
Expand Down Expand Up @@ -105,8 +101,6 @@ class CopyNode : public Sink {
private:
std::shared_ptr<CopyNodeSharedState> sharedState;
CopyNodeInfo copyNodeInfo;
common::ValueVector* rowIdxVector;
common::ValueVector* filePathVector;
std::vector<common::ValueVector*> dataColumnVectors;
std::unique_ptr<storage::NodeGroup> localNodeGroup;
};
Expand Down
10 changes: 4 additions & 6 deletions src/include/processor/operator/copy/read_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ namespace processor {

class ReadCSV : public ReadFile {
public:
ReadCSV(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos,
std::vector<DataPos> dataColumnPoses,
ReadCSV(std::vector<DataPos> dataColumnPoses,
std::shared_ptr<storage::ReadFileSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: ReadFile{rowIdxVectorPos, filePathVectorPos, std::move(dataColumnPoses),
std::move(sharedState), PhysicalOperatorType::READ_CSV, id, paramsString} {}
: ReadFile{std::move(dataColumnPoses), std::move(sharedState),
PhysicalOperatorType::READ_CSV, id, paramsString} {}

inline std::shared_ptr<arrow::RecordBatch> readTuples(
std::unique_ptr<storage::ReadFileMorsel> morsel) override {
Expand All @@ -21,8 +20,7 @@ class ReadCSV : public ReadFile {
}

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ReadCSV>(
rowIdxVectorPos, filePathVectorPos, dataColumnPoses, sharedState, id, paramsString);
return std::make_unique<ReadCSV>(dataColumnPoses, sharedState, id, paramsString);
}
};

Expand Down
14 changes: 3 additions & 11 deletions src/include/processor/operator/copy/read_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,11 @@ namespace processor {

class ReadFile : public PhysicalOperator {
public:
ReadFile(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos,
std::vector<DataPos> dataColumnPoses,
ReadFile(std::vector<DataPos> dataColumnPoses,
std::shared_ptr<storage::ReadFileSharedState> sharedState,
PhysicalOperatorType operatorType, uint32_t id, const std::string& paramsString)
: PhysicalOperator{operatorType, id, paramsString}, rowIdxVectorPos{rowIdxVectorPos},
filePathVectorPos{filePathVectorPos}, dataColumnPoses{std::move(dataColumnPoses)},
sharedState{std::move(sharedState)}, rowIdxVector{nullptr}, filePathVector{nullptr} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;
: PhysicalOperator{operatorType, id, paramsString},
dataColumnPoses{std::move(dataColumnPoses)}, sharedState{std::move(sharedState)} {}

inline void initGlobalStateInternal(kuzu::processor::ExecutionContext* context) override {
sharedState->countNumRows();
Expand All @@ -32,11 +28,7 @@ class ReadFile : public PhysicalOperator {

protected:
std::shared_ptr<storage::ReadFileSharedState> sharedState;
DataPos rowIdxVectorPos;
DataPos filePathVectorPos;
std::vector<DataPos> dataColumnPoses;
common::ValueVector* rowIdxVector;
common::ValueVector* filePathVector;
};

} // namespace processor
Expand Down
10 changes: 4 additions & 6 deletions src/include/processor/operator/copy/read_npy.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,19 @@ namespace processor {

class ReadNPY : public ReadFile {
public:
ReadNPY(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos,
std::vector<DataPos> dataColumnPoses,
ReadNPY(std::vector<DataPos> dataColumnPoses,
std::shared_ptr<storage::ReadFileSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: ReadFile{rowIdxVectorPos, filePathVectorPos, std::move(dataColumnPoses),
std::move(sharedState), PhysicalOperatorType::READ_NPY, id, paramsString} {
: ReadFile{std::move(dataColumnPoses), std::move(sharedState),
PhysicalOperatorType::READ_NPY, id, paramsString} {
reader = std::make_unique<storage::NpyMultiFileReader>(this->sharedState->filePaths);
}

std::shared_ptr<arrow::RecordBatch> readTuples(
std::unique_ptr<storage::ReadFileMorsel> morsel) final;

inline std::unique_ptr<PhysicalOperator> clone() final {
return std::make_unique<ReadNPY>(
rowIdxVectorPos, filePathVectorPos, dataColumnPoses, sharedState, id, paramsString);
return std::make_unique<ReadNPY>(dataColumnPoses, sharedState, id, paramsString);
}

private:
Expand Down
10 changes: 4 additions & 6 deletions src/include/processor/operator/copy/read_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@ namespace processor {

class ReadParquet : public ReadFile {
public:
ReadParquet(const DataPos& offsetVectorPos, const DataPos& filePathVectorPos,
std::vector<DataPos> dataColumnPoses,
ReadParquet(std::vector<DataPos> dataColumnPoses,
std::shared_ptr<storage::ReadFileSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: ReadFile{offsetVectorPos, filePathVectorPos, std::move(dataColumnPoses),
std::move(sharedState), PhysicalOperatorType::READ_PARQUET, id, paramsString} {}
: ReadFile{std::move(dataColumnPoses), std::move(sharedState),
PhysicalOperatorType::READ_PARQUET, id, paramsString} {}

std::shared_ptr<arrow::RecordBatch> readTuples(
std::unique_ptr<storage::ReadFileMorsel> morsel) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ReadParquet>(
rowIdxVectorPos, filePathVectorPos, dataColumnPoses, sharedState, id, paramsString);
return std::make_unique<ReadParquet>(dataColumnPoses, sharedState, id, paramsString);
}

private:
Expand Down
1 change: 0 additions & 1 deletion src/include/storage/buffer_manager/bm_file_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ class PageState {
}
}
}
// CAS
inline bool tryLock(uint64_t oldStateAndVersion) {
return stateAndVersion.compare_exchange_strong(
oldStateAndVersion, updateStateWithSameVersion(oldStateAndVersion, LOCKED));
Expand Down
1 change: 0 additions & 1 deletion src/include/storage/store/node_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class NodeGroup {
common::offset_t numNodes;
std::unordered_map<common::property_id_t, std::unique_ptr<ColumnChunk>> chunks;
catalog::TableSchema* schema;
common::CopyDescription* copyDescription;
};

} // namespace storage
Expand Down
4 changes: 0 additions & 4 deletions src/planner/operator/logical_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ void LogicalCopy::computeFactorizedSchema() {
createEmptySchema();
auto groupPos = schema->createGroup();
schema->insertToGroupAndScope(dataColumnExpressions, groupPos);
schema->insertToGroupAndScope(rowIdxExpression, groupPos);
schema->insertToGroupAndScope(filePathExpression, groupPos);
schema->insertToGroupAndScope(outputExpression, groupPos);
schema->setGroupAsSingleState(groupPos);
}
Expand All @@ -17,8 +15,6 @@ void LogicalCopy::computeFlatSchema() {
createEmptySchema();
schema->createGroup();
schema->insertToGroupAndScope(dataColumnExpressions, 0);
schema->insertToGroupAndScope(rowIdxExpression, 0);
schema->insertToGroupAndScope(filePathExpression, 0);
schema->insertToGroupAndScope(outputExpression, 0);
}

Expand Down
4 changes: 0 additions & 4 deletions src/planner/planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,6 @@ std::unique_ptr<LogicalPlan> Planner::planCopy(
}
auto copy = make_shared<LogicalCopy>(copyClause.getCopyDescription(), copyClause.getTableID(),
copyClause.getTableName(), std::move(arrowColumnExpressions),
std::make_shared<VariableExpression>(
common::LogicalType{common::LogicalTypeID::INT64}, "rowIdx", "rowIdx"),
std::make_shared<VariableExpression>(
common::LogicalType{common::LogicalTypeID::STRING}, "filePath", "filePath"),
copyClause.getStatementResult()->getSingleExpressionToCollect());
plan->setLastOperator(std::move(copy));
return plan;
Expand Down
17 changes: 6 additions & 11 deletions src/processor/mapper/map_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,31 +42,28 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyNode(
for (auto& dataColumnExpr : dataColumnExpressions) {
dataColumnPoses.emplace_back(outSchema->getExpressionPos(*dataColumnExpr));
}
auto rowIdxVectorPos = DataPos(outSchema->getExpressionPos(*copy->getRowIdxExpression()));
auto filePathVectorPos = DataPos(outSchema->getExpressionPos(*copy->getFilePathExpression()));
auto nodeTableSchema = catalog->getReadOnlyVersion()->getNodeTableSchema(copy->getTableID());
switch (copy->getCopyDescription().fileType) {
case (common::CopyDescription::FileType::CSV): {
readFileSharedState =
std::make_shared<ReadCSVSharedState>(copy->getCopyDescription().filePaths,
*copy->getCopyDescription().csvReaderConfig, nodeTableSchema);
readFile = std::make_unique<ReadCSV>(rowIdxVectorPos, filePathVectorPos, dataColumnPoses,
readFileSharedState, getOperatorID(), copy->getExpressionsForPrinting());
readFile = std::make_unique<ReadCSV>(dataColumnPoses, readFileSharedState, getOperatorID(),
copy->getExpressionsForPrinting());
} break;
case (common::CopyDescription::FileType::PARQUET): {
readFileSharedState =
std::make_shared<ReadParquetSharedState>(copy->getCopyDescription().filePaths,
*copy->getCopyDescription().csvReaderConfig, nodeTableSchema);
readFile =
std::make_unique<ReadParquet>(rowIdxVectorPos, filePathVectorPos, dataColumnPoses,
readFileSharedState, getOperatorID(), copy->getExpressionsForPrinting());
readFile = std::make_unique<ReadParquet>(dataColumnPoses, readFileSharedState,
getOperatorID(), copy->getExpressionsForPrinting());
} break;
case (common::CopyDescription::FileType::NPY): {
readFileSharedState =
std::make_shared<ReadNPYSharedState>(copy->getCopyDescription().filePaths,
*copy->getCopyDescription().csvReaderConfig, nodeTableSchema);
readFile = std::make_unique<ReadNPY>(rowIdxVectorPos, filePathVectorPos, dataColumnPoses,
readFileSharedState, getOperatorID(), copy->getExpressionsForPrinting());
readFile = std::make_unique<ReadNPY>(dataColumnPoses, readFileSharedState, getOperatorID(),
copy->getExpressionsForPrinting());
} break;
default:
throw common::NotImplementedException("PlanMapper::mapLogicalCopyNodeToPhysical");
Expand All @@ -76,8 +73,6 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyNode(
storageManager.getNodesStore().getNodeTable(copy->getTableID()), copy->getCopyDescription(),
memoryManager);
CopyNodeInfo copyNodeDataInfo{
rowIdxVectorPos,
filePathVectorPos,
dataColumnPoses,
copy->getCopyDescription(),
storageManager.getNodesStore().getNodeTable(copy->getTableID()),
Expand Down
4 changes: 1 addition & 3 deletions src/processor/operator/copy/copy_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ CopyNode::CopyNode(std::shared_ptr<CopyNodeSharedState> sharedState, CopyNodeInf
std::unique_ptr<PhysicalOperator> child, uint32_t id, const std::string& paramsString)
: Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_NODE, std::move(child), id,
paramsString},
sharedState{std::move(sharedState)}, copyNodeInfo{std::move(copyNodeInfo)},
rowIdxVector{nullptr}, filePathVector{nullptr} {}
sharedState{std::move(sharedState)}, copyNodeInfo{std::move(copyNodeInfo)} {}

void CopyNodeSharedState::appendLocalNodeGroup(std::unique_ptr<NodeGroup> localNodeGroup) {
std::unique_lock xLck{mtx};
Expand All @@ -67,7 +66,6 @@ void CopyNodeSharedState::appendLocalNodeGroup(std::unique_ptr<NodeGroup> localN
CopyNode::appendNodeGroupToTableAndPopulateIndex(
table, sharedNodeGroup.get(), pkIndex.get(), pkColumnID);
}
// append node group to table.
if (numNodesAppended < localNodeGroup->getNumNodes()) {
sharedNodeGroup->appendNodeGroup(localNodeGroup.get(), numNodesAppended);
}
Expand Down
12 changes: 0 additions & 12 deletions src/processor/operator/copy/read_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,11 @@
namespace kuzu {
namespace processor {

void ReadFile::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) {
rowIdxVector = resultSet->getValueVector(rowIdxVectorPos).get();
filePathVector = resultSet->getValueVector(filePathVectorPos).get();
}

bool ReadFile::getNextTuplesInternal(kuzu::processor::ExecutionContext* context) {
auto morsel = sharedState->getMorsel();
if (morsel == nullptr) {
return false;
}
rowIdxVector->setValue(
rowIdxVector->state->selVector->selectedPositions[0], morsel->rowIdxInFile);
rowIdxVector->setValue(
rowIdxVector->state->selVector->selectedPositions[1], morsel->rowIdxInFile);
filePathVector->resetAuxiliaryBuffer();
filePathVector->setValue(
rowIdxVector->state->selVector->selectedPositions[0], morsel->filePath);
auto recordBatch = readTuples(std::move(morsel));
for (auto i = 0u; i < dataColumnPoses.size(); i++) {
common::ArrowColumnVector::setArrowColumn(
Expand Down
1 change: 0 additions & 1 deletion src/processor/operator/copy/read_npy.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "processor/operator/copy/read_npy.h"

#include "common/constants.h"
#include "storage/in_mem_storage_structure/in_mem_column_chunk.h"

using namespace kuzu::storage;

Expand Down
2 changes: 1 addition & 1 deletion src/storage/store/node_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace kuzu {
namespace storage {

NodeGroup::NodeGroup(TableSchema* schema, CopyDescription* copyDescription)
: nodeGroupIdx{UINT64_MAX}, numNodes{0}, schema{schema}, copyDescription{copyDescription} {
: nodeGroupIdx{UINT64_MAX}, numNodes{0}, schema{schema} {
for (auto& property : schema->properties) {
chunks[property.propertyID] =
ColumnChunkFactory::createColumnChunk(property.dataType, copyDescription);
Expand Down

0 comments on commit 864923b

Please sign in to comment.