Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reader/csv: sniff column name and type #2116

Merged
merged 1 commit into from
Sep 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dataset/csv-edge-case-tests/large-header.csv

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions dataset/typed-headers/person.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
name,age:UINT64,height:INT64
Alice,12,144
Bob,30,510
15 changes: 8 additions & 7 deletions src/binder/bind/bind_reading_clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,15 @@ std::unique_ptr<BoundReadingClause> Binder::bindLoadFrom(
switch (fileType) {
case FileType::CSV: {
auto csvReader = SerialCSVReader(readerConfig->filePaths[0], *readerConfig);
csvReader.sniffCSV();
auto numColumns = csvReader.getNumColumnsDetected();
auto stringType = LogicalType(LogicalTypeID::STRING);
for (auto i = 0; i < numColumns; ++i) {
auto columnName = "column" + std::to_string(i);
auto sniffedColumns = csvReader.sniffCSV();
for (auto column : sniffedColumns) {
std::string columnName = std::move(column.first);
std::unique_ptr<LogicalType> columnType = std::make_unique<LogicalType>(column.second);
LogicalTypeID columnTypeID = columnType->getLogicalTypeID();

readerConfig->columnNames.push_back(columnName);
readerConfig->columnTypes.push_back(stringType.copy());
columns.push_back(createVariable(columnName, stringType));
readerConfig->columnTypes.push_back(std::move(columnType));
columns.push_back(createVariable(columnName, columnTypeID));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to create a variable for columnTypeID since it is only used once. Just inline the call

}
} break;
case FileType::PARQUET: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,28 @@
namespace kuzu {
namespace processor {

enum class ParserMode : uint8_t {
PARSING = 0,
PARSING_HEADER = 1,
SNIFFING_DIALECT = 2,
INVALID = 255
};

class BaseCSVReader {
friend class ParsingDriver;

public:
BaseCSVReader(const std::string& filePath, const common::ReaderConfig& readerConfig);

virtual ~BaseCSVReader();

uint64_t parseBlock(common::block_idx_t blockIdx, common::DataChunk& resultChunk);
virtual uint64_t parseBlock(common::block_idx_t blockIdx, common::DataChunk& resultChunk) = 0;

uint64_t countRows();

protected:
void addValue(common::DataChunk&, std::string_view, common::column_id_t columnIdx,
template<typename Driver>
void addValue(Driver&, uint64_t rowNum, common::column_id_t columnIdx, std::string_view strVal,
std::vector<uint64_t>& escapePositions);
void addRow(common::DataChunk&, common::column_id_t column);

template<typename Driver>
bool addRow(Driver&, uint64_t rowNum, common::column_id_t column_count);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a variable name for Driver&


//! Read BOM and header.
void handleFirstBlock();

//! If this finds a BOM, it advances `position`.
void readBOM();
Expand All @@ -49,7 +50,8 @@ class BaseCSVReader {
return position < bufferSize || readBuffer(start);
}

uint64_t parseCSV(common::DataChunk& resultChunk);
template<typename Driver>
uint64_t parseCSV(Driver&);

inline bool isNewLine(char c) { return c == '\n' || c == '\r'; }

Expand All @@ -58,19 +60,8 @@ class BaseCSVReader {
uint64_t getLineNumber();

protected:
//! Called when starting the parsing of a new block.
virtual void parseBlockHook() = 0;
virtual bool finishedBlockDetail() const = 0;
virtual void handleQuotedNewline() = 0;

private:
void copyStringToVector(common::ValueVector*, std::string_view);
//! Called after a row is finished to determine if we should keep processing.
inline bool finishedBlock() {
return mode != ParserMode::PARSING || rowToAdd >= common::DEFAULT_VECTOR_CAPACITY ||
finishedBlockDetail();
}

protected:
std::string filePath;
common::CSVReaderConfig csvReaderConfig;
Expand All @@ -86,10 +77,6 @@ class BaseCSVReader {
uint64_t position;

bool rowEmpty = false;

ParserMode mode;

uint64_t rowToAdd;
};

} // namespace processor
Expand Down
73 changes: 73 additions & 0 deletions src/include/processor/operator/persistent/reader/csv/driver.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#pragma once

#include <cstdint>

#include "common/copier_config/copier_config.h"
#include "common/data_chunk/data_chunk.h"

namespace kuzu {
namespace processor {

// TODO(Keenan): Split up this file.
class BaseCSVReader;
class ParsingDriver {
common::DataChunk& chunk;
bool rowEmpty;

public:
ParsingDriver(common::DataChunk& chunk);

bool done(uint64_t rowNum);
void addValue(uint64_t rowNum, common::column_id_t columnIdx, std::string_view value);
bool addRow(uint64_t rowNum, common::column_id_t columnCount);

private:
virtual bool doneEarly() = 0;
virtual BaseCSVReader* getReader() = 0;
};

class ParallelCSVReader;

class ParallelParsingDriver : public ParsingDriver {
ParallelCSVReader* reader;

public:
ParallelParsingDriver(common::DataChunk& chunk, ParallelCSVReader* reader);
bool doneEarly() override;

private:
BaseCSVReader* getReader() override;
};

class SerialCSVReader;

class SerialParsingDriver : public ParsingDriver {
SerialCSVReader* reader;

public:
SerialParsingDriver(common::DataChunk& chunk, SerialCSVReader* reader);

bool doneEarly() override;

private:
BaseCSVReader* getReader() override;
};

struct SniffCSVNameAndTypeDriver {
std::vector<std::pair<std::string, common::LogicalType>> columns;

bool done(uint64_t rowNum);
void addValue(uint64_t rowNum, common::column_id_t columnIdx, std::string_view value);
bool addRow(uint64_t rowNum, common::column_id_t columntCount);
};

struct SniffCSVColumnCountDriver {
uint64_t numColumns = 0;

bool done(uint64_t rowNum);
void addValue(uint64_t rowNum, common::column_id_t columnIdx, std::string_view value);
bool addRow(uint64_t rowNum, common::column_id_t columntCount);
};

} // namespace processor
} // namespace kuzu
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,21 @@ namespace processor {

//! ParallelCSVReader is a class that reads values from a stream in parallel.
class ParallelCSVReader final : public BaseCSVReader {
friend class ParallelParsingDriver;

public:
ParallelCSVReader(const std::string& filePath, const common::ReaderConfig& readerConfig);

bool hasMoreToRead() const;
uint64_t parseBlock(common::block_idx_t blockIdx, common::DataChunk& resultChunk) override;
uint64_t continueBlock(common::DataChunk& resultChunk);

protected:
void parseBlockHook() override;
void handleQuotedNewline() override;
bool finishedBlockDetail() const override;

private:
bool finishedBlock() const;
void seekToBlockStart();
};

} // namespace processor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@ class SerialCSVReader final : public BaseCSVReader {
SerialCSVReader(const std::string& filePath, const common::ReaderConfig& readerConfig);

//! Sniffs CSV dialect and determines skip rows, header row, column types and column names
void sniffCSV();
inline uint64_t getNumColumnsDetected() const { return numColumnsDetected; }
std::vector<std::pair<std::string, common::LogicalType>> sniffCSV();
uint64_t parseBlock(common::block_idx_t blockIdx, common::DataChunk& resultChunk) override;

protected:
void parseBlockHook() override {}
void handleQuotedNewline() override {}
bool finishedBlockDetail() const override;
};

} // namespace processor
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
add_library(kuzu_processor_operator_csv_reader
OBJECT
base_csv_reader.cpp
driver.cpp
parallel_csv_reader.cpp
serial_csv_reader.cpp)

Expand Down
Loading
Loading