Skip to content

Commit

Permalink
Support Parquet files on COPY TO
Browse files Browse the repository at this point in the history
  • Loading branch information
rfdavid committed Aug 31, 2023
1 parent 9526e5e commit eae6431
Show file tree
Hide file tree
Showing 30 changed files with 3,298 additions and 2,391 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,6 @@ elseif (${BUILD_BENCHMARK})
add_subdirectory(test/test_helper)
endif()
add_subdirectory(tools)

# TODO (Rui): REMOVE ME
add_subdirectory(examples/cpp)
1 change: 1 addition & 0 deletions dataset/copy-test/list-of-struct/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
COPY T FROM "dataset/copy-test/list-of-struct/test.csv" (HEADER=true);
1 change: 1 addition & 0 deletions dataset/copy-test/list-of-struct/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create node table T (id INT64, fields STRUCT(a INT64, b INT64)[], PRIMARY KEY (id));
4 changes: 4 additions & 0 deletions dataset/copy-test/list-of-struct/test.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id,fields
0,"[{a:1,b:2},{a:3,b:4}]"
1,"[{a:5,b:6},{a:7,b:8},{a:9,b:10}]"
2,"[{a:11,b:12}]"
101 changes: 91 additions & 10 deletions examples/cpp/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,98 @@
using namespace kuzu::main;

int main() {
auto database = std::make_unique<Database>("" /* fill db path */);
auto database = std::make_unique<Database>("puffer_fish" /* fill db path */);
auto connection = std::make_unique<Connection>(database.get());

// Create schema.
connection->query("CREATE NODE TABLE Person(name STRING, age INT64, PRIMARY KEY(name));");
// Create nodes.
connection->query("CREATE (:Person {name: 'Alice', age: 25});");
connection->query("CREATE (:Person {name: 'Bob', age: 30});");
connection->query("create node table person (ID INt64, fName StRING, gender INT64, isStudent BoOLEAN, isWorker BOOLEAN, age INT64, eyeSight DOUBLE, birthdate DATE, registerTime TIMESTAMP, lastJobDuration interval, workedHours INT64[], usedNames STRING[], courseScoresPerTerm INT64[][], grades INT64[4], height float, PRIMARY KEY (ID));");
connection->query("create node table organisation (ID INT64, name STRING, orgCode INT64, mark DOUBLE, score INT64, history STRING, licenseValidInterval INTERVAL, rating DOUBLE, state STRUCT(revenue INT16, location STRING[], stock STRUCT(price INT64[], volume INT64)), PRIMARY KEY (ID));");
connection->query("create node table movies (name STRING, length INT32, note STRING, description STRUCT(rating DOUBLE, views INT64, release TIMESTAMP, film DATE), content BYTEA, PRIMARY KEY (name));");
connection->query("create rel table knows (FROM person TO person, date DATE, meetTime TIMESTAMP, validInterval INTERVAL, comments STRING[], MANY_MANY);");
connection->query("create rel table studyAt (FROM person TO organisation, year INT64, places STRING[], length INT16,MANY_ONE);");
connection->query("create rel table workAt (FROM person TO organisation, year INT64, grading DOUBLE[2], rating float, MANY_ONE);");
connection->query("create rel table meets (FROM person TO person, location FLOAT[2], times INT, data BYTEA, MANY_ONE);");
connection->query("create rel table marries (FROM person TO person, usedAddress STRING[], address INT16[2], note STRING, ONE_ONE);");

// Execute a simple query.
auto result = connection->query("MATCH (a:Person) RETURN a.name AS NAME, a.age AS AGE;");
// Print query result.
std::cout << result->toString();
connection->query("COPY person FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/vPerson.csv' (HEADER=true, DELIM=',');");
connection->query("COPY organisation FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/vOrganisation.csv';");
connection->query("COPY movies FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/vMovies.csv';");
connection->query("COPY knows FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eKnows.csv';");
connection->query("COPY studyAt FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eStudyAt.csv' (HEADER=true);");
connection->query("COPY workAt FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eWorkAt.csv';");
connection->query("COPY meets FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eMeets.csv';");
connection->query("COPY marries FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eMarries.csv';");

connection->query("COPY (MATCH (p:person) RETURN 1, p.eyeSight) TO 'out.parquet';");
// connection->query("COPY (return {a: {b: 1, c:2}, d: {b: 3, c:'asdasdd'}}) to 'out.parquet';");
// connection->query("copy (return [0,1,null,2]) to 'out.parquet';");

// connection->query("COPY (return {b: [{c: [1,2], d: [3]}, {c: [4], d: [5,6]} ]}) to 'out.parquet';");

// connection->query("COPY (return {b: [{c: [1,2], d: [3]}, {c: [4], d: [5,6]} ], e: [{c: [7,8], d: [9]}] }) to 'out.parquet';");

// connection->query("COPY (return {b: [{c: [1,2], d: [3]}, {c: [4], d: [5,6]} ], e: [{c: [7,8], d: [9]}] }) to 'out.parquet';");
// connection->query("COPY (RETURN {a: {c: [1,3], d: [2,77,8]}, b: {c: [3], d: [4,9,0]}}) TO 'out.parquet';");
// connection->query("COPY (RETURN {a: {b: 1, c: 2}, d: {b: 3, c: 4}}) TO 'out.parquet';");
// connection->query("COPY (MATCH (p:person) RETURN p.fName, p.gender) TO 'out.parquet';");
// connection->query("COPY (MATCH (p:person) RETURN p.fName, p.gender, p.isStudent, p.age, p.birthdate, p.registerTime) TO 'out.parquet';");

// connection->query("COPY (RETURN 1,[[[[[[0,1]],[[2,3]]]]]],[[[1]],[[2],[6]],[[3,4,8],[9]],[[5]]]) TO 'out.parquet'");
// connection->query("COPY (RETURN [ 1,2,3,4]) TO 'out.parquet';");
// connection->query("COPY (RETURN [[1,2],[3,4],[5,6,7,8]]) TO 'out.parquet';");
// connection->query("COPY (RETURN {first:[1,2,3], second:[4,5,6,7]}) TO 'out.parquet';");

// connection->query("COPY (RETURN {a: {b: 1, c: 2}, d: {b: 3, c: 4}}) TO 'out.parquet';");
// connection->query("COPY (return [{a:[1,2], b:[3], c:[4], d:[5], e:[6]}, {a:[4], b:[7], c:[13], d:[44], e:[123]}]) to 'out.parquet';");
// connection->query("COPY (return [{a:[1,2], b:[3]}]) to 'out.parquet';");
// connection->query("COPY (return [[[[[[0,1]],[[2,3]]]]]], [{a:[1,2], b:[3]}, {a:[4], b:[7]}]) to 'out.parquet';");
// connection->query("COPY (return [[1,2],[3],[4,5,6],[7,8]]) to 'out.parquet';");
// connection->query("COPY (return [{a:[1,2], b:[3]}, {a:[4,5,6], b:[7,8]}]) to 'out.parquet';");
// connection->query("COPY (return [{a:1}, {a:2}]) to 'out.parquet';");
// connection->query("COPY (RETURN [[[[[[0,1]],[[2,3]]]]]]) TO 'out.parquet';");
// connection->query("COPY (RETURN [[[1]],[[2],[6]],[[3,4,8],[9]],[[5]]]) TO 'out.parquet';");

// connection->query("COPY (RETURN {a: {b: {c: [1,2]}} , d: {e: {f: [3,4]}} }) TO 'out.parquet';");
// create node table test6(id int64, field struct (a struct(b struct(c int64[])), d struct(e struct(f int64[]))), primary key(id));

// connection->query("COPY (RETURN [[[1],[2]],[[3],[4]],[[5],[6],[7],[8]]]) TO 'out.parquet';");
// connection->query("COPY (RETURN [[[1]],[[2]],[[3,4]],[[5]]]) TO 'out.parquet';");
// connection->query("COPY (RETURN [{a: [{b: 1}, {b: 2}], c: [{d: 3}]}]) TO 'out.parquet';");

// connection->query("COPY (RETURN [[1,2],[3,4],[5,6,7,8]]) TO 'out.parquet';");


// ERROR
// connection->query("COPY (RETURN {a: [{b: 1}, {b: 2}], c: [{d: 3}]}) TO 'out.parquet';");
// connection->query("COPY (RETURN {a: {b: 1, c: 2}, b: {b: 3, c: 4}}) TO 'out.parquet';");


// connection->query("COPY (RETURN 'aaa',{a: 1, b: 2}) TO 'out.parquet';");
//
// connection->query("COPY (RETURN [ {a: [1], b: [2]} ]) TO 'out.parquet';");

// connection->query("COPY (RETURN {first:[[7],[5]], second:[[4],[3],[2]]}) TO 'out.parquet';");

//connection->query("COPY (RETURN [[[1,2],[3,4],[5],[],[7]]]) TO 'out.parquet';");

// connection->query("COPY (RETURN [[1,2],[3,4],[5,6,7,8]]) TO 'out.parquet';");
// connection->query("COPY (RETURN [1,2,3,4]) TO 'out.parquet';");



// connection->query("COPY (RETURN {first: {f: 'please'}, second: {s: 'work'}}) TO 'out.parquet';");

// connection->query("COPY (MATCH (p:person) RETURN p.fName, p.gender, p.isStudent, p.age, p.birthdate, p.registerTime) TO 'out.parquet';");
// connection->query("COPY (RETURN {first: {f: 'please'}, second: {s: 'work'}}) TO 'out.parquet';");
// connection->query("COPY (RETURN {first: [[44]], second: [[12]]}) TO 'out.parquet';");
// connection->query("COPY (RETURN {first:[['a'],['b']], second:[['c'],['d'],['e']]}) TO 'out.parquet';");

// connection->query("COPY (MATCH (p:person) RETURN p.workedHours) TO 'dates.parquet';");

// connection->query("COPY (MATCH (b:person) RETURN b.courseScoresPerTerm) TO 'out.parquet';");
// connection->query("COPY (MATCH (b:person) RETURN b.workedHours) TO 'out.parquet';");
// connection->query("COPY (MATCH (a:person)-[e]->(b:person) RETURN a.ID, a.birthdate, a.registerTime, b.ID) TO 'out.parquet';");
// connection->query("COPY (MATCH (b:person) RETURN b.isStudent) TO 'out.csv';");
// connection->query("COPY (MATCH (a:person)-[e]->(b:person) RETURN ID(e), a.ID, b.ID) TO 'a.csv';");
// connection->query("COPY (MATCH (a:person) RETURN a.fName, a.workedHours) TO 'a.csv';");
// connection->query("COPY (MATCH (a:person) RETURN a.fName, a.workedHours) TO 'a.csv';");
}
2 changes: 1 addition & 1 deletion src/antlr4/Cypher.g4
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ TO: ( 'T' | 't' ) ( 'O' | 'o' ) ;

kU_DataType
: oC_SymbolicName
| ( oC_SymbolicName kU_ListIdentifiers )
| kU_DataType kU_ListIdentifiers
| UNION SP? '(' SP? kU_PropertyDefinitions SP? ')'
| oC_SymbolicName SP? '(' SP? kU_PropertyDefinitions SP? ')'
| oC_SymbolicName SP? '(' SP? kU_DataType SP? ',' SP? kU_DataType SP? ')' ;
Expand Down
14 changes: 10 additions & 4 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,23 @@ std::unique_ptr<BoundStatement> Binder::bindCopyToClause(const Statement& statem
auto boundFilePath = copyToStatement.getFilePath();
auto fileType = bindFileType(boundFilePath);
std::vector<std::string> columnNames;
std::vector<LogicalType> columnTypes;
auto query = bindQuery(*copyToStatement.getRegularQuery());
auto columns = query->getStatementResult()->getColumns();
for (auto& column : columns) {
auto columnName = column->hasAlias() ? column->getAlias() : column->toString();
columnNames.push_back(columnName);
columnTypes.push_back(column->getDataType());
// auto d = column->getDataType();
// auto e = VarListType::getChildType(&d);
// auto f = "ok";
}
if (fileType != CopyDescription::FileType::CSV) {
throw BinderException("COPY TO currently only supports csv files.");
if (fileType != CopyDescription::FileType::CSV &&
fileType != CopyDescription::FileType::PARQUET) {
throw BinderException("COPY TO currently only supports csv and parquet files.");
}
return std::make_unique<BoundCopyTo>(
CopyDescription(std::vector<std::string>{boundFilePath}, fileType, columnNames),
return std::make_unique<BoundCopyTo>(CopyDescription(std::vector<std::string>{boundFilePath},
fileType, columnNames, columnTypes),
std::move(query));
}

Expand Down
11 changes: 7 additions & 4 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ CopyDescription::CopyDescription(

// Copy To
CopyDescription::CopyDescription(const std::vector<std::string>& filePaths, FileType fileType,
const std::vector<std::string>& columnNames)
: filePaths{filePaths}, fileType{fileType}, columnNames{columnNames} {}
const std::vector<std::string>& columnNames,
const std::vector<common::LogicalType>& columnTypes)
: filePaths{filePaths}, fileType{fileType}, columnNames{columnNames}, columnTypes{columnTypes} {
}

CopyDescription::CopyDescription(const CopyDescription& copyDescription)
: filePaths{copyDescription.filePaths}, csvReaderConfig{nullptr},
fileType{copyDescription.fileType}, columnNames{copyDescription.columnNames} {
: filePaths{copyDescription.filePaths},
csvReaderConfig{nullptr}, fileType{copyDescription.fileType},
columnNames{copyDescription.columnNames}, columnTypes{copyDescription.columnTypes} {
if (copyDescription.csvReaderConfig != nullptr) {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(*copyDescription.csvReaderConfig);
}
Expand Down
14 changes: 14 additions & 0 deletions src/common/types/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,20 @@ bool LogicalTypeUtils::isNumerical(const LogicalType& dataType) {
}
}

// TODO (Rui): review the datatypes
bool LogicalTypeUtils::isPrimitive(const LogicalType& dataType) {
switch (dataType.typeID) {
case LogicalTypeID::STRUCT:
case LogicalTypeID::VAR_LIST:
case LogicalTypeID::FIXED_LIST:
case LogicalTypeID::UNION:
case LogicalTypeID::MAP:
return false;
default:
return true;
}
}

std::vector<LogicalType> LogicalTypeUtils::getAllValidComparableLogicalTypes() {
return std::vector<LogicalType>{LogicalType{LogicalTypeID::BOOL},
LogicalType{LogicalTypeID::INT64}, LogicalType{LogicalTypeID::INT32},
Expand Down
4 changes: 3 additions & 1 deletion src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ struct CopyDescription {

// Copy To
CopyDescription(const std::vector<std::string>& filePaths, FileType fileType,
const std::vector<std::string>& columnNames);
const std::vector<std::string>& columnNames,
const std::vector<common::LogicalType>& columnTypes);

CopyDescription(const CopyDescription& copyDescription);

Expand All @@ -53,6 +54,7 @@ struct CopyDescription {

const std::vector<std::string> filePaths;
const std::vector<std::string> columnNames;
const std::vector<common::LogicalType> columnTypes;
std::unique_ptr<CSVReaderConfig> csvReaderConfig;
FileType fileType;
};
Expand Down
1 change: 1 addition & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ class LogicalTypeUtils {
KUZU_API static LogicalType dataTypeFromString(const std::string& dataTypeString);
static uint32_t getRowLayoutSize(const LogicalType& logicalType);
static bool isNumerical(const LogicalType& dataType);
static bool isPrimitive(const LogicalType& dataType);
static std::vector<LogicalType> getAllValidComparableLogicalTypes();
static std::vector<LogicalTypeID> getNumericalLogicalTypeIDs();
static std::vector<LogicalType> getAllValidLogicTypes();
Expand Down
2 changes: 2 additions & 0 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class ValueVector {

void resetAuxiliaryBuffer();

inline bool isPrimitiveDataType() { return LogicalTypeUtils::isPrimitive(dataType); }

// If there is still non-null values after discarding, return true. Otherwise, return false.
// For an unflat vector, its selection vector is also updated to the resultSelVector.
static bool discardNull(ValueVector& vector);
Expand Down
48 changes: 32 additions & 16 deletions src/include/processor/operator/copy_to/copy_to.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,54 @@

#include "common/copier_config/copier_config.h"
#include "common/task_system/task_scheduler.h"
#include "processor/operator/copy_to/csv_file_writer.h"
#include "processor/operator/copy_to/csv_parquet_writer.h"
#include "processor/operator/copy_to/csv_writer.h"
#include "processor/operator/copy_to/parquet_writer.h"
#include "processor/operator/physical_operator.h"
#include "processor/operator/sink.h"
#include "processor/result/result_set.h"

namespace kuzu {
namespace processor {

class WriteCSVFileSharedState {
class CSVParquetWriterSharedState {
public:
WriteCSVFileSharedState() {
csvFileWriter = std::make_unique<kuzu::processor::CSVFileWriter>();
};
std::unique_ptr<kuzu::processor::CSVFileWriter> csvFileWriter;
CSVParquetWriterSharedState(common::CopyDescription::FileType fileType) {
if (fileType == common::CopyDescription::FileType::CSV) {
fileWriter = std::make_unique<kuzu::processor::CSVWriter>();
} else if (fileType == common::CopyDescription::FileType::PARQUET) {
fileWriter = std::make_unique<kuzu::processor::ParquetWriter>();
} else {
common::NotImplementedException(
"CSVParquetWriterSharedState::CSVParquetWriterSharedState");
}
}
std::unique_ptr<CSVParquetWriter>& getWriter() { return fileWriter; }

private:
std::unique_ptr<CSVParquetWriter> fileWriter;
};

class CopyTo : public PhysicalOperator {
class CopyTo : public Sink {
public:
CopyTo(std::shared_ptr<WriteCSVFileSharedState> sharedState,
CopyTo(std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::shared_ptr<CSVParquetWriterSharedState> sharedState,
std::vector<DataPos> vectorsToCopyPos, const common::CopyDescription& copyDescription,
uint32_t id, const std::string& paramsString, std::unique_ptr<PhysicalOperator> child)
: PhysicalOperator{PhysicalOperatorType::COPY_TO, std::move(child), id, paramsString},
: Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_TO, std::move(child), id,
paramsString},
sharedState{std::move(sharedState)}, vectorsToCopyPos{std::move(vectorsToCopyPos)},
copyDescription{copyDescription} {}

bool getNextTuplesInternal(ExecutionContext* context) override;
void executeInternal(ExecutionContext* context) final;

common::CopyDescription& getCopyDescription() { return copyDescription; }
void finalize(ExecutionContext* context) final;

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;
common::CopyDescription& getCopyDescription() { return copyDescription; }

std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<CopyTo>(
sharedState, vectorsToCopyPos, copyDescription, id, paramsString, children[0]->clone());
std::unique_ptr<PhysicalOperator> clone() final {
return make_unique<CopyTo>(resultSetDescriptor->copy(), sharedState, vectorsToCopyPos,
copyDescription, id, paramsString, children[0]->clone());
}

protected:
Expand All @@ -43,9 +58,10 @@ class CopyTo : public PhysicalOperator {

private:
void initGlobalStateInternal(ExecutionContext* context) override;
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;
common::CopyDescription copyDescription;
std::vector<common::ValueVector*> outputVectors;
std::shared_ptr<WriteCSVFileSharedState> sharedState;
std::shared_ptr<CSVParquetWriterSharedState> sharedState;
};

} // namespace processor
Expand Down
32 changes: 32 additions & 0 deletions src/include/processor/operator/copy_to/csv_parquet_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once

#include "common/copier_config/copier_config.h"
#include "processor/result/result_set.h"

namespace kuzu {
namespace processor {

class CSVParquetWriter {
public:
virtual ~CSVParquetWriter(){};
virtual void openFile(const std::string& filePath) = 0;
virtual void init() = 0;
virtual void closeFile() = 0;
virtual void writeValues(std::vector<common::ValueVector*>& outputVectors) = 0;

// TODO (Rui): check setters and getters vs directly access
inline void setColumns(const std::vector<std::string>& columnNames,
const std::vector<common::LogicalType>& columnTypes) {
this->columnNames = columnNames;
this->columnTypes = columnTypes;
}
inline std::vector<std::string>& getColumnNames() { return columnNames; }
inline std::vector<common::LogicalType>& getColumnTypes() { return columnTypes; }

private:
std::vector<std::string> columnNames;
std::vector<common::LogicalType> columnTypes;
};

} // namespace processor
} // namespace kuzu
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
#pragma once

#include "common/copier_config/copier_config.h"
#include "common/file_utils.h"
#include "processor/operator/copy_to/csv_parquet_writer.h"
#include "processor/result/result_set.h"

namespace kuzu {
namespace processor {

class CSVFileWriter {
class CSVWriter : public CSVParquetWriter {
public:
CSVFileWriter(){};
void open(const std::string& filePath);
void writeHeader(const std::vector<std::string>& columnNames);
void writeValues(std::vector<common::ValueVector*>& outputVectors);
CSVWriter(){};
void openFile(const std::string& filePath) override;
void init() override;
inline void closeFile() override { flush(); }
void writeValues(std::vector<common::ValueVector*>& outputVectors) override;

private:
void writeHeader(const std::vector<std::string>& columnNames);
void escapeString(std::string& value);
void writeValue(common::ValueVector* vector);
void flush();

template<typename T>
void writeToBuffer(common::ValueVector* vector, int64_t pos, bool escapeStringValue = false);
void writeToBuffer(common::ValueVector* vector, bool escapeStringValue = false);

template<typename T>
void writeListToBuffer(common::ValueVector* vector, int64_t pos);
void writeListToBuffer(common::ValueVector* vector);

inline void writeToBuffer(const std::string& value) { buffer << value; }
inline void writeToBuffer(const char value) { buffer << value; }
Expand Down
Loading

0 comments on commit eae6431

Please sign in to comment.