Skip to content

Commit

Permalink
Merge pull request #1834 from kuzudb/clean-copy
Browse files Browse the repository at this point in the history
Clean up duplicated morsel and sharedState between copy node and rel
  • Loading branch information
ray6080 committed Jul 19, 2023
2 parents 5931a4a + 09e5211 commit d85691c
Show file tree
Hide file tree
Showing 20 changed files with 392 additions and 491 deletions.
8 changes: 2 additions & 6 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,13 @@ namespace common {
CopyDescription::CopyDescription(
const std::vector<std::string>& filePaths, CSVReaderConfig csvReaderConfig, FileType fileType)
: filePaths{filePaths}, csvReaderConfig{nullptr}, fileType{fileType} {
if (fileType == FileType::CSV) {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(csvReaderConfig);
}
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(csvReaderConfig);
}

CopyDescription::CopyDescription(const CopyDescription& copyDescription)
: filePaths{copyDescription.filePaths},
csvReaderConfig{nullptr}, fileType{copyDescription.fileType} {
if (fileType == FileType::CSV) {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(*copyDescription.csvReaderConfig);
}
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(*copyDescription.csvReaderConfig);
}

std::string CopyDescription::getFileTypeName(FileType fileType) {
Expand Down
2 changes: 1 addition & 1 deletion src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class CopyNode : public Sink {
id, paramsString},
sharedState{std::move(sharedState)}, copyNodeDataInfo{std::move(copyNodeDataInfo)},
copyDesc{copyDesc}, table{table}, relsStore{relsStore}, catalog{catalog}, wal{wal},
rowIdxVector{nullptr} {
rowIdxVector{nullptr}, filePathVector{nullptr} {
auto tableSchema = catalog->getReadOnlyVersion()->getNodeTableSchema(table->getTableID());
copyStates.resize(tableSchema->getNumProperties());
for (auto i = 0u; i < tableSchema->getNumProperties(); i++) {
Expand Down
38 changes: 5 additions & 33 deletions src/include/processor/operator/copy/read_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,46 +5,18 @@
namespace kuzu {
namespace processor {

// For CSV file, we need to read in streaming mode, so we need to read one batch at a time.
class ReadCSVMorsel : public ReadFileMorsel {
public:
ReadCSVMorsel(common::row_idx_t rowIdx, std::string filePath, common::row_idx_t rowIdxInFile,
std::shared_ptr<arrow::RecordBatch> recordBatch)
: ReadFileMorsel{rowIdx, common::INVALID_BLOCK_IDX, common::INVALID_ROW_IDX,
std::move(filePath), rowIdxInFile},
recordBatch{std::move(recordBatch)} {}

std::shared_ptr<arrow::RecordBatch> recordBatch;
};

class ReadCSVSharedState : public ReadFileSharedState {
public:
ReadCSVSharedState(common::CSVReaderConfig csvReaderConfig, std::vector<std::string> filePaths,
catalog::TableSchema* tableSchema)
: ReadFileSharedState{std::move(filePaths), tableSchema}, csvReaderConfig{csvReaderConfig} {
}

private:
void countNumRows() override;

std::unique_ptr<ReadFileMorsel> getMorsel() override;

private:
common::CSVReaderConfig csvReaderConfig;
std::shared_ptr<arrow::csv::StreamingReader> reader;
};

class ReadCSV : public ReadFile {
public:
ReadCSV(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos,
std::vector<DataPos> arrowColumnPoses, std::shared_ptr<ReadFileSharedState> sharedState,
uint32_t id, const std::string& paramsString)
std::vector<DataPos> arrowColumnPoses,
std::shared_ptr<storage::ReadFileSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: ReadFile{rowIdxVectorPos, filePathVectorPos, std::move(arrowColumnPoses),
std::move(sharedState), PhysicalOperatorType::READ_CSV, id, paramsString} {}

inline std::shared_ptr<arrow::RecordBatch> readTuples(
std::unique_ptr<ReadFileMorsel> morsel) override {
auto csvMorsel = reinterpret_cast<ReadCSVMorsel*>(morsel.get());
std::unique_ptr<storage::ReadFileMorsel> morsel) override {
auto csvMorsel = reinterpret_cast<storage::ReadCSVMorsel*>(morsel.get());
return csvMorsel->recordBatch;
}

Expand Down
55 changes: 5 additions & 50 deletions src/include/processor/operator/copy/read_file.h
Original file line number Diff line number Diff line change
@@ -1,61 +1,16 @@
#pragma once

#include "processor/operator/physical_operator.h"
#include "storage/copier/table_copy_utils.h"
#include "storage/copier/read_file_state.h"

namespace kuzu {
namespace processor {

class ReadFileMorsel {
public:
ReadFileMorsel(common::row_idx_t rowIdx, common::block_idx_t blockIdx,
common::row_idx_t numRows, std::string filePath, common::row_idx_t rowIdxInFile)
: rowIdx{rowIdx}, blockIdx{blockIdx}, numRows{numRows}, filePath{std::move(filePath)},
rowIdxInFile{rowIdxInFile} {};

virtual ~ReadFileMorsel() = default;

public:
// When reading from multiple files, rowIdx is accumulated.
common::row_idx_t rowIdx;
common::block_idx_t blockIdx;
common::row_idx_t numRows;
std::string filePath;
// Row idx in the current file. Equal to `rowIdx` when reading from only a single file.
common::row_idx_t rowIdxInFile;
};

class ReadFileSharedState {
public:
explicit ReadFileSharedState(
std::vector<std::string> filePaths, catalog::TableSchema* tableSchema)
: currRowIdx{0}, curBlockIdx{0}, filePaths{std::move(filePaths)}, curFileIdx{0},
tableSchema{tableSchema}, numRows{0}, currRowIdxInCurrFile{1} {}

virtual ~ReadFileSharedState() = default;

virtual void countNumRows() = 0;

virtual std::unique_ptr<ReadFileMorsel> getMorsel() = 0;

public:
common::row_idx_t numRows;
catalog::TableSchema* tableSchema;

protected:
std::mutex mtx;
common::row_idx_t currRowIdx;
std::unordered_map<std::string, storage::FileBlockInfo> fileBlockInfos;
common::block_idx_t curBlockIdx;
std::vector<std::string> filePaths;
common::vector_idx_t curFileIdx;
common::row_idx_t currRowIdxInCurrFile;
};

class ReadFile : public PhysicalOperator {
public:
ReadFile(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos,
std::vector<DataPos> arrowColumnPoses, std::shared_ptr<ReadFileSharedState> sharedState,
std::vector<DataPos> arrowColumnPoses,
std::shared_ptr<storage::ReadFileSharedState> sharedState,
PhysicalOperatorType operatorType, uint32_t id, const std::string& paramsString)
: PhysicalOperator{operatorType, id, paramsString}, rowIdxVectorPos{rowIdxVectorPos},
filePathVectorPos{filePathVectorPos}, arrowColumnPoses{std::move(arrowColumnPoses)},
Expand All @@ -70,12 +25,12 @@ class ReadFile : public PhysicalOperator {
inline bool isSource() const override { return true; }

virtual std::shared_ptr<arrow::RecordBatch> readTuples(
std::unique_ptr<ReadFileMorsel> morsel) = 0;
std::unique_ptr<storage::ReadFileMorsel> morsel) = 0;

bool getNextTuplesInternal(ExecutionContext* context) override;

protected:
std::shared_ptr<ReadFileSharedState> sharedState;
std::shared_ptr<storage::ReadFileSharedState> sharedState;
DataPos rowIdxVectorPos;
DataPos filePathVectorPos;
std::vector<DataPos> arrowColumnPoses;
Expand Down
29 changes: 3 additions & 26 deletions src/include/processor/operator/copy/read_npy.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,41 +7,18 @@
namespace kuzu {
namespace processor {

class ReadNPYMorsel : public ReadFileMorsel {
public:
ReadNPYMorsel(common::row_idx_t rowIdx, common::block_idx_t blockIdx, common::row_idx_t numRows,
std::string filePath, common::vector_idx_t curFileIdx, common::row_idx_t rowIdxInFile)
: ReadFileMorsel{rowIdx, blockIdx, numRows, std::move(filePath), rowIdxInFile},
columnIdx{curFileIdx} {}

inline common::vector_idx_t getColumnIdx() const { return columnIdx; }

private:
common::vector_idx_t columnIdx;
};

class ReadNPYSharedState : public ReadFileSharedState {
public:
ReadNPYSharedState(catalog::NodeTableSchema* tableSchema, std::vector<std::string> filePaths)
: ReadFileSharedState{std::move(filePaths), tableSchema} {}

std::unique_ptr<ReadFileMorsel> getMorsel() final;

private:
void countNumRows() final;
};

class ReadNPY : public ReadFile {
public:
ReadNPY(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos,
std::vector<DataPos> arrowColumnPoses, const DataPos& columnIdxPos,
std::shared_ptr<ReadFileSharedState> sharedState, uint32_t id,
std::shared_ptr<storage::ReadFileSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: ReadFile{rowIdxVectorPos, filePathVectorPos, std::move(arrowColumnPoses),
std::move(sharedState), PhysicalOperatorType::READ_NPY, id, paramsString},
columnIdxPos{columnIdxPos}, columnIdxVector{nullptr} {}

std::shared_ptr<arrow::RecordBatch> readTuples(std::unique_ptr<ReadFileMorsel> morsel) final;
std::shared_ptr<arrow::RecordBatch> readTuples(
std::unique_ptr<storage::ReadFileMorsel> morsel) final;

bool getNextTuplesInternal(ExecutionContext* context) final;

Expand Down
20 changes: 5 additions & 15 deletions src/include/processor/operator/copy/read_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,17 @@
namespace kuzu {
namespace processor {

class ReadParquetSharedState : public ReadFileSharedState {
public:
explicit ReadParquetSharedState(
std::vector<std::string> filePaths, catalog::TableSchema* tableSchema)
: ReadFileSharedState{std::move(filePaths), tableSchema} {}

private:
void countNumRows() override;

std::unique_ptr<ReadFileMorsel> getMorsel() override;
};

class ReadParquet : public ReadFile {
public:
ReadParquet(const DataPos& offsetVectorPos, const DataPos& filePathVectorPos,
std::vector<DataPos> arrowColumnPoses, std::shared_ptr<ReadFileSharedState> sharedState,
uint32_t id, const std::string& paramsString)
std::vector<DataPos> arrowColumnPoses,
std::shared_ptr<storage::ReadFileSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: ReadFile{offsetVectorPos, filePathVectorPos, std::move(arrowColumnPoses),
std::move(sharedState), PhysicalOperatorType::READ_PARQUET, id, paramsString} {}

std::shared_ptr<arrow::RecordBatch> readTuples(std::unique_ptr<ReadFileMorsel> morsel) override;
std::shared_ptr<arrow::RecordBatch> readTuples(
std::unique_ptr<storage::ReadFileMorsel> morsel) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ReadParquet>(
Expand Down
116 changes: 116 additions & 0 deletions src/include/storage/copier/read_file_state.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#pragma once

#include "storage/copier/table_copy_utils.h"

namespace kuzu {
namespace storage {

class ReadFileMorsel {
public:
ReadFileMorsel(common::row_idx_t rowIdx, common::block_idx_t blockIdx,
common::row_idx_t numRows, std::string filePath, common::row_idx_t rowIdxInFile)
: rowIdx{rowIdx}, blockIdx{blockIdx}, numRows{numRows}, filePath{std::move(filePath)},
rowIdxInFile{rowIdxInFile} {};
virtual ~ReadFileMorsel() = default;

public:
// When reading from multiple files, rowIdx is accumulated.
common::row_idx_t rowIdx;
common::block_idx_t blockIdx;
common::row_idx_t numRows;
std::string filePath;
// Row idx in the current file. Equal to `rowIdx` when reading from only a single file.
common::row_idx_t rowIdxInFile;
};

// For CSV file, we need to read in streaming mode, so we need to read one batch at a time.
class ReadCSVMorsel : public ReadFileMorsel {
public:
ReadCSVMorsel(common::offset_t startRowIdx, std::string filePath,
common::row_idx_t rowIdxInFile, std::shared_ptr<arrow::RecordBatch> recordBatch)
: ReadFileMorsel{startRowIdx, common::INVALID_BLOCK_IDX, common::INVALID_ROW_IDX,
std::move(filePath), rowIdxInFile},
recordBatch{std::move(recordBatch)} {}

std::shared_ptr<arrow::RecordBatch> recordBatch;
};

class ReadNPYMorsel : public ReadFileMorsel {
public:
ReadNPYMorsel(common::row_idx_t rowIdx, common::block_idx_t blockIdx, common::row_idx_t numRows,
std::string filePath, common::vector_idx_t curFileIdx, common::row_idx_t rowIdxInFile)
: ReadFileMorsel{rowIdx, blockIdx, numRows, std::move(filePath), rowIdxInFile},
columnIdx{curFileIdx} {}

inline common::vector_idx_t getColumnIdx() const { return columnIdx; }

private:
common::vector_idx_t columnIdx;
};

class ReadFileSharedState {
public:
ReadFileSharedState(std::vector<std::string> filePaths, common::CSVReaderConfig csvReaderConfig,
catalog::TableSchema* tableSchema)
: numRows{0}, tableSchema{tableSchema}, filePaths{std::move(filePaths)},
csvReaderConfig{csvReaderConfig}, currRowIdx{0}, currBlockIdx{0}, currFileIdx{0},
currRowIdxInCurrFile{1} {};
virtual ~ReadFileSharedState() = default;

virtual void countNumRows() = 0;
virtual std::unique_ptr<ReadFileMorsel> getMorsel() = 0;

public:
std::mutex mtx;
common::row_idx_t numRows;
catalog::TableSchema* tableSchema;
std::vector<std::string> filePaths;
common::CSVReaderConfig csvReaderConfig;
std::unordered_map<std::string, FileBlockInfo> fileBlockInfos;
common::row_idx_t currRowIdx;
common::block_idx_t currBlockIdx;
common::vector_idx_t currFileIdx;
common::row_idx_t currRowIdxInCurrFile;
};

// For CSV file, we need to read in streaming mode, so we need to keep the reader in the shared
// state.
class ReadCSVSharedState : public ReadFileSharedState {
public:
ReadCSVSharedState(std::vector<std::string> filePaths, common::CSVReaderConfig csvReaderConfig,
catalog::TableSchema* tableSchema)
: ReadFileSharedState{std::move(filePaths), csvReaderConfig, tableSchema} {};

void countNumRows() final;
std::unique_ptr<ReadFileMorsel> getMorsel() final;

private:
std::shared_ptr<arrow::csv::StreamingReader> reader;
};

class ReadParquetSharedState : public ReadFileSharedState {
public:
explicit ReadParquetSharedState(std::vector<std::string> filePaths,
common::CSVReaderConfig csvReaderConfig, catalog::TableSchema* tableSchema)
: ReadFileSharedState{std::move(filePaths), csvReaderConfig, tableSchema} {}

private:
void countNumRows() override;

std::unique_ptr<storage::ReadFileMorsel> getMorsel() override;
};

class ReadNPYSharedState : public ReadFileSharedState {
public:
ReadNPYSharedState(std::vector<std::string> filePaths, common::CSVReaderConfig csvReaderConfig,
catalog::NodeTableSchema* tableSchema)
: ReadFileSharedState{std::move(filePaths), csvReaderConfig, tableSchema} {}

std::unique_ptr<storage::ReadFileMorsel> getMorsel() final;

private:
void countNumRows() final;
};

} // namespace storage
} // namespace kuzu
Loading

0 comments on commit d85691c

Please sign in to comment.