Skip to content

Commit

Permalink
Copy node tables from npy files
Browse files Browse the repository at this point in the history
  • Loading branch information
mewim committed Mar 28, 2023
1 parent 3f574a4 commit d740b29
Show file tree
Hide file tree
Showing 64 changed files with 4,622 additions and 3,120 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ include_directories(third_party/antlr4_cypher/include)
include_directories(third_party/antlr4_runtime/src)
include_directories(third_party/spdlog)
include_directories(third_party/nlohmann_json)
include_directories(third_party/pyparse)
include_directories(third_party/utf8proc/include)
include_directories(third_party/pybind11/include)
include_directories(third_party/re2/include)
Expand Down
1 change: 1 addition & 0 deletions dataset/npy-1d/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
copy npytable fromnpy ("dataset/npy-1d/one_dim_int64.npy", "dataset/npy-1d/one_dim_int32.npy", "dataset/npy-1d/one_dim_int16.npy", "dataset/npy-1d/one_dim_double.npy", "dataset/npy-1d/one_dim_float.npy");
Binary file added dataset/npy-1d/fortran_order.npy
Binary file not shown.
Binary file added dataset/npy-1d/one_dim_double.npy
Binary file not shown.
Binary file added dataset/npy-1d/one_dim_float.npy
Binary file not shown.
Binary file added dataset/npy-1d/one_dim_int16.npy
Binary file not shown.
Binary file added dataset/npy-1d/one_dim_int32.npy
Binary file not shown.
Binary file added dataset/npy-1d/one_dim_int64.npy
Binary file not shown.
1 change: 1 addition & 0 deletions dataset/npy-1d/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create node table npytable (i64 INT64,i32 INT32,i16 INT16,f64 DOUBLE,f32 FLOAT, PRIMARY KEY(i64));
1 change: 1 addition & 0 deletions dataset/npy-20k/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
copy npytable fromNPY ("dataset/npy-20k/id_int64.npy", "dataset/npy-20k/two_dim_float.npy");
Binary file added dataset/npy-20k/id_int64.npy
Binary file not shown.
1 change: 1 addition & 0 deletions dataset/npy-20k/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create node table npytable (id INT64,f32 FLOAT[10],PRIMARY KEY(id));
Binary file added dataset/npy-20k/two_dim_float.npy
Binary file not shown.
1 change: 1 addition & 0 deletions dataset/npy-2d/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
copy npytable fromNPY ("dataset/npy-2d/id_int64.npy", "dataset/npy-2d/two_dim_int64.npy", "dataset/npy-2d/two_dim_int32.npy", "dataset/npy-2d/two_dim_int16.npy", "dataset/npy-2d/two_dim_double.npy", "dataset/npy-2d/two_dim_float.npy");
Binary file added dataset/npy-2d/id_int64.npy
Binary file not shown.
1 change: 1 addition & 0 deletions dataset/npy-2d/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create node table npytable (id INT64, i64 INT64[3],i32 INT32[3],i16 INT16[3],f64 DOUBLE[3],f32 FLOAT[3],PRIMARY KEY(id));
Binary file added dataset/npy-2d/three_dim_int64.npy
Binary file not shown.
Binary file added dataset/npy-2d/two_dim_double.npy
Binary file not shown.
Binary file added dataset/npy-2d/two_dim_float.npy
Binary file not shown.
Binary file added dataset/npy-2d/two_dim_int16.npy
Binary file not shown.
Binary file added dataset/npy-2d/two_dim_int32.npy
Binary file not shown.
Binary file added dataset/npy-2d/two_dim_int64.npy
Binary file not shown.
1 change: 1 addition & 0 deletions dataset/npy-3d/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
copy npytable FROMnpy ("dataset/npy-3d/id_int64.npy", "dataset/npy-3d/three_dim_int64.npy");
Binary file added dataset/npy-3d/id_int64.npy
Binary file not shown.
1 change: 1 addition & 0 deletions dataset/npy-3d/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create node table npytable (id INT64,i64 INT64[12],PRIMARY KEY(id));
Binary file added dataset/npy-3d/three_dim_int64.npy
Binary file not shown.
7 changes: 6 additions & 1 deletion src/antlr4/Cypher.g4
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ grammar Cypher;
}

oC_Cypher
: SP ? oC_AnyCypherOption? SP? ( oC_Statement | kU_DDL | kU_CopyCSV ) ( SP? ';' )? SP? EOF ;
: SP ? oC_AnyCypherOption? SP? ( oC_Statement | kU_DDL | kU_CopyNPY | kU_CopyCSV ) ( SP? ';' )? SP? EOF ;

kU_CopyCSV
: COPY SP oC_SchemaName SP FROM SP kU_FilePaths ( SP? '(' SP? kU_ParsingOptions SP? ')' )? ;

kU_CopyNPY
: COPY SP oC_SchemaName SP FROMNPY ( SP '(' SP? StringLiteral ( SP? ',' SP? StringLiteral )* ')' ) ;

kU_FilePaths
: '[' SP? StringLiteral ( SP? ',' SP? StringLiteral )* ']'
| StringLiteral
Expand All @@ -36,6 +39,8 @@ COPY : ( 'C' | 'c' ) ( 'O' | 'o' ) ( 'P' | 'p') ( 'Y' | 'y' ) ;

FROM : ( 'F' | 'f' ) ( 'R' | 'r' ) ( 'O' | 'o' ) ( 'M' | 'm' );

FROMNPY : ( 'F' | 'f' ) ( 'R' | 'r' ) ( 'O' | 'o' ) ( 'M' | 'm' ) ( 'N' | 'n' ) ( 'P' | 'p' ) ( 'Y' | 'y' ) ;

kU_DDL
: kU_CreateNode
| kU_CreateRel
Expand Down
56 changes: 51 additions & 5 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,24 @@ std::unique_ptr<BoundStatement> Binder::bindCopy(const Statement& statement) {
validateTableExist(catalog, tableName);
auto tableID = catalogContent->getTableID(tableName);
auto csvReaderConfig = bindParsingOptions(copyCSV.getParsingOptions());
auto filePaths = bindFilePaths(copyCSV.getFilePaths());
auto fileType = bindFileType(filePaths);
auto boundFilePaths = bindFilePaths(copyCSV.getFilePaths());
auto actualFileType = bindFileType(boundFilePaths);
auto expectedFileType = copyCSV.getFileType();
std::unordered_map<common::property_id_t, std::string> propertyToNpyMap;
if (expectedFileType == common::CopyDescription::FileType::UNKNOWN &&
actualFileType == common::CopyDescription::FileType::NPY) {
throw BinderException("Please use COPY FROMNPY statement for copying npy files.");
}
if (expectedFileType == common::CopyDescription::FileType::NPY &&
actualFileType != expectedFileType) {
throw BinderException("Please use COPY FROM statement for copying csv and parquet files.");
}
if (actualFileType == CopyDescription::FileType::NPY) {
propertyToNpyMap = bindPropertyToNpyMap(tableID, boundFilePaths);
}
return make_unique<BoundCopy>(
CopyDescription(filePaths, csvReaderConfig, fileType), tableID, tableName);
CopyDescription(boundFilePaths, propertyToNpyMap, csvReaderConfig, actualFileType), tableID,
tableName);
}

std::vector<std::string> Binder::bindFilePaths(const std::vector<std::string>& filePaths) {
Expand All @@ -35,6 +49,24 @@ std::vector<std::string> Binder::bindFilePaths(const std::vector<std::string>& f
return boundFilePaths;
}

std::unordered_map<common::property_id_t, std::string> Binder::bindPropertyToNpyMap(
common::table_id_t tableID, const std::vector<std::string>& filePaths) {
auto catalogContent = catalog.getReadOnlyVersion();
auto tableSchema = catalogContent->getTableSchema(tableID);
if (tableSchema->properties.size() != filePaths.size()) {
throw BinderException(StringUtils::string_format(
"Number of npy files is not equal to number of properties in table {}.",
tableSchema->tableName));
}
std::unordered_map<common::property_id_t, std::string> propertyIdxToNpyMap;
for (int i = 0; i < filePaths.size(); i++) {
auto& filePath = filePaths[i];
auto& propertyID = tableSchema->properties[i].propertyID;
propertyIdxToNpyMap[propertyID] = filePath;
}
return propertyIdxToNpyMap;
}

CSVReaderConfig Binder::bindParsingOptions(
const std::unordered_map<std::string, std::unique_ptr<ParsedExpression>>* parsingOptions) {
CSVReaderConfig csvReaderConfig;
Expand Down Expand Up @@ -97,13 +129,27 @@ CopyDescription::FileType Binder::bindFileType(std::vector<std::string> filePath
auto fileName = filePaths[0];
auto csvSuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::CSV);
auto parquetSuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::PARQUET);
auto npySuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::NPY);
CopyDescription::FileType fileType;
std::string expectedSuffix;
if (fileName.ends_with(csvSuffix)) {
return CopyDescription::FileType::CSV;
fileType = CopyDescription::FileType::CSV;
expectedSuffix = csvSuffix;
} else if (fileName.ends_with(parquetSuffix)) {
return CopyDescription::FileType::PARQUET;
fileType = CopyDescription::FileType::PARQUET;
expectedSuffix = parquetSuffix;
} else if (fileName.ends_with(npySuffix)) {
fileType = CopyDescription::FileType::NPY;
expectedSuffix = npySuffix;
} else {
throw CopyException("Unsupported file type: " + fileName);
}
for (auto& path : filePaths) {
if (!path.ends_with(expectedSuffix)) {
throw CopyException("Loading files with different types is not currently supported.");
}
}
return fileType;
}

} // namespace binder
Expand Down
13 changes: 9 additions & 4 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ using namespace kuzu::utf8proc;

namespace kuzu {
namespace common {
CopyDescription::CopyDescription(
const std::vector<std::string>& filePaths, CSVReaderConfig csvReaderConfig, FileType fileType)
: filePaths{filePaths}, csvReaderConfig{nullptr}, fileType{fileType} {
CopyDescription::CopyDescription(const std::vector<std::string>& filePaths,
std::unordered_map<common::property_id_t, std::string> propertyToNpyMap,
CSVReaderConfig csvReaderConfig, FileType fileType)
: filePaths{filePaths}, propertyIDToNpyMap{std::move(propertyToNpyMap)},
csvReaderConfig{nullptr}, fileType{fileType} {
if (fileType == FileType::CSV) {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(csvReaderConfig);
}
}

CopyDescription::CopyDescription(const CopyDescription& copyDescription)
: filePaths{copyDescription.filePaths},
: filePaths{copyDescription.filePaths}, propertyIDToNpyMap{copyDescription.propertyIDToNpyMap},
csvReaderConfig{nullptr}, fileType{copyDescription.fileType} {
if (fileType == FileType::CSV) {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(*copyDescription.csvReaderConfig);
Expand All @@ -31,6 +33,9 @@ std::string CopyDescription::getFileTypeName(FileType fileType) {
case FileType::PARQUET: {
return "parquet";
}
case FileType::NPY: {
return "npy";
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class Binder {

std::vector<std::string> bindFilePaths(const std::vector<std::string>& filePaths);

std::unordered_map<common::property_id_t, std::string> bindPropertyToNpyMap(
common::table_id_t tableId, const std::vector<std::string>& filePaths);

common::CSVReaderConfig bindParsingOptions(
const std::unordered_map<std::string, std::unique_ptr<parser::ParsedExpression>>*
parsingOptions);
Expand Down
3 changes: 3 additions & 0 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ struct CopyConstants {
// Lower bound for number of incomplete tasks in copier to trigger scheduling a new batch.
static constexpr uint64_t MINIMUM_NUM_COPIER_TASKS_TO_SCHEDULE_MORE = 50;

// Number of rows per block for npy files
static constexpr uint64_t NUM_ROWS_PER_BLOCK_FOR_NPY = 2048;

// Default configuration for csv file parsing
static constexpr const char* STRING_CSV_PARSING_OPTIONS[5] = {
"ESCAPE", "DELIM", "QUOTE", "LIST_BEGIN", "LIST_END"};
Expand Down
8 changes: 5 additions & 3 deletions src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ struct CSVReaderConfig {
};

struct CopyDescription {
enum class FileType : uint8_t { CSV = 0, PARQUET = 1 };
enum class FileType : uint8_t { UNKNOWN = 0, CSV = 1, PARQUET = 2, NPY = 3 };

CopyDescription(const std::vector<std::string>& filePaths, CSVReaderConfig csvReaderConfig,
FileType fileType);
CopyDescription(const std::vector<std::string>& filePaths,
std::unordered_map<common::property_id_t, std::string> propertyToNpyMap,
CSVReaderConfig csvReaderConfig, FileType fileType);

CopyDescription(const CopyDescription& copyDescription);

Expand All @@ -46,6 +47,7 @@ struct CopyDescription {

const std::vector<std::string> filePaths;
std::unique_ptr<CSVReaderConfig> csvReaderConfig;
std::unordered_map<common::property_id_t, std::string> propertyIDToNpyMap;
FileType fileType;
};

Expand Down
6 changes: 6 additions & 0 deletions src/include/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,11 @@ static uint64_t nextPowerOfTwo(uint64_t v) {
return v;
}

static bool isLittleEndian() {
// Little endian arch stores the least significant value in the lower bytes.
int testNumber = 1;
return *(uint8_t*)&testNumber == 1;
}

} // namespace common
} // namespace kuzu
8 changes: 6 additions & 2 deletions src/include/parser/copy_csv/copy_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,23 @@ namespace parser {
class CopyCSV : public Statement {
public:
explicit CopyCSV(std::vector<std::string> filePaths, std::string tableName,
std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> parsingOptions)
std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> parsingOptions,
common::CopyDescription::FileType fileType)
: Statement{common::StatementType::COPY_CSV}, filePaths{std::move(filePaths)},
tableName{std::move(tableName)}, parsingOptions{std::move(parsingOptions)} {}
tableName{std::move(tableName)},
parsingOptions{std::move(parsingOptions)}, fileType{fileType} {}

inline std::vector<std::string> getFilePaths() const { return filePaths; }
inline std::string getTableName() const { return tableName; }
inline std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> const*
getParsingOptions() const {
return &parsingOptions;
}
inline common::CopyDescription::FileType getFileType() const { return fileType; }

private:
std::vector<std::string> filePaths;
common::CopyDescription::FileType fileType;
std::string tableName;
std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> parsingOptions;
};
Expand Down
5 changes: 4 additions & 1 deletion src/include/parser/transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,10 @@ class Transformer {

std::unique_ptr<Statement> transformCopyCSV(CypherParser::KU_CopyCSVContext& ctx);

std::vector<std::string> transformFilePaths(CypherParser::KU_FilePathsContext& ctx);
std::unique_ptr<Statement> transformCopyNPY(CypherParser::KU_CopyNPYContext& ctx);

std::vector<std::string> transformFilePaths(
std::vector<antlr4::tree::TerminalNode*> stringLiteral);

std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> transformParsingOptions(
CypherParser::KU_ParsingOptionsContext& ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ class OrderByKeyEncoder {
assert(false);
}

static bool isLittleEndian();

void flipBytesIfNecessary(
uint32_t keyColIdx, uint8_t* tuplePtr, uint32_t numEntriesToEncode, common::DataType& type);

Expand Down
52 changes: 52 additions & 0 deletions src/include/storage/copier/copy_node_npy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#pragma once

#include "common/copier_config/copier_config.h"
#include "common/logging_level_utils.h"
#include "common/task_system/task_scheduler.h"
#include "node_copier.h"
#include "npy_reader.h"
#include "storage/in_mem_storage_structure/in_mem_column.h"
#include "storage/index/hash_index_builder.h"
#include "storage/store/nodes_statistics_and_deleted_ids.h"

using namespace kuzu::catalog;
using namespace kuzu::common;

namespace kuzu {
namespace storage {

class CopyNodeNpy : public NodeCopier {

public:
CopyNodeNpy(CopyDescription& copyDescription, std::string outputDirectory,
TaskScheduler& taskScheduler, catalog::Catalog& catalog, table_id_t tableID,
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs)
: NodeCopier(copyDescription, outputDirectory, taskScheduler, catalog, tableID,
nodesStatisticsAndDeletedIDs){};

~CopyNodeNpy() = default;

private:
void populateColumnsAndLists() override;

void populateInMemoryStructures() override;

void initializeNpyReaders();

void validateNpyReaders();

void populateColumnsFromNpy(std::unique_ptr<HashIndexBuilder<int64_t>>& pkIndex);

void assignCopyNpyTasks(
common::property_id_t propertyIdx, std::unique_ptr<HashIndexBuilder<int64_t>>& pkIndex);

static void batchPopulateColumnsTask(common::property_id_t primaryKeyPropertyIdx,
uint64_t blockIdx, offset_t startOffset, uint64_t numLinesInCurBlock,
HashIndexBuilder<int64_t>* pkIndex, CopyNodeNpy* copier, common::property_id_t propertyIdx);

private:
std::unordered_map<common::property_id_t, std::unique_ptr<NpyReader>> npyReaderMap;
};

} // namespace storage
} // namespace kuzu
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ class CopyStructuresArrow {
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog,
common::table_id_t tableID);

uint64_t copy();

virtual ~CopyStructuresArrow() = default;

uint64_t copy();

protected:
virtual void updateTableStatistics() = 0;

Expand All @@ -51,7 +51,7 @@ class CopyStructuresArrow {

virtual void saveToFile() = 0;

void populateInMemoryStructures();
virtual void populateInMemoryStructures();

void countNumLines(const std::vector<std::string>& filePath);

Expand Down
File renamed without changes.
Loading

0 comments on commit d740b29

Please sign in to comment.