Skip to content

Commit

Permalink
Refactor to separate CopyTo and CopyFrom
Browse files Browse the repository at this point in the history
  • Loading branch information
rfdavid committed Jul 12, 2023
1 parent 4e04c09 commit 7d9dc88
Show file tree
Hide file tree
Showing 16 changed files with 182 additions and 178 deletions.
20 changes: 5 additions & 15 deletions examples/cpp/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,17 @@
using namespace kuzu::main;

int main() {
auto database = std::make_unique<Database>("/tmp/puffer_fish" /* fill db path */);
auto database = std::make_unique<Database>("" /* fill db path */);
auto connection = std::make_unique<Connection>(database.get());


// Create schema.
connection->query("CREATE NODE TABLE Person(name STRING, age INT64, PRIMARY KEY(name));");
// Create nodes.
connection->query("CREATE (:Person {name: 'Alice', age: 25});");
connection->query("CREATE (:Person {name: 'Bob', age: 30});");

connection->query("MATCH (a:Person) RETURN a;");

auto s = connection->query("COPY Person TO '/tmp/out.csv';");
// std::cout << s->toString();

// Create schema.
//connection->query("CREATE NODE TABLE Person(name STRING, age INT64, PRIMARY KEY(name));");
// Create nodes.
//connection->query("CREATE (:Person {name: 'Alice', age: 25});");
//connection->query("CREATE (:Person {name: 'Bob', age: 30});");

// Execute a simple query.
//auto result = connection->query("MATCH (a:Person) RETURN a.name AS NAME, a.age AS AGE;");
auto result = connection->query("MATCH (a:Person) RETURN a.name AS NAME, a.age AS AGE;");
// Print query result.
//std::cout << result->toString();
std::cout << result->toString();
}
73 changes: 45 additions & 28 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,43 @@ using namespace kuzu::parser;
namespace kuzu {
namespace binder {

std::unique_ptr<BoundStatement> Binder::bindCopyClause(const Statement& statement) {
auto& copyCSV = (Copy&)statement;
std::unique_ptr<BoundStatement> Binder::bindCopyToClause(const Copy& copyStatement) {
auto boundFilePaths = copyStatement.getFilePaths();
auto actualFileType = bindFileType(boundFilePaths);
auto tableID = catalog.getReadOnlyVersion()->getTableID(copyStatement.getTableName());
auto queryGraphCollection = std::make_unique<QueryGraphCollection>();
std::unique_ptr<BoundStatementResult> statementResult;
expression_vector expressionsToCopy;

if (actualFileType == CopyDescription::FileType::NPY) {
throw BinderException("COPY TO npy files is not supported.");
}
std::shared_ptr<NodeExpression> node =
createQueryNode(InternalKeyword::ANONYMOUS, std::vector<common::table_id_t>{tableID});
auto queryGraph = std::make_unique<QueryGraph>();
queryGraph->addQueryNode(node);
queryGraphCollection->addAndMergeQueryGraphIfConnected(std::move(queryGraph));
statementResult = BoundStatementResult::createEmptyResult();
for (auto& property : node->getPropertyExpressions()) {
expressionsToCopy.push_back(property->copy());
}
return std::make_unique<BoundCopy>(std::move(queryGraphCollection),
std::move(expressionsToCopy),
CopyDescription(
CopyDescription::CopyDirection::TO, boundFilePaths, actualFileType, CSVReaderConfig{}),
std::move(statementResult));
}

std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Copy& copyStatement) {
auto catalogContent = catalog.getReadOnlyVersion();
auto tableName = copyCSV.getTableName();
auto copyDirection = copyCSV.getCopyDirection();
auto tableName = copyStatement.getTableName();
validateTableExist(catalog, tableName);
auto tableID = catalogContent->getTableID(tableName);
auto csvReaderConfig = bindParsingOptions(copyCSV.getParsingOptions());
auto boundFilePaths = (copyDirection == common::CopyDescription::CopyDirection::TO) ?
copyCSV.getFilePaths() :
bindFilePaths(copyCSV.getFilePaths());
auto csvReaderConfig = bindParsingOptions(copyStatement.getParsingOptions());
auto boundFilePaths = bindFilePaths(copyStatement.getFilePaths());
auto actualFileType = bindFileType(boundFilePaths);
auto expectedFileType = copyCSV.getFileType();
auto expectedFileType = copyStatement.getFileType();

if (expectedFileType == common::CopyDescription::FileType::UNKNOWN &&
actualFileType == common::CopyDescription::FileType::NPY) {
throw BinderException("Please use COPY FROM BY COLUMN statement for copying npy files.");
Expand All @@ -39,27 +63,20 @@ std::unique_ptr<BoundStatement> Binder::bindCopyClause(const Statement& statemen
tableSchema->tableName));
}
}
auto queryGraphCollection = std::make_unique<QueryGraphCollection>();
std::unique_ptr<BoundStatementResult> statementResult;
expression_vector expressionsToCopy;
if (copyDirection == common::CopyDescription::CopyDirection::TO) {
if (actualFileType == CopyDescription::FileType::NPY) {
throw BinderException("COPY TO npy files is not supported.");
}
std::shared_ptr<NodeExpression> node = createQueryNode(InternalKeyword::ANONYMOUS, std::vector<common::table_id_t>{tableID});
auto queryGraph = std::make_unique<QueryGraph>();
queryGraph->addQueryNode(node);
queryGraphCollection->addAndMergeQueryGraphIfConnected(std::move(queryGraph));
statementResult = BoundStatementResult::createEmptyResult();
for (auto& property : node->getPropertyExpressions()) {
expressionsToCopy.push_back(property->copy());
}
std::unique_ptr<BoundStatementResult> statementResult =
BoundStatementResult::createSingleStringColumnResult();
return std::make_unique<BoundCopy>(CopyDescription(CopyDescription::CopyDirection::FROM,
boundFilePaths, actualFileType, csvReaderConfig),
tableID, tableName, std::move(statementResult));
}

std::unique_ptr<BoundStatement> Binder::bindCopyClause(const Statement& statement) {
auto& copyStatement = (Copy&)statement;
if (copyStatement.getCopyDirection() == common::CopyDescription::CopyDirection::TO) {
return bindCopyToClause(copyStatement);
} else {
statementResult = BoundStatementResult::createSingleStringColumnResult();
return bindCopyFromClause(copyStatement);
}
return std::make_unique<BoundCopy>(std::move(queryGraphCollection), std::move(expressionsToCopy),
CopyDescription(copyDirection, boundFilePaths, csvReaderConfig, actualFileType), tableID,
tableName, std::move(statementResult));
}

std::vector<std::string> Binder::bindFilePaths(const std::vector<std::string>& filePaths) {
Expand Down
2 changes: 1 addition & 1 deletion src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ using namespace kuzu::utf8proc;
namespace kuzu {
namespace common {
CopyDescription::CopyDescription(CopyDirection copyDirection,
const std::vector<std::string>& filePaths, CSVReaderConfig csvReaderConfig, FileType fileType)
const std::vector<std::string>& filePaths, FileType fileType, CSVReaderConfig csvReaderConfig)
: copyDirection{copyDirection}, filePaths{filePaths}, csvReaderConfig{nullptr}, fileType{
fileType} {
if (fileType == FileType::CSV) {
Expand Down
3 changes: 3 additions & 0 deletions src/include/binder/binder.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "binder/query/bound_regular_query.h"
#include "common/copier_config/copier_config.h"
#include "expression_binder.h"
#include "parser/copy.h"
#include "parser/query/regular_query.h"
#include "query_normalizer.h"

Expand Down Expand Up @@ -92,6 +93,8 @@ class Binder {

/*** bind copy csv ***/
std::unique_ptr<BoundStatement> bindCopyClause(const parser::Statement& statement);
std::unique_ptr<BoundStatement> bindCopyFromClause(const parser::Copy& statement);
std::unique_ptr<BoundStatement> bindCopyToClause(const parser::Copy& statement);

std::vector<std::string> bindFilePaths(const std::vector<std::string>& filePaths);

Expand Down
36 changes: 24 additions & 12 deletions src/include/binder/copy/bound_copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,42 @@
namespace kuzu {
namespace binder {

struct BoundCopyInfo {
common::table_id_t tableID;
std::string tableName;
};

class BoundCopy : public BoundStatement {
public:
// COPY FROM
BoundCopy(common::CopyDescription copyDescription, common::table_id_t tableID,
std::string tableName, std::unique_ptr<BoundStatementResult> copyStatementResult)
: BoundStatement{common::StatementType::COPY, std::move(copyStatementResult)},
copyDescription{std::move(copyDescription)}, tableID{tableID}, tableName{std::move(
tableName)} {}

// COPY TO
BoundCopy(std::unique_ptr<QueryGraphCollection> queryGraphCollection,
expression_vector expressionsToCopy,
common::CopyDescription copyDescription,
common::table_id_t tableID,
std::string tableName,
std::unique_ptr<BoundStatementResult> copyStatementResult)
expression_vector expressionsToCopy, common::CopyDescription copyDescription,
std::unique_ptr<BoundStatementResult> copyStatementResult)
: BoundStatement{common::StatementType::COPY, std::move(copyStatementResult)},
expressionsToCopy{std::move(expressionsToCopy)},
queryGraphCollection{std::move(queryGraphCollection)},
copyDescription{std::move(copyDescription)}, tableID{tableID}, tableName{std::move(tableName)} {}
queryGraphCollection{std::move(queryGraphCollection)}, copyDescription{
std::move(copyDescription)} {}

// inline std::shared_ptr<NodeExpression> getNodeExpression() const { return nodeExpression; }
inline std::unique_ptr<QueryGraphCollection> getQueryGraphCollection() { return std::move(queryGraphCollection); }
inline std::unique_ptr<QueryGraphCollection> getQueryGraphCollection() {
return std::move(queryGraphCollection);
}

inline binder::expression_vector getExpressionsToCopy() const { return expressionsToCopy; }

inline common::CopyDescription getCopyDescription() const { return copyDescription; }

inline common::table_id_t getTableID() const { return tableID; }

inline std::unique_ptr<BoundStatementResult> getCopyStatementResult() { return std::move(copyStatementResult); }
inline std::unique_ptr<BoundStatementResult> getCopyStatementResult() {
return std::move(copyStatementResult);
}

inline std::string getTableName() const { return tableName; }

Expand All @@ -44,8 +57,7 @@ class BoundCopy : public BoundStatement {
std::string tableName;
std::unique_ptr<BoundStatementResult> copyStatementResult;

// CopyTo only
// std::shared_ptr<NodeExpression> nodeExpression;
// CopyTo only
std::unique_ptr<QueryGraphCollection> queryGraphCollection;
expression_vector expressionsToCopy;
};
Expand Down
37 changes: 36 additions & 1 deletion src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,48 @@ struct CSVReaderConfig {
bool hasHeader;
};

/*
struct CopyDescriptionFrom {
enum class FileType : uint8_t { UNKNOWN = 0, CSV = 1, PARQUET = 2, NPY = 3 };
CopyDescription(const std::vector<std::string>& filePaths, FileType fileType,
CSVReaderConfig csvReaderConfig);
CopyDescription(const CopyDescription& copyDescription);
inline static std::string getFileTypeSuffix(FileType fileType) {
return "." + getFileTypeName(fileType);
}
static std::string getFileTypeName(FileType fileType);
const std::vector<std::string> filePaths;
std::unique_ptr<CSVReaderConfig> csvReaderConfig;
FileType fileType;
}
struct CopyDescriptionTo {
enum class FileType : uint8_t { CSV = 1, PARQUET = 2 };
CopyDescription(const std::string& filePath, FileType fileType);
CopyDescription(const CopyDescription& copyDescription);
static std::string getFileTypeName(FileType fileType);
const std::string filePath;
FileType fileType;
}
*/

struct CopyDescription {
enum class CopyDirection : uint8_t { FROM = 0, TO = 1 };

enum class FileType : uint8_t { UNKNOWN = 0, CSV = 1, PARQUET = 2, NPY = 3 };

CopyDescription(CopyDirection copyDirection, const std::vector<std::string>& filePaths,
CSVReaderConfig csvReaderConfig, FileType fileType);
FileType fileType, CSVReaderConfig csvReaderConfig);

CopyDescription(const CopyDescription& copyDescription);

Expand Down
1 change: 1 addition & 0 deletions src/include/parser/copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
namespace kuzu {
namespace parser {

// This class contains all the information parsed from COPY command.
class Copy : public Statement {
public:
explicit Copy(common::CopyDescription::CopyDirection copyDirection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,42 +10,27 @@ namespace planner {
class LogicalCopyTo : public LogicalOperator {

public:
// TODO: remove what is not needed from here
LogicalCopyTo(std::shared_ptr<LogicalOperator> child,
binder::expression_vector expressionsToCopy,
const common::CopyDescription& copyDescription,
common::table_id_t tableID,
std::string tableName)
LogicalCopyTo(std::shared_ptr<LogicalOperator> child,
binder::expression_vector expressionsToCopy, const common::CopyDescription& copyDescription)
: LogicalOperator{LogicalOperatorType::COPY_TO, child},
expressionsToCopy{std::move(expressionsToCopy)},
copyDescription{copyDescription},
tableID{tableID},
tableName{std::move(tableName)} {}
expressionsToCopy{std::move(expressionsToCopy)}, copyDescription{copyDescription} {}

inline std::string getExpressionsForPrinting() const override { return tableName; }
inline std::string getExpressionsForPrinting() const override { return std::string{}; }

inline binder::expression_vector getExpressionsToCopy() const { return expressionsToCopy; }

inline common::CopyDescription getCopyDescription() const { return copyDescription; }

inline common::table_id_t getTableID() const { return tableID; }
inline void computeFactorizedSchema() override { copyChildSchema(0); }

inline void computeFactorizedSchema() override {
copyChildSchema(0);
}

inline void computeFlatSchema() override {
copyChildSchema(0);
}
inline void computeFlatSchema() override { copyChildSchema(0); }

inline std::unique_ptr<LogicalOperator> copy() override {
return make_unique<LogicalCopyTo>(children[0]->copy(), expressionsToCopy, copyDescription, tableID, tableName);
return make_unique<LogicalCopyTo>(children[0]->copy(), expressionsToCopy, copyDescription);
}

private:
common::CopyDescription copyDescription;
common::table_id_t tableID;
std::string tableName;
binder::expression_vector expressionsToCopy;
};

Expand Down
8 changes: 4 additions & 4 deletions src/include/planner/planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ class Planner {
const storage::RelsStatistics& relsStatistics, const BoundStatement& statement);

static std::unique_ptr<LogicalPlan> planCopy(const catalog::Catalog& catalog,
const storage::NodesStatisticsAndDeletedIDs& nodesStatistics, const storage::RelsStatistics& relsStatistics,
const BoundStatement& statement);
const storage::NodesStatisticsAndDeletedIDs& nodesStatistics,
const storage::RelsStatistics& relsStatistics, const BoundStatement& statement);

static std::unique_ptr<LogicalPlan> planCopyTo(const catalog::Catalog& catalog,
const storage::NodesStatisticsAndDeletedIDs& nodesStatistics, const storage::RelsStatistics& relsStatistics,
const BoundStatement& statement);
const storage::NodesStatisticsAndDeletedIDs& nodesStatistics,
const storage::RelsStatistics& relsStatistics, const BoundStatement& statement);

static std::unique_ptr<LogicalPlan> planCopyFrom(
const catalog::Catalog& catalog, const BoundStatement& statement);
Expand Down
15 changes: 7 additions & 8 deletions src/include/processor/operator/copy_to/copy_to.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,20 @@ namespace processor {

class CopyTo : public PhysicalOperator {
public:
CopyTo(std::vector<DataPos> vectorsToCopyPos,
const common::CopyDescription& copyDescription, uint32_t id,
const std::string& paramsString,
std::unique_ptr<PhysicalOperator> child)
: PhysicalOperator{PhysicalOperatorType::COPY_TO, std::move(child), id, paramsString},
vectorsToCopyPos{std::move(vectorsToCopyPos)}, copyDescription{copyDescription} {}
CopyTo(std::vector<DataPos> vectorsToCopyPos, const common::CopyDescription& copyDescription,
uint32_t id, const std::string& paramsString, std::unique_ptr<PhysicalOperator> child)
: PhysicalOperator{PhysicalOperatorType::COPY_TO, std::move(child), id, paramsString},
vectorsToCopyPos{std::move(vectorsToCopyPos)}, copyDescription{copyDescription} {}

bool getNextTuplesInternal(ExecutionContext* context) override;

// std::string execute(common::TaskScheduler* taskScheduler, ExecutionContext* executionContext);
common::CopyDescription& getCopyDescription() { return copyDescription; }

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;

std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<CopyTo>(vectorsToCopyPos, copyDescription, id, paramsString, children[0]->clone());
return make_unique<CopyTo>(
vectorsToCopyPos, copyDescription, id, paramsString, children[0]->clone());
}

protected:
Expand Down
1 change: 0 additions & 1 deletion src/planner/operator/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ add_library(kuzu_planner_operator
logical_aggregate.cpp
logical_in_query_call.cpp
logical_copy.cpp
logical_copy_to.cpp
logical_create.cpp
logical_cross_product.cpp
logical_ddl.cpp
Expand Down
15 changes: 0 additions & 15 deletions src/planner/operator/logical_copy_to.cpp

This file was deleted.

Loading

0 comments on commit 7d9dc88

Please sign in to comment.