Skip to content

Commit

Permalink
changes according to comments 2
Browse files Browse the repository at this point in the history
  • Loading branch information
Wei Pang authored and Wei Pang committed Jan 15, 2023
1 parent 0dc293f commit ef12216
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 94 deletions.
8 changes: 4 additions & 4 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ unique_ptr<BoundStatement> Binder::bindCopy(const Statement& statement) {
return make_unique<BoundCopy>(CopyDescription(filePath, readerConfig), tableID, tableName);
}

ReaderConfig Binder::bindParsingOptions(
CSVReaderConfig Binder::bindParsingOptions(
const unordered_map<string, unique_ptr<ParsedExpression>>* parsingOptions) {
ReaderConfig readerConfig;
CSVReaderConfig readerConfig;
for (auto& parsingOption : *parsingOptions) {
auto copyOptionName = parsingOption.first;
bool isValidStringParsingOption = validateStringParsingOptionName(copyOptionName);
Expand Down Expand Up @@ -52,12 +52,12 @@ ReaderConfig Binder::bindParsingOptions(
}

void Binder::bindStringParsingOptions(
ReaderConfig& readerConfig, const string& optionName, string& optionValue) {
CSVReaderConfig& readerConfig, const string& optionName, string& optionValue) {
auto parsingOptionValue = bindParsingOptionValue(optionValue);
if (optionName == "ESCAPE") {
readerConfig.escapeChar = parsingOptionValue;
} else if (optionName == "DELIM") {
readerConfig.tokenSeparator = parsingOptionValue;
readerConfig.delimiter = parsingOptionValue;
} else if (optionName == "QUOTE") {
readerConfig.quoteChar = parsingOptionValue;
} else if (optionName == "LIST_BEGIN") {
Expand Down
82 changes: 74 additions & 8 deletions src/common/csv_reader/csv_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ using namespace kuzu::utf8proc;
namespace kuzu {
namespace common {

CSVReader::CSVReader(const string& fName, const ReaderConfig& config, uint64_t blockId)
CSVReader::CSVReader(const string& fName, const CSVReaderConfig& config, uint64_t blockId)
: CSVReader{fName, config} {
readingBlockStartOffset = CopyConfig::READING_BLOCK_SIZE * blockId;
readingBlockEndOffset = CopyConfig::READING_BLOCK_SIZE * (blockId + 1);
readingBlockStartOffset = CopyConfig::CSV_READING_BLOCK_SIZE * blockId;
readingBlockEndOffset = CopyConfig::CSV_READING_BLOCK_SIZE * (blockId + 1);
auto isBeginningOfLine = false;
if (0 == readingBlockStartOffset) {
isBeginningOfLine = true;
Expand All @@ -29,12 +29,13 @@ CSVReader::CSVReader(const string& fName, const ReaderConfig& config, uint64_t b
}
}

CSVReader::CSVReader(const string& fname, const ReaderConfig& config)
CSVReader::CSVReader(const string& fname, const CSVReaderConfig& config)
: CSVReader{(char*)malloc(sizeof(char) * 1024), 0, -1l, config} {
openFile(fname);
}

CSVReader::CSVReader(char* line, uint64_t lineLen, int64_t linePtrStart, const ReaderConfig& config)
CSVReader::CSVReader(
char* line, uint64_t lineLen, int64_t linePtrStart, const CSVReaderConfig& config)
: fd{nullptr}, config{config}, logger{LoggerUtils::getOrCreateLogger("csv_reader")},
nextLineIsNotProcessed{false}, isEndOfBlock{false},
nextTokenIsNotProcessed{false}, line{line}, lineCapacity{1024}, lineLen{lineLen},
Expand Down Expand Up @@ -165,15 +166,15 @@ bool CSVReader::hasNextToken() {
string lineStr;
while (true) {
if (isQuotedString) {
// ignore tokenSeparator and new line character here
// ignore delimiter and new line character here
if (config.quoteChar == line[linePtrEnd]) {
break;
} else if (config.escapeChar == line[linePtrEnd]) {
// escape next special character
linePtrEnd++;
}
} else if (isList) {
// ignore tokenSeparator and new line character here
// ignore delimiter and new line character here
if (config.listBeginChar == line[linePtrEnd]) {
linePtrEnd++;
nestedListLevel++;
Expand All @@ -183,7 +184,7 @@ bool CSVReader::hasNextToken() {
if (nestedListLevel == 0) {
break;
}
} else if (config.tokenSeparator == line[linePtrEnd] || '\n' == line[linePtrEnd] ||
} else if (config.delimiter == line[linePtrEnd] || '\n' == line[linePtrEnd] ||
linePtrEnd == lineLen) {
break;
}
Expand Down Expand Up @@ -324,5 +325,70 @@ void CSVReader::openFile(const string& fName) {
}
}

CopyDescription::CopyDescription(const string& filePath, CSVReaderConfig readerConfig)
: filePath{filePath}, readerConfig{nullptr}, fileType{FileType::CSV} {
setFileType(filePath);
if (fileType == FileType::CSV) {
this->readerConfig = make_unique<CSVReaderConfig>(readerConfig);
}
}

CopyDescription::CopyDescription(const CopyDescription& copyDescription)
: filePath{copyDescription.filePath}, readerConfig{nullptr}, fileType{
copyDescription.fileType} {
if (fileType == FileType::CSV) {
this->readerConfig = make_unique<CSVReaderConfig>(*copyDescription.readerConfig);
}
}

string CopyDescription::getFileTypeName(FileType fileType) {
switch (fileType) {
case FileType::CSV:
return "csv";

case FileType::ARROW:
return "arrow";

case FileType::PARQUET:
return "parquet";
}
}

string CopyDescription::getFileTypeSuffix(FileType fileType) {
return "." + getFileTypeName(fileType);
}

void CopyDescription::setFileType(string const& fileName) {
auto csvSuffix = getFileTypeSuffix(FileType::CSV);
auto arrowSuffix = getFileTypeSuffix(FileType::ARROW);
auto parquetSuffix = getFileTypeSuffix(FileType::PARQUET);

if (fileName.length() >= csvSuffix.length()) {
if (!fileName.compare(
fileName.length() - csvSuffix.length(), csvSuffix.length(), csvSuffix)) {
fileType = FileType::CSV;
return;
}
}

if (fileName.length() >= arrowSuffix.length()) {
if (!fileName.compare(
fileName.length() - arrowSuffix.length(), arrowSuffix.length(), arrowSuffix)) {
fileType = FileType::ARROW;
return;
}
}

if (fileName.length() >= parquetSuffix.length()) {
if (!fileName.compare(fileName.length() - parquetSuffix.length(), parquetSuffix.length(),
parquetSuffix)) {
fileType = FileType::PARQUET;
return;
}
}

throw CopyException("Unsupported file type: " + fileName);
}

} // namespace common
} // namespace kuzu
4 changes: 2 additions & 2 deletions src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ class Binder {
/*** bind copy csv ***/
unique_ptr<BoundStatement> bindCopy(const Statement& statement);

ReaderConfig bindParsingOptions(
CSVReaderConfig bindParsingOptions(
const unordered_map<string, unique_ptr<ParsedExpression>>* parsingOptions);
void bindStringParsingOptions(
ReaderConfig& readerConfig, const string& optionName, string& optionValue);
CSVReaderConfig& readerConfig, const string& optionName, string& optionValue);
char bindParsingOptionValue(string value);

/*** bind query ***/
Expand Down
14 changes: 7 additions & 7 deletions src/include/common/configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ struct HashIndexConfig {

struct CopyConfig {
// Size (in bytes) of the chunks to be read in Node/Rel Copier
static constexpr uint64_t READING_BLOCK_SIZE = 1 << 23;
static constexpr uint64_t CSV_READING_BLOCK_SIZE = 1 << 23;

// Number of tasks to be assigned in a batch when reading files.
static constexpr uint64_t NUM_COPIER_TASKS_TO_SCHEDULE_PER_BATCH = 200;
Expand All @@ -96,12 +96,12 @@ struct CopyConfig {
// Default configuration for csv file parsing
static constexpr const char* STRING_CSV_PARSING_OPTIONS[5] = {
"ESCAPE", "DELIM", "QUOTE", "LIST_BEGIN", "LIST_END"};
static constexpr char DEFAULT_ESCAPE_CHAR = '\\';
static constexpr char DEFAULT_TOKEN_SEPARATOR = ',';
static constexpr char DEFAULT_QUOTE_CHAR = '"';
static constexpr char DEFAULT_LIST_BEGIN_CHAR = '[';
static constexpr char DEFAULT_LIST_END_CHAR = ']';
static constexpr bool DEFAULT_HAS_HEADER = false;
static constexpr char DEFAULT_CSV_ESCAPE_CHAR = '\\';
static constexpr char DEFAULT_CSV_DELIMITER = ',';
static constexpr char DEFAULT_CSV_QUOTE_CHAR = '"';
static constexpr char DEFAULT_CSV_LIST_BEGIN_CHAR = '[';
static constexpr char DEFAULT_CSV_LIST_END_CHAR = ']';
static constexpr bool DEFAULT_CSV_HAS_HEADER = false;
};

struct EnumeratorKnobs {
Expand Down
90 changes: 23 additions & 67 deletions src/include/common/csv_reader/csv_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,83 +13,38 @@ class logger;
namespace kuzu {
namespace common {

struct ReaderConfig {
ReaderConfig()
: escapeChar{CopyConfig::DEFAULT_ESCAPE_CHAR},
tokenSeparator{CopyConfig::DEFAULT_TOKEN_SEPARATOR},
quoteChar{CopyConfig::DEFAULT_QUOTE_CHAR},
listBeginChar{CopyConfig::DEFAULT_LIST_BEGIN_CHAR},
listEndChar{CopyConfig::DEFAULT_LIST_END_CHAR}, hasHeader{
CopyConfig::DEFAULT_HAS_HEADER} {}
struct CSVReaderConfig {
CSVReaderConfig()
: escapeChar{CopyConfig::DEFAULT_CSV_ESCAPE_CHAR},
delimiter{CopyConfig::DEFAULT_CSV_DELIMITER},
quoteChar{CopyConfig::DEFAULT_CSV_QUOTE_CHAR},
listBeginChar{CopyConfig::DEFAULT_CSV_LIST_BEGIN_CHAR},
listEndChar{CopyConfig::DEFAULT_CSV_LIST_END_CHAR},
hasHeader{CopyConfig::DEFAULT_CSV_HAS_HEADER} {}

char escapeChar;
char tokenSeparator;
char delimiter;
char quoteChar;
char listBeginChar;
char listEndChar;
bool hasHeader;
};

struct CopyDescription {
CopyDescription(const string& filePath, const ReaderConfig readerConfig)
: filePath{filePath}, readerConfig{nullptr}, fileType{FileType::CSV} {
setFileType(filePath);
if (fileType == FileType::CSV) {
this->readerConfig = make_shared<ReaderConfig>(readerConfig);
}
}
CopyDescription(const string& filePath, CSVReaderConfig readerConfig);

CopyDescription(const CopyDescription& copyDescription);

enum class FileType { CSV, ARROW, PARQUET };

static string getFileTypeName(FileType fileType) {
switch (fileType) {
case FileType::CSV:
return "csv";

case FileType::ARROW:
return "arrow";

case FileType::PARQUET:
return "parquet";
}
}

static string getFileTypeSuffix(FileType fileType) { return "." + getFileTypeName(fileType); }

void setFileType(string const& fileName) {
auto csvSuffix = getFileTypeSuffix(FileType::CSV);
auto arrowSuffix = getFileTypeSuffix(FileType::ARROW);
auto parquetSuffix = getFileTypeSuffix(FileType::PARQUET);

if (fileName.length() >= csvSuffix.length()) {
if (!fileName.compare(
fileName.length() - csvSuffix.length(), csvSuffix.length(), csvSuffix)) {
fileType = FileType::CSV;
return;
}
}

if (fileName.length() >= arrowSuffix.length()) {
if (!fileName.compare(
fileName.length() - arrowSuffix.length(), arrowSuffix.length(), arrowSuffix)) {
fileType = FileType::ARROW;
return;
}
}

if (fileName.length() >= parquetSuffix.length()) {
if (!fileName.compare(fileName.length() - parquetSuffix.length(),
parquetSuffix.length(), parquetSuffix)) {
fileType = FileType::PARQUET;
return;
}
}

throw CopyException("Unsupported file type: " + fileName);
}
static string getFileTypeName(FileType fileType);

static string getFileTypeSuffix(FileType fileType);

void setFileType(string const& fileName);

const string filePath;
shared_ptr<ReaderConfig> readerConfig;
unique_ptr<CSVReaderConfig> readerConfig;
FileType fileType;
};

Expand All @@ -100,11 +55,12 @@ class CSVReader {

public:
// Initializes to read a block in file.
CSVReader(const string& fname, const ReaderConfig& readerConfig, uint64_t blockId);
CSVReader(const string& fname, const CSVReaderConfig& readerConfig, uint64_t blockId);
// Initializes to read the complete file.
CSVReader(const string& fname, const ReaderConfig& readerConfig);
CSVReader(const string& fname, const CSVReaderConfig& readerConfig);
// Initializes to read a part of a line.
CSVReader(char* line, uint64_t lineLen, int64_t linePtrStart, const ReaderConfig& readerConfig);
CSVReader(
char* line, uint64_t lineLen, int64_t linePtrStart, const CSVReaderConfig& readerConfig);

~CSVReader();

Expand Down Expand Up @@ -138,7 +94,7 @@ class CSVReader {

private:
FILE* fd;
const ReaderConfig& config;
const CSVReaderConfig& config;
shared_ptr<spdlog::logger> logger;
bool nextLineIsNotProcessed, isEndOfBlock, nextTokenIsNotProcessed;
char* line;
Expand Down
2 changes: 0 additions & 2 deletions src/storage/copy_arrow/copy_rel_arrow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -590,10 +590,8 @@ void CopyRelArrow::initListsMetadata() {
void CopyRelArrow::populateLists() {
logger->debug(
"Populating adjLists and rel property lists for rel {}.", relTableSchema->tableName);

auto status = executePopulateTask(PopulateTaskType::populateListsTask);
throwCopyExceptionIfNotOK(status);

logger->debug(
"Done populating adjLists and rel property lists for rel {}.", relTableSchema->tableName);
}
Expand Down
6 changes: 3 additions & 3 deletions src/storage/copy_arrow/copy_structures_arrow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ arrow::Status CopyStructuresArrow::initCSVReader(
shared_ptr<arrow::io::InputStream> arrow_input_stream;
ARROW_ASSIGN_OR_RAISE(arrow_input_stream, arrow::io::ReadableFile::Open(filePath));
auto arrowRead = arrow::csv::ReadOptions::Defaults();
arrowRead.block_size = CopyConfig::READING_BLOCK_SIZE;
arrowRead.block_size = CopyConfig::CSV_READING_BLOCK_SIZE;

if (!copyDescription.readerConfig->hasHeader) {
arrowRead.autogenerate_column_names = true;
Expand All @@ -187,7 +187,7 @@ arrow::Status CopyStructuresArrow::initCSVReader(
arrowConvert.quoted_strings_can_be_null = false;

auto arrowParse = arrow::csv::ParseOptions::Defaults();
arrowParse.delimiter = copyDescription.readerConfig->tokenSeparator;
arrowParse.delimiter = copyDescription.readerConfig->delimiter;
arrowParse.escape_char = copyDescription.readerConfig->escapeChar;
arrowParse.quote_char = copyDescription.readerConfig->quoteChar;
arrowParse.escaping = true;
Expand Down Expand Up @@ -224,7 +224,7 @@ Literal CopyStructuresArrow::getArrowList(string& l, int64_t from, int64_t to,
auto childDataType = *dataType.childType;
Literal result(DataType(LIST, make_unique<DataType>(childDataType)));

char delimiter = copyDescription.readerConfig->tokenSeparator;
char delimiter = copyDescription.readerConfig->delimiter;
vector<pair<int64_t, int64_t>> split;
int bracket = 0;
int64_t last = from;
Expand Down
2 changes: 1 addition & 1 deletion test/copy/copy_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ TEST_F(CopyNodePropertyTest, NodeStructuredStringPropertyTest) {
auto column = reinterpret_cast<StringPropertyColumn*>(
graph->getNodesStore().getNodePropertyColumn(tableID, propertyIdx.propertyID));
string fName = TestHelper::appendKuzuRootPath("dataset/copy-node-property-test/vPerson.csv");
ReaderConfig config;
CSVReaderConfig config;
CSVReader csvReader(fName, config);
int lineIdx = 0;
uint64_t count = 0;
Expand Down

0 comments on commit ef12216

Please sign in to comment.