Skip to content

Commit

Permalink
Copy node tables from .npy files
Browse files Browse the repository at this point in the history
  • Loading branch information
mewim committed Mar 21, 2023
1 parent b5805d6 commit 46ebc4b
Show file tree
Hide file tree
Showing 56 changed files with 4,048 additions and 2,453 deletions.
1 change: 1 addition & 0 deletions dataset/npy-1d/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
copy npy(i64,i32,i16,f64,f32) from "dataset/npy-1d/one_dim_int64.npy", "dataset/npy-1d/one_dim_int32.npy", "dataset/npy-1d/one_dim_int16.npy", "dataset/npy-1d/one_dim_double.npy", "dataset/npy-1d/one_dim_float.npy";
Binary file added dataset/npy-1d/fortran_order.npy
Binary file not shown.
Binary file added dataset/npy-1d/one_dim_double.npy
Binary file not shown.
Binary file added dataset/npy-1d/one_dim_float.npy
Binary file not shown.
Binary file added dataset/npy-1d/one_dim_int16.npy
Binary file not shown.
Binary file added dataset/npy-1d/one_dim_int32.npy
Binary file not shown.
Binary file added dataset/npy-1d/one_dim_int64.npy
Binary file not shown.
1 change: 1 addition & 0 deletions dataset/npy-1d/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create node table npy (i64 INT64,i32 INT32,i16 INT16,f64 DOUBLE,f32 FLOAT, PRIMARY KEY(i64));
1 change: 1 addition & 0 deletions dataset/npy-20k/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
copy npy(id,f32) from "dataset/npy-20k/id_int64.npy", "dataset/npy-20k/two_dim_float.npy";
Binary file added dataset/npy-20k/id_int64.npy
Binary file not shown.
1 change: 1 addition & 0 deletions dataset/npy-20k/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create node table npy (id INT64,f32 FLOAT[10],PRIMARY KEY(id));
Binary file added dataset/npy-20k/two_dim_float.npy
Binary file not shown.
1 change: 1 addition & 0 deletions dataset/npy-2d/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
copy npy(id,i64,i32,i16,f64,f32) from "dataset/npy-2d/id_int64.npy", "dataset/npy-2d/two_dim_int64.npy", "dataset/npy-2d/two_dim_int32.npy", "dataset/npy-2d/two_dim_int16.npy", "dataset/npy-2d/two_dim_double.npy", "dataset/npy-2d/two_dim_float.npy";
Binary file added dataset/npy-2d/id_int64.npy
Binary file not shown.
1 change: 1 addition & 0 deletions dataset/npy-2d/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create node table npy (id INT64, i64 INT64[3],i32 INT32[3],i16 INT16[3],f64 DOUBLE[3],f32 FLOAT[3],PRIMARY KEY(id));
Binary file added dataset/npy-2d/three_dim_int64.npy
Binary file not shown.
Binary file added dataset/npy-2d/two_dim_double.npy
Binary file not shown.
Binary file added dataset/npy-2d/two_dim_float.npy
Binary file not shown.
Binary file added dataset/npy-2d/two_dim_int16.npy
Binary file not shown.
Binary file added dataset/npy-2d/two_dim_int32.npy
Binary file not shown.
Binary file added dataset/npy-2d/two_dim_int64.npy
Binary file not shown.
1 change: 1 addition & 0 deletions dataset/npy-3d/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
copy npy(id,i64) from "dataset/npy-3d/id_int64.npy", "dataset/npy-3d/three_dim_int64.npy";
Binary file added dataset/npy-3d/id_int64.npy
Binary file not shown.
1 change: 1 addition & 0 deletions dataset/npy-3d/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create node table npy (id INT64,i64 INT64[12],PRIMARY KEY(id));
Binary file added dataset/npy-3d/three_dim_int64.npy
Binary file not shown.
5 changes: 4 additions & 1 deletion src/antlr4/Cypher.g4
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ oC_Cypher
: SP ? oC_AnyCypherOption? SP? ( oC_Statement | kU_DDL | kU_CopyCSV ) ( SP? ';' )? SP? EOF ;

kU_CopyCSV
: COPY SP oC_SchemaName SP FROM SP kU_FilePaths ( SP? '(' SP? kU_ParsingOptions SP? ')' )? ;
: COPY SP oC_SchemaName kU_CopyCSV_Properties? SP FROM SP kU_FilePaths (SP? ',' SP? kU_FilePaths)* ( SP? '(' SP? kU_ParsingOptions SP? ')' )? ;

kU_CopyCSV_Properties
: ( SP? '(' SP? oC_PropertyKeyName ( SP? ',' SP? oC_PropertyKeyName )* SP? ')' SP? );

kU_FilePaths
: '[' SP? StringLiteral ( SP? ',' SP? StringLiteral )* ']'
Expand Down
83 changes: 67 additions & 16 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,59 @@ std::unique_ptr<BoundStatement> Binder::bindCopy(const Statement& statement) {
validateTableExist(catalog, tableName);
auto tableID = catalogContent->getTableID(tableName);
auto csvReaderConfig = bindParsingOptions(copyCSV.getParsingOptions());
auto filePaths = bindFilePaths(copyCSV.getFilePaths());
auto fileType = bindFileType(filePaths);
auto boundFilePaths = bindFilePaths(copyCSV.getFilePaths());
auto fileType = bindFileType(boundFilePaths);
std::vector<std::string> filePaths;
std::unordered_map<std::string, std::string> propertyToNpyMap;
if (fileType == CopyDescription::FileType::NPY) {
propertyToNpyMap = bindPropertyToNpyMap(boundFilePaths, copyCSV.getPropertyNames());
} else {
filePaths = boundFilePaths[0];
}
return make_unique<BoundCopy>(
CopyDescription(filePaths, csvReaderConfig, fileType), tableID, tableName);
CopyDescription(filePaths, propertyToNpyMap, csvReaderConfig, fileType), tableID,
tableName);
}

std::vector<std::string> Binder::bindFilePaths(const std::vector<std::string>& filePaths) {
std::vector<std::string> boundFilePaths;
for (auto& filePath : filePaths) {
auto globbedFilePaths = FileUtils::globFilePath(filePath);
boundFilePaths.insert(
boundFilePaths.end(), globbedFilePaths.begin(), globbedFilePaths.end());
}
if (boundFilePaths.empty()) {
throw BinderException{StringUtils::string_format("Invalid file path: {}.", filePaths[0])};
std::vector<std::vector<std::string>> Binder::bindFilePaths(
const std::vector<std::vector<std::string>>& filePaths) {
std::vector<std::vector<std::string>> boundFilePaths;
for (auto& currPaths : filePaths) {
std::vector<std::string> currBoundFilePaths;
for (auto& filePath : currPaths) {
auto globbedFilePaths = FileUtils::globFilePath(filePath);
currBoundFilePaths.insert(
currBoundFilePaths.end(), globbedFilePaths.begin(), globbedFilePaths.end());
if (currBoundFilePaths.empty()) {
throw BinderException{
StringUtils::string_format("Invalid file path: {}.", currPaths[0])};
}
}
boundFilePaths.emplace_back(currBoundFilePaths);
}
return boundFilePaths;
}

std::unordered_map<std::string, std::string> Binder::bindPropertyToNpyMap(
const std::vector<std::vector<std::string>>& filePaths,
const std::vector<std::string>& propertyNames) {
if (filePaths.size() != propertyNames.size()) {
throw BinderException("The number of file paths and property names must be the same for "
"loading from npy files.");
}
std::unordered_map<std::string, std::string> propertyToNpyMap;
for (int i = 0; i < filePaths.size(); i++) {
auto& currFilePaths = filePaths[i];
auto& currPropertyName = propertyNames[i];
if (currFilePaths.size() != 1) {
throw BinderException(
"Only one file path is allowed for each property when loading from npy files.");
}
propertyToNpyMap[currPropertyName] = currFilePaths[0];
}
return propertyToNpyMap;
}

CSVReaderConfig Binder::bindParsingOptions(
const std::unordered_map<std::string, std::unique_ptr<ParsedExpression>>* parsingOptions) {
CSVReaderConfig csvReaderConfig;
Expand Down Expand Up @@ -91,19 +125,36 @@ char Binder::bindParsingOptionValue(std::string value) {
return value[value.length() - 1];
}

CopyDescription::FileType Binder::bindFileType(std::vector<std::string> filePaths) {
CopyDescription::FileType Binder::bindFileType(std::vector<std::vector<std::string>> filePaths) {
// We currently only support loading from files with the same type. Loading files with different
// types is not supported.
auto fileName = filePaths[0];
auto fileName = filePaths[0][0];
auto csvSuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::CSV);
auto parquetSuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::PARQUET);
auto npySuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::NPY);
CopyDescription::FileType fileType;
std::string expectedSuffix;
if (fileName.ends_with(csvSuffix)) {
return CopyDescription::FileType::CSV;
fileType = CopyDescription::FileType::CSV;
expectedSuffix = csvSuffix;
} else if (fileName.ends_with(parquetSuffix)) {
return CopyDescription::FileType::PARQUET;
fileType = CopyDescription::FileType::PARQUET;
expectedSuffix = parquetSuffix;
} else if (fileName.ends_with(npySuffix)) {
fileType = CopyDescription::FileType::NPY;
expectedSuffix = npySuffix;
} else {
throw CopyException("Unsupported file type: " + fileName);
}
for (auto& currPaths : filePaths) {
for (auto& path : currPaths) {
if (!path.ends_with(expectedSuffix)) {
throw CopyException(
"Loading files with different types is not currently supported.");
}
}
}
return fileType;
}

} // namespace binder
Expand Down
13 changes: 9 additions & 4 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ using namespace kuzu::utf8proc;

namespace kuzu {
namespace common {
CopyDescription::CopyDescription(
const std::vector<std::string>& filePaths, CSVReaderConfig csvReaderConfig, FileType fileType)
: filePaths{filePaths}, csvReaderConfig{nullptr}, fileType{fileType} {
CopyDescription::CopyDescription(const std::vector<std::string>& filePaths,
std::unordered_map<std::string, std::string> propertyToNpyMap, CSVReaderConfig csvReaderConfig,
FileType fileType)
: filePaths{filePaths}, propertyToNpyMap{propertyToNpyMap},
csvReaderConfig{nullptr}, fileType{fileType} {
if (fileType == FileType::CSV) {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(csvReaderConfig);
}
}

CopyDescription::CopyDescription(const CopyDescription& copyDescription)
: filePaths{copyDescription.filePaths},
: filePaths{copyDescription.filePaths}, propertyToNpyMap{copyDescription.propertyToNpyMap},
csvReaderConfig{nullptr}, fileType{copyDescription.fileType} {
if (fileType == FileType::CSV) {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(*copyDescription.csvReaderConfig);
Expand All @@ -31,6 +33,9 @@ std::string CopyDescription::getFileTypeName(FileType fileType) {
case FileType::PARQUET: {
return "parquet";
}
case FileType::NPY: {
return "npy";
}
}
}

Expand Down
9 changes: 7 additions & 2 deletions src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,20 @@ class Binder {
/*** bind copy csv ***/
std::unique_ptr<BoundStatement> bindCopy(const parser::Statement& statement);

std::vector<std::string> bindFilePaths(const std::vector<std::string>& filePaths);
std::vector<std::vector<std::string>> bindFilePaths(
const std::vector<std::vector<std::string>>& filePaths);

std::unordered_map<std::string, std::string> bindPropertyToNpyMap(
const std::vector<std::vector<std::string>>& filePaths,
const std::vector<std::string>& propertyNames);

common::CSVReaderConfig bindParsingOptions(
const std::unordered_map<std::string, std::unique_ptr<parser::ParsedExpression>>*
parsingOptions);
void bindStringParsingOptions(common::CSVReaderConfig& csvReaderConfig,
const std::string& optionName, std::string& optionValue);
char bindParsingOptionValue(std::string value);
common::CopyDescription::FileType bindFileType(std::vector<std::string> filePaths);
common::CopyDescription::FileType bindFileType(std::vector<std::vector<std::string>> filePaths);

/*** bind query ***/
std::unique_ptr<BoundRegularQuery> bindQuery(const parser::RegularQuery& regularQuery);
Expand Down
3 changes: 3 additions & 0 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ struct CopyConstants {
// Lower bound for number of incomplete tasks in copier to trigger scheduling a new batch.
static constexpr uint64_t MINIMUM_NUM_COPIER_TASKS_TO_SCHEDULE_MORE = 50;

// Number of rows per block for npy files
static constexpr uint64_t NUM_ROWS_PER_BLOCK_FOR_NPY = 2048;

// Default configuration for csv file parsing
static constexpr const char* STRING_CSV_PARSING_OPTIONS[5] = {
"ESCAPE", "DELIM", "QUOTE", "LIST_BEGIN", "LIST_END"};
Expand Down
8 changes: 5 additions & 3 deletions src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ struct CSVReaderConfig {
};

struct CopyDescription {
enum class FileType : uint8_t { CSV = 0, PARQUET = 1 };
enum class FileType : uint8_t { CSV = 0, PARQUET = 1, NPY = 2 };

CopyDescription(const std::vector<std::string>& filePaths, CSVReaderConfig csvReaderConfig,
FileType fileType);
CopyDescription(const std::vector<std::string>& filePaths,
std::unordered_map<std::string, std::string> propertyToNpyMap,
CSVReaderConfig csvReaderConfig, FileType fileType);

CopyDescription(const CopyDescription& copyDescription);

Expand All @@ -46,6 +47,7 @@ struct CopyDescription {

const std::vector<std::string> filePaths;
std::unique_ptr<CSVReaderConfig> csvReaderConfig;
std::unordered_map<std::string, std::string> propertyToNpyMap;
FileType fileType;
};

Expand Down
12 changes: 8 additions & 4 deletions src/include/parser/copy_csv/copy_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,24 @@ namespace parser {

class CopyCSV : public Statement {
public:
explicit CopyCSV(std::vector<std::string> filePaths, std::string tableName,
explicit CopyCSV(std::vector<std::vector<std::string>> filePaths,
std::vector<std::string> propertyNames, std::string tableName,
std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> parsingOptions)
: Statement{common::StatementType::COPY_CSV}, filePaths{std::move(filePaths)},
tableName{std::move(tableName)}, parsingOptions{std::move(parsingOptions)} {}
propertyNames{std::move(propertyNames)}, tableName{std::move(tableName)},
parsingOptions{std::move(parsingOptions)} {}

inline std::vector<std::string> getFilePaths() const { return filePaths; }
inline std::vector<std::vector<std::string>> getFilePaths() const { return filePaths; }
inline std::vector<std::string> getPropertyNames() const { return propertyNames; }
inline std::string getTableName() const { return tableName; }
inline std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> const*
getParsingOptions() const {
return &parsingOptions;
}

private:
std::vector<std::string> filePaths;
std::vector<std::vector<std::string>> filePaths;
std::vector<std::string> propertyNames;
std::string tableName;
std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> parsingOptions;
};
Expand Down
6 changes: 5 additions & 1 deletion src/include/parser/transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,11 @@ class Transformer {

std::unique_ptr<Statement> transformCopyCSV(CypherParser::KU_CopyCSVContext& ctx);

std::vector<std::string> transformFilePaths(CypherParser::KU_FilePathsContext& ctx);
std::vector<std::vector<std::string>> transformFilePaths(
std::vector<CypherParser::KU_FilePathsContext*> ctx);

std::vector<std::string> transformCopyCSVPropertyNames(
CypherParser::KU_CopyCSV_PropertiesContext& ctx);

std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> transformParsingOptions(
CypherParser::KU_ParsingOptionsContext& ctx);
Expand Down
75 changes: 75 additions & 0 deletions src/include/storage/copier/copy_node_npy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#pragma once

#include "common/copier_config/copier_config.h"
#include "common/logging_level_utils.h"
#include "common/task_system/task_scheduler.h"
#include "npy_reader.h"
#include "storage/in_mem_storage_structure/in_mem_column.h"
#include "storage/index/hash_index_builder.h"
#include "storage/store/nodes_statistics_and_deleted_ids.h"

using namespace kuzu::catalog;
using namespace kuzu::common;

namespace kuzu {
namespace storage {

class CopyNodeNpy {

public:
CopyNodeNpy(CopyDescription& copyDescription, std::string outputDirectory,
TaskScheduler& taskScheduler, catalog::Catalog& catalog, table_id_t tableID,
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs)
: logger{LoggerUtils::getLogger(LoggerConstants::LoggerEnum::LOADER)},
copyDescription{copyDescription}, outputDirectory{std::move(outputDirectory)},
taskScheduler{taskScheduler}, catalog{catalog}, tableID{tableID},
nodesStatisticsAndDeletedIDs{nodesStatisticsAndDeletedIDs},
tableSchema{catalog.getReadOnlyVersion()->getTableSchema(tableID)} {}

uint64_t copy();

~CopyNodeNpy() = default;

private:
inline void updateTableStatistics() {
nodesStatisticsAndDeletedIDs->setNumTuplesForTable(tableSchema->tableID, numRows);
}

void initializeColumnsAndLists();

void initializeNpyReaders();

void validateNpyReaders();

void populateColumns();

void populateColumnsFromNpy(std::unique_ptr<HashIndexBuilder<int64_t>>& pkIndex);

void assignCopyNpyTasks(
const std::string& property, std::unique_ptr<HashIndexBuilder<int64_t>>& pkIndex);

static void batchPopulateColumnsTask(std::string primaryKeyPropertyIdx, uint64_t blockIdx,
offset_t startOffset, uint64_t numLinesInCurBlock, HashIndexBuilder<int64_t>* pkIndex,
CopyNodeNpy* copier, std::string property);

static void populatePKIndex(InMemColumn* column, HashIndexBuilder<int64_t>* pkIndex,
uint64_t startOffset, uint64_t numValues);

void saveToFile();

private:
std::shared_ptr<spdlog::logger> logger;
CopyDescription copyDescription;
std::string outputDirectory;
TaskScheduler& taskScheduler;
catalog::Catalog& catalog;
table_id_t tableID;
std::unordered_map<std::string, std::unique_ptr<InMemColumn>> columnMap;
std::unordered_map<std::string, std::unique_ptr<NpyReader>> npyReaderMap;
catalog::TableSchema* tableSchema;
uint64_t numRows;
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs;
};

} // namespace storage
} // namespace kuzu
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit 46ebc4b

Please sign in to comment.