Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy node tables from npy files #1396

Merged
merged 1 commit into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ include_directories(third_party/antlr4_cypher/include)
include_directories(third_party/antlr4_runtime/src)
include_directories(third_party/spdlog)
include_directories(third_party/nlohmann_json)
include_directories(third_party/pyparse)
include_directories(third_party/utf8proc/include)
include_directories(third_party/pybind11/include)
include_directories(third_party/re2/include)
Expand Down
1 change: 1 addition & 0 deletions dataset/npy-1d/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
copy npytable fromnpy ("dataset/npy-1d/one_dim_int64.npy", "dataset/npy-1d/one_dim_int32.npy", "dataset/npy-1d/one_dim_int16.npy", "dataset/npy-1d/one_dim_double.npy", "dataset/npy-1d/one_dim_float.npy");
Binary file added dataset/npy-1d/fortran_order.npy
Binary file not shown.
Binary file added dataset/npy-1d/one_dim_double.npy
Binary file not shown.
Binary file added dataset/npy-1d/one_dim_float.npy
Binary file not shown.
Binary file added dataset/npy-1d/one_dim_int16.npy
Binary file not shown.
Binary file added dataset/npy-1d/one_dim_int32.npy
Binary file not shown.
Binary file added dataset/npy-1d/one_dim_int64.npy
Binary file not shown.
1 change: 1 addition & 0 deletions dataset/npy-1d/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create node table npytable (i64 INT64,i32 INT32,i16 INT16,f64 DOUBLE,f32 FLOAT, PRIMARY KEY(i64));
1 change: 1 addition & 0 deletions dataset/npy-20k/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
copy npytable fromNPY ("dataset/npy-20k/id_int64.npy", "dataset/npy-20k/two_dim_float.npy");
Binary file added dataset/npy-20k/id_int64.npy
Binary file not shown.
1 change: 1 addition & 0 deletions dataset/npy-20k/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create node table npytable (id INT64,f32 FLOAT[10],PRIMARY KEY(id));
Binary file added dataset/npy-20k/two_dim_float.npy
Binary file not shown.
1 change: 1 addition & 0 deletions dataset/npy-2d/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
copy npytable fromNPY ("dataset/npy-2d/id_int64.npy", "dataset/npy-2d/two_dim_int64.npy", "dataset/npy-2d/two_dim_int32.npy", "dataset/npy-2d/two_dim_int16.npy", "dataset/npy-2d/two_dim_double.npy", "dataset/npy-2d/two_dim_float.npy");
Binary file added dataset/npy-2d/id_int64.npy
Binary file not shown.
1 change: 1 addition & 0 deletions dataset/npy-2d/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create node table npytable (id INT64, i64 INT64[3],i32 INT32[3],i16 INT16[3],f64 DOUBLE[3],f32 FLOAT[3],PRIMARY KEY(id));
Binary file added dataset/npy-2d/three_dim_int64.npy
Binary file not shown.
Binary file added dataset/npy-2d/two_dim_double.npy
Binary file not shown.
Binary file added dataset/npy-2d/two_dim_float.npy
Binary file not shown.
Binary file added dataset/npy-2d/two_dim_int16.npy
Binary file not shown.
Binary file added dataset/npy-2d/two_dim_int32.npy
Binary file not shown.
Binary file added dataset/npy-2d/two_dim_int64.npy
Binary file not shown.
1 change: 1 addition & 0 deletions dataset/npy-3d/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
copy npytable FROMnpy ("dataset/npy-3d/id_int64.npy", "dataset/npy-3d/three_dim_int64.npy");
Binary file added dataset/npy-3d/id_int64.npy
Binary file not shown.
1 change: 1 addition & 0 deletions dataset/npy-3d/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create node table npytable (id INT64,i64 INT64[12],PRIMARY KEY(id));
Binary file added dataset/npy-3d/three_dim_int64.npy
Binary file not shown.
7 changes: 6 additions & 1 deletion src/antlr4/Cypher.g4
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ grammar Cypher;
}

oC_Cypher
: SP ? oC_AnyCypherOption? SP? ( oC_Statement | kU_DDL | kU_CopyCSV ) ( SP? ';' )? SP? EOF ;
: SP ? oC_AnyCypherOption? SP? ( oC_Statement | kU_DDL | kU_CopyNPY | kU_CopyCSV ) ( SP? ';' )? SP? EOF ;

kU_CopyCSV
: COPY SP oC_SchemaName SP FROM SP kU_FilePaths ( SP? '(' SP? kU_ParsingOptions SP? ')' )? ;

kU_CopyNPY
: COPY SP oC_SchemaName SP FROMNPY ( SP '(' SP? StringLiteral ( SP? ',' SP? StringLiteral )* ')' ) ;

kU_FilePaths
: '[' SP? StringLiteral ( SP? ',' SP? StringLiteral )* ']'
| StringLiteral
Expand All @@ -36,6 +39,8 @@ COPY : ( 'C' | 'c' ) ( 'O' | 'o' ) ( 'P' | 'p') ( 'Y' | 'y' ) ;

FROM : ( 'F' | 'f' ) ( 'R' | 'r' ) ( 'O' | 'o' ) ( 'M' | 'm' );

FROMNPY : ( 'F' | 'f' ) ( 'R' | 'r' ) ( 'O' | 'o' ) ( 'M' | 'm' ) ( 'N' | 'n' ) ( 'P' | 'p' ) ( 'Y' | 'y' ) ;

kU_DDL
: kU_CreateNode
| kU_CreateRel
Expand Down
56 changes: 51 additions & 5 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,24 @@ std::unique_ptr<BoundStatement> Binder::bindCopy(const Statement& statement) {
validateTableExist(catalog, tableName);
auto tableID = catalogContent->getTableID(tableName);
auto csvReaderConfig = bindParsingOptions(copyCSV.getParsingOptions());
auto filePaths = bindFilePaths(copyCSV.getFilePaths());
auto fileType = bindFileType(filePaths);
auto boundFilePaths = bindFilePaths(copyCSV.getFilePaths());
auto actualFileType = bindFileType(boundFilePaths);
auto expectedFileType = copyCSV.getFileType();
std::unordered_map<common::property_id_t, std::string> propertyToNpyMap;
if (expectedFileType == common::CopyDescription::FileType::UNKNOWN &&
actualFileType == common::CopyDescription::FileType::NPY) {
throw BinderException("Please use COPY FROMNPY statement for copying npy files.");
}
if (expectedFileType == common::CopyDescription::FileType::NPY &&
actualFileType != expectedFileType) {
throw BinderException("Please use COPY FROM statement for copying csv and parquet files.");
}
if (actualFileType == CopyDescription::FileType::NPY) {
propertyToNpyMap = bindPropertyToNpyMap(tableID, boundFilePaths);
}
return make_unique<BoundCopy>(
CopyDescription(filePaths, csvReaderConfig, fileType), tableID, tableName);
CopyDescription(boundFilePaths, propertyToNpyMap, csvReaderConfig, actualFileType), tableID,
tableName);
}

std::vector<std::string> Binder::bindFilePaths(const std::vector<std::string>& filePaths) {
Expand All @@ -35,6 +49,24 @@ std::vector<std::string> Binder::bindFilePaths(const std::vector<std::string>& f
return boundFilePaths;
}

std::unordered_map<common::property_id_t, std::string> Binder::bindPropertyToNpyMap(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is up to u. But I think you can simply bind to vector so that it aligns with the order in schema. Also we don't need to support multiple npy files to 1 column as an experimental feature.

common::table_id_t tableID, const std::vector<std::string>& filePaths) {
auto catalogContent = catalog.getReadOnlyVersion();
auto tableSchema = catalogContent->getTableSchema(tableID);
if (tableSchema->properties.size() != filePaths.size()) {
throw BinderException(StringUtils::string_format(
"Number of npy files is not equal to number of properties in table {}.",
tableSchema->tableName));
}
std::unordered_map<common::property_id_t, std::string> propertyIdxToNpyMap;
for (int i = 0; i < filePaths.size(); i++) {
auto& filePath = filePaths[i];
auto& propertyID = tableSchema->properties[i].propertyID;
propertyIdxToNpyMap[propertyID] = filePath;
}
return propertyIdxToNpyMap;
}

CSVReaderConfig Binder::bindParsingOptions(
const std::unordered_map<std::string, std::unique_ptr<ParsedExpression>>* parsingOptions) {
CSVReaderConfig csvReaderConfig;
Expand Down Expand Up @@ -97,13 +129,27 @@ CopyDescription::FileType Binder::bindFileType(std::vector<std::string> filePath
auto fileName = filePaths[0];
auto csvSuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::CSV);
auto parquetSuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::PARQUET);
auto npySuffix = CopyDescription::getFileTypeSuffix(CopyDescription::FileType::NPY);
CopyDescription::FileType fileType;
std::string expectedSuffix;
if (fileName.ends_with(csvSuffix)) {
return CopyDescription::FileType::CSV;
fileType = CopyDescription::FileType::CSV;
expectedSuffix = csvSuffix;
} else if (fileName.ends_with(parquetSuffix)) {
return CopyDescription::FileType::PARQUET;
fileType = CopyDescription::FileType::PARQUET;
expectedSuffix = parquetSuffix;
} else if (fileName.ends_with(npySuffix)) {
fileType = CopyDescription::FileType::NPY;
expectedSuffix = npySuffix;
} else {
throw CopyException("Unsupported file type: " + fileName);
}
for (auto& path : filePaths) {
if (!path.ends_with(expectedSuffix)) {
throw CopyException("Loading files with different types is not currently supported.");
}
}
return fileType;
}

} // namespace binder
Expand Down
13 changes: 9 additions & 4 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ using namespace kuzu::utf8proc;

namespace kuzu {
namespace common {
CopyDescription::CopyDescription(
const std::vector<std::string>& filePaths, CSVReaderConfig csvReaderConfig, FileType fileType)
: filePaths{filePaths}, csvReaderConfig{nullptr}, fileType{fileType} {
CopyDescription::CopyDescription(const std::vector<std::string>& filePaths,
std::unordered_map<common::property_id_t, std::string> propertyToNpyMap,
CSVReaderConfig csvReaderConfig, FileType fileType)
: filePaths{filePaths}, propertyIDToNpyMap{std::move(propertyToNpyMap)},
csvReaderConfig{nullptr}, fileType{fileType} {
if (fileType == FileType::CSV) {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(csvReaderConfig);
}
}

CopyDescription::CopyDescription(const CopyDescription& copyDescription)
: filePaths{copyDescription.filePaths},
: filePaths{copyDescription.filePaths}, propertyIDToNpyMap{copyDescription.propertyIDToNpyMap},
csvReaderConfig{nullptr}, fileType{copyDescription.fileType} {
if (fileType == FileType::CSV) {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(*copyDescription.csvReaderConfig);
Expand All @@ -31,6 +33,9 @@ std::string CopyDescription::getFileTypeName(FileType fileType) {
case FileType::PARQUET: {
return "parquet";
}
case FileType::NPY: {
return "npy";
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ class Binder {

std::vector<std::string> bindFilePaths(const std::vector<std::string>& filePaths);

std::unordered_map<common::property_id_t, std::string> bindPropertyToNpyMap(
common::table_id_t tableId, const std::vector<std::string>& filePaths);

common::CSVReaderConfig bindParsingOptions(
const std::unordered_map<std::string, std::unique_ptr<parser::ParsedExpression>>*
parsingOptions);
Expand Down
3 changes: 3 additions & 0 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ struct CopyConstants {
// Lower bound for number of incomplete tasks in copier to trigger scheduling a new batch.
static constexpr uint64_t MINIMUM_NUM_COPIER_TASKS_TO_SCHEDULE_MORE = 50;

// Number of rows per block for npy files
static constexpr uint64_t NUM_ROWS_PER_BLOCK_FOR_NPY = 2048;

// Default configuration for csv file parsing
static constexpr const char* STRING_CSV_PARSING_OPTIONS[5] = {
"ESCAPE", "DELIM", "QUOTE", "LIST_BEGIN", "LIST_END"};
Expand Down
8 changes: 5 additions & 3 deletions src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ struct CSVReaderConfig {
};

struct CopyDescription {
enum class FileType : uint8_t { CSV = 0, PARQUET = 1 };
enum class FileType : uint8_t { UNKNOWN = 0, CSV = 1, PARQUET = 2, NPY = 3 };

CopyDescription(const std::vector<std::string>& filePaths, CSVReaderConfig csvReaderConfig,
FileType fileType);
CopyDescription(const std::vector<std::string>& filePaths,
std::unordered_map<common::property_id_t, std::string> propertyToNpyMap,
CSVReaderConfig csvReaderConfig, FileType fileType);

CopyDescription(const CopyDescription& copyDescription);

Expand All @@ -46,6 +47,7 @@ struct CopyDescription {

const std::vector<std::string> filePaths;
std::unique_ptr<CSVReaderConfig> csvReaderConfig;
std::unordered_map<common::property_id_t, std::string> propertyIDToNpyMap;
FileType fileType;
};

Expand Down
6 changes: 6 additions & 0 deletions src/include/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,11 @@ static uint64_t nextPowerOfTwo(uint64_t v) {
return v;
}

static bool isLittleEndian() {
// Little endian arch stores the least significant value in the lower bytes.
int testNumber = 1;
return *(uint8_t*)&testNumber == 1;
}

} // namespace common
} // namespace kuzu
8 changes: 6 additions & 2 deletions src/include/parser/copy_csv/copy_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,23 @@ namespace parser {
class CopyCSV : public Statement {
public:
explicit CopyCSV(std::vector<std::string> filePaths, std::string tableName,
std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> parsingOptions)
std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> parsingOptions,
common::CopyDescription::FileType fileType)
: Statement{common::StatementType::COPY_CSV}, filePaths{std::move(filePaths)},
tableName{std::move(tableName)}, parsingOptions{std::move(parsingOptions)} {}
tableName{std::move(tableName)},
parsingOptions{std::move(parsingOptions)}, fileType{fileType} {}

inline std::vector<std::string> getFilePaths() const { return filePaths; }
inline std::string getTableName() const { return tableName; }
inline std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> const*
getParsingOptions() const {
return &parsingOptions;
}
inline common::CopyDescription::FileType getFileType() const { return fileType; }

private:
std::vector<std::string> filePaths;
common::CopyDescription::FileType fileType;
std::string tableName;
std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> parsingOptions;
};
Expand Down
5 changes: 4 additions & 1 deletion src/include/parser/transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,10 @@ class Transformer {

std::unique_ptr<Statement> transformCopyCSV(CypherParser::KU_CopyCSVContext& ctx);

std::vector<std::string> transformFilePaths(CypherParser::KU_FilePathsContext& ctx);
std::unique_ptr<Statement> transformCopyNPY(CypherParser::KU_CopyNPYContext& ctx);

std::vector<std::string> transformFilePaths(
std::vector<antlr4::tree::TerminalNode*> stringLiteral);

std::unordered_map<std::string, std::unique_ptr<ParsedExpression>> transformParsingOptions(
CypherParser::KU_ParsingOptionsContext& ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ class OrderByKeyEncoder {
assert(false);
}

static bool isLittleEndian();

void flipBytesIfNecessary(
uint32_t keyColIdx, uint8_t* tuplePtr, uint32_t numEntriesToEncode, common::DataType& type);

Expand Down
52 changes: 52 additions & 0 deletions src/include/storage/copier/copy_node_npy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#pragma once
mewim marked this conversation as resolved.
Show resolved Hide resolved

#include "common/copier_config/copier_config.h"
#include "common/logging_level_utils.h"
#include "common/task_system/task_scheduler.h"
#include "node_copier.h"
#include "npy_reader.h"
#include "storage/in_mem_storage_structure/in_mem_column.h"
#include "storage/index/hash_index_builder.h"
#include "storage/store/nodes_statistics_and_deleted_ids.h"

using namespace kuzu::catalog;
using namespace kuzu::common;

namespace kuzu {
namespace storage {

class CopyNodeNpy : public NodeCopier {

public:
CopyNodeNpy(CopyDescription& copyDescription, std::string outputDirectory,
TaskScheduler& taskScheduler, catalog::Catalog& catalog, table_id_t tableID,
NodesStatisticsAndDeletedIDs* nodesStatisticsAndDeletedIDs)
: NodeCopier(copyDescription, outputDirectory, taskScheduler, catalog, tableID,
nodesStatisticsAndDeletedIDs){};

~CopyNodeNpy() = default;

private:
void populateColumnsAndLists() override;

void populateInMemoryStructures() override;

void initializeNpyReaders();

void validateNpyReaders();

void populateColumnsFromNpy(std::unique_ptr<HashIndexBuilder<int64_t>>& pkIndex);

void assignCopyNpyTasks(
common::property_id_t propertyIdx, std::unique_ptr<HashIndexBuilder<int64_t>>& pkIndex);

static void batchPopulateColumnsTask(common::property_id_t primaryKeyPropertyIdx,
uint64_t blockIdx, offset_t startOffset, uint64_t numLinesInCurBlock,
HashIndexBuilder<int64_t>* pkIndex, CopyNodeNpy* copier, common::property_id_t propertyIdx);

private:
std::unordered_map<common::property_id_t, std::unique_ptr<NpyReader>> npyReaderMap;
ray6080 marked this conversation as resolved.
Show resolved Hide resolved
};

} // namespace storage
} // namespace kuzu
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ class CopyStructuresArrow {
common::TaskScheduler& taskScheduler, catalog::Catalog& catalog,
common::table_id_t tableID);

uint64_t copy();

virtual ~CopyStructuresArrow() = default;

uint64_t copy();

protected:
virtual void updateTableStatistics() = 0;

Expand All @@ -51,7 +51,7 @@ class CopyStructuresArrow {

virtual void saveToFile() = 0;

void populateInMemoryStructures();
virtual void populateInMemoryStructures();

void countNumLines(const std::vector<std::string>& filePath);

Expand Down
Loading