Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load from csv #2052

Merged
merged 1 commit into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/antlr4/Cypher.g4
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,14 @@ oC_ReadingClause
: oC_Match
| oC_Unwind
| kU_InQueryCall
| kU_LoadFrom
;

kU_LoadFrom
: LOAD SP FROM SP kU_FilePaths ( SP? '(' SP? kU_ParsingOptions SP? ')' )? ;

LOAD : ( 'L' | 'l' ) ( 'O' | 'o' ) ( 'A' | 'a' ) ( 'D' | 'd' ) ;

kU_InQueryCall
: CALL SP oC_FunctionName SP? '(' oC_Literal* ')' ;

Expand Down
13 changes: 7 additions & 6 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ std::unique_ptr<BoundStatement> Binder::bindCopyNodeFrom(
auto containsSerial = bindContainsSerial(tableSchema);
auto columns = bindExpectedNodeFileColumns(tableSchema, *readerConfig);
auto offset = createVariable(std::string(Property::OFFSET_NAME), LogicalTypeID::INT64);
auto boundFileScanInfo = std::make_unique<BoundFileScanInfo>(std::move(readerConfig),
std::move(columns), std::move(offset), TableType::NODE, containsSerial);
auto boundFileScanInfo = std::make_unique<BoundFileScanInfo>(
std::move(readerConfig), std::move(columns), std::move(offset), TableType::NODE);
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(
tableSchema, std::move(boundFileScanInfo), containsSerial, nullptr);
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
Expand All @@ -125,12 +125,13 @@ std::unique_ptr<BoundStatement> Binder::bindCopyRelFrom(
std::unique_ptr<ReaderConfig> readerConfig, TableSchema* tableSchema) {
// For table with SERIAL columns, we need to read in serial from files.
auto containsSerial = bindContainsSerial(tableSchema);
assert(containsSerial == false);
auto columns = bindExpectedRelFileColumns(tableSchema, *readerConfig);
auto srcKey = columns[0];
auto dstKey = columns[1];
auto offset = createVariable(std::string(Property::OFFSET_NAME), LogicalTypeID::INT64);
auto boundFileScanInfo = std::make_unique<BoundFileScanInfo>(std::move(readerConfig),
std::move(columns), std::move(offset), TableType::REL, containsSerial);
auto boundFileScanInfo = std::make_unique<BoundFileScanInfo>(
std::move(readerConfig), std::move(columns), std::move(offset), TableType::REL);
auto relTableSchema = reinterpret_cast<RelTableSchema*>(tableSchema);
auto srcTableSchema =
catalog.getReadOnlyVersion()->getTableSchema(relTableSchema->getSrcTableID());
Expand Down Expand Up @@ -161,8 +162,8 @@ std::unique_ptr<BoundStatement> Binder::bindCopyRdfRelFrom(
auto objectKey = columns[2];
auto offset = createVariable(std::string(Property::OFFSET_NAME), LogicalTypeID::INT64);
auto containsSerial = false;
auto boundFileScanInfo = std::make_unique<BoundFileScanInfo>(std::move(readerConfig),
std::move(columns), std::move(offset), TableType::REL, containsSerial);
auto boundFileScanInfo = std::make_unique<BoundFileScanInfo>(
std::move(readerConfig), std::move(columns), std::move(offset), TableType::REL);
auto relTableSchema = reinterpret_cast<RelTableSchema*>(tableSchema);
assert(relTableSchema->getSrcTableID() == relTableSchema->getDstTableID());
auto nodeTableID = relTableSchema->getSrcTableID();
Expand Down
36 changes: 36 additions & 0 deletions src/binder/bind/bind_reading_clause.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#include "binder/binder.h"
#include "binder/expression/literal_expression.h"
#include "binder/query/reading_clause/bound_in_query_call.h"
#include "binder/query/reading_clause/bound_load_from.h"
#include "binder/query/reading_clause/bound_match_clause.h"
#include "binder/query/reading_clause/bound_unwind_clause.h"
#include "common/exception/binder.h"
#include "parser/query/reading_clause/in_query_call_clause.h"
#include "parser/query/reading_clause/load_from.h"
#include "parser/query/reading_clause/unwind_clause.h"
#include "processor/operator/persistent/reader/csv_reader.h"

Expand All @@ -27,6 +29,9 @@
case ClauseType::IN_QUERY_CALL: {
return bindInQueryCall(readingClause);
}
case ClauseType::LOAD_FROM: {
return bindLoadFrom(readingClause);
}
default:
throw NotImplementedException("bindReadingClause().");
}
Expand Down Expand Up @@ -107,5 +112,36 @@
tableFunctionDefinition->tableFunc, std::move(bindData), std::move(outputExpressions));
}

std::unique_ptr<BoundReadingClause> Binder::bindLoadFrom(
const parser::ReadingClause& readingClause) {
auto& loadFrom = reinterpret_cast<const LoadFrom&>(readingClause);
auto csvReaderConfig = bindParsingOptions(loadFrom.getParsingOptionsRef());
auto filePaths = bindFilePaths(loadFrom.getFilePaths());
auto fileType = bindFileType(filePaths);
auto readerConfig =
std::make_unique<ReaderConfig>(fileType, std::move(filePaths), std::move(csvReaderConfig));
if (readerConfig->fileType != FileType::CSV) {
throw BinderException("Load from non-csv file is not supported.");

Check warning on line 124 in src/binder/bind/bind_reading_clause.cpp

View check run for this annotation

Codecov / codecov/patch

src/binder/bind/bind_reading_clause.cpp#L124

Added line #L124 was not covered by tests
}
if (readerConfig->getNumFiles() > 1) {
throw BinderException("Load from multiple files is not supported.");

Check warning on line 127 in src/binder/bind/bind_reading_clause.cpp

View check run for this annotation

Codecov / codecov/patch

src/binder/bind/bind_reading_clause.cpp#L127

Added line #L127 was not covered by tests
}
auto csvReader = BufferedCSVReader(
readerConfig->filePaths[0], *readerConfig->csvReaderConfig, 0 /*expectedNumColumns*/);
csvReader.SniffCSV();
auto numColumns = csvReader.getNumColumnsDetected();
expression_vector columns;
auto stringType = LogicalType(LogicalTypeID::STRING);
for (auto i = 0; i < numColumns; ++i) {
auto columnName = "column" + std::to_string(i);
readerConfig->columnNames.push_back(columnName);
readerConfig->columnTypes.push_back(stringType.copy());
columns.push_back(createVariable(columnName, stringType));
}
auto info = std::make_unique<BoundFileScanInfo>(
andyfengHKU marked this conversation as resolved.
Show resolved Hide resolved
std::move(readerConfig), std::move(columns), nullptr, TableType::UNKNOWN);
return std::make_unique<BoundLoadFrom>(std::move(info));
}

} // namespace binder
} // namespace kuzu
3 changes: 3 additions & 0 deletions src/binder/bound_statement_visitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ void BoundStatementVisitor::visitReadingClause(const BoundReadingClause& reading
case ClauseType::IN_QUERY_CALL: {
visitInQueryCall(readingClause);
} break;
case ClauseType::LOAD_FROM: {
visitLoadFrom(readingClause);
} break;
default:
throw NotImplementedException("BoundStatementVisitor::visitReadingClause");
}
Expand Down
2 changes: 1 addition & 1 deletion src/include/binder/binder.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include "binder/query/bound_regular_query.h"
#include "common/copier_config/copier_config.h"
#include "expression_binder.h"
#include "parser/copy.h"
#include "parser/query/regular_query.h"
Expand Down Expand Up @@ -154,6 +153,7 @@ class Binder {
std::unique_ptr<BoundReadingClause> bindUnwindClause(
const parser::ReadingClause& readingClause);
std::unique_ptr<BoundReadingClause> bindInQueryCall(const parser::ReadingClause& readingClause);
std::unique_ptr<BoundReadingClause> bindLoadFrom(const parser::ReadingClause& readingClause);

/*** bind updating clause ***/
// TODO(Guodong/Xiyang): Is update clause an accurate name? How about (data)modificationClause?
Expand Down
1 change: 1 addition & 0 deletions src/include/binder/bound_statement_visitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class BoundStatementVisitor {
virtual void visitMatch(const BoundReadingClause& readingClause) {}
virtual void visitUnwind(const BoundReadingClause& readingClause) {}
virtual void visitInQueryCall(const BoundReadingClause& statement) {}
virtual void visitLoadFrom(const BoundReadingClause& statement) {}
void visitUpdatingClause(const BoundUpdatingClause& updatingClause);
virtual void visitSet(const BoundUpdatingClause& updatingClause) {}
virtual void visitDelete(const BoundUpdatingClause& updatingClause) {}
Expand Down
7 changes: 3 additions & 4 deletions src/include/binder/copy/bound_file_scan_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@ struct BoundFileScanInfo {

// TODO: remove the following field
common::TableType tableType;
bool containsSerial;

BoundFileScanInfo(std::unique_ptr<common::ReaderConfig> readerConfig,
binder::expression_vector columns, std::shared_ptr<Expression> offset,
common::TableType tableType, bool containsSerial)
common::TableType tableType)
: readerConfig{std::move(readerConfig)}, columns{std::move(columns)},
offset{std::move(offset)}, tableType{tableType}, containsSerial{containsSerial} {}
offset{std::move(offset)}, tableType{tableType} {}
BoundFileScanInfo(const BoundFileScanInfo& other)
: readerConfig{other.readerConfig->copy()}, columns{other.columns}, offset{other.offset},
tableType{other.tableType}, containsSerial{other.containsSerial} {}
tableType{other.tableType} {}

inline std::unique_ptr<BoundFileScanInfo> copy() const {
return std::make_unique<BoundFileScanInfo>(*this);
Expand Down
25 changes: 25 additions & 0 deletions src/include/binder/query/reading_clause/bound_load_from.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include "binder/copy/bound_file_scan_info.h"
#include "bound_reading_clause.h"

namespace kuzu {
namespace binder {

class BoundLoadFrom : public BoundReadingClause {
public:
BoundLoadFrom(std::unique_ptr<BoundFileScanInfo> info)
: BoundReadingClause{common::ClauseType::LOAD_FROM}, info{std::move(info)} {}

inline BoundFileScanInfo* getInfo() const { return info.get(); }

inline std::unique_ptr<BoundReadingClause> copy() override {
return std::make_unique<BoundLoadFrom>(info->copy());

Check warning on line 17 in src/include/binder/query/reading_clause/bound_load_from.h

View check run for this annotation

Codecov / codecov/patch

src/include/binder/query/reading_clause/bound_load_from.h#L16-L17

Added lines #L16 - L17 were not covered by tests
}

private:
std::unique_ptr<BoundFileScanInfo> info;
};

} // namespace binder
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/common/clause_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ enum class ClauseType : uint8_t {
MATCH = 10,
UNWIND = 11,
IN_QUERY_CALL = 12,
LOAD_FROM = 13,
};

enum class MatchClauseType : uint8_t {
Expand Down
9 changes: 5 additions & 4 deletions src/include/common/table_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ namespace kuzu {
namespace common {

enum class TableType : uint8_t {
NODE = 0,
REL = 1,
RDF = 2,
REL_GROUP = 3,
UNKNOWN = 0,
NODE = 1,
REL = 2,
RDF = 3,
REL_GROUP = 4,
};

} // namespace common
Expand Down
24 changes: 24 additions & 0 deletions src/include/parser/query/reading_clause/load_from.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#pragma once

#include "parser/expression/parsed_expression.h"
#include "reading_clause.h"

namespace kuzu {
namespace parser {

class LoadFrom : public ReadingClause {
public:
LoadFrom(std::vector<std::string> filePaths, parsing_option_t parsingOptions)
: ReadingClause{common::ClauseType::LOAD_FROM}, filePaths{std::move(filePaths)},
parsingOptions{std::move(parsingOptions)} {}

inline std::vector<std::string> getFilePaths() const { return filePaths; }
inline const parsing_option_t& getParsingOptionsRef() const { return parsingOptions; }
andyfengHKU marked this conversation as resolved.
Show resolved Hide resolved

private:
std::vector<std::string> filePaths;
parsing_option_t parsingOptions;
};

} // namespace parser
} // namespace kuzu
3 changes: 1 addition & 2 deletions src/include/parser/transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,9 @@ class Transformer {
CypherParser::OC_ReadingClauseContext& ctx);

std::unique_ptr<ReadingClause> transformMatch(CypherParser::OC_MatchContext& ctx);

std::unique_ptr<ReadingClause> transformUnwind(CypherParser::OC_UnwindContext& ctx);

std::unique_ptr<ReadingClause> transformInQueryCall(CypherParser::KU_InQueryCallContext& ctx);
std::unique_ptr<ReadingClause> transformLoadFrom(CypherParser::KU_LoadFromContext& ctx);

std::unique_ptr<UpdatingClause> transformCreate(CypherParser::OC_CreateContext& ctx);

Expand Down
6 changes: 2 additions & 4 deletions src/include/planner/query_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@
#include "planner/operator/extend/extend_direction.h"

namespace kuzu {
namespace common {
struct CopyDescription;
}

namespace binder {
class BoundCreateInfo;
class BoundSetPropertyInfo;
Expand Down Expand Up @@ -64,6 +60,8 @@ class QueryPlanner {
std::vector<std::unique_ptr<LogicalPlan>>& plans);
void planInQueryCall(binder::BoundReadingClause* readingClause,
std::vector<std::unique_ptr<LogicalPlan>>& plans);
void planLoadFrom(binder::BoundReadingClause* readingClause,
std::vector<std::unique_ptr<LogicalPlan>>& plans);

// Plan updating
void planUpdatingClause(binder::BoundUpdatingClause& updatingClause,
Expand Down
9 changes: 9 additions & 0 deletions src/include/processor/operator/persistent/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ struct CopyNodeInfo {
storage::RelsStore* relsStore;
catalog::Catalog* catalog;
storage::WAL* wal;
bool containsSerial;

CopyNodeInfo(std::vector<DataPos> dataColumnPoses, storage::NodeTable* table,
storage::RelsStore* relsStore, catalog::Catalog* catalog, storage::WAL* wal,
bool containsSerial)
: dataColumnPoses{std::move(dataColumnPoses)}, table{table}, relsStore{relsStore},
catalog{catalog}, wal{wal}, containsSerial{containsSerial} {}
};

class CopyNode : public Sink {
Expand All @@ -75,6 +82,8 @@ class CopyNode : public Sink {
sharedState->tableSchema, sharedState->csvReaderConfig.get());
}

inline bool canParallel() const final { return !copyNodeInfo.containsSerial; }

void initGlobalStateInternal(ExecutionContext* context) final;

void executeInternal(ExecutionContext* context) final;
Expand Down
10 changes: 10 additions & 0 deletions src/include/processor/operator/persistent/copy_rel.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ struct CopyRelInfo {
DataPos boundOffsetPos;
DataPos nbrOffsetPos;
storage::WAL* wal;
bool containsSerial;

CopyRelInfo(catalog::RelTableSchema* schema, std::vector<DataPos> dataPose,
const DataPos& offsetPos, const DataPos& boundOffsetPos, const DataPos& nbrOffsetPos,
storage::WAL* wal, bool containsSerial)
: schema{schema}, dataPoses{std::move(dataPose)}, offsetPos{offsetPos},
boundOffsetPos{boundOffsetPos}, nbrOffsetPos{nbrOffsetPos}, wal{wal},
containsSerial{containsSerial} {}
};

class CopyRel;
Expand Down Expand Up @@ -109,6 +117,8 @@ class CopyRel : public Sink {
children.push_back(std::move(right));
}

inline bool canParallel() const final { return !info.containsSerial; }

void initLocalStateInternal(ResultSet* resultSet_, ExecutionContext* context) final;
void initGlobalStateInternal(ExecutionContext* context) final;

Expand Down
14 changes: 6 additions & 8 deletions src/include/processor/operator/persistent/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,16 @@ namespace processor {
struct ReaderInfo {
DataPos nodeOffsetPos;
std::vector<DataPos> dataColumnsPos;
bool containsSerial;

common::TableType tableType;

ReaderInfo(const DataPos& nodeOffsetPos, std::vector<DataPos> dataColumnsPos,
bool containsSerial, common::TableType tableType)
: nodeOffsetPos{nodeOffsetPos}, dataColumnsPos{std::move(dataColumnsPos)},
containsSerial{containsSerial}, tableType{tableType} {}
common::TableType tableType)
: nodeOffsetPos{nodeOffsetPos}, dataColumnsPos{std::move(dataColumnsPos)}, tableType{
tableType} {}
ReaderInfo(const ReaderInfo& other)
: nodeOffsetPos{other.nodeOffsetPos}, dataColumnsPos{other.dataColumnsPos},
containsSerial{other.containsSerial}, tableType{other.tableType} {}
: nodeOffsetPos{other.nodeOffsetPos},
dataColumnsPos{other.dataColumnsPos}, tableType{other.tableType} {}

inline uint32_t getNumColumns() const { return dataColumnsPos.size(); }

Expand All @@ -36,8 +35,7 @@ class Reader : public PhysicalOperator {

inline bool isSource() const final { return true; }
inline bool canParallel() const final {
return !info->containsSerial &&
sharedState->readerConfig->fileType != common::FileType::TURTLE;
return sharedState->readerConfig->fileType != common::FileType::TURTLE;
}

void initGlobalStateInternal(ExecutionContext* context) final;
Expand Down
Loading