Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow rel copier #1154

Merged
merged 1 commit into from
Jan 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion dataset/copy-csv-dos-style-newline/copy_csv.cypher

This file was deleted.

1 change: 0 additions & 1 deletion dataset/copy-csv-empty-lists-test/copy_csv.cypher

This file was deleted.

1 change: 0 additions & 1 deletion dataset/copy-csv-fault-tests/long-string/copy_csv.cypher

This file was deleted.

This file was deleted.

1 change: 0 additions & 1 deletion dataset/copy-csv-node-property-test/copy_csv.cypher

This file was deleted.

3 changes: 0 additions & 3 deletions dataset/copy-csv-special-char-test/copy_csv.cypher

This file was deleted.

1 change: 1 addition & 0 deletions dataset/copy-dos-style-newline/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
COPY person FROM "dataset/copy-dos-style-newline/vPerson.csv"
1 change: 1 addition & 0 deletions dataset/copy-empty-lists-test/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
COPY person FROM "dataset/copy-empty-lists-test/vPerson.csv"
1 change: 1 addition & 0 deletions dataset/copy-fault-tests/long-string/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
COPY person FROM "dataset/copy-fault-tests/long-string/vPerson.csv"
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
COPY person FROM "dataset/copy-fault-tests/rel-table-multiplicity-violation/vPerson.csv"
1 change: 1 addition & 0 deletions dataset/copy-node-property-test/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
COPY person FROM "dataset/copy-node-property-test/vPerson.csv"
3 changes: 3 additions & 0 deletions dataset/copy-special-char-test/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
COPY person FROM "dataset/copy-special-char-test/vPerson.csv" (ESCAPE = "#", QUOTE = "-", DELIM="|");
COPY organisation FROM "dataset/copy-special-char-test/vOrganisation.csv" (ESCAPE = "#", QUOTE = "=", DELIM=",");
COPY workAt FROM "dataset/copy-special-char-test/eWorkAt.csv" (ESCAPE = "#", QUOTE = "=", DELIM="|");
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion dataset/long-string-pk-tests/eKnows.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
AAAAAAAAAAAAAAAAAAAA,Bob
semihsalihoglu-uw marked this conversation as resolved.
Show resolved Hide resolved
weipang142857 marked this conversation as resolved.
Show resolved Hide resolved
AAAAAAAAAAAAAAAAAAAA,Bob
2 changes: 1 addition & 1 deletion dataset/primary-key-tests/string-pk-tests/eKnows.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
Alice,Bob
Alice,Bob
5,102 changes: 2,551 additions & 2,551 deletions dataset/rel-insertion-tests/eKnows.csv

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ COPY studyAt FROM "dataset/tinysnb/eStudyAt.csv" (HEADER=true);
COPY workAt FROM "dataset/tinysnb/eWorkAt.csv"
COPY meets FROM "dataset/tinysnb/eMeets.csv"
COPY mixed FROM "dataset/tinysnb/eMixed.csv"
COPY marries FROM "dataset/tinysnb/eMarries.csv"
COPY marries FROM "dataset/tinysnb/eMarries.csv"
28 changes: 14 additions & 14 deletions dataset/tinysnb/eKnows.csv
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
0,2,2021-06-30,1986-10-21 21:08:31.521,10 years 5 months 13 hours 24 us,[rnme,m8sihsdnf2990nfiwf]
0,3,2021-06-30,1946-08-25 19:07:22,20 years 30 days 48 hours,[njnojppo9u0jkmf,fjiojioh9h9h89hph]
0,5,2021-06-30,2012-12-11 20:07:22,10 days,[ioji232,jifhe8w99u43434]
2,0,2021-06-30,1946-08-25 19:07:22,10 years 5 months 13 hours 24 us,[2huh9y89fsfw23,23nsihufhw723]
2,3,1950-05-14,1946-08-25 19:07:22,23 minutes,[fwehu9h9832wewew,23u9h989sdfsss]
2,5,1950-05-14,2012-12-11 20:07:22,20 years 30 days 48 hours,[fwh9y81232uisuiehuf,ewnuihxy8dyf232]
3,0,2021-06-30,2002-07-31 11:42:53.12342,30 hours 40 days,[fnioh8323aeweae34d,osd89e2ejshuih12]
3,2,1950-05-14,2007-02-12 12:11:42.123,28 minutes 30 milliseconds,[fwh983-sdjisdfji,ioh89y32r2huir]
3,5,2000-01-01,1998-10-02 13:09:22.423,300 milliseconds,[psh989823oaaioe,nuiuah1nosndfisf]
5,0,2021-06-30,1936-11-02 11:02:01,480us,[fwewe]
5,2,1950-05-14,1982-11-11 13:12:05.123,23 minutes,[fewh9182912e3,h9y8y89soidfsf,nuhudf78w78efw,hioshe0f9023sdsd]
5,3,2000-01-01,1999-04-21 15:12:11.42,48 hours 52 milliseconds,[23h9sdslnfowhu2932,shuhf98922323sf]
7,8,1905-12-12,2025-01-01 11:22:33.52,47 minutes 58 seconds,[ahu2333333333333,12weeeeeeeeeeeeeeeeee]
7,9,1905-12-12,2020-03-01 12:11:41.6552,47 minutes 58 seconds,[peweeeeeeeeeeeeeeeee,kowje9w0eweeeeeeeee]
0,2,2021-06-30,1986-10-21 21:08:31.521,10 years 5 months 13 hours 24 us,"[rnme,m8sihsdnf2990nfiwf]"
0,3,2021-06-30,1946-08-25 19:07:22,20 years 30 days 48 hours,"[njnojppo9u0jkmf,fjiojioh9h9h89hph]"
0,5,2021-06-30,2012-12-11 20:07:22,10 days,"[ioji232,jifhe8w99u43434]"
2,0,2021-06-30,1946-08-25 19:07:22,10 years 5 months 13 hours 24 us,"[2huh9y89fsfw23,23nsihufhw723]"
2,3,1950-05-14,1946-08-25 19:07:22,23 minutes,"[fwehu9h9832wewew,23u9h989sdfsss]"
2,5,1950-05-14,2012-12-11 20:07:22,20 years 30 days 48 hours,"[fwh9y81232uisuiehuf,ewnuihxy8dyf232]"
3,0,2021-06-30,2002-07-31 11:42:53.12342,30 hours 40 days,"[fnioh8323aeweae34d,osd89e2ejshuih12]"
3,2,1950-05-14,2007-02-12 12:11:42.123,28 minutes 30 milliseconds,"[fwh983-sdjisdfji,ioh89y32r2huir]"
3,5,2000-01-01,1998-10-02 13:09:22.423,300 milliseconds,"[psh989823oaaioe,nuiuah1nosndfisf]"
5,0,2021-06-30,1936-11-02 11:02:01,480us,"[fwewe]"
5,2,1950-05-14,1982-11-11 13:12:05.123,23 minutes,"[fewh9182912e3,h9y8y89soidfsf,nuhudf78w78efw,hioshe0f9023sdsd]"
5,3,2000-01-01,1999-04-21 15:12:11.42,48 hours 52 milliseconds,"[23h9sdslnfowhu2932,shuhf98922323sf]"
7,8,1905-12-12,2025-01-01 11:22:33.52,47 minutes 58 seconds,"[ahu2333333333333,12weeeeeeeeeeeeeeeeee]"
7,9,1905-12-12,2020-03-01 12:11:41.6552,47 minutes 58 seconds,"[peweeeeeeeeeeeeeeeee,kowje9w0eweeeeeeeee]"
4 changes: 2 additions & 2 deletions dataset/tinysnb/eMarries.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
0,2,["toronto"],
0,2,"[toronto]",
3,5,,"long long long string"
7,8,["vancouver"],"short str"
7,8,"[vancouver]","short str"
6 changes: 3 additions & 3 deletions dataset/tinysnb/eStudyAt.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from,to,YEAR,Places
0,1,2021,[wwAewsdndweusd,wek]
2,1,2020,[anew,jsdnwusklklklwewsd]
8,1,2020,[awndsnjwejwen,isuhuwennjnuhuhuwewe]
0,1,2021,"[wwAewsdndweusd,wek]"
2,1,2020,"[anew,jsdnwusklklklwewsd]"
8,1,2020,"[awndsnjwejwen,isuhuwennjnuhuhuwewe]"
2 changes: 1 addition & 1 deletion src/binder/bind/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
add_library(
kuzu_binder_bind
OBJECT
bind_copy_csv.cpp
bind_copy.cpp
bind_ddl.cpp
bind_graph_pattern.cpp
bind_projection_clause.cpp
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#include "binder/binder.h"
#include "binder/copy_csv/bound_copy_csv.h"
#include "binder/copy/bound_copy.h"
#include "binder/expression/literal_expression.h"
#include "parser/copy_csv/copy_csv.h"

namespace kuzu {
namespace binder {

unique_ptr<BoundStatement> Binder::bindCopyCSV(const Statement& statement) {
unique_ptr<BoundStatement> Binder::bindCopy(const Statement& statement) {
auto& copyCSV = (CopyCSV&)statement;
auto catalogContent = catalog.getReadOnlyVersion();
auto tableName = copyCSV.getTableName();
Expand All @@ -16,7 +16,7 @@ unique_ptr<BoundStatement> Binder::bindCopyCSV(const Statement& statement) {
catalogContent->getRelTableIDFromName(tableName);
auto filePath = copyCSV.getCSVFileName();
auto csvReaderConfig = bindParsingOptions(copyCSV.getParsingOptions());
return make_unique<BoundCopyCSV>(CSVDescription(filePath, csvReaderConfig), tableID, tableName);
return make_unique<BoundCopy>(CopyDescription(filePath, csvReaderConfig), tableID, tableName);
}

CSVReaderConfig Binder::bindParsingOptions(
Expand Down Expand Up @@ -57,7 +57,7 @@ void Binder::bindStringParsingOptions(
if (optionName == "ESCAPE") {
csvReaderConfig.escapeChar = parsingOptionValue;
} else if (optionName == "DELIM") {
csvReaderConfig.tokenSeparator = parsingOptionValue;
csvReaderConfig.delimiter = parsingOptionValue;
} else if (optionName == "QUOTE") {
csvReaderConfig.quoteChar = parsingOptionValue;
} else if (optionName == "LIST_BEGIN") {
Expand Down
6 changes: 3 additions & 3 deletions src/binder/binder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ unique_ptr<BoundStatement> Binder::bind(const Statement& statement) {
return bindDropTable(statement);
}
case StatementType::COPY_CSV: {
return bindCopyCSV(statement);
return bindCopy(statement);
}
case StatementType::DROP_PROPERTY: {
return bindDropProperty(statement);
Expand Down Expand Up @@ -165,8 +165,8 @@ void Binder::validateTableExist(const Catalog& _catalog, string& tableName) {
}

bool Binder::validateStringParsingOptionName(string& parsingOptionName) {
for (auto i = 0; i < size(CopyCSVConfig::STRING_CSV_PARSING_OPTIONS); i++) {
if (parsingOptionName == CopyCSVConfig::STRING_CSV_PARSING_OPTIONS[i]) {
for (auto i = 0; i < size(CopyConfig::STRING_CSV_PARSING_OPTIONS); i++) {
if (parsingOptionName == CopyConfig::STRING_CSV_PARSING_OPTIONS[i]) {
return true;
}
}
Expand Down
89 changes: 77 additions & 12 deletions src/common/csv_reader/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ namespace common {

CSVReader::CSVReader(const string& fName, const CSVReaderConfig& config, uint64_t blockId)
: CSVReader{fName, config} {
readingBlockStartOffset = CopyCSVConfig::CSV_READING_BLOCK_SIZE * blockId;
readingBlockEndOffset = CopyCSVConfig::CSV_READING_BLOCK_SIZE * (blockId + 1);
readingBlockStartOffset = CopyConfig::CSV_READING_BLOCK_SIZE * blockId;
readingBlockEndOffset = CopyConfig::CSV_READING_BLOCK_SIZE * (blockId + 1);
auto isBeginningOfLine = false;
if (0 == readingBlockStartOffset) {
isBeginningOfLine = true;
Expand Down Expand Up @@ -166,15 +166,15 @@ bool CSVReader::hasNextToken() {
string lineStr;
while (true) {
if (isQuotedString) {
// ignore tokenSeparator and new line character here
// ignore delimiter and new line character here
if (config.quoteChar == line[linePtrEnd]) {
break;
} else if (config.escapeChar == line[linePtrEnd]) {
// escape next special character
linePtrEnd++;
}
} else if (isList) {
// ignore tokenSeparator and new line character here
// ignore delimiter and new line character here
if (config.listBeginChar == line[linePtrEnd]) {
linePtrEnd++;
nestedListLevel++;
Expand All @@ -184,7 +184,7 @@ bool CSVReader::hasNextToken() {
if (nestedListLevel == 0) {
break;
}
} else if (config.tokenSeparator == line[linePtrEnd] || '\n' == line[linePtrEnd] ||
} else if (config.delimiter == line[linePtrEnd] || '\n' == line[linePtrEnd] ||
linePtrEnd == lineLen) {
break;
}
Expand All @@ -207,7 +207,7 @@ bool CSVReader::hasNextToken() {

bool CSVReader::hasNextTokenOrError() {
if (!hasNextToken()) {
throw CSVReaderException(
throw ReaderException(
StringUtils::string_format("CSV Reader was expecting more tokens but the line does not "
"have any tokens left. Last token: %s",
line + linePtrStart));
Expand Down Expand Up @@ -244,7 +244,7 @@ char* CSVReader::getString() {
} else if (unicodeType == UnicodeType::UNICODE) {
return Utf8Proc::normalize(strVal, strlen(strVal));
} else {
throw CSVReaderException("Invalid UTF-8 character encountered.");
throw ReaderException("Invalid UTF-8 character encountered.");
}
}

Expand Down Expand Up @@ -298,15 +298,15 @@ Literal CSVReader::getList(const DataType& dataType) {
result.listVal.emplace_back(listCSVReader.getList(*dataType.childType));
} break;
default:
throw CSVReaderException("Unsupported data type " +
Types::dataTypeToString(dataType.childType->typeID) +
" inside LIST");
throw ReaderException("Unsupported data type " +
Types::dataTypeToString(dataType.childType->typeID) +
" inside LIST");
}
}
}
auto numBytesOfOverflow = result.listVal.size() * Types::getDataTypeSize(dataType.typeID);
if (numBytesOfOverflow >= DEFAULT_PAGE_SIZE) {
throw CSVReaderException(StringUtils::string_format(
throw ReaderException(StringUtils::string_format(
"Maximum num bytes of a LIST is %d. Input list's num bytes is %d.", DEFAULT_PAGE_SIZE,
numBytesOfOverflow));
}
Expand All @@ -321,9 +321,74 @@ void CSVReader::setNextTokenIsProcessed() {
void CSVReader::openFile(const string& fName) {
fd = fopen(fName.c_str(), "r");
if (nullptr == fd) {
throw CSVReaderException("Cannot open file: " + fName);
throw ReaderException("Cannot open file: " + fName);
}
}

CopyDescription::CopyDescription(const string& filePath, CSVReaderConfig csvReaderConfig)
: filePath{filePath}, csvReaderConfig{nullptr}, fileType{FileType::CSV} {
setFileType(filePath);
if (fileType == FileType::CSV) {
this->csvReaderConfig = make_unique<CSVReaderConfig>(csvReaderConfig);
}
}

CopyDescription::CopyDescription(const CopyDescription& copyDescription)
: filePath{copyDescription.filePath}, csvReaderConfig{nullptr}, fileType{
copyDescription.fileType} {
if (fileType == FileType::CSV) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check for this null or not condition directly on the readerConfig field. So something like:

if (copyDescription.readerConfig) {
  this->readerConfig = make_unique<CSVReaderConfig>(*copyDescription.readerConfig);
}

this->csvReaderConfig = make_unique<CSVReaderConfig>(*copyDescription.csvReaderConfig);
}
}

string CopyDescription::getFileTypeName(FileType fileType) {
switch (fileType) {
case FileType::CSV:
return "csv";

case FileType::ARROW:
return "arrow";

case FileType::PARQUET:
return "parquet";
}
}

string CopyDescription::getFileTypeSuffix(FileType fileType) {
return "." + getFileTypeName(fileType);
}

void CopyDescription::setFileType(string const& fileName) {
auto csvSuffix = getFileTypeSuffix(FileType::CSV);
auto arrowSuffix = getFileTypeSuffix(FileType::ARROW);
auto parquetSuffix = getFileTypeSuffix(FileType::PARQUET);

if (fileName.length() >= csvSuffix.length()) {
if (!fileName.compare(
fileName.length() - csvSuffix.length(), csvSuffix.length(), csvSuffix)) {
fileType = FileType::CSV;
return;
}
}

if (fileName.length() >= arrowSuffix.length()) {
if (!fileName.compare(
fileName.length() - arrowSuffix.length(), arrowSuffix.length(), arrowSuffix)) {
fileType = FileType::ARROW;
return;
}
}

if (fileName.length() >= parquetSuffix.length()) {
if (!fileName.compare(fileName.length() - parquetSuffix.length(), parquetSuffix.length(),
parquetSuffix)) {
fileType = FileType::PARQUET;
return;
}
}

throw CopyException("Unsupported file type: " + fileName);
}

} // namespace common
} // namespace kuzu
2 changes: 1 addition & 1 deletion src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class Binder {
property_id_t bindPropertyName(TableSchema* tableSchema, const string& propertyName);

/*** bind copy csv ***/
unique_ptr<BoundStatement> bindCopyCSV(const Statement& statement);
unique_ptr<BoundStatement> bindCopy(const Statement& statement);

CSVReaderConfig bindParsingOptions(
const unordered_map<string, unique_ptr<ParsedExpression>>* parsingOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@ using namespace kuzu::catalog;
namespace kuzu {
namespace binder {

class BoundCopyCSV : public BoundStatement {
class BoundCopy : public BoundStatement {
public:
BoundCopyCSV(CSVDescription csvDescription, table_id_t tableID, string tableName)
BoundCopy(CopyDescription copyDescription, table_id_t tableID, string tableName)
: BoundStatement{StatementType::COPY_CSV,
BoundStatementResult::createSingleStringColumnResult()},
csvDescription{std::move(csvDescription)}, tableID{tableID}, tableName{
std::move(tableName)} {}
copyDescription{std::move(copyDescription)}, tableID{tableID}, tableName{std::move(
tableName)} {}

inline CSVDescription getCSVDescription() const { return csvDescription; }
inline CopyDescription getCopyDescription() const { return copyDescription; }

inline table_id_t getTableID() const { return tableID; }

inline string getTableName() const { return tableName; }

private:
CSVDescription csvDescription;
CopyDescription copyDescription;
table_id_t tableID;
string tableName;
};
Expand Down
16 changes: 8 additions & 8 deletions src/include/common/configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ struct HashIndexConfig {
static constexpr uint8_t SLOT_CAPACITY = (uint64_t)1 << SLOT_CAPACITY_LOG_2;
};

struct CopyCSVConfig {
// Size (in bytes) of the chunks to be read in InMemNode/RelCSVCopier
struct CopyConfig {
// Size (in bytes) of the chunks to be read in Node/Rel Copier
static constexpr uint64_t CSV_READING_BLOCK_SIZE = 1 << 23;

// Number of tasks to be assigned in a batch when reading files.
Expand All @@ -96,12 +96,12 @@ struct CopyCSVConfig {
// Default configuration for csv file parsing
static constexpr const char* STRING_CSV_PARSING_OPTIONS[5] = {
"ESCAPE", "DELIM", "QUOTE", "LIST_BEGIN", "LIST_END"};
static constexpr char DEFAULT_ESCAPE_CHAR = '\\';
static constexpr char DEFAULT_TOKEN_SEPARATOR = ',';
static constexpr char DEFAULT_QUOTE_CHAR = '"';
static constexpr char DEFAULT_LIST_BEGIN_CHAR = '[';
static constexpr char DEFAULT_LIST_END_CHAR = ']';
static constexpr bool DEFAULT_HAS_HEADER = false;
static constexpr char DEFAULT_CSV_ESCAPE_CHAR = '\\';
static constexpr char DEFAULT_CSV_DELIMITER = ',';
static constexpr char DEFAULT_CSV_QUOTE_CHAR = '"';
static constexpr char DEFAULT_CSV_LIST_BEGIN_CHAR = '[';
static constexpr char DEFAULT_CSV_LIST_END_CHAR = ']';
static constexpr bool DEFAULT_CSV_HAS_HEADER = false;
};

struct EnumeratorKnobs {
Expand Down
Loading