Skip to content

Commit

Permalink
Merge pull request #1154 from kuzudb/arrow_rel
Browse files Browse the repository at this point in the history
Implementation of arrow rel copier
  • Loading branch information
weipang142857 committed Jan 15, 2023
2 parents 960bbbe + 7167f06 commit 4cf8afc
Show file tree
Hide file tree
Showing 150 changed files with 4,237 additions and 4,131 deletions.
1 change: 0 additions & 1 deletion dataset/copy-csv-dos-style-newline/copy_csv.cypher

This file was deleted.

1 change: 0 additions & 1 deletion dataset/copy-csv-empty-lists-test/copy_csv.cypher

This file was deleted.

1 change: 0 additions & 1 deletion dataset/copy-csv-fault-tests/long-string/copy_csv.cypher

This file was deleted.

This file was deleted.

1 change: 0 additions & 1 deletion dataset/copy-csv-node-property-test/copy_csv.cypher

This file was deleted.

3 changes: 0 additions & 3 deletions dataset/copy-csv-special-char-test/copy_csv.cypher

This file was deleted.

1 change: 1 addition & 0 deletions dataset/copy-dos-style-newline/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
COPY person FROM "dataset/copy-dos-style-newline/vPerson.csv"
File renamed without changes.
1 change: 1 addition & 0 deletions dataset/copy-empty-lists-test/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
COPY person FROM "dataset/copy-empty-lists-test/vPerson.csv"
File renamed without changes.
File renamed without changes.
1 change: 1 addition & 0 deletions dataset/copy-fault-tests/long-string/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
COPY person FROM "dataset/copy-fault-tests/long-string/vPerson.csv"
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
COPY person FROM "dataset/copy-fault-tests/rel-table-multiplicity-violation/vPerson.csv"
1 change: 1 addition & 0 deletions dataset/copy-node-property-test/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
COPY person FROM "dataset/copy-node-property-test/vPerson.csv"
File renamed without changes.
File renamed without changes.
3 changes: 3 additions & 0 deletions dataset/copy-special-char-test/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
COPY person FROM "dataset/copy-special-char-test/vPerson.csv" (ESCAPE = "#", QUOTE = "-", DELIM="|");
COPY organisation FROM "dataset/copy-special-char-test/vOrganisation.csv" (ESCAPE = "#", QUOTE = "=", DELIM=",");
COPY workAt FROM "dataset/copy-special-char-test/eWorkAt.csv" (ESCAPE = "#", QUOTE = "=", DELIM="|");
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion dataset/long-string-pk-tests/eKnows.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
AAAAAAAAAAAAAAAAAAAA,Bob
AAAAAAAAAAAAAAAAAAAA,Bob
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion dataset/primary-key-tests/string-pk-tests/eKnows.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Alice,Bob
Alice,Bob
File renamed without changes.
5,102 changes: 2,551 additions & 2,551 deletions dataset/rel-insertion-tests/eKnows.csv

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ COPY studyAt FROM "dataset/tinysnb/eStudyAt.csv" (HEADER=true);
COPY workAt FROM "dataset/tinysnb/eWorkAt.csv"
COPY meets FROM "dataset/tinysnb/eMeets.csv"
COPY mixed FROM "dataset/tinysnb/eMixed.csv"
COPY marries FROM "dataset/tinysnb/eMarries.csv"
COPY marries FROM "dataset/tinysnb/eMarries.csv"
28 changes: 14 additions & 14 deletions dataset/tinysnb/eKnows.csv
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
0,2,2021-06-30,1986-10-21 21:08:31.521,10 years 5 months 13 hours 24 us,[rnme,m8sihsdnf2990nfiwf]
0,3,2021-06-30,1946-08-25 19:07:22,20 years 30 days 48 hours,[njnojppo9u0jkmf,fjiojioh9h9h89hph]
0,5,2021-06-30,2012-12-11 20:07:22,10 days,[ioji232,jifhe8w99u43434]
2,0,2021-06-30,1946-08-25 19:07:22,10 years 5 months 13 hours 24 us,[2huh9y89fsfw23,23nsihufhw723]
2,3,1950-05-14,1946-08-25 19:07:22,23 minutes,[fwehu9h9832wewew,23u9h989sdfsss]
2,5,1950-05-14,2012-12-11 20:07:22,20 years 30 days 48 hours,[fwh9y81232uisuiehuf,ewnuihxy8dyf232]
3,0,2021-06-30,2002-07-31 11:42:53.12342,30 hours 40 days,[fnioh8323aeweae34d,osd89e2ejshuih12]
3,2,1950-05-14,2007-02-12 12:11:42.123,28 minutes 30 milliseconds,[fwh983-sdjisdfji,ioh89y32r2huir]
3,5,2000-01-01,1998-10-02 13:09:22.423,300 milliseconds,[psh989823oaaioe,nuiuah1nosndfisf]
5,0,2021-06-30,1936-11-02 11:02:01,480us,[fwewe]
5,2,1950-05-14,1982-11-11 13:12:05.123,23 minutes,[fewh9182912e3,h9y8y89soidfsf,nuhudf78w78efw,hioshe0f9023sdsd]
5,3,2000-01-01,1999-04-21 15:12:11.42,48 hours 52 milliseconds,[23h9sdslnfowhu2932,shuhf98922323sf]
7,8,1905-12-12,2025-01-01 11:22:33.52,47 minutes 58 seconds,[ahu2333333333333,12weeeeeeeeeeeeeeeeee]
7,9,1905-12-12,2020-03-01 12:11:41.6552,47 minutes 58 seconds,[peweeeeeeeeeeeeeeeee,kowje9w0eweeeeeeeee]
0,2,2021-06-30,1986-10-21 21:08:31.521,10 years 5 months 13 hours 24 us,"[rnme,m8sihsdnf2990nfiwf]"
0,3,2021-06-30,1946-08-25 19:07:22,20 years 30 days 48 hours,"[njnojppo9u0jkmf,fjiojioh9h9h89hph]"
0,5,2021-06-30,2012-12-11 20:07:22,10 days,"[ioji232,jifhe8w99u43434]"
2,0,2021-06-30,1946-08-25 19:07:22,10 years 5 months 13 hours 24 us,"[2huh9y89fsfw23,23nsihufhw723]"
2,3,1950-05-14,1946-08-25 19:07:22,23 minutes,"[fwehu9h9832wewew,23u9h989sdfsss]"
2,5,1950-05-14,2012-12-11 20:07:22,20 years 30 days 48 hours,"[fwh9y81232uisuiehuf,ewnuihxy8dyf232]"
3,0,2021-06-30,2002-07-31 11:42:53.12342,30 hours 40 days,"[fnioh8323aeweae34d,osd89e2ejshuih12]"
3,2,1950-05-14,2007-02-12 12:11:42.123,28 minutes 30 milliseconds,"[fwh983-sdjisdfji,ioh89y32r2huir]"
3,5,2000-01-01,1998-10-02 13:09:22.423,300 milliseconds,"[psh989823oaaioe,nuiuah1nosndfisf]"
5,0,2021-06-30,1936-11-02 11:02:01,480us,"[fwewe]"
5,2,1950-05-14,1982-11-11 13:12:05.123,23 minutes,"[fewh9182912e3,h9y8y89soidfsf,nuhudf78w78efw,hioshe0f9023sdsd]"
5,3,2000-01-01,1999-04-21 15:12:11.42,48 hours 52 milliseconds,"[23h9sdslnfowhu2932,shuhf98922323sf]"
7,8,1905-12-12,2025-01-01 11:22:33.52,47 minutes 58 seconds,"[ahu2333333333333,12weeeeeeeeeeeeeeeeee]"
7,9,1905-12-12,2020-03-01 12:11:41.6552,47 minutes 58 seconds,"[peweeeeeeeeeeeeeeeee,kowje9w0eweeeeeeeee]"
4 changes: 2 additions & 2 deletions dataset/tinysnb/eMarries.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
0,2,["toronto"],
0,2,"[toronto]",
3,5,,"long long long string"
7,8,["vancouver"],"short str"
7,8,"[vancouver]","short str"
6 changes: 3 additions & 3 deletions dataset/tinysnb/eStudyAt.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from,to,YEAR,Places
0,1,2021,[wwAewsdndweusd,wek]
2,1,2020,[anew,jsdnwusklklklwewsd]
8,1,2020,[awndsnjwejwen,isuhuwennjnuhuhuwewe]
0,1,2021,"[wwAewsdndweusd,wek]"
2,1,2020,"[anew,jsdnwusklklklwewsd]"
8,1,2020,"[awndsnjwejwen,isuhuwennjnuhuhuwewe]"
2 changes: 1 addition & 1 deletion src/binder/bind/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
add_library(
kuzu_binder_bind
OBJECT
bind_copy_csv.cpp
bind_copy.cpp
bind_ddl.cpp
bind_graph_pattern.cpp
bind_projection_clause.cpp
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#include "binder/binder.h"
#include "binder/copy_csv/bound_copy_csv.h"
#include "binder/copy/bound_copy.h"
#include "binder/expression/literal_expression.h"
#include "parser/copy_csv/copy_csv.h"

namespace kuzu {
namespace binder {

unique_ptr<BoundStatement> Binder::bindCopyCSV(const Statement& statement) {
unique_ptr<BoundStatement> Binder::bindCopy(const Statement& statement) {
auto& copyCSV = (CopyCSV&)statement;
auto catalogContent = catalog.getReadOnlyVersion();
auto tableName = copyCSV.getTableName();
Expand All @@ -16,7 +16,7 @@ unique_ptr<BoundStatement> Binder::bindCopyCSV(const Statement& statement) {
catalogContent->getRelTableIDFromName(tableName);
auto filePath = copyCSV.getCSVFileName();
auto csvReaderConfig = bindParsingOptions(copyCSV.getParsingOptions());
return make_unique<BoundCopyCSV>(CSVDescription(filePath, csvReaderConfig), tableID, tableName);
return make_unique<BoundCopy>(CopyDescription(filePath, csvReaderConfig), tableID, tableName);
}

CSVReaderConfig Binder::bindParsingOptions(
Expand Down Expand Up @@ -57,7 +57,7 @@ void Binder::bindStringParsingOptions(
if (optionName == "ESCAPE") {
csvReaderConfig.escapeChar = parsingOptionValue;
} else if (optionName == "DELIM") {
csvReaderConfig.tokenSeparator = parsingOptionValue;
csvReaderConfig.delimiter = parsingOptionValue;
} else if (optionName == "QUOTE") {
csvReaderConfig.quoteChar = parsingOptionValue;
} else if (optionName == "LIST_BEGIN") {
Expand Down
6 changes: 3 additions & 3 deletions src/binder/binder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ unique_ptr<BoundStatement> Binder::bind(const Statement& statement) {
return bindDropTable(statement);
}
case StatementType::COPY_CSV: {
return bindCopyCSV(statement);
return bindCopy(statement);
}
case StatementType::DROP_PROPERTY: {
return bindDropProperty(statement);
Expand Down Expand Up @@ -165,8 +165,8 @@ void Binder::validateTableExist(const Catalog& _catalog, string& tableName) {
}

bool Binder::validateStringParsingOptionName(string& parsingOptionName) {
for (auto i = 0; i < size(CopyCSVConfig::STRING_CSV_PARSING_OPTIONS); i++) {
if (parsingOptionName == CopyCSVConfig::STRING_CSV_PARSING_OPTIONS[i]) {
for (auto i = 0; i < size(CopyConfig::STRING_CSV_PARSING_OPTIONS); i++) {
if (parsingOptionName == CopyConfig::STRING_CSV_PARSING_OPTIONS[i]) {
return true;
}
}
Expand Down
89 changes: 77 additions & 12 deletions src/common/csv_reader/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ namespace common {

CSVReader::CSVReader(const string& fName, const CSVReaderConfig& config, uint64_t blockId)
: CSVReader{fName, config} {
readingBlockStartOffset = CopyCSVConfig::CSV_READING_BLOCK_SIZE * blockId;
readingBlockEndOffset = CopyCSVConfig::CSV_READING_BLOCK_SIZE * (blockId + 1);
readingBlockStartOffset = CopyConfig::CSV_READING_BLOCK_SIZE * blockId;
readingBlockEndOffset = CopyConfig::CSV_READING_BLOCK_SIZE * (blockId + 1);
auto isBeginningOfLine = false;
if (0 == readingBlockStartOffset) {
isBeginningOfLine = true;
Expand Down Expand Up @@ -166,15 +166,15 @@ bool CSVReader::hasNextToken() {
string lineStr;
while (true) {
if (isQuotedString) {
// ignore tokenSeparator and new line character here
// ignore delimiter and new line character here
if (config.quoteChar == line[linePtrEnd]) {
break;
} else if (config.escapeChar == line[linePtrEnd]) {
// escape next special character
linePtrEnd++;
}
} else if (isList) {
// ignore tokenSeparator and new line character here
// ignore delimiter and new line character here
if (config.listBeginChar == line[linePtrEnd]) {
linePtrEnd++;
nestedListLevel++;
Expand All @@ -184,7 +184,7 @@ bool CSVReader::hasNextToken() {
if (nestedListLevel == 0) {
break;
}
} else if (config.tokenSeparator == line[linePtrEnd] || '\n' == line[linePtrEnd] ||
} else if (config.delimiter == line[linePtrEnd] || '\n' == line[linePtrEnd] ||
linePtrEnd == lineLen) {
break;
}
Expand All @@ -207,7 +207,7 @@ bool CSVReader::hasNextToken() {

bool CSVReader::hasNextTokenOrError() {
if (!hasNextToken()) {
throw CSVReaderException(
throw ReaderException(
StringUtils::string_format("CSV Reader was expecting more tokens but the line does not "
"have any tokens left. Last token: %s",
line + linePtrStart));
Expand Down Expand Up @@ -244,7 +244,7 @@ char* CSVReader::getString() {
} else if (unicodeType == UnicodeType::UNICODE) {
return Utf8Proc::normalize(strVal, strlen(strVal));
} else {
throw CSVReaderException("Invalid UTF-8 character encountered.");
throw ReaderException("Invalid UTF-8 character encountered.");
}
}

Expand Down Expand Up @@ -298,15 +298,15 @@ Literal CSVReader::getList(const DataType& dataType) {
result.listVal.emplace_back(listCSVReader.getList(*dataType.childType));
} break;
default:
throw CSVReaderException("Unsupported data type " +
Types::dataTypeToString(dataType.childType->typeID) +
" inside LIST");
throw ReaderException("Unsupported data type " +
Types::dataTypeToString(dataType.childType->typeID) +
" inside LIST");
}
}
}
auto numBytesOfOverflow = result.listVal.size() * Types::getDataTypeSize(dataType.typeID);
if (numBytesOfOverflow >= DEFAULT_PAGE_SIZE) {
throw CSVReaderException(StringUtils::string_format(
throw ReaderException(StringUtils::string_format(
"Maximum num bytes of a LIST is %d. Input list's num bytes is %d.", DEFAULT_PAGE_SIZE,
numBytesOfOverflow));
}
Expand All @@ -321,9 +321,74 @@ void CSVReader::setNextTokenIsProcessed() {
void CSVReader::openFile(const string& fName) {
fd = fopen(fName.c_str(), "r");
if (nullptr == fd) {
throw CSVReaderException("Cannot open file: " + fName);
throw ReaderException("Cannot open file: " + fName);
}
}

CopyDescription::CopyDescription(const string& filePath, CSVReaderConfig csvReaderConfig)
: filePath{filePath}, csvReaderConfig{nullptr}, fileType{FileType::CSV} {
setFileType(filePath);
if (fileType == FileType::CSV) {
this->csvReaderConfig = make_unique<CSVReaderConfig>(csvReaderConfig);
}
}

CopyDescription::CopyDescription(const CopyDescription& copyDescription)
: filePath{copyDescription.filePath}, csvReaderConfig{nullptr}, fileType{
copyDescription.fileType} {
if (fileType == FileType::CSV) {
this->csvReaderConfig = make_unique<CSVReaderConfig>(*copyDescription.csvReaderConfig);
}
}

string CopyDescription::getFileTypeName(FileType fileType) {
switch (fileType) {
case FileType::CSV:
return "csv";

case FileType::ARROW:
return "arrow";

case FileType::PARQUET:
return "parquet";
}
}

string CopyDescription::getFileTypeSuffix(FileType fileType) {
return "." + getFileTypeName(fileType);
}

void CopyDescription::setFileType(string const& fileName) {
auto csvSuffix = getFileTypeSuffix(FileType::CSV);
auto arrowSuffix = getFileTypeSuffix(FileType::ARROW);
auto parquetSuffix = getFileTypeSuffix(FileType::PARQUET);

if (fileName.length() >= csvSuffix.length()) {
if (!fileName.compare(
fileName.length() - csvSuffix.length(), csvSuffix.length(), csvSuffix)) {
fileType = FileType::CSV;
return;
}
}

if (fileName.length() >= arrowSuffix.length()) {
if (!fileName.compare(
fileName.length() - arrowSuffix.length(), arrowSuffix.length(), arrowSuffix)) {
fileType = FileType::ARROW;
return;
}
}

if (fileName.length() >= parquetSuffix.length()) {
if (!fileName.compare(fileName.length() - parquetSuffix.length(), parquetSuffix.length(),
parquetSuffix)) {
fileType = FileType::PARQUET;
return;
}
}

throw CopyException("Unsupported file type: " + fileName);
}

} // namespace common
} // namespace kuzu
2 changes: 1 addition & 1 deletion src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class Binder {
property_id_t bindPropertyName(TableSchema* tableSchema, const string& propertyName);

/*** bind copy csv ***/
unique_ptr<BoundStatement> bindCopyCSV(const Statement& statement);
unique_ptr<BoundStatement> bindCopy(const Statement& statement);

CSVReaderConfig bindParsingOptions(
const unordered_map<string, unique_ptr<ParsedExpression>>* parsingOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@ using namespace kuzu::catalog;
namespace kuzu {
namespace binder {

class BoundCopyCSV : public BoundStatement {
class BoundCopy : public BoundStatement {
public:
BoundCopyCSV(CSVDescription csvDescription, table_id_t tableID, string tableName)
BoundCopy(CopyDescription copyDescription, table_id_t tableID, string tableName)
: BoundStatement{StatementType::COPY_CSV,
BoundStatementResult::createSingleStringColumnResult()},
csvDescription{std::move(csvDescription)}, tableID{tableID}, tableName{
std::move(tableName)} {}
copyDescription{std::move(copyDescription)}, tableID{tableID}, tableName{std::move(
tableName)} {}

inline CSVDescription getCSVDescription() const { return csvDescription; }
inline CopyDescription getCopyDescription() const { return copyDescription; }

inline table_id_t getTableID() const { return tableID; }

inline string getTableName() const { return tableName; }

private:
CSVDescription csvDescription;
CopyDescription copyDescription;
table_id_t tableID;
string tableName;
};
Expand Down
16 changes: 8 additions & 8 deletions src/include/common/configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ struct HashIndexConfig {
static constexpr uint8_t SLOT_CAPACITY = (uint64_t)1 << SLOT_CAPACITY_LOG_2;
};

struct CopyCSVConfig {
// Size (in bytes) of the chunks to be read in InMemNode/RelCSVCopier
struct CopyConfig {
// Size (in bytes) of the chunks to be read in Node/Rel Copier
static constexpr uint64_t CSV_READING_BLOCK_SIZE = 1 << 23;

// Number of tasks to be assigned in a batch when reading files.
Expand All @@ -96,12 +96,12 @@ struct CopyCSVConfig {
// Default configuration for csv file parsing
static constexpr const char* STRING_CSV_PARSING_OPTIONS[5] = {
"ESCAPE", "DELIM", "QUOTE", "LIST_BEGIN", "LIST_END"};
static constexpr char DEFAULT_ESCAPE_CHAR = '\\';
static constexpr char DEFAULT_TOKEN_SEPARATOR = ',';
static constexpr char DEFAULT_QUOTE_CHAR = '"';
static constexpr char DEFAULT_LIST_BEGIN_CHAR = '[';
static constexpr char DEFAULT_LIST_END_CHAR = ']';
static constexpr bool DEFAULT_HAS_HEADER = false;
static constexpr char DEFAULT_CSV_ESCAPE_CHAR = '\\';
static constexpr char DEFAULT_CSV_DELIMITER = ',';
static constexpr char DEFAULT_CSV_QUOTE_CHAR = '"';
static constexpr char DEFAULT_CSV_LIST_BEGIN_CHAR = '[';
static constexpr char DEFAULT_CSV_LIST_END_CHAR = ']';
static constexpr bool DEFAULT_CSV_HAS_HEADER = false;
};

struct EnumeratorKnobs {
Expand Down
Loading

0 comments on commit 4cf8afc

Please sign in to comment.