Skip to content

Commit

Permalink
CSV file writer
Browse files Browse the repository at this point in the history
  • Loading branch information
rfdavid committed Jul 20, 2023
1 parent d5af21a commit 943b716
Show file tree
Hide file tree
Showing 15 changed files with 266 additions and 53 deletions.
2 changes: 1 addition & 1 deletion examples/cpp/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ int main() {
connection->query("CREATE NODE TABLE Person(name STRING, age INT64, PRIMARY KEY(name));");
connection->query("CREATE (:Person {name: 'Alice', age: 25});");
connection->query("CREATE (:Person {name: 'Bob', age: 30});");
connection->query("CREATE (:Person {name: 'Jane', age: 38});");
connection->query("CREATE (:Person {name: 'Jane'});");

connection->query("MATCH (a:Person) RETURN a;");

Expand Down
8 changes: 7 additions & 1 deletion src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@ std::unique_ptr<BoundStatement> Binder::bindCopyToClause(const Statement& statem
auto& copyToStatement = (CopyTo&)statement;
auto boundFilePath = copyToStatement.getFilePath();
auto fileType = bindFileType(boundFilePath);
std::vector<std::string> columnNames;
std::unique_ptr<BoundRegularQuery> query = bindQuery(*copyToStatement.getRegularQuery());
auto columns = query->getStatementResult()->getColumns();
for (auto& column : columns) {
auto columnName = column->hasAlias() ? column->getAlias() : column->toString();
columnNames.push_back(columnName);
}
if (fileType == CopyDescription::FileType::NPY) {
throw BinderException("COPY TO npy files is not supported.");
}
return std::make_unique<BoundCopy>(CopyDescription(boundFilePath, fileType), std::move(query));
return std::make_unique<BoundCopy>(CopyDescription(boundFilePath, fileType, columnNames), std::move(query));
}

std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& statement) {
Expand Down
9 changes: 5 additions & 4 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@ CopyDescription::CopyDescription(const std::vector<std::string>& copyFromFilePat
}
}

// Copy To
CopyDescription::CopyDescription(const std::string& copyToFilePath, FileType fileType, const std::vector<std::string>& columnNames)
: copyToFilePath{copyToFilePath}, fileType{fileType}, copyDirection{CopyDirection::TO}, columnNames{columnNames} {}

CopyDescription::CopyDescription(const CopyDescription& copyDescription)
: copyFromFilePaths{copyDescription.copyFromFilePaths},
copyToFilePath{copyDescription.copyToFilePath}, copyDirection{copyDescription.copyDirection},
csvReaderConfig{nullptr}, fileType{copyDescription.fileType} {
csvReaderConfig{nullptr}, fileType{copyDescription.fileType}, columnNames{copyDescription.columnNames} {
if (copyDirection == CopyDirection::FROM && fileType == FileType::CSV) {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(*copyDescription.csvReaderConfig);
}
}

CopyDescription::CopyDescription(const std::string& copyToFilePath, FileType fileType)
: copyToFilePath{copyToFilePath}, fileType{fileType}, copyDirection{CopyDirection::TO} {}

CopyDescription::FileType CopyDescription::getFileTypeFromExtension(const std::string& extension) {
CopyDescription::FileType fileType = CopyDescription::fileTypeMap[extension];
if (fileType == FileType::UNKNOWN) {
Expand Down
5 changes: 0 additions & 5 deletions src/include/binder/copy/bound_copy.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@
namespace kuzu {
namespace binder {

struct BoundCopyInfo {
common::table_id_t tableID;
std::string tableName;
};

class BoundCopy : public BoundStatement {
public:
// COPY FROM
Expand Down
3 changes: 2 additions & 1 deletion src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ struct CopyDescription {
CSVReaderConfig csvReaderConfig);

// Copy To
CopyDescription(const std::string& copyToFilePath, FileType fileType);
CopyDescription(const std::string& copyToFilePath, FileType fileType, const std::vector<std::string>& columnNames);

CopyDescription(const CopyDescription& copyDescription);

Expand All @@ -54,6 +54,7 @@ struct CopyDescription {

const std::vector<std::string> copyFromFilePaths;
const std::string copyToFilePath;
const std::vector<std::string> columnNames;
std::unique_ptr<CSVReaderConfig> csvReaderConfig;
FileType fileType;
CopyDirection copyDirection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ namespace planner {
class LogicalCopyTo : public LogicalOperator {
public:
LogicalCopyTo(
std::shared_ptr<LogicalOperator> child,
const common::CopyDescription& copyDescription)
std::shared_ptr<LogicalOperator> child, const common::CopyDescription& copyDescription)
: LogicalOperator{LogicalOperatorType::COPY_TO, child}, copyDescription{copyDescription} {}

inline std::string getExpressionsForPrinting() const override { return std::string{}; }
Expand Down
5 changes: 5 additions & 0 deletions src/include/processor/operator/copy_to/copy_to.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "common/copier_config/copier_config.h"
#include "common/task_system/task_scheduler.h"
#include "processor/operator/copy_to/file_writer.h"
#include "processor/operator/physical_operator.h"
#include "processor/result/result_set.h"

Expand Down Expand Up @@ -33,6 +34,10 @@ class CopyTo : public PhysicalOperator {
private:
common::CopyDescription copyDescription;
std::vector<common::ValueVector*> outputVectors;
common::ValueVector* currentVector;
std::unique_ptr<FileWriter> fileWriter;

void write(int64_t pos);
};

} // namespace processor
Expand Down
34 changes: 34 additions & 0 deletions src/include/processor/operator/copy_to/csv_file_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#pragma once

#include "processor/operator/copy_to/file_writer.h"

namespace kuzu {
namespace processor {

class CSVFileWriter : public FileWriter {
public:
CSVFileWriter(const std::vector<std::string>& columnNames) : FileWriter(columnNames) {};
void openForWriting(const std::string& filePath);

void add(const int64_t value);
void add(const int32_t value);
void add(const int16_t value);
void add(const float value);
void add(const double value);
void add(const common::ku_string_t& value);
void addEndl();

private:
static constexpr int flushAfterLines = 10000;
std::ofstream file;
int64_t unflushedLines = 0;
std::string line = "";

void add(std::string value);
void addHeader();
void flush();
void addSeparator(char separator = ',');
};

} // namespace processor
} // namespace kuzu
30 changes: 30 additions & 0 deletions src/include/processor/operator/copy_to/file_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#pragma once

#include "common/copier_config/copier_config.h"

namespace kuzu {
namespace processor {

class FileWriter {
public:
FileWriter(const std::vector<std::string>& columnNames) : columnNames{columnNames} {};

virtual void openForWriting(const std::string& filePath) = 0;

virtual void add(const int64_t value) = 0;
virtual void add(const int32_t value) = 0;
virtual void add(const int16_t value) = 0;
virtual void add(const float value) = 0;
virtual void add(const double value) = 0;
virtual void add(const common::ku_string_t& value) = 0;

virtual void addEndl() = 0;

inline const std::vector<std::string>& getColumnNames() const { return columnNames; }

private:
const std::vector<std::string>& columnNames;
};

} // namespace processor
} // namespace kuzu
15 changes: 15 additions & 0 deletions src/include/processor/operator/copy_to/file_writer_creator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#pragma once

#include "processor/operator/copy_to/file_writer.h"

namespace kuzu {
namespace processor {

// Factory to return the instance of the file writer according to the required type
class FileWriterCreator {
public:
static std::unique_ptr<FileWriter> create(common::CopyDescription::FileType fileType, const std::vector<std::string>& columnNames);
};

} // namespace processor
} // namespace kuzu
3 changes: 1 addition & 2 deletions src/planner/planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,7 @@ std::unique_ptr<LogicalPlan> Planner::planCopyTo(const Catalog& catalog,
}
auto plan = QueryPlanner(catalog, nodesStatistics, relsStatistics).getBestPlan(*regularQuery);
auto logicalCopyTo =
make_shared<LogicalCopyTo>(plan->getLastOperator(),
copyClause.getCopyDescription());
make_shared<LogicalCopyTo>(plan->getLastOperator(), copyClause.getCopyDescription());
plan->setLastOperator(std::move(logicalCopyTo));
return plan;
}
Expand Down
2 changes: 2 additions & 0 deletions src/processor/operator/copy_to/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
add_library(kuzu_processor_operator_copy_to
OBJECT
copy_to.cpp
csv_file_writer.cpp
file_writer_creator.cpp
)

set(ALL_OBJECT_FILES
Expand Down
97 changes: 60 additions & 37 deletions src/processor/operator/copy_to/copy_to.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#include "processor/operator/copy_to/copy_to.h"

#include <iostream> // FIXME: REMOVE ME

#include "common/string_utils.h"
#include "common/types/value.h"
#include "processor/operator/copy_to/file_writer_creator.h"

using namespace kuzu::common;

Expand All @@ -12,52 +16,71 @@ void CopyTo::initLocalStateInternal(ResultSet* resultSet, ExecutionContext* cont
for (auto& pos : vectorsToCopyPos) {
outputVectors.push_back(resultSet->getValueVector(pos).get());
}
auto copyDescription = getCopyDescription();
fileWriter = FileWriterCreator::create(copyDescription.fileType, copyDescription.columnNames);
fileWriter->openForWriting(copyDescription.copyToFilePath);
}

void CopyTo::write(int64_t pos) {
auto position = currentVector->state->selVector->selectedPositions[pos];
auto dataType = currentVector->dataType.getLogicalTypeID();

switch (dataType) {
case LogicalTypeID::STRING: {
auto value = currentVector->getValue<ku_string_t>(position);
fileWriter->add(value);
break;
}
case LogicalTypeID::INT64: {
auto value = currentVector->getValue<int64_t>(position);
fileWriter->add(value);
break;
}
case LogicalTypeID::INT32: {
auto value = currentVector->getValue<int32_t>(position);
fileWriter->add(value);
break;
}
case LogicalTypeID::INT16: {
auto value = currentVector->getValue<int16_t>(position);
fileWriter->add(value);
break;
}
case LogicalTypeID::FLOAT: {
auto value = currentVector->getValue<float>(position);
fileWriter->add(value);
break;
}
case LogicalTypeID::DOUBLE: {
auto value = currentVector->getValue<double>(position);
fileWriter->add(value);
break;
}
}
}

bool CopyTo::getNextTuplesInternal(ExecutionContext* context) {
if (!children[0]->getNextTuple(context)) {
return false;
}
// auto copyDescription = getCopyDescription();

// COPY (MATCH (a:Person) RETURN a.name, a.age) TO 'out.csv';
// outputVectors[0] = a.name
// outputVectors[1] = a.age
auto outputVector = outputVectors[0];

for (auto i = 0u; i < outputVector->state->selVector->selectedSize; i++) {
auto position = outputVector->state->selVector->selectedPositions[i];
auto dataType = outputVector->dataType;
// auto nodeID = outputVector->getValue<nodeID_t>(position);

auto value = outputVector->getValue<ku_string_t>(position);

bool hasData = true;
int64_t pos = 0;
auto outputVectorsSize = outputVectors.size();
while (hasData) {
hasData = false;
for (auto vectorPos = 0u; vectorPos < outputVectorsSize; vectorPos++) {
currentVector = outputVectors[vectorPos];
if (pos < currentVector->state->selVector->selectedSize) {
write(pos);
hasData = true;
}
}
// TODO: Perhaps fileWriter->endTuple() ?
fileWriter->addEndl();
pos++;
}
return true;
}
/*
std::ofstream file;
// Start CSV
if (copyDescription.fileType == CopyDescription::FileType::CSV) {
file.open(copyDescription.copyToFilePath);
if (!file.is_open()) {
throw CopyException("Could not open file " + copyDescription.copyToFilePath);
}
}
// Write to CSV
std::string csvStr;
csvStr += std::to_string(nodeID.offset);
csvStr += "\n";
file << csvStr;
// Finish CSV
if (copyDescription.fileType == CopyDescription::FileType::CSV) {
file.close();
}
*/

} // namespace processor
} // namespace kuzu
Loading

0 comments on commit 943b716

Please sign in to comment.