Skip to content

Commit

Permalink
Add LOAD FROM CSV
Browse files Browse the repository at this point in the history
i
  • Loading branch information
andyfengHKU committed Sep 19, 2023
1 parent 6943574 commit bcca15e
Show file tree
Hide file tree
Showing 31 changed files with 3,219 additions and 2,887 deletions.
6 changes: 6 additions & 0 deletions src/antlr4/Cypher.g4
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,14 @@ oC_ReadingClause
: oC_Match
| oC_Unwind
| kU_InQueryCall
| kU_LoadFrom
;

kU_LoadFrom
: LOAD SP FROM SP kU_FilePaths ( SP? '(' SP? kU_ParsingOptions SP? ')' )? ;

LOAD : ( 'L' | 'l' ) ( 'O' | 'o' ) ( 'A' | 'a' ) ( 'D' | 'd' ) ;

kU_InQueryCall
: CALL SP oC_FunctionName SP? '(' oC_Literal* ')' ;

Expand Down
13 changes: 7 additions & 6 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ std::unique_ptr<BoundStatement> Binder::bindCopyNodeFrom(
auto containsSerial = bindContainsSerial(tableSchema);
auto columns = bindExpectedNodeFileColumns(tableSchema, *readerConfig);
auto offset = createVariable(std::string(Property::OFFSET_NAME), LogicalTypeID::INT64);
auto boundFileScanInfo = std::make_unique<BoundFileScanInfo>(std::move(readerConfig),
std::move(columns), std::move(offset), TableType::NODE, containsSerial);
auto boundFileScanInfo = std::make_unique<BoundFileScanInfo>(
std::move(readerConfig), std::move(columns), std::move(offset), TableType::NODE);
auto boundCopyFromInfo = std::make_unique<BoundCopyFromInfo>(
tableSchema, std::move(boundFileScanInfo), containsSerial, nullptr);
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
Expand All @@ -124,12 +124,13 @@ std::unique_ptr<BoundStatement> Binder::bindCopyRelFrom(
std::unique_ptr<ReaderConfig> readerConfig, TableSchema* tableSchema) {
// For table with SERIAL columns, we need to read in serial from files.
auto containsSerial = bindContainsSerial(tableSchema);
assert(containsSerial == false);
auto columns = bindExpectedRelFileColumns(tableSchema, *readerConfig);
auto srcKey = columns[0];
auto dstKey = columns[1];
auto offset = createVariable(std::string(Property::OFFSET_NAME), LogicalTypeID::INT64);
auto boundFileScanInfo = std::make_unique<BoundFileScanInfo>(std::move(readerConfig),
std::move(columns), std::move(offset), TableType::REL, containsSerial);
auto boundFileScanInfo = std::make_unique<BoundFileScanInfo>(
std::move(readerConfig), std::move(columns), std::move(offset), TableType::REL);
auto relTableSchema = reinterpret_cast<RelTableSchema*>(tableSchema);
auto srcTableSchema =
catalog.getReadOnlyVersion()->getTableSchema(relTableSchema->getSrcTableID());
Expand Down Expand Up @@ -160,8 +161,8 @@ std::unique_ptr<BoundStatement> Binder::bindCopyRdfRelFrom(
auto objectKey = columns[2];
auto offset = createVariable(std::string(Property::OFFSET_NAME), LogicalTypeID::INT64);
auto containsSerial = false;
auto boundFileScanInfo = std::make_unique<BoundFileScanInfo>(std::move(readerConfig),
std::move(columns), std::move(offset), TableType::REL, containsSerial);
auto boundFileScanInfo = std::make_unique<BoundFileScanInfo>(
std::move(readerConfig), std::move(columns), std::move(offset), TableType::REL);
auto relTableSchema = reinterpret_cast<RelTableSchema*>(tableSchema);
assert(relTableSchema->getSrcTableID() == relTableSchema->getDstTableID());
auto nodeTableID = relTableSchema->getSrcTableID();
Expand Down
36 changes: 36 additions & 0 deletions src/binder/bind/bind_reading_clause.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#include "binder/binder.h"
#include "binder/expression/literal_expression.h"
#include "binder/query/reading_clause/bound_in_query_call.h"
#include "binder/query/reading_clause/bound_load_from.h"
#include "binder/query/reading_clause/bound_match_clause.h"
#include "binder/query/reading_clause/bound_unwind_clause.h"
#include "common/exception/binder.h"
#include "parser/query/reading_clause/in_query_call_clause.h"
#include "parser/query/reading_clause/load_from.h"
#include "parser/query/reading_clause/unwind_clause.h"
#include "processor/operator/persistent/reader/csv_reader.h"

Expand All @@ -27,6 +29,9 @@ std::unique_ptr<BoundReadingClause> Binder::bindReadingClause(const ReadingClaus
case ClauseType::IN_QUERY_CALL: {
return bindInQueryCall(readingClause);
}
case ClauseType::LOAD_FROM: {
return bindLoadFrom(readingClause);
}
default:
throw NotImplementedException("bindReadingClause().");
}
Expand Down Expand Up @@ -107,5 +112,36 @@ std::unique_ptr<BoundReadingClause> Binder::bindInQueryCall(const ReadingClause&
tableFunctionDefinition->tableFunc, std::move(bindData), std::move(outputExpressions));
}

std::unique_ptr<BoundReadingClause> Binder::bindLoadFrom(
const parser::ReadingClause& readingClause) {
auto& loadFrom = reinterpret_cast<const LoadFrom&>(readingClause);
auto csvReaderConfig = bindParsingOptions(loadFrom.getParsingOptionsRef());
auto filePaths = bindFilePaths(loadFrom.getFilePaths());
auto fileType = bindFileType(filePaths);
auto readerConfig =
std::make_unique<ReaderConfig>(fileType, std::move(filePaths), std::move(csvReaderConfig));
if (readerConfig->fileType != FileType::CSV) {
throw BinderException("Load from non-csv file is not supported.");

Check warning on line 124 in src/binder/bind/bind_reading_clause.cpp

View check run for this annotation

Codecov / codecov/patch

src/binder/bind/bind_reading_clause.cpp#L124

Added line #L124 was not covered by tests
}
if (readerConfig->getNumFiles() > 1) {
throw BinderException("Load from multiple files is not supported.");

Check warning on line 127 in src/binder/bind/bind_reading_clause.cpp

View check run for this annotation

Codecov / codecov/patch

src/binder/bind/bind_reading_clause.cpp#L127

Added line #L127 was not covered by tests
}
auto csvReader = BufferedCSVReader(
readerConfig->filePaths[0], *readerConfig->csvReaderConfig, 0 /*expectedNumColumns*/);
csvReader.SniffCSV();
auto numColumns = csvReader.getNumColumnsDetected();
expression_vector columns;
auto stringType = LogicalType(LogicalTypeID::STRING);
for (auto i = 0; i < numColumns; ++i) {
auto columnName = "column" + std::to_string(i);
readerConfig->columnNames.push_back(columnName);
readerConfig->columnTypes.push_back(stringType.copy());
columns.push_back(createVariable(columnName, stringType));
}
auto info = std::make_unique<BoundFileScanInfo>(
std::move(readerConfig), std::move(columns), nullptr, TableType::UNKNOWN);
return std::make_unique<BoundLoadFrom>(std::move(info));
}

} // namespace binder
} // namespace kuzu
3 changes: 3 additions & 0 deletions src/binder/bound_statement_visitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ void BoundStatementVisitor::visitReadingClause(const BoundReadingClause& reading
case ClauseType::IN_QUERY_CALL: {
visitInQueryCall(readingClause);
} break;
case ClauseType::LOAD_FROM: {
visitLoadFrom(readingClause);
} break;
default:
throw NotImplementedException("BoundStatementVisitor::visitReadingClause");
}
Expand Down
2 changes: 1 addition & 1 deletion src/include/binder/binder.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include "binder/query/bound_regular_query.h"
#include "common/copier_config/copier_config.h"
#include "expression_binder.h"
#include "parser/copy.h"
#include "parser/query/regular_query.h"
Expand Down Expand Up @@ -154,6 +153,7 @@ class Binder {
std::unique_ptr<BoundReadingClause> bindUnwindClause(
const parser::ReadingClause& readingClause);
std::unique_ptr<BoundReadingClause> bindInQueryCall(const parser::ReadingClause& readingClause);
std::unique_ptr<BoundReadingClause> bindLoadFrom(const parser::ReadingClause& readingClause);

/*** bind updating clause ***/
// TODO(Guodong/Xiyang): Is update clause an accurate name? How about (data)modificationClause?
Expand Down
1 change: 1 addition & 0 deletions src/include/binder/bound_statement_visitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class BoundStatementVisitor {
virtual void visitMatch(const BoundReadingClause& readingClause) {}
virtual void visitUnwind(const BoundReadingClause& readingClause) {}
virtual void visitInQueryCall(const BoundReadingClause& statement) {}
virtual void visitLoadFrom(const BoundReadingClause& statement) {}
void visitUpdatingClause(const BoundUpdatingClause& updatingClause);
virtual void visitSet(const BoundUpdatingClause& updatingClause) {}
virtual void visitDelete(const BoundUpdatingClause& updatingClause) {}
Expand Down
7 changes: 3 additions & 4 deletions src/include/binder/copy/bound_file_scan_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,15 @@ struct BoundFileScanInfo {

// TODO: remove the following field
common::TableType tableType;
bool containsSerial;

BoundFileScanInfo(std::unique_ptr<common::ReaderConfig> readerConfig,
binder::expression_vector columns, std::shared_ptr<Expression> offset,
common::TableType tableType, bool containsSerial)
common::TableType tableType)
: readerConfig{std::move(readerConfig)}, columns{std::move(columns)},
offset{std::move(offset)}, tableType{tableType}, containsSerial{containsSerial} {}
offset{std::move(offset)}, tableType{tableType} {}
BoundFileScanInfo(const BoundFileScanInfo& other)
: readerConfig{other.readerConfig->copy()}, columns{other.columns}, offset{other.offset},
tableType{other.tableType}, containsSerial{other.containsSerial} {}
tableType{other.tableType} {}

inline std::unique_ptr<BoundFileScanInfo> copy() const {
return std::make_unique<BoundFileScanInfo>(*this);
Expand Down
25 changes: 25 additions & 0 deletions src/include/binder/query/reading_clause/bound_load_from.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include "binder/copy/bound_file_scan_info.h"
#include "bound_reading_clause.h"

namespace kuzu {
namespace binder {

class BoundLoadFrom : public BoundReadingClause {
public:
BoundLoadFrom(std::unique_ptr<BoundFileScanInfo> info)
: BoundReadingClause{common::ClauseType::LOAD_FROM}, info{std::move(info)} {}

inline BoundFileScanInfo* getInfo() const { return info.get(); }

inline std::unique_ptr<BoundReadingClause> copy() override {
return std::make_unique<BoundLoadFrom>(info->copy());

Check warning on line 17 in src/include/binder/query/reading_clause/bound_load_from.h

View check run for this annotation

Codecov / codecov/patch

src/include/binder/query/reading_clause/bound_load_from.h#L16-L17

Added lines #L16 - L17 were not covered by tests
}

private:
std::unique_ptr<BoundFileScanInfo> info;
};

} // namespace binder
} // namespace kuzu
1 change: 1 addition & 0 deletions src/include/common/clause_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ enum class ClauseType : uint8_t {
MATCH = 10,
UNWIND = 11,
IN_QUERY_CALL = 12,
LOAD_FROM = 13,
};

enum class MatchClauseType : uint8_t {
Expand Down
9 changes: 5 additions & 4 deletions src/include/common/table_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ namespace kuzu {
namespace common {

enum class TableType : uint8_t {
NODE = 0,
REL = 1,
RDF = 2,
REL_GROUP = 3,
UNKNOWN = 0,
NODE = 1,
REL = 2,
RDF = 3,
REL_GROUP = 4,
};

} // namespace common
Expand Down
24 changes: 24 additions & 0 deletions src/include/parser/query/reading_clause/load_from.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#pragma once

#include "parser/expression/parsed_expression.h"
#include "reading_clause.h"

namespace kuzu {
namespace parser {

class LoadFrom : public ReadingClause {
public:
LoadFrom(std::vector<std::string> filePaths, parsing_option_t parsingOptions)
: ReadingClause{common::ClauseType::LOAD_FROM}, filePaths{std::move(filePaths)},
parsingOptions{std::move(parsingOptions)} {}

inline std::vector<std::string> getFilePaths() const { return filePaths; }
inline const parsing_option_t& getParsingOptionsRef() const { return parsingOptions; }

private:
std::vector<std::string> filePaths;
parsing_option_t parsingOptions;
};

} // namespace parser
} // namespace kuzu
3 changes: 1 addition & 2 deletions src/include/parser/transformer.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,9 @@ class Transformer {
CypherParser::OC_ReadingClauseContext& ctx);

std::unique_ptr<ReadingClause> transformMatch(CypherParser::OC_MatchContext& ctx);

std::unique_ptr<ReadingClause> transformUnwind(CypherParser::OC_UnwindContext& ctx);

std::unique_ptr<ReadingClause> transformInQueryCall(CypherParser::KU_InQueryCallContext& ctx);
std::unique_ptr<ReadingClause> transformLoadFrom(CypherParser::KU_LoadFromContext& ctx);

std::unique_ptr<UpdatingClause> transformCreate(CypherParser::OC_CreateContext& ctx);

Expand Down
6 changes: 2 additions & 4 deletions src/include/planner/query_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@
#include "planner/operator/extend/extend_direction.h"

namespace kuzu {
namespace common {
struct CopyDescription;
}

namespace binder {
class BoundCreateInfo;
class BoundSetPropertyInfo;
Expand Down Expand Up @@ -64,6 +60,8 @@ class QueryPlanner {
std::vector<std::unique_ptr<LogicalPlan>>& plans);
void planInQueryCall(binder::BoundReadingClause* readingClause,
std::vector<std::unique_ptr<LogicalPlan>>& plans);
void planLoadFrom(binder::BoundReadingClause* readingClause,
std::vector<std::unique_ptr<LogicalPlan>>& plans);

// Plan updating
void planUpdatingClause(binder::BoundUpdatingClause& updatingClause,
Expand Down
9 changes: 9 additions & 0 deletions src/include/processor/operator/persistent/copy_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ struct CopyNodeInfo {
storage::RelsStore* relsStore;
catalog::Catalog* catalog;
storage::WAL* wal;
bool containsSerial;

CopyNodeInfo(std::vector<DataPos> dataColumnPoses, storage::NodeTable* table,
storage::RelsStore* relsStore, catalog::Catalog* catalog, storage::WAL* wal,
bool containsSerial)
: dataColumnPoses{std::move(dataColumnPoses)}, table{table}, relsStore{relsStore},
catalog{catalog}, wal{wal}, containsSerial{containsSerial} {}
};

class CopyNode : public Sink {
Expand All @@ -75,6 +82,8 @@ class CopyNode : public Sink {
sharedState->tableSchema, sharedState->csvReaderConfig.get());
}

inline bool canParallel() const final { return !copyNodeInfo.containsSerial; }

void initGlobalStateInternal(ExecutionContext* context) final;

void executeInternal(ExecutionContext* context) final;
Expand Down
10 changes: 10 additions & 0 deletions src/include/processor/operator/persistent/copy_rel.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ struct CopyRelInfo {
DataPos boundOffsetPos;
DataPos nbrOffsetPos;
storage::WAL* wal;
bool containsSerial;

CopyRelInfo(catalog::RelTableSchema* schema, std::vector<DataPos> dataPose,
const DataPos& offsetPos, const DataPos& boundOffsetPos, const DataPos& nbrOffsetPos,
storage::WAL* wal, bool containsSerial)
: schema{schema}, dataPoses{std::move(dataPose)}, offsetPos{offsetPos},
boundOffsetPos{boundOffsetPos}, nbrOffsetPos{nbrOffsetPos}, wal{wal},
containsSerial{containsSerial} {}
};

class CopyRel;
Expand Down Expand Up @@ -109,6 +117,8 @@ class CopyRel : public Sink {
children.push_back(std::move(right));
}

inline bool canParallel() const final { return !info.containsSerial; }

void initLocalStateInternal(ResultSet* resultSet_, ExecutionContext* context) final;
void initGlobalStateInternal(ExecutionContext* context) final;

Expand Down
14 changes: 6 additions & 8 deletions src/include/processor/operator/persistent/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,16 @@ namespace processor {
struct ReaderInfo {
DataPos nodeOffsetPos;
std::vector<DataPos> dataColumnsPos;
bool containsSerial;

common::TableType tableType;

ReaderInfo(const DataPos& nodeOffsetPos, std::vector<DataPos> dataColumnsPos,
bool containsSerial, common::TableType tableType)
: nodeOffsetPos{nodeOffsetPos}, dataColumnsPos{std::move(dataColumnsPos)},
containsSerial{containsSerial}, tableType{tableType} {}
common::TableType tableType)
: nodeOffsetPos{nodeOffsetPos}, dataColumnsPos{std::move(dataColumnsPos)}, tableType{
tableType} {}
ReaderInfo(const ReaderInfo& other)
: nodeOffsetPos{other.nodeOffsetPos}, dataColumnsPos{other.dataColumnsPos},
containsSerial{other.containsSerial}, tableType{other.tableType} {}
: nodeOffsetPos{other.nodeOffsetPos},
dataColumnsPos{other.dataColumnsPos}, tableType{other.tableType} {}

inline uint32_t getNumColumns() const { return dataColumnsPos.size(); }

Expand All @@ -36,8 +35,7 @@ class Reader : public PhysicalOperator {

inline bool isSource() const final { return true; }
inline bool canParallel() const final {
return !info->containsSerial &&
sharedState->readerConfig->fileType != common::FileType::TURTLE;
return sharedState->readerConfig->fileType != common::FileType::TURTLE;
}

void initGlobalStateInternal(ExecutionContext* context) final;
Expand Down
Loading

0 comments on commit bcca15e

Please sign in to comment.