Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change npy reader to read multiple files at one time #1842

Merged
merged 1 commit into from
Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ namespace kuzu {
namespace binder {

std::unique_ptr<BoundStatement> Binder::bindCopyClause(const Statement& statement) {
auto& copyCSV = (Copy&)statement;
auto& copyStatement = (Copy&)statement;
auto catalogContent = catalog.getReadOnlyVersion();
auto tableName = copyCSV.getTableName();
auto tableName = copyStatement.getTableName();
validateTableExist(catalog, tableName);
auto tableID = catalogContent->getTableID(tableName);
auto csvReaderConfig = bindParsingOptions(copyCSV.getParsingOptions());
auto boundFilePaths = bindFilePaths(copyCSV.getFilePaths());
auto csvReaderConfig = bindParsingOptions(copyStatement.getParsingOptions());
auto boundFilePaths = bindFilePaths(copyStatement.getFilePaths());
auto actualFileType = bindFileType(boundFilePaths);
auto expectedFileType = copyCSV.getFileType();
auto expectedFileType = copyStatement.getFileType();
if (expectedFileType == common::CopyDescription::FileType::UNKNOWN &&
actualFileType == common::CopyDescription::FileType::NPY) {
throw BinderException("Please use COPY FROM BY COLUMN statement for copying npy files.");
Expand Down
5 changes: 0 additions & 5 deletions src/catalog/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,11 @@ namespace kuzu {
namespace catalog {

CatalogContent::CatalogContent() : nextTableID{0} {
logger = LoggerUtils::getLogger(LoggerConstants::LoggerEnum::CATALOG);
registerBuiltInFunctions();
}

CatalogContent::CatalogContent(const std::string& directory) {
logger = LoggerUtils::getLogger(LoggerConstants::LoggerEnum::CATALOG);
logger->info("Initializing catalog.");
readFromFile(directory, DBFileType::ORIGINAL);
logger->info("Initializing catalog done.");
registerBuiltInFunctions();
}

Expand Down Expand Up @@ -295,7 +291,6 @@ void CatalogContent::saveToFile(const std::string& directory, DBFileType dbFileT

void CatalogContent::readFromFile(const std::string& directory, DBFileType dbFileType) {
auto catalogPath = StorageUtils::getCatalogFilePath(directory, dbFileType);
logger->debug("Reading from {}.", catalogPath);
auto fileInfo = FileUtils::openFile(catalogPath, O_RDONLY);
uint64_t offset = 0;
validateMagicBytes(fileInfo.get(), offset);
Expand Down
10 changes: 0 additions & 10 deletions src/include/catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,19 @@
#include "storage/wal/wal.h"
#include "transaction/transaction.h"

namespace spdlog {
class logger;
}

namespace kuzu {
namespace catalog {

class CatalogContent {
friend class Catalog;

public:
// This constructor is only used for mock catalog testing only.
CatalogContent();

explicit CatalogContent(const std::string& directory);

CatalogContent(const CatalogContent& other);

virtual ~CatalogContent() = default;

/**
* Node and Rel table functions.
*/
Expand Down Expand Up @@ -164,7 +157,6 @@ class CatalogContent {
void registerBuiltInFunctions();

private:
std::shared_ptr<spdlog::logger> logger;
std::unordered_map<common::table_id_t, std::unique_ptr<NodeTableSchema>> nodeTableSchemas;
std::unordered_map<common::table_id_t, std::unique_ptr<RelTableSchema>> relTableSchemas;
// These two maps are maintained as caches. They are not serialized to the catalog file, but
Expand All @@ -184,8 +176,6 @@ class Catalog {

explicit Catalog(storage::WAL* wal);

virtual ~Catalog() = default;

// TODO(Guodong): Get rid of these two functions.
inline CatalogContent* getReadOnlyVersion() const { return catalogContentForReadOnlyTrx.get(); }
inline CatalogContent* getWriteVersion() const { return catalogContentForWriteTrx.get(); }
Expand Down
31 changes: 12 additions & 19 deletions src/include/planner/logical_plan/logical_operator/logical_copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,25 @@ class LogicalCopy : public LogicalOperator {

public:
LogicalCopy(const common::CopyDescription& copyDescription, common::table_id_t tableID,
std::string tableName, binder::expression_vector arrowColumnExpressions,
std::string tableName, binder::expression_vector dataColumnExpressions,
std::shared_ptr<binder::Expression> rowIdxExpression,
std::shared_ptr<binder::Expression> filePathExpression,
std::shared_ptr<binder::Expression> columnIdxExpression,
std::shared_ptr<binder::Expression> outputExpression)
: LogicalOperator{LogicalOperatorType::COPY}, copyDescription{copyDescription},
tableID{tableID}, tableName{std::move(tableName)}, arrowColumnExpressions{std::move(
arrowColumnExpressions)},
rowIdxExpression{std::move(rowIdxExpression)}, filePathExpression{std::move(
filePathExpression)},
columnIdxExpression{std::move(columnIdxExpression)}, outputExpression{
std::move(outputExpression)} {}
: LogicalOperator{LogicalOperatorType::COPY},
copyDescription{copyDescription}, tableID{tableID}, tableName{std::move(tableName)},
dataColumnExpressions{std::move(dataColumnExpressions)}, rowIdxExpression{std::move(
rowIdxExpression)},
filePathExpression{std::move(filePathExpression)}, outputExpression{
std::move(outputExpression)} {}

inline std::string getExpressionsForPrinting() const override { return tableName; }

inline common::CopyDescription getCopyDescription() const { return copyDescription; }

inline common::table_id_t getTableID() const { return tableID; }

inline std::vector<std::shared_ptr<binder::Expression>> getArrowColumnExpressions() const {
return arrowColumnExpressions;
inline std::vector<std::shared_ptr<binder::Expression>> getDataColumnExpressions() const {
return dataColumnExpressions;
}

inline std::shared_ptr<binder::Expression> getRowIdxExpression() const {
Expand All @@ -42,10 +40,6 @@ class LogicalCopy : public LogicalOperator {
return filePathExpression;
}

inline std::shared_ptr<binder::Expression> getColumnIdxExpression() const {
return columnIdxExpression;
}

inline std::shared_ptr<binder::Expression> getOutputExpression() const {
return outputExpression;
}
Expand All @@ -54,19 +48,18 @@ class LogicalCopy : public LogicalOperator {
void computeFlatSchema() override;

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCopy>(copyDescription, tableID, tableName, arrowColumnExpressions,
rowIdxExpression, filePathExpression, columnIdxExpression, outputExpression);
return make_unique<LogicalCopy>(copyDescription, tableID, tableName, dataColumnExpressions,
rowIdxExpression, filePathExpression, outputExpression);
}

private:
common::CopyDescription copyDescription;
common::table_id_t tableID;
// Used for printing only.
std::string tableName;
binder::expression_vector arrowColumnExpressions;
binder::expression_vector dataColumnExpressions;
std::shared_ptr<binder::Expression> rowIdxExpression;
std::shared_ptr<binder::Expression> filePathExpression;
std::shared_ptr<binder::Expression> columnIdxExpression;
std::shared_ptr<binder::Expression> outputExpression;
};

Expand Down
8 changes: 4 additions & 4 deletions src/include/processor/operator/copy/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class CopyNodeSharedState {
struct CopyNodeDataInfo {
DataPos rowIdxVectorPos;
DataPos filePathVectorPos;
std::vector<DataPos> arrowColumnPoses;
std::vector<DataPos> dataColumnPoses;
};

class CopyNode : public Sink {
Expand All @@ -62,8 +62,8 @@ class CopyNode : public Sink {
inline void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override {
rowIdxVector = resultSet->getValueVector(copyNodeDataInfo.rowIdxVectorPos).get();
filePathVector = resultSet->getValueVector(copyNodeDataInfo.filePathVectorPos).get();
for (auto& arrowColumnPos : copyNodeDataInfo.arrowColumnPoses) {
arrowColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
for (auto& arrowColumnPos : copyNodeDataInfo.dataColumnPoses) {
dataColumnVectors.push_back(resultSet->getValueVector(arrowColumnPos).get());
}
}

Expand Down Expand Up @@ -124,7 +124,7 @@ class CopyNode : public Sink {
storage::WAL* wal;
common::ValueVector* rowIdxVector;
common::ValueVector* filePathVector;
std::vector<common::ValueVector*> arrowColumnVectors;
std::vector<common::ValueVector*> dataColumnVectors;
std::vector<std::unique_ptr<storage::PropertyCopyState>> copyStates;
};

Expand Down
44 changes: 0 additions & 44 deletions src/include/processor/operator/copy/copy_npy_node.h

This file was deleted.

6 changes: 3 additions & 3 deletions src/include/processor/operator/copy/read_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ namespace processor {
class ReadCSV : public ReadFile {
public:
ReadCSV(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos,
std::vector<DataPos> arrowColumnPoses,
std::vector<DataPos> dataColumnPoses,
std::shared_ptr<storage::ReadFileSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: ReadFile{rowIdxVectorPos, filePathVectorPos, std::move(arrowColumnPoses),
: ReadFile{rowIdxVectorPos, filePathVectorPos, std::move(dataColumnPoses),
std::move(sharedState), PhysicalOperatorType::READ_CSV, id, paramsString} {}

inline std::shared_ptr<arrow::RecordBatch> readTuples(
Expand All @@ -22,7 +22,7 @@ class ReadCSV : public ReadFile {

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ReadCSV>(
rowIdxVectorPos, filePathVectorPos, arrowColumnPoses, sharedState, id, paramsString);
rowIdxVectorPos, filePathVectorPos, dataColumnPoses, sharedState, id, paramsString);
}
};

Expand Down
9 changes: 5 additions & 4 deletions src/include/processor/operator/copy/read_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ namespace processor {
class ReadFile : public PhysicalOperator {
public:
ReadFile(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos,
std::vector<DataPos> arrowColumnPoses,
std::vector<DataPos> dataColumnPoses,
std::shared_ptr<storage::ReadFileSharedState> sharedState,
PhysicalOperatorType operatorType, uint32_t id, const std::string& paramsString)
: PhysicalOperator{operatorType, id, paramsString}, rowIdxVectorPos{rowIdxVectorPos},
filePathVectorPos{filePathVectorPos}, arrowColumnPoses{std::move(arrowColumnPoses)},
filePathVectorPos{filePathVectorPos}, dataColumnPoses{std::move(dataColumnPoses)},
sharedState{std::move(sharedState)}, rowIdxVector{nullptr}, filePathVector{nullptr} {}

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;
Expand All @@ -24,6 +24,7 @@ class ReadFile : public PhysicalOperator {

inline bool isSource() const override { return true; }

protected:
virtual std::shared_ptr<arrow::RecordBatch> readTuples(
std::unique_ptr<storage::ReadFileMorsel> morsel) = 0;

Expand All @@ -33,10 +34,10 @@ class ReadFile : public PhysicalOperator {
std::shared_ptr<storage::ReadFileSharedState> sharedState;
DataPos rowIdxVectorPos;
DataPos filePathVectorPos;
std::vector<DataPos> arrowColumnPoses;
std::vector<DataPos> dataColumnPoses;
common::ValueVector* rowIdxVector;
common::ValueVector* filePathVector;
std::vector<common::ValueVector*> arrowColumnVectors;
std::vector<common::ValueVector*> dataColumnVectors;
};

} // namespace processor
Expand Down
21 changes: 8 additions & 13 deletions src/include/processor/operator/copy/read_npy.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,24 @@ namespace processor {
class ReadNPY : public ReadFile {
public:
ReadNPY(const DataPos& rowIdxVectorPos, const DataPos& filePathVectorPos,
std::vector<DataPos> arrowColumnPoses, const DataPos& columnIdxPos,
std::vector<DataPos> dataColumnPoses,
std::shared_ptr<storage::ReadFileSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: ReadFile{rowIdxVectorPos, filePathVectorPos, std::move(arrowColumnPoses),
std::move(sharedState), PhysicalOperatorType::READ_NPY, id, paramsString},
columnIdxPos{columnIdxPos}, columnIdxVector{nullptr} {}
: ReadFile{rowIdxVectorPos, filePathVectorPos, std::move(dataColumnPoses),
std::move(sharedState), PhysicalOperatorType::READ_NPY, id, paramsString} {
reader = std::make_unique<storage::NpyMultiFileReader>(this->sharedState->filePaths);
}

std::shared_ptr<arrow::RecordBatch> readTuples(
std::unique_ptr<storage::ReadFileMorsel> morsel) final;

bool getNextTuplesInternal(ExecutionContext* context) final;

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final;

inline std::unique_ptr<PhysicalOperator> clone() final {
return std::make_unique<ReadNPY>(rowIdxVectorPos, filePathVectorPos, arrowColumnPoses,
columnIdxPos, sharedState, id, paramsString);
return std::make_unique<ReadNPY>(
rowIdxVectorPos, filePathVectorPos, dataColumnPoses, sharedState, id, paramsString);
}

private:
std::unique_ptr<storage::NpyReader> reader;
DataPos columnIdxPos;
common::ValueVector* columnIdxVector;
std::unique_ptr<storage::NpyMultiFileReader> reader;
};

} // namespace processor
Expand Down
6 changes: 3 additions & 3 deletions src/include/processor/operator/copy/read_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ namespace processor {
class ReadParquet : public ReadFile {
public:
ReadParquet(const DataPos& offsetVectorPos, const DataPos& filePathVectorPos,
std::vector<DataPos> arrowColumnPoses,
std::vector<DataPos> dataColumnPoses,
std::shared_ptr<storage::ReadFileSharedState> sharedState, uint32_t id,
const std::string& paramsString)
: ReadFile{offsetVectorPos, filePathVectorPos, std::move(arrowColumnPoses),
: ReadFile{offsetVectorPos, filePathVectorPos, std::move(dataColumnPoses),
std::move(sharedState), PhysicalOperatorType::READ_PARQUET, id, paramsString} {}

std::shared_ptr<arrow::RecordBatch> readTuples(
std::unique_ptr<storage::ReadFileMorsel> morsel) override;

inline std::unique_ptr<PhysicalOperator> clone() override {
return std::make_unique<ReadParquet>(
rowIdxVectorPos, filePathVectorPos, arrowColumnPoses, sharedState, id, paramsString);
rowIdxVectorPos, filePathVectorPos, dataColumnPoses, sharedState, id, paramsString);
}

private:
Expand Down
10 changes: 10 additions & 0 deletions src/include/storage/copier/npy_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,15 @@ class NpyReader {
static inline const std::string defaultFieldName = "NPY_FIELD";
};

class NpyMultiFileReader {
public:
explicit NpyMultiFileReader(const std::vector<std::string>& filePaths);

std::shared_ptr<arrow::RecordBatch> readBlock(common::block_idx_t blockIdx) const;

private:
std::vector<std::unique_ptr<NpyReader>> fileReaders;
};

} // namespace storage
} // namespace kuzu
13 changes: 0 additions & 13 deletions src/include/storage/copier/read_file_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,6 @@ class ReadCSVMorsel : public ReadFileMorsel {
std::shared_ptr<arrow::RecordBatch> recordBatch;
};

class ReadNPYMorsel : public ReadFileMorsel {
public:
ReadNPYMorsel(common::row_idx_t rowIdx, common::block_idx_t blockIdx, common::row_idx_t numRows,
std::string filePath, common::vector_idx_t curFileIdx, common::row_idx_t rowIdxInFile)
: ReadFileMorsel{rowIdx, blockIdx, numRows, std::move(filePath), rowIdxInFile},
columnIdx{curFileIdx} {}

inline common::vector_idx_t getColumnIdx() const { return columnIdx; }

private:
common::vector_idx_t columnIdx;
};

class ReadFileSharedState {
public:
ReadFileSharedState(std::vector<std::string> filePaths, common::CSVReaderConfig csvReaderConfig,
Expand Down
Loading
Loading