Skip to content

Commit

Permalink
Support Parquet files on COPY TO
Browse files Browse the repository at this point in the history
  • Loading branch information
rfdavid committed Aug 11, 2023
1 parent fc32db0 commit 542e3e0
Show file tree
Hide file tree
Showing 16 changed files with 468 additions and 69 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,6 @@ elseif (${BUILD_BENCHMARK})
add_subdirectory(test/test_helper)
endif()
add_subdirectory(tools)

# TODO (Rui): REMOVE ME
add_subdirectory(examples/cpp)
38 changes: 28 additions & 10 deletions examples/cpp/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,35 @@
using namespace kuzu::main;

int main() {
auto database = std::make_unique<Database>("" /* fill db path */);
auto database = std::make_unique<Database>("puffer_fish" /* fill db path */);
auto connection = std::make_unique<Connection>(database.get());

// Create schema.
connection->query("CREATE NODE TABLE Person(name STRING, age INT64, PRIMARY KEY(name));");
// Create nodes.
connection->query("CREATE (:Person {name: 'Alice', age: 25});");
connection->query("CREATE (:Person {name: 'Bob', age: 30});");
// connection->query("create node table person (ID INt64, fName StRING, gender INT64, isStudent BoOLEAN, isWorker BOOLEAN, age INT64, eyeSight DOUBLE, birthdate DATE, registerTime TIMESTAMP, lastJobDuration interval, workedHours INT64[], usedNames STRING[], courseScoresPerTerm INT64[][], grades INT64[4], height float, PRIMARY KEY (ID));");
// connection->query("create node table organisation (ID INT64, name STRING, orgCode INT64, mark DOUBLE, score INT64, history STRING, licenseValidInterval INTERVAL, rating DOUBLE, state STRUCT(revenue INT16, location STRING[], stock STRUCT(price INT64[], volume INT64)), PRIMARY KEY (ID));");
// connection->query("create node table movies (name STRING, length INT32, note STRING, description STRUCT(rating DOUBLE, views INT64, release TIMESTAMP, film DATE), content BYTEA, PRIMARY KEY (name));");
// connection->query("create rel table knows (FROM person TO person, date DATE, meetTime TIMESTAMP, validInterval INTERVAL, comments STRING[], MANY_MANY);");
// connection->query("create rel table studyAt (FROM person TO organisation, year INT64, places STRING[], length INT16,MANY_ONE);");
// connection->query("create rel table workAt (FROM person TO organisation, year INT64, grading DOUBLE[2], rating float, MANY_ONE);");
// connection->query("create rel table meets (FROM person TO person, location FLOAT[2], times INT, data BYTEA, MANY_ONE);");
// connection->query("create rel table marries (FROM person TO person, usedAddress STRING[], address INT16[2], note STRING, ONE_ONE);");
//
// connection->query("COPY person FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/vPerson.csv' (HEADER=true, DELIM=',');");
// connection->query("COPY organisation FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/vOrganisation.csv';");
// connection->query("COPY movies FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/vMovies.csv';");
// connection->query("COPY knows FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eKnows.csv';");
// connection->query("COPY studyAt FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eStudyAt.csv' (HEADER=true);");
// connection->query("COPY workAt FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eWorkAt.csv';");
// connection->query("COPY meets FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eMeets.csv';");
// connection->query("COPY marries FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eMarries.csv';");

// Execute a simple query.
auto result = connection->query("MATCH (a:Person) RETURN a.name AS NAME, a.age AS AGE;");
// Print query result.
std::cout << result->toString();

connection->query("COPY (MATCH (p:person) RETURN p.ID, p.birthdate, p.registerTime) TO 'dates.parquet';");

// connection->query("COPY (MATCH (b:person) RETURN b.courseScoresPerTerm) TO 'out.parquet';");
// connection->query("COPY (MATCH (b:person) RETURN b.workedHours) TO 'out.parquet';");
// connection->query("COPY (MATCH (a:person)-[e]->(b:person) RETURN a.ID, a.birthdate, a.registerTime, b.ID) TO 'out.parquet';");
// connection->query("COPY (MATCH (b:person) RETURN b.isStudent) TO 'out.csv';");
// connection->query("COPY (MATCH (a:person)-[e]->(b:person) RETURN ID(e), a.ID, b.ID) TO 'a.csv';");
// connection->query("COPY (MATCH (a:person) RETURN a.fName, a.workedHours) TO 'a.csv';");
// connection->query("COPY (MATCH (a:person) RETURN a.fName, a.workedHours) TO 'a.csv';");
}
11 changes: 7 additions & 4 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@ std::unique_ptr<BoundStatement> Binder::bindCopyToClause(const Statement& statem
auto boundFilePath = copyToStatement.getFilePath();
auto fileType = bindFileType(boundFilePath);
std::vector<std::string> columnNames;
std::vector<LogicalTypeID> columnTypes;
auto query = bindQuery(*copyToStatement.getRegularQuery());
auto columns = query->getStatementResult()->getColumns();
for (auto& column : columns) {
auto columnName = column->hasAlias() ? column->getAlias() : column->toString();
columnNames.push_back(columnName);
columnTypes.push_back(column->getDataType().getLogicalTypeID());
}
if (fileType != CopyDescription::FileType::CSV) {
throw BinderException("COPY TO currently only supports csv files.");
if (fileType != CopyDescription::FileType::CSV &&
fileType != CopyDescription::FileType::PARQUET) {
throw BinderException("COPY TO currently only supports csv and parquet files.");
}
return std::make_unique<BoundCopyTo>(
CopyDescription(std::vector<std::string>{boundFilePath}, fileType, columnNames),
return std::make_unique<BoundCopyTo>(CopyDescription(std::vector<std::string>{boundFilePath},
fileType, columnNames, columnTypes),
std::move(query));
}

Expand Down
11 changes: 7 additions & 4 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ CopyDescription::CopyDescription(

// Copy To
CopyDescription::CopyDescription(const std::vector<std::string>& filePaths, FileType fileType,
const std::vector<std::string>& columnNames)
: filePaths{filePaths}, fileType{fileType}, columnNames{columnNames} {}
const std::vector<std::string>& columnNames,
const std::vector<common::LogicalTypeID>& columnTypes)
: filePaths{filePaths}, fileType{fileType}, columnNames{columnNames}, columnTypes{columnTypes} {
}

CopyDescription::CopyDescription(const CopyDescription& copyDescription)
: filePaths{copyDescription.filePaths}, csvReaderConfig{nullptr},
fileType{copyDescription.fileType}, columnNames{copyDescription.columnNames} {
: filePaths{copyDescription.filePaths},
csvReaderConfig{nullptr}, fileType{copyDescription.fileType},
columnNames{copyDescription.columnNames}, columnTypes{copyDescription.columnTypes} {
if (copyDescription.csvReaderConfig != nullptr) {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(*copyDescription.csvReaderConfig);
}
Expand Down
4 changes: 3 additions & 1 deletion src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ struct CopyDescription {

// Copy To
CopyDescription(const std::vector<std::string>& filePaths, FileType fileType,
const std::vector<std::string>& columnNames);
const std::vector<std::string>& columnNames,
const std::vector<common::LogicalTypeID>& columnTypes);

CopyDescription(const CopyDescription& copyDescription);

Expand All @@ -53,6 +54,7 @@ struct CopyDescription {

const std::vector<std::string> filePaths;
const std::vector<std::string> columnNames;
const std::vector<common::LogicalTypeID> columnTypes;
std::unique_ptr<CSVReaderConfig> csvReaderConfig;
FileType fileType;
};
Expand Down
49 changes: 33 additions & 16 deletions src/include/processor/operator/copy_to/copy_to.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,55 @@

#include "common/copier_config/copier_config.h"
#include "common/task_system/task_scheduler.h"
#include "processor/operator/copy_to/csv_file_writer.h"
#include "processor/operator/copy_to/csv_parquet_writer.h"
#include "processor/operator/copy_to/csv_writer.h"
#include "processor/operator/copy_to/parquet_writer.h"
#include "processor/operator/physical_operator.h"
#include "processor/operator/sink.h"
#include "processor/result/result_set.h"

namespace kuzu {
namespace processor {

class WriteCSVFileSharedState {
class CSVParquetWriterSharedState {
public:
WriteCSVFileSharedState() {
csvFileWriter = std::make_unique<kuzu::processor::CSVFileWriter>();
};
std::unique_ptr<kuzu::processor::CSVFileWriter> csvFileWriter;
CSVParquetWriterSharedState(common::CopyDescription::FileType fileType) {
if (fileType == common::CopyDescription::FileType::CSV) {
fileWriter = std::make_unique<kuzu::processor::CSVWriter>();
} else if (fileType == common::CopyDescription::FileType::PARQUET) {
// not implemented yet
fileWriter = std::make_unique<kuzu::processor::ParquetWriter>();
} else {
common::NotImplementedException(

Check warning on line 24 in src/include/processor/operator/copy_to/copy_to.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/copy_to/copy_to.h#L24

Added line #L24 was not covered by tests
"CSVParquetWriterSharedState::CSVParquetWriterSharedState");
}
}
std::unique_ptr<CSVParquetWriter>& getWriter() { return fileWriter; }

private:
std::unique_ptr<CSVParquetWriter> fileWriter;
};

class CopyTo : public PhysicalOperator {
class CopyTo : public Sink {
public:
CopyTo(std::shared_ptr<WriteCSVFileSharedState> sharedState,
CopyTo(std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::shared_ptr<CSVParquetWriterSharedState> sharedState,
std::vector<DataPos> vectorsToCopyPos, const common::CopyDescription& copyDescription,
uint32_t id, const std::string& paramsString, std::unique_ptr<PhysicalOperator> child)
: PhysicalOperator{PhysicalOperatorType::COPY_TO, std::move(child), id, paramsString},
: Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_TO, std::move(child), id,
paramsString},
sharedState{std::move(sharedState)}, vectorsToCopyPos{std::move(vectorsToCopyPos)},
copyDescription{copyDescription} {}

bool getNextTuplesInternal(ExecutionContext* context) override;
void executeInternal(ExecutionContext* context) final;

common::CopyDescription& getCopyDescription() { return copyDescription; }
void finalize(ExecutionContext* context) final;

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;
common::CopyDescription& getCopyDescription() { return copyDescription; }

std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<CopyTo>(
sharedState, vectorsToCopyPos, copyDescription, id, paramsString, children[0]->clone());
std::unique_ptr<PhysicalOperator> clone() final {
return make_unique<CopyTo>(resultSetDescriptor->copy(), sharedState, vectorsToCopyPos,
copyDescription, id, paramsString, children[0]->clone());
}

protected:
Expand All @@ -43,9 +59,10 @@ class CopyTo : public PhysicalOperator {

private:
void initGlobalStateInternal(ExecutionContext* context) override;
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;
common::CopyDescription copyDescription;
std::vector<common::ValueVector*> outputVectors;
std::shared_ptr<WriteCSVFileSharedState> sharedState;
std::shared_ptr<CSVParquetWriterSharedState> sharedState;
};

} // namespace processor
Expand Down
32 changes: 32 additions & 0 deletions src/include/processor/operator/copy_to/csv_parquet_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once

#include "common/copier_config/copier_config.h"
#include "processor/result/result_set.h"

namespace kuzu {
namespace processor {

class CSVParquetWriter {
public:
virtual ~CSVParquetWriter(){};
virtual void open(const std::string& filePath) = 0;
virtual void init() = 0;
virtual void closeFile() = 0;
virtual void writeValues(std::vector<common::ValueVector*>& outputVectors) = 0;

// TODO (Rui): check setters and getters vs directly access
inline void setColumns(const std::vector<std::string>& columnNames,
const std::vector<common::LogicalTypeID>& columnTypes) {
this->columnNames = columnNames;
this->columnTypes = columnTypes;
}
inline std::vector<std::string>& getColumnNames() { return columnNames; }
inline std::vector<common::LogicalTypeID>& getColumnTypes() { return columnTypes; }

private:
std::vector<std::string> columnNames;
std::vector<common::LogicalTypeID> columnTypes;
};

} // namespace processor
} // namespace kuzu
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
#pragma once

#include "common/copier_config/copier_config.h"
#include "common/file_utils.h"
#include "processor/operator/copy_to/csv_parquet_writer.h"
#include "processor/result/result_set.h"

namespace kuzu {
namespace processor {

class CSVFileWriter {
// TODO (Rui): Rename to CSVWriter
class CSVWriter : public CSVParquetWriter {
public:
CSVFileWriter(){};
void open(const std::string& filePath);
void writeHeader(const std::vector<std::string>& columnNames);
void writeValues(std::vector<common::ValueVector*>& outputVectors);
CSVWriter(){};
void open(const std::string& filePath) override;
void init() override;
inline void closeFile() override { flush(); }
void writeValues(std::vector<common::ValueVector*>& outputVectors) override;

private:
void writeHeader(const std::vector<std::string>& columnNames);
void escapeString(std::string& value);
void writeValue(common::ValueVector* vector);
void flush();
Expand Down
33 changes: 33 additions & 0 deletions src/include/processor/operator/copy_to/parquet_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#pragma once

#include "arrow/io/file.h"
#include "parquet/stream_writer.h"
#include "processor/operator/copy_to/csv_parquet_writer.h"

namespace kuzu {
namespace processor {

class ParquetWriter : public CSVParquetWriter {
public:
ParquetWriter(){};
// TODO (Rui): rename to openFile to not confuse with open syscall
void open(const std::string& filePath) override;
void init() override;
void closeFile() override;
void writeValues(std::vector<common::ValueVector*>& outputVectors) override;

private:
static std::shared_ptr<parquet::schema::Node> kuzuTypeToParquetType(
std::string& columnName, common::LogicalTypeID& typeID, int length = -1);

void streamWrite(common::ValueVector* vector);

parquet::StreamWriter streamWriter;
std::shared_ptr<arrow::io::FileOutputStream> outputStream;
std::shared_ptr<arrow::io::FileOutputStream> outFile;
std::shared_ptr<parquet::ParquetFileWriter> fileWriter;
parquet::RowGroupWriter* rowWriter;
};

} // namespace processor
} // namespace kuzu
27 changes: 22 additions & 5 deletions src/processor/map/map_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyFrom(LogicalOperator* logic
throw NotImplementedException{"PlanMapper::mapCopy"};
}
}

std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyTo(LogicalOperator* logicalOperator) {
auto copy = (LogicalCopyTo*)logicalOperator;
auto childSchema = logicalOperator->getChild(0)->getSchema();
Expand All @@ -37,10 +36,28 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyTo(LogicalOperator* logical
for (auto& expression : childSchema->getExpressionsInScope()) {
vectorsToCopyPos.emplace_back(childSchema->getExpressionPos(*expression));
}
auto sharedState = std::make_shared<WriteCSVFileSharedState>();
return std::make_unique<CopyTo>(sharedState, std::move(vectorsToCopyPos),
copy->getCopyDescription(), getOperatorID(), copy->getExpressionsForPrinting(),
std::move(prevOperator));
auto sharedState =
std::make_shared<CSVParquetWriterSharedState>(copy->getCopyDescription().fileType);

Check warning on line 40 in src/processor/map/map_copy.cpp

View check run for this annotation

Codecov / codecov/patch

src/processor/map/map_copy.cpp#L40

Added line #L40 was not covered by tests
// return std::make_unique<CopyTo>(std::make_unique<ResultSetDescriptor>(childSchema),
// sharedState,
// std::move(vectorsToCopyPos), copy->getCopyDescription(), getOperatorID(),
// copy->getExpressionsForPrinting(), std::move(prevOperator));

auto copyTo = std::make_unique<CopyTo>(std::make_unique<ResultSetDescriptor>(childSchema),
sharedState, std::move(vectorsToCopyPos), copy->getCopyDescription(), getOperatorID(),
copy->getExpressionsForPrinting(), std::move(prevOperator));

std::shared_ptr<FactorizedTable> fTable;

auto ftTableSchema = std::make_unique<FactorizedTableSchema>();
// ftTableSchema->appendColumn(
// std::make_unique<ColumnSchema>(false /* flat */, 0 /* dataChunkPos */,
// common::LogicalTypeUtils::getRowLayoutSize(common::LogicalType{common::LogicalTypeID::STRING})));

fTable = std::make_shared<FactorizedTable>(memoryManager, std::move(ftTableSchema));

return createFactorizedTableScan(
binder::expression_vector{}, childSchema, fTable, std::move(copyTo));
}

std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyNode(
Expand Down
3 changes: 2 additions & 1 deletion src/processor/operator/copy_to/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
add_library(kuzu_processor_operator_copy_to
OBJECT
copy_to.cpp
csv_file_writer.cpp
csv_writer.cpp
parquet_writer.cpp
)

set(ALL_OBJECT_FILES
Expand Down
20 changes: 13 additions & 7 deletions src/processor/operator/copy_to/copy_to.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "processor/operator/copy_to/copy_to.h"

#include <iostream> // TODO (Rui): Remove this

#include "common/string_utils.h"
#include "common/types/value.h"

Expand All @@ -9,8 +11,10 @@ namespace kuzu {
namespace processor {

void CopyTo::initGlobalStateInternal(ExecutionContext* context) {
sharedState->csvFileWriter->open(getCopyDescription().filePaths[0]);
sharedState->csvFileWriter->writeHeader(getCopyDescription().columnNames);
sharedState->getWriter()->open(getCopyDescription().filePaths[0]);
sharedState->getWriter()->setColumns(
getCopyDescription().columnNames, getCopyDescription().columnTypes);
sharedState->getWriter()->init();
}

void CopyTo::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) {
Expand All @@ -20,12 +24,14 @@ void CopyTo::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* cont
}
}

bool CopyTo::getNextTuplesInternal(ExecutionContext* context) {
if (!children[0]->getNextTuple(context)) {
return false;
void CopyTo::executeInternal(ExecutionContext* context) {
while (children[0]->getNextTuple(context)) {
sharedState->getWriter()->writeValues(outputVectors);
}
sharedState->csvFileWriter->writeValues(outputVectors);
return true;
}

void CopyTo::finalize(ExecutionContext* context) {
sharedState->getWriter()->closeFile();
}

} // namespace processor
Expand Down
Loading

0 comments on commit 542e3e0

Please sign in to comment.