Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up duplicated morsel and sharedState between copy node and rel #1834

Merged
merged 1 commit into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,13 @@ namespace common {
CopyDescription::CopyDescription(
const std::vector<std::string>& filePaths, CSVReaderConfig csvReaderConfig, FileType fileType)
: filePaths{filePaths}, csvReaderConfig{nullptr}, fileType{fileType} {
if (fileType == FileType::CSV) {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(csvReaderConfig);
}
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(csvReaderConfig);
}

CopyDescription::CopyDescription(const CopyDescription& copyDescription)
: filePaths{copyDescription.filePaths},
csvReaderConfig{nullptr}, fileType{copyDescription.fileType} {
if (fileType == FileType::CSV) {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(*copyDescription.csvReaderConfig);
}
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(*copyDescription.csvReaderConfig);
}

std::string CopyDescription::getFileTypeName(FileType fileType) {
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class CopyNode : public Sink {
id, paramsString},
sharedState{std::move(sharedState)}, copyNodeDataInfo{std::move(copyNodeDataInfo)},
copyDesc{copyDesc}, table{table}, relsStore{relsStore}, catalog{catalog}, wal{wal},
rowIdxVector{nullptr} {
rowIdxVector{nullptr}, filePathVector{nullptr} {
auto tableSchema = catalog->getReadOnlyVersion()->getNodeTableSchema(table->getTableID());
copyStates.resize(tableSchema->getNumProperties());
for (auto i = 0u; i < tableSchema->getNumProperties(); i++) {
Expand Down
38 changes: 5 additions & 33 deletions src/include/processor/operator/copy/read_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,46 +5,18 @@
namespace kuzu {
namespace processor {

// For CSV file, we need to read in streaming mode, so we need to read one batch at a time.
class ReadCSVMorsel : public ReadFileMorsel {
public:
ReadCSVMorsel(common::row_idx_t rowIdx, std::string filePath, common::row_idx_t rowIdxInFile,
std::shared_ptr<arrow::RecordBatch> recordBatch)
: ReadFileMorsel{rowIdx, common::INVALID_BLOCK_IDX, common::INVALID_ROW_IDX,
std::move(filePath), rowIdxInFile},
recordBatch{std::move(recordBatch)} {}

std::shared_ptr<arrow::RecordBatch> recordBatch;
};

class ReadCSVSharedState : public ReadFileSharedState {
public:
ReadCSVSharedState(common::CSVReaderConfig csvReaderConfig, std::vector<std::string> filePaths,
catalog::TableSchema* tableSchema)
: ReadFileSharedState{std::move(filePaths), tableSchema}, csvReaderConfig{csvReaderConfig} {
}

private:
void countNumRows() override;

std::unique_ptr<ReadFileMorsel> getMorsel() override;

private:
common::CSVReaderConfig csvReaderConfig;
std::shared_ptr<arrow::csv::StreamingReader> reader;
};

class ReadCSV : public ReadFile {
public:
ReadCSV(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos,
std::vector<DataPos> arrowColumnPoses, std::shared_ptr<ReadFileSharedState> sharedState,
uint32_t id, const std::string& paramsString)
std::vector<DataPos> arrowColumnPoses,
std::shared_ptr<storage::ReadFileSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: ReadFile{rowIdxVectorPos, filePathVectorPos, std::move(arrowColumnPoses),
std::move(sharedState), PhysicalOperatorType::READ_CSV, id, paramsString} {}

inline std::shared_ptr<arrow::RecordBatch> readTuples(
std::unique_ptr<ReadFileMorsel> morsel) override {
auto csvMorsel = reinterpret_cast<ReadCSVMorsel*>(morsel.get());
std::unique_ptr<storage::ReadFileMorsel> morsel) override {
auto csvMorsel = reinterpret_cast<storage::ReadCSVMorsel*>(morsel.get());
return csvMorsel->recordBatch;
}

Expand Down
55 changes: 5 additions & 50 deletions src/include/processor/operator/copy/read_file.h
Original file line number Diff line number Diff line change
@@ -1,61 +1,16 @@
#pragma once

#include "processor/operator/physical_operator.h"
#include "storage/copier/table_copy_utils.h"
#include "storage/copier/read_file_state.h"

namespace kuzu {
namespace processor {

class ReadFileMorsel {
public:
ReadFileMorsel(common::row_idx_t rowIdx, common::block_idx_t blockIdx,
common::row_idx_t numRows, std::string filePath, common::row_idx_t rowIdxInFile)
: rowIdx{rowIdx}, blockIdx{blockIdx}, numRows{numRows}, filePath{std::move(filePath)},
rowIdxInFile{rowIdxInFile} {};

virtual ~ReadFileMorsel() = default;

public:
// When reading from multiple files, rowIdx is accumulated.
common::row_idx_t rowIdx;
common::block_idx_t blockIdx;
common::row_idx_t numRows;
std::string filePath;
// Row idx in the current file. Equal to `rowIdx` when reading from only a single file.
common::row_idx_t rowIdxInFile;
};

class ReadFileSharedState {
public:
explicit ReadFileSharedState(
std::vector<std::string> filePaths, catalog::TableSchema* tableSchema)
: currRowIdx{0}, curBlockIdx{0}, filePaths{std::move(filePaths)}, curFileIdx{0},
tableSchema{tableSchema}, numRows{0}, currRowIdxInCurrFile{1} {}

virtual ~ReadFileSharedState() = default;

virtual void countNumRows() = 0;

virtual std::unique_ptr<ReadFileMorsel> getMorsel() = 0;

public:
common::row_idx_t numRows;
catalog::TableSchema* tableSchema;

protected:
std::mutex mtx;
common::row_idx_t currRowIdx;
std::unordered_map<std::string, storage::FileBlockInfo> fileBlockInfos;
common::block_idx_t curBlockIdx;
std::vector<std::string> filePaths;
common::vector_idx_t curFileIdx;
common::row_idx_t currRowIdxInCurrFile;
};

class ReadFile : public PhysicalOperator {
public:
ReadFile(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos,
std::vector<DataPos> arrowColumnPoses, std::shared_ptr<ReadFileSharedState> sharedState,
std::vector<DataPos> arrowColumnPoses,
std::shared_ptr<storage::ReadFileSharedState> sharedState,
PhysicalOperatorType operatorType, uint32_t id, const std::string& paramsString)
: PhysicalOperator{operatorType, id, paramsString}, rowIdxVectorPos{rowIdxVectorPos},
filePathVectorPos{filePathVectorPos}, arrowColumnPoses{std::move(arrowColumnPoses)},
Expand All @@ -70,12 +25,12 @@ class ReadFile : public PhysicalOperator {
inline bool isSource() const override { return true; }

virtual std::shared_ptr<arrow::RecordBatch> readTuples(
std::unique_ptr<ReadFileMorsel> morsel) = 0;
std::unique_ptr<storage::ReadFileMorsel> morsel) = 0;

bool getNextTuplesInternal(ExecutionContext* context) override;

protected:
std::shared_ptr<ReadFileSharedState> sharedState;
std::shared_ptr<storage::ReadFileSharedState> sharedState;
DataPos rowIdxVectorPos;
DataPos filePathVectorPos;
std::vector<DataPos> arrowColumnPoses;
Expand Down
29 changes: 3 additions & 26 deletions src/include/processor/operator/copy/read_npy.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,41 +7,18 @@
namespace kuzu {
namespace processor {

class ReadNPYMorsel : public ReadFileMorsel {
public:
ReadNPYMorsel(common::row_idx_t rowIdx, common::block_idx_t blockIdx, common::row_idx_t numRows,
std::string filePath, common::vector_idx_t curFileIdx, common::row_idx_t rowIdxInFile)
: ReadFileMorsel{rowIdx, blockIdx, numRows, std::move(filePath), rowIdxInFile},
columnIdx{curFileIdx} {}

inline common::vector_idx_t getColumnIdx() const { return columnIdx; }

private:
common::vector_idx_t columnIdx;
};

class ReadNPYSharedState : public ReadFileSharedState {
public:
ReadNPYSharedState(catalog::NodeTableSchema* tableSchema, std::vector<std::string> filePaths)
: ReadFileSharedState{std::move(filePaths), tableSchema} {}

std::unique_ptr<ReadFileMorsel> getMorsel() final;

private:
void countNumRows() final;
};

class ReadNPY : public ReadFile {
public:
ReadNPY(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos,
std::vector<DataPos> arrowColumnPoses, const DataPos& columnIdxPos,
std::shared_ptr<ReadFileSharedState> sharedState, uint32_t id,
std::shared_ptr<storage::ReadFileSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: ReadFile{rowIdxVectorPos, filePathVectorPos, std::move(arrowColumnPoses),
std::move(sharedState), PhysicalOperatorType::READ_NPY, id, paramsString},
columnIdxPos{columnIdxPos}, columnIdxVector{nullptr} {}

std::shared_ptr<arrow::RecordBatch> readTuples(std::unique_ptr<ReadFileMorsel> morsel) final;
std::shared_ptr<arrow::RecordBatch> readTuples(
std::unique_ptr<storage::ReadFileMorsel> morsel) final;

bool getNextTuplesInternal(ExecutionContext* context) final;

Expand Down
20 changes: 5 additions & 15 deletions src/include/processor/operator/copy/read_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,17 @@
namespace kuzu {
namespace processor {

class ReadParquetSharedState : public ReadFileSharedState {
public:
explicit ReadParquetSharedState(
std::vector<std::string> filePaths, catalog::TableSchema* tableSchema)
: ReadFileSharedState{std::move(filePaths), tableSchema} {}

private:
void countNumRows() override;

std::unique_ptr<ReadFileMorsel> getMorsel() override;
};

class ReadParquet : public ReadFile {
public:
ReadParquet(const DataPos& offsetVectorPos, const DataPos& filePathVectorPos,
std::vector<DataPos> arrowColumnPoses, std::shared_ptr<ReadFileSharedState> sharedState,
uint32_t id, const std::string& paramsString)
std::vector<DataPos> arrowColumnPoses,
std::shared_ptr<storage::ReadFileSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: ReadFile{offsetVectorPos, filePathVectorPos, std::move(arrowColumnPoses),
std::move(sharedState), PhysicalOperatorType::READ_PARQUET, id, paramsString} {}

std::shared_ptr<arrow::RecordBatch> readTuples(std::unique_ptr<ReadFileMorsel> morsel) override;
std::shared_ptr<arrow::RecordBatch> readTuples(
std::unique_ptr<storage::ReadFileMorsel> morsel) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ReadParquet>(
Expand Down
116 changes: 116 additions & 0 deletions src/include/storage/copier/read_file_state.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#pragma once

#include "storage/copier/table_copy_utils.h"

namespace kuzu {
namespace storage {

class ReadFileMorsel {
public:
ReadFileMorsel(common::row_idx_t rowIdx, common::block_idx_t blockIdx,
common::row_idx_t numRows, std::string filePath, common::row_idx_t rowIdxInFile)
: rowIdx{rowIdx}, blockIdx{blockIdx}, numRows{numRows}, filePath{std::move(filePath)},
rowIdxInFile{rowIdxInFile} {};
virtual ~ReadFileMorsel() = default;

public:
// When reading from multiple files, rowIdx is accumulated.
common::row_idx_t rowIdx;
common::block_idx_t blockIdx;
common::row_idx_t numRows;
std::string filePath;
// Row idx in the current file. Equal to `rowIdx` when reading from only a single file.
common::row_idx_t rowIdxInFile;
};

// For CSV file, we need to read in streaming mode, so we need to read one batch at a time.
class ReadCSVMorsel : public ReadFileMorsel {
public:
ReadCSVMorsel(common::offset_t startRowIdx, std::string filePath,
common::row_idx_t rowIdxInFile, std::shared_ptr<arrow::RecordBatch> recordBatch)
: ReadFileMorsel{startRowIdx, common::INVALID_BLOCK_IDX, common::INVALID_ROW_IDX,
std::move(filePath), rowIdxInFile},
recordBatch{std::move(recordBatch)} {}

std::shared_ptr<arrow::RecordBatch> recordBatch;
};

class ReadNPYMorsel : public ReadFileMorsel {
public:
ReadNPYMorsel(common::row_idx_t rowIdx, common::block_idx_t blockIdx, common::row_idx_t numRows,
std::string filePath, common::vector_idx_t curFileIdx, common::row_idx_t rowIdxInFile)
: ReadFileMorsel{rowIdx, blockIdx, numRows, std::move(filePath), rowIdxInFile},
columnIdx{curFileIdx} {}

inline common::vector_idx_t getColumnIdx() const { return columnIdx; }

private:
common::vector_idx_t columnIdx;
};

class ReadFileSharedState {
public:
ReadFileSharedState(std::vector<std::string> filePaths, common::CSVReaderConfig csvReaderConfig,
catalog::TableSchema* tableSchema)
: numRows{0}, tableSchema{tableSchema}, filePaths{std::move(filePaths)},
csvReaderConfig{csvReaderConfig}, currRowIdx{0}, currBlockIdx{0}, currFileIdx{0},
currRowIdxInCurrFile{1} {};
virtual ~ReadFileSharedState() = default;

virtual void countNumRows() = 0;
virtual std::unique_ptr<ReadFileMorsel> getMorsel() = 0;

public:
std::mutex mtx;
common::row_idx_t numRows;
catalog::TableSchema* tableSchema;
std::vector<std::string> filePaths;
common::CSVReaderConfig csvReaderConfig;
std::unordered_map<std::string, FileBlockInfo> fileBlockInfos;
common::row_idx_t currRowIdx;
common::block_idx_t currBlockIdx;
common::vector_idx_t currFileIdx;
common::row_idx_t currRowIdxInCurrFile;
};

// For CSV file, we need to read in streaming mode, so we need to keep the reader in the shared
// state.
class ReadCSVSharedState : public ReadFileSharedState {
public:
ReadCSVSharedState(std::vector<std::string> filePaths, common::CSVReaderConfig csvReaderConfig,
catalog::TableSchema* tableSchema)
: ReadFileSharedState{std::move(filePaths), csvReaderConfig, tableSchema} {};

void countNumRows() final;
std::unique_ptr<ReadFileMorsel> getMorsel() final;

private:
std::shared_ptr<arrow::csv::StreamingReader> reader;
};

class ReadParquetSharedState : public ReadFileSharedState {
public:
explicit ReadParquetSharedState(std::vector<std::string> filePaths,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema)
: ReadFileSharedState{std::move(filePaths), csvReaderConfig, tableSchema} {}

private:
void countNumRows() override;

std::unique_ptr<storage::ReadFileMorsel> getMorsel() override;
};

class ReadNPYSharedState : public ReadFileSharedState {
public:
ReadNPYSharedState(std::vector<std::string> filePaths, common::CSVReaderConfig csvReaderConfig,
catalog::NodeTableSchema* tableSchema)
: ReadFileSharedState{std::move(filePaths), csvReaderConfig, tableSchema} {}

std::unique_ptr<storage::ReadFileMorsel> getMorsel() final;

private:
void countNumRows() final;
};

} // namespace storage
} // namespace kuzu
Loading
Loading