Skip to content

Commit

Permalink
Change npy reader to read multiple files at one time
Browse files Browse the repository at this point in the history
  • Loading branch information
ray6080 committed Jul 21, 2023
1 parent 60b21ca commit d2846de
Show file tree
Hide file tree
Showing 34 changed files with 132 additions and 277 deletions.
10 changes: 5 additions & 5 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ namespace kuzu {
namespace binder {

std::unique_ptr<BoundStatement> Binder::bindCopyClause(const Statement& statement) {
auto& copyCSV = (Copy&)statement;
auto& copyStatement = (Copy&)statement;
auto catalogContent = catalog.getReadOnlyVersion();
auto tableName = copyCSV.getTableName();
auto tableName = copyStatement.getTableName();
validateTableExist(catalog, tableName);
auto tableID = catalogContent->getTableID(tableName);
auto csvReaderConfig = bindParsingOptions(copyCSV.getParsingOptions());
auto boundFilePaths = bindFilePaths(copyCSV.getFilePaths());
auto csvReaderConfig = bindParsingOptions(copyStatement.getParsingOptions());
auto boundFilePaths = bindFilePaths(copyStatement.getFilePaths());
auto actualFileType = bindFileType(boundFilePaths);
auto expectedFileType = copyCSV.getFileType();
auto expectedFileType = copyStatement.getFileType();
if (expectedFileType == common::CopyDescription::FileType::UNKNOWN &&
actualFileType == common::CopyDescription::FileType::NPY) {
throw BinderException("Please use COPY FROM BY COLUMN statement for copying npy files.");
Expand Down
5 changes: 0 additions & 5 deletions src/catalog/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,11 @@ namespace kuzu {
namespace catalog {

CatalogContent::CatalogContent() : nextTableID{0} {
logger = LoggerUtils::getLogger(LoggerConstants::LoggerEnum::CATALOG);
registerBuiltInFunctions();
}

CatalogContent::CatalogContent(const std::string& directory) {
logger = LoggerUtils::getLogger(LoggerConstants::LoggerEnum::CATALOG);
logger->info("Initializing catalog.");
readFromFile(directory, DBFileType::ORIGINAL);
logger->info("Initializing catalog done.");
registerBuiltInFunctions();
}

Expand Down Expand Up @@ -295,7 +291,6 @@ void CatalogContent::saveToFile(const std::string& directory, DBFileType dbFileT

void CatalogContent::readFromFile(const std::string& directory, DBFileType dbFileType) {
auto catalogPath = StorageUtils::getCatalogFilePath(directory, dbFileType);
logger->debug("Reading from {}.", catalogPath);
auto fileInfo = FileUtils::openFile(catalogPath, O_RDONLY);
uint64_t offset = 0;
validateMagicBytes(fileInfo.get(), offset);
Expand Down
10 changes: 0 additions & 10 deletions src/include/catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,19 @@
#include "storage/wal/wal.h"
#include "transaction/transaction.h"

namespace spdlog {
class logger;
}

namespace kuzu {
namespace catalog {

class CatalogContent {
friend class Catalog;

public:
// This constructor is only used for mock catalog testing only.
CatalogContent();

explicit CatalogContent(const std::string& directory);

CatalogContent(const CatalogContent& other);

virtual ~CatalogContent() = default;

/**
* Node and Rel table functions.
*/
Expand Down Expand Up @@ -164,7 +157,6 @@ class CatalogContent {
void registerBuiltInFunctions();

private:
std::shared_ptr<spdlog::logger> logger;
std::unordered_map<common::table_id_t, std::unique_ptr<NodeTableSchema>> nodeTableSchemas;
std::unordered_map<common::table_id_t, std::unique_ptr<RelTableSchema>> relTableSchemas;
// These two maps are maintained as caches. They are not serialized to the catalog file, but
Expand All @@ -184,8 +176,6 @@ class Catalog {

explicit Catalog(storage::WAL* wal);

virtual ~Catalog() = default;

// TODO(Guodong): Get rid of these two functions.
inline CatalogContent* getReadOnlyVersion() const { return catalogContentForReadOnlyTrx.get(); }
inline CatalogContent* getWriteVersion() const { return catalogContentForWriteTrx.get(); }
Expand Down
31 changes: 12 additions & 19 deletions src/include/planner/logical_plan/logical_operator/logical_copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,25 @@ class LogicalCopy : public LogicalOperator {

public:
LogicalCopy(const common::CopyDescription& copyDescription, common::table_id_t tableID,
std::string tableName, binder::expression_vector arrowColumnExpressions,
std::string tableName, binder::expression_vector dataColumnExpressions,
std::shared_ptr<binder::Expression> rowIdxExpression,
std::shared_ptr<binder::Expression> filePathExpression,
std::shared_ptr<binder::Expression> columnIdxExpression,
std::shared_ptr<binder::Expression> outputExpression)
: LogicalOperator{LogicalOperatorType::COPY}, copyDescription{copyDescription},
tableID{tableID}, tableName{std::move(tableName)}, arrowColumnExpressions{std::move(
arrowColumnExpressions)},
rowIdxExpression{std::move(rowIdxExpression)}, filePathExpression{std::move(
filePathExpression)},
columnIdxExpression{std::move(columnIdxExpression)}, outputExpression{
std::move(outputExpression)} {}
: LogicalOperator{LogicalOperatorType::COPY},
copyDescription{copyDescription}, tableID{tableID}, tableName{std::move(tableName)},
dataColumnExpressions{std::move(dataColumnExpressions)}, rowIdxExpression{std::move(
rowIdxExpression)},
filePathExpression{std::move(filePathExpression)}, outputExpression{
std::move(outputExpression)} {}

inline std::string getExpressionsForPrinting() const override { return tableName; }

inline common::CopyDescription getCopyDescription() const { return copyDescription; }

inline common::table_id_t getTableID() const { return tableID; }

inline std::vector<std::shared_ptr<binder::Expression>> getArrowColumnExpressions() const {
return arrowColumnExpressions;
inline std::vector<std::shared_ptr<binder::Expression>> getDataColumnExpressions() const {
return dataColumnExpressions;
}

inline std::shared_ptr<binder::Expression> getRowIdxExpression() const {
Expand All @@ -42,10 +40,6 @@ class LogicalCopy : public LogicalOperator {
return filePathExpression;
}

inline std::shared_ptr<binder::Expression> getColumnIdxExpression() const {
return columnIdxExpression;
}

inline std::shared_ptr<binder::Expression> getOutputExpression() const {
return outputExpression;
}
Expand All @@ -54,19 +48,18 @@ class LogicalCopy : public LogicalOperator {
void computeFlatSchema() override;

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCopy>(copyDescription, tableID, tableName, arrowColumnExpressions,
rowIdxExpression, filePathExpression, columnIdxExpression, outputExpression);
return make_unique<LogicalCopy>(copyDescription, tableID, tableName, dataColumnExpressions,
rowIdxExpression, filePathExpression, outputExpression);
}

private:
common::CopyDescription copyDescription;
common::table_id_t tableID;
// Used for printing only.
std::string tableName;
binder::expression_vector arrowColumnExpressions;
binder::expression_vector dataColumnExpressions;
std::shared_ptr<binder::Expression> rowIdxExpression;
std::shared_ptr<binder::Expression> filePathExpression;
std::shared_ptr<binder::Expression> columnIdxExpression;
std::shared_ptr<binder::Expression> outputExpression;
};

Expand Down
8 changes: 4 additions & 4 deletions src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class CopyNodeSharedState {
struct CopyNodeDataInfo {
DataPos rowIdxVectorPos;
DataPos filePathVectorPos;
std::vector<DataPos> arrowColumnPoses;
std::vector<DataPos> dataColumnPoses;
};

class CopyNode : public Sink {
Expand All @@ -62,8 +62,8 @@ class CopyNode : public Sink {
inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
rowIdxVector = resultSet->getValueVector(copyNodeDataInfo.rowIdxVectorPos).get();
filePathVector = resultSet->getValueVector(copyNodeDataInfo.filePathVectorPos).get();
for (auto& arrowColumnPos : copyNodeDataInfo.arrowColumnPoses) {
arrowColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
for (auto& arrowColumnPos : copyNodeDataInfo.dataColumnPoses) {
dataColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
}
}

Expand Down Expand Up @@ -124,7 +124,7 @@ class CopyNode : public Sink {
storage::WAL* wal;
common::ValueVector* rowIdxVector;
common::ValueVector* filePathVector;
std::vector<common::ValueVector*> arrowColumnVectors;
std::vector<common::ValueVector*> dataColumnVectors;
std::vector<std::unique_ptr<storage::PropertyCopyState>> copyStates;
};

Expand Down
44 changes: 0 additions & 44 deletions src/include/processor/operator/copy/copy_npy_node.h

This file was deleted.

6 changes: 3 additions & 3 deletions src/include/processor/operator/copy/read_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ namespace processor {
class ReadCSV : public ReadFile {
public:
ReadCSV(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos,
std::vector<DataPos> arrowColumnPoses,
std::vector<DataPos> dataColumnPoses,
std::shared_ptr<storage::ReadFileSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: ReadFile{rowIdxVectorPos, filePathVectorPos, std::move(arrowColumnPoses),
: ReadFile{rowIdxVectorPos, filePathVectorPos, std::move(dataColumnPoses),
std::move(sharedState), PhysicalOperatorType::READ_CSV, id, paramsString} {}

inline std::shared_ptr<arrow::RecordBatch> readTuples(
Expand All @@ -22,7 +22,7 @@ class ReadCSV : public ReadFile {

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ReadCSV>(
rowIdxVectorPos, filePathVectorPos, arrowColumnPoses, sharedState, id, paramsString);
rowIdxVectorPos, filePathVectorPos, dataColumnPoses, sharedState, id, paramsString);
}
};

Expand Down
9 changes: 5 additions & 4 deletions src/include/processor/operator/copy/read_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ namespace processor {
class ReadFile : public PhysicalOperator {
public:
ReadFile(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos,
std::vector<DataPos> arrowColumnPoses,
std::vector<DataPos> dataColumnPoses,
std::shared_ptr<storage::ReadFileSharedState> sharedState,
PhysicalOperatorType operatorType, uint32_t id, const std::string& paramsString)
: PhysicalOperator{operatorType, id, paramsString}, rowIdxVectorPos{rowIdxVectorPos},
filePathVectorPos{filePathVectorPos}, arrowColumnPoses{std::move(arrowColumnPoses)},
filePathVectorPos{filePathVectorPos}, dataColumnPoses{std::move(dataColumnPoses)},
sharedState{std::move(sharedState)}, rowIdxVector{nullptr}, filePathVector{nullptr} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;
Expand All @@ -24,6 +24,7 @@ class ReadFile : public PhysicalOperator {

inline bool isSource() const override { return true; }

protected:
virtual std::shared_ptr<arrow::RecordBatch> readTuples(
std::unique_ptr<storage::ReadFileMorsel> morsel) = 0;

Expand All @@ -33,10 +34,10 @@ class ReadFile : public PhysicalOperator {
std::shared_ptr<storage::ReadFileSharedState> sharedState;
DataPos rowIdxVectorPos;
DataPos filePathVectorPos;
std::vector<DataPos> arrowColumnPoses;
std::vector<DataPos> dataColumnPoses;
common::ValueVector* rowIdxVector;
common::ValueVector* filePathVector;
std::vector<common::ValueVector*> arrowColumnVectors;
std::vector<common::ValueVector*> dataColumnVectors;
};

} // namespace processor
Expand Down
21 changes: 8 additions & 13 deletions src/include/processor/operator/copy/read_npy.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,24 @@ namespace processor {
class ReadNPY : public ReadFile {
public:
ReadNPY(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos,
std::vector<DataPos> arrowColumnPoses, const DataPos& columnIdxPos,
std::vector<DataPos> dataColumnPoses,
std::shared_ptr<storage::ReadFileSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: ReadFile{rowIdxVectorPos, filePathVectorPos, std::move(arrowColumnPoses),
std::move(sharedState), PhysicalOperatorType::READ_NPY, id, paramsString},
columnIdxPos{columnIdxPos}, columnIdxVector{nullptr} {}
: ReadFile{rowIdxVectorPos, filePathVectorPos, std::move(dataColumnPoses),
std::move(sharedState), PhysicalOperatorType::READ_NPY, id, paramsString} {
reader = std::make_unique<storage::NpyMultiFileReader>(this->sharedState->filePaths);
}

std::shared_ptr<arrow::RecordBatch> readTuples(
std::unique_ptr<storage::ReadFileMorsel> morsel) final;

bool getNextTuplesInternal(ExecutionContext* context) final;

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

inline std::unique_ptr<PhysicalOperator> clone() final {
return std::make_unique<ReadNPY>(rowIdxVectorPos, filePathVectorPos, arrowColumnPoses,
columnIdxPos, sharedState, id, paramsString);
return std::make_unique<ReadNPY>(
rowIdxVectorPos, filePathVectorPos, dataColumnPoses, sharedState, id, paramsString);
}

private:
std::unique_ptr<storage::NpyReader> reader;
DataPos columnIdxPos;
common::ValueVector* columnIdxVector;
std::unique_ptr<storage::NpyMultiFileReader> reader;
};

} // namespace processor
Expand Down
6 changes: 3 additions & 3 deletions src/include/processor/operator/copy/read_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ namespace processor {
class ReadParquet : public ReadFile {
public:
ReadParquet(const DataPos& offsetVectorPos, const DataPos& filePathVectorPos,
std::vector<DataPos> arrowColumnPoses,
std::vector<DataPos> dataColumnPoses,
std::shared_ptr<storage::ReadFileSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: ReadFile{offsetVectorPos, filePathVectorPos, std::move(arrowColumnPoses),
: ReadFile{offsetVectorPos, filePathVectorPos, std::move(dataColumnPoses),
std::move(sharedState), PhysicalOperatorType::READ_PARQUET, id, paramsString} {}

std::shared_ptr<arrow::RecordBatch> readTuples(
std::unique_ptr<storage::ReadFileMorsel> morsel) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ReadParquet>(
rowIdxVectorPos, filePathVectorPos, arrowColumnPoses, sharedState, id, paramsString);
rowIdxVectorPos, filePathVectorPos, dataColumnPoses, sharedState, id, paramsString);
}

private:
Expand Down
10 changes: 10 additions & 0 deletions src/include/storage/copier/npy_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,15 @@ class NpyReader {
static inline const std::string defaultFieldName = "NPY_FIELD";
};

class NpyMultiFileReader {
public:
explicit NpyMultiFileReader(const std::vector<std::string>& filePaths);

std::shared_ptr<arrow::RecordBatch> readBlock(common::block_idx_t blockIdx) const;

private:
std::vector<std::unique_ptr<NpyReader>> fileReaders;
};

} // namespace storage
} // namespace kuzu
13 changes: 0 additions & 13 deletions src/include/storage/copier/read_file_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,6 @@ class ReadCSVMorsel : public ReadFileMorsel {
std::shared_ptr<arrow::RecordBatch> recordBatch;
};

class ReadNPYMorsel : public ReadFileMorsel {
public:
ReadNPYMorsel(common::row_idx_t rowIdx, common::block_idx_t blockIdx, common::row_idx_t numRows,
std::string filePath, common::vector_idx_t curFileIdx, common::row_idx_t rowIdxInFile)
: ReadFileMorsel{rowIdx, blockIdx, numRows, std::move(filePath), rowIdxInFile},
columnIdx{curFileIdx} {}

inline common::vector_idx_t getColumnIdx() const { return columnIdx; }

private:
common::vector_idx_t columnIdx;
};

class ReadFileSharedState {
public:
ReadFileSharedState(std::vector<std::string> filePaths, common::CSVReaderConfig csvReaderConfig,
Expand Down
Loading

0 comments on commit d2846de

Please sign in to comment.