Skip to content

Commit

Permalink
CSV file writer
Browse files Browse the repository at this point in the history
  • Loading branch information
rfdavid committed Jul 20, 2023
1 parent d5af21a commit 934dcd1
Show file tree
Hide file tree
Showing 12 changed files with 187 additions and 19 deletions.
2 changes: 1 addition & 1 deletion examples/cpp/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ int main() {
connection->query("CREATE NODE TABLE Person(name STRING, age INT64, PRIMARY KEY(name));");
connection->query("CREATE (:Person {name: 'Alice', age: 25});");
connection->query("CREATE (:Person {name: 'Bob', age: 30});");
connection->query("CREATE (:Person {name: 'Jane', age: 38});");
connection->query("CREATE (:Person {name: 'Jane'});");

connection->query("MATCH (a:Person) RETURN a;");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ namespace planner {
class LogicalCopyTo : public LogicalOperator {
public:
LogicalCopyTo(
std::shared_ptr<LogicalOperator> child,
const common::CopyDescription& copyDescription)
std::shared_ptr<LogicalOperator> child, const common::CopyDescription& copyDescription)
: LogicalOperator{LogicalOperatorType::COPY_TO, child}, copyDescription{copyDescription} {}

inline std::string getExpressionsForPrinting() const override { return std::string{}; }
Expand Down
5 changes: 5 additions & 0 deletions src/include/processor/operator/copy_to/copy_to.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "common/copier_config/copier_config.h"
#include "common/task_system/task_scheduler.h"
#include "processor/operator/copy_to/file_writer.h"
#include "processor/operator/physical_operator.h"
#include "processor/result/result_set.h"

Expand Down Expand Up @@ -33,6 +34,10 @@ class CopyTo : public PhysicalOperator {
private:
common::CopyDescription copyDescription;
std::vector<common::ValueVector*> outputVectors;
common::ValueVector* currentVector;
std::unique_ptr<FileWriter> fileWriter;

void write(int64_t pos);
};

} // namespace processor
Expand Down
25 changes: 25 additions & 0 deletions src/include/processor/operator/copy_to/csv_file_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include "processor/operator/copy_to/file_writer.h"

namespace kuzu {
namespace processor {

class CSVFileWriter : public FileWriter {
public:
void openForWriting(const std::string& filePath);
void add(const int64_t& value);
void add(const common::ku_string_t& value);
void addEndl();

private:
std::ofstream file;
int64_t lineCount = 0;
std::string line = "";

void flush();
void addSeparator(char separator = ',');
};

} // namespace processor
} // namespace kuzu
20 changes: 20 additions & 0 deletions src/include/processor/operator/copy_to/file_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#pragma once

#include "common/copier_config/copier_config.h"

namespace kuzu {
namespace processor {

class FileWriter {
public:
virtual ~FileWriter(){};
virtual void openForWriting(const std::string& filePath) = 0;

virtual void add(const int64_t& value) = 0;
virtual void add(const common::ku_string_t& value) = 0;

virtual void addEndl() = 0;
};

} // namespace processor
} // namespace kuzu
16 changes: 16 additions & 0 deletions src/include/processor/operator/copy_to/file_writer_creator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#pragma once

#include "processor/operator/copy_to/file_writer.h"

namespace kuzu {
namespace processor {

// Factory to return the instance of the file writer according to the required type
class FileWriterCreator {
public:
virtual ~FileWriterCreator(){};
static std::unique_ptr<FileWriter> create(common::CopyDescription::FileType fileType);
};

} // namespace processor
} // namespace kuzu
3 changes: 1 addition & 2 deletions src/planner/planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,7 @@ std::unique_ptr<LogicalPlan> Planner::planCopyTo(const Catalog& catalog,
}
auto plan = QueryPlanner(catalog, nodesStatistics, relsStatistics).getBestPlan(*regularQuery);
auto logicalCopyTo =
make_shared<LogicalCopyTo>(plan->getLastOperator(),
copyClause.getCopyDescription());
make_shared<LogicalCopyTo>(plan->getLastOperator(), copyClause.getCopyDescription());
plan->setLastOperator(std::move(logicalCopyTo));
return plan;
}
Expand Down
3 changes: 3 additions & 0 deletions src/processor/operator/copy_to/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
add_library(kuzu_processor_operator_copy_to
OBJECT
copy_to.cpp
csv_file_writer.cpp
file_writer.cpp
file_writer_creator.cpp
)

set(ALL_OBJECT_FILES
Expand Down
52 changes: 38 additions & 14 deletions src/processor/operator/copy_to/copy_to.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#include "processor/operator/copy_to/copy_to.h"

#include <iostream> // FIXME: REMOVE ME

#include "common/string_utils.h"
#include "common/types/value.h"
#include "processor/operator/copy_to/file_writer_creator.h"

using namespace kuzu::common;

Expand All @@ -12,26 +16,46 @@ void CopyTo::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* cont
for (auto& pos : vectorsToCopyPos) {
outputVectors.push_back(resultSet->getValueVector(pos).get());
}
auto copyDescription = getCopyDescription();
fileWriter = FileWriterCreator::create(copyDescription.fileType);
fileWriter->openForWriting(copyDescription.copyToFilePath);
}

void CopyTo::write(int64_t pos) {
auto position = currentVector->state->selVector->selectedPositions[pos];
auto dataType = currentVector->dataType.getLogicalTypeID();
switch (dataType) {
case LogicalTypeID::STRING: {
auto value = currentVector->getValue<ku_string_t>(position);
fileWriter->add(value);
break;
}
case LogicalTypeID::INT64: {
auto value = currentVector->getValue<int64_t>(position);
fileWriter->add(value);
break;
}
}
}

bool CopyTo::getNextTuplesInternal(ExecutionContext* context) {
if (!children[0]->getNextTuple(context)) {
return false;
}
// auto copyDescription = getCopyDescription();

// COPY (MATCH (a:Person) RETURN a.name, a.age) TO 'out.csv';
// outputVectors[0] = a.name
// outputVectors[1] = a.age
auto outputVector = outputVectors[0];

for (auto i = 0u; i < outputVector->state->selVector->selectedSize; i++) {
auto position = outputVector->state->selVector->selectedPositions[i];
auto dataType = outputVector->dataType;
// auto nodeID = outputVector->getValue<nodeID_t>(position);

auto value = outputVector->getValue<ku_string_t>(position);

bool hasData = true;
int64_t pos = 0;
auto outputVectorsSize = outputVectors.size();
while (hasData) {
hasData = false;
for (auto vectorPos = 0u; vectorPos < outputVectorsSize; vectorPos++) {
currentVector = outputVectors[vectorPos];
if (pos < currentVector->state->selVector->selectedSize) {
write(pos);
hasData = true;
}
}
fileWriter->addEndl();
pos++;
}
return true;
}
Expand Down
44 changes: 44 additions & 0 deletions src/processor/operator/copy_to/csv_file_writer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#include "processor/operator/copy_to/csv_file_writer.h"

using namespace kuzu::common;

namespace kuzu {
namespace processor {

void CSVFileWriter::openForWriting(const std::string& filePath) {
file.open(filePath);
if (!file.is_open()) {
throw CopyException("Could not open file " + filePath);
}
}

void CSVFileWriter::add(const int64_t& value) {
line += std::to_string(value);
addSeparator();
}

void CSVFileWriter::add(const common::ku_string_t& value) {
// TODO: escape
line += '"' + value.getAsString() + '"';
addSeparator();
}

void CSVFileWriter::addEndl() {
line.back() = '\n';
lineCount++;
flush();
}

void CSVFileWriter::addSeparator(char separator) {
line += separator;
}

// TODO: To improve performance, use custom flush to persist after X lines
void CSVFileWriter::flush() {
file << line;
file.flush();
line.clear();
}

} // namespace processor
} // namespace kuzu
11 changes: 11 additions & 0 deletions src/processor/operator/copy_to/file_writer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#include "processor/operator/copy_to/file_writer.h"

using namespace kuzu::common;

namespace kuzu {
namespace processor {

// TODO: check if this would be necessary

} // namespace processor
} // namespace kuzu
22 changes: 22 additions & 0 deletions src/processor/operator/copy_to/file_writer_creator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#include "processor/operator/copy_to/file_writer_creator.h"

#include "processor/operator/copy_to/csv_file_writer.h"

using namespace kuzu::common;

namespace kuzu {
namespace processor {

std::unique_ptr<FileWriter> FileWriterCreator::create(common::CopyDescription::FileType fileType) {
switch (fileType) {
case CopyDescription::FileType::CSV:
return std::make_unique<CSVFileWriter>();
case CopyDescription::FileType::PARQUET:
// return std::make_unique<ParquetFileWriter>();
default:
throw std::runtime_error("File type not supported");
}
}

} // namespace processor
} // namespace kuzu

0 comments on commit 934dcd1

Please sign in to comment.