Skip to content

Commit

Permalink
[skip ci] Addressed the points from the code review
Browse files Browse the repository at this point in the history
  • Loading branch information
rfdavid committed Sep 17, 2023
1 parent dd08aa1 commit a30c208
Show file tree
Hide file tree
Showing 20 changed files with 1,254 additions and 199 deletions.
2 changes: 1 addition & 1 deletion src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ std::unique_ptr<BoundStatement> Binder::bindCopyToClause(const Statement& statem
}
if (fileType != CopyDescription::FileType::CSV &&
fileType != CopyDescription::FileType::PARQUET) {
throw BinderException("COPY TO currently only supports csv and parquet files.");
throw BinderException(ExceptionMessage::validateCopyToCSVParquetExtensionsException());
}
auto copyDescription = std::make_unique<CopyDescription>(
fileType, std::vector<std::string>{boundFilePath}, columnNames, columnTypes);
Expand Down
6 changes: 3 additions & 3 deletions src/common/types/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -621,16 +621,16 @@ bool LogicalTypeUtils::isNumerical(const LogicalType& dataType) {
}
}

bool LogicalTypeUtils::isPrimitive(const LogicalType& dataType) {
bool LogicalTypeUtils::isNested(const LogicalType& dataType) {
switch (dataType.typeID) {
case LogicalTypeID::STRUCT:
case LogicalTypeID::VAR_LIST:
case LogicalTypeID::FIXED_LIST:
case LogicalTypeID::UNION:
case LogicalTypeID::MAP:
return false;
default:
return true;
default:
return false;
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/include/common/exception/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ struct ExceptionMessage {
static inline std::string validateCopyCSVParquetByColumnException() {
return "Please use COPY FROM statement for copying csv and parquet files.";
}
static inline std::string validateCopyToCSVParquetExtensionsException() {
return "COPY TO currently only supports csv and parquet files.";
}
static std::string validateCopyNpyNotForRelTablesException(const std::string& tableName);
};

Expand Down
2 changes: 1 addition & 1 deletion src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ class LogicalTypeUtils {
KUZU_API static LogicalType dataTypeFromString(const std::string& dataTypeString);
static uint32_t getRowLayoutSize(const LogicalType& logicalType);
static bool isNumerical(const LogicalType& dataType);
static bool isPrimitive(const LogicalType& dataType);
static bool isNested(const LogicalType& dataType);
static std::vector<LogicalType> getAllValidComparableLogicalTypes();
static std::vector<LogicalTypeID> getNumericalLogicalTypeIDs();
static std::vector<LogicalType> getAllValidLogicTypes();
Expand Down
2 changes: 0 additions & 2 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ class ValueVector {

void resetAuxiliaryBuffer();

inline bool isPrimitiveDataType() { return LogicalTypeUtils::isPrimitive(dataType); }

// If there is still non-null values after discarding, return true. Otherwise, return false.
// For an unflat vector, its selection vector is also updated to the resultSelVector.
static bool discardNull(ValueVector& vector);
Expand Down
28 changes: 15 additions & 13 deletions src/include/processor/operator/persistent/copy_to.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include "common/copier_config/copier_config.h"
#include "common/task_system/task_scheduler.h"
#include "processor/operator/persistent/csv_file_writer.h"
#include "processor/operator/persistent/csv_parquet_writer.h"
#include "processor/operator/persistent/file_writer.h"
#include "processor/operator/persistent/parquet_file_writer.h"
#include "processor/operator/physical_operator.h"
#include "processor/operator/sink.h"
Expand All @@ -12,30 +12,32 @@
namespace kuzu {
namespace processor {

class CSVParquetWriterSharedState {
class CopyToSharedState {
public:
CSVParquetWriterSharedState(common::CopyDescription::FileType fileType) {
CopyToSharedState(common::CopyDescription::FileType fileType, std::string& filePath,
std::vector<std::string>& columnNames, std::vector<common::LogicalType>& columnTypes) {
if (fileType == common::CopyDescription::FileType::CSV) {
fileWriter = std::make_unique<kuzu::processor::CSVFileWriter>();
fileWriter = std::make_unique<kuzu::processor::CSVFileWriter>(
filePath, columnNames, columnTypes);
} else if (fileType == common::CopyDescription::FileType::PARQUET) {
fileWriter = std::make_unique<kuzu::processor::ParquetFileWriter>();
fileWriter = std::make_unique<kuzu::processor::ParquetFileWriter>(
filePath, columnNames, columnTypes);
} else {
throw common::NotImplementedException(
"CSVParquetWriterSharedState::CSVParquetWriterSharedState");
throw common::NotImplementedException("CopyToSharedState::CopyToSharedState");
}
}
std::unique_ptr<CSVParquetWriter>& getWriter() { return fileWriter; }
inline std::unique_ptr<FileWriter>& getWriter() { return fileWriter; }

private:
std::unique_ptr<CSVParquetWriter> fileWriter;
std::unique_ptr<FileWriter> fileWriter;
};

class CopyTo : public Sink {
public:
CopyTo(std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::shared_ptr<CSVParquetWriterSharedState> sharedState,
std::vector<DataPos> vectorsToCopyPos, const common::CopyDescription& copyDescription,
uint32_t id, const std::string& paramsString, std::unique_ptr<PhysicalOperator> child)
std::shared_ptr<CopyToSharedState> sharedState, std::vector<DataPos> vectorsToCopyPos,
const common::CopyDescription& copyDescription, uint32_t id,
const std::string& paramsString, std::unique_ptr<PhysicalOperator> child)
: Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_TO, std::move(child), id,
paramsString},
sharedState{std::move(sharedState)}, vectorsToCopyPos{std::move(vectorsToCopyPos)},
Expand All @@ -61,7 +63,7 @@ class CopyTo : public Sink {
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;
common::CopyDescription copyDescription;
std::vector<common::ValueVector*> outputVectors;
std::shared_ptr<CSVParquetWriterSharedState> sharedState;
std::shared_ptr<CopyToSharedState> sharedState;
};

} // namespace processor
Expand Down
16 changes: 8 additions & 8 deletions src/include/processor/operator/persistent/csv_file_writer.h
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
#pragma once

#include "processor/operator/persistent/csv_parquet_writer.h"
#include "processor/operator/persistent/file_writer.h"
#include "processor/result/result_set.h"

namespace kuzu {
namespace processor {

class CSVFileWriter : public CSVParquetWriter {
class CSVFileWriter : public FileWriter {
public:
CSVFileWriter(){};
void openFile(const std::string& filePath) override;
void init() override;
inline void closeFile() override { flush(); }
void writeValues(std::vector<common::ValueVector*>& outputVectors) override;
using FileWriter::FileWriter;
void openFile() final;
void init() final;
inline void closeFile() final { flush(); }
void writeValues(std::vector<common::ValueVector*>& outputVectors) final;

private:
void writeHeader(const std::vector<std::string>& columnNames);
void writeHeader();
void escapeString(std::string& value);
void writeValue(common::ValueVector* vector);
void flush();
Expand Down
31 changes: 0 additions & 31 deletions src/include/processor/operator/persistent/csv_parquet_writer.h

This file was deleted.

27 changes: 27 additions & 0 deletions src/include/processor/operator/persistent/file_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include "common/copier_config/copier_config.h"
#include "processor/result/result_set.h"

namespace kuzu {
namespace processor {

class FileWriter {
public:
FileWriter(std::string filePath, std::vector<std::string> columnNames,
std::vector<common::LogicalType> columnTypes)
: filePath{filePath}, columnNames{columnNames}, columnTypes{columnTypes} {}
virtual ~FileWriter(){};
virtual void init() = 0;
virtual void openFile() = 0;
virtual void closeFile() = 0;
virtual void writeValues(std::vector<common::ValueVector*>& outputVectors) = 0;

protected:
std::string filePath;
std::vector<std::string> columnNames;
std::vector<common::LogicalType> columnTypes;
};

} // namespace processor
} // namespace kuzu
16 changes: 5 additions & 11 deletions src/include/processor/operator/persistent/parquet_column_writer.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

#include "processor/operator/persistent/csv_parquet_writer.h"
#include "processor/operator/persistent/file_writer.h"
#include <parquet/api/writer.h>

namespace kuzu {
Expand All @@ -14,7 +14,7 @@ class ParquetColumnWriter {
: maxDefinitionLevels(maxDefinitionLevels), totalColumns(totalColumns){};

void writeColumn(
int kuzuColumn, common::ValueVector* vector, parquet::RowGroupWriter* rowWriter);
int column, common::ValueVector* vector, parquet::RowGroupWriter* rowGroupWriter);

int64_t estimatedRowBytes;

Expand Down Expand Up @@ -61,19 +61,13 @@ class ParquetColumnWriter {
// Properties for nested lists and structs
bool isListStarting;

int currentKuzuColumn;
int currentColumn;
int currentParquetColumn;
int totalColumns;

// define the writers
parquet::Int64Writer* int64Writer;
parquet::ByteArrayWriter* byteArrayWriter;
parquet::FixedLenByteArrayWriter* fixedLenByteArrayWriter;
parquet::BoolWriter* boolWriter;
parquet::RowGroupWriter* rowWriter;
parquet::Int32Writer* int32Writer;
parquet::DoubleWriter* doubleWriter;
parquet::FloatWriter* floatWriter;
parquet::RowGroupWriter* rowGroupWriter;
parquet::ColumnWriter* columnWriter;
};

} // namespace processor
Expand Down
34 changes: 22 additions & 12 deletions src/include/processor/operator/persistent/parquet_file_writer.h
Original file line number Diff line number Diff line change
@@ -1,33 +1,43 @@
#pragma once

#include "arrow/io/file.h"
#include "processor/operator/persistent/csv_parquet_writer.h"
#include "processor/operator/persistent/file_writer.h"
#include "processor/operator/persistent/parquet_column_writer.h"
#include <parquet/api/writer.h>

namespace kuzu {
namespace processor {

// ParquetWriter performs the following:
// openFile: opens the file and create a arrow::io::FileOutputStream object
// ParquetFileWriter performs the following:
// openFile: opens the file and create an arrow::io::FileOutputStream object
// init:
// - generate the schema
// - calculate the max definition levels and number of primitive nodes
// - calculate the max definition levels and number of primitive nodes. The
// definition levels are used for nested types.
// - initialize parquetColumnWriter
// writeValues : take a vector of ValueVector and pass to parquetColumnWriter
class ParquetFileWriter : public CSVParquetWriter {
// More information about the encoding:
// https://parquet.apache.org/docs/file-format/data-pages/encodings
class ParquetFileWriter : public FileWriter {
public:
ParquetFileWriter(){};
void openFile(const std::string& filePath) override;
void init() override;
void closeFile() override;
void writeValues(std::vector<common::ValueVector*>& outputVectors) override;
using FileWriter::FileWriter;
void openFile() final;
void init() final;
void closeFile() final;
void writeValues(std::vector<common::ValueVector*>& outputVectors) final;

private:
static std::shared_ptr<parquet::schema::Node> kuzuTypeToParquetType(int& parquetColumnsCount,
static std::shared_ptr<parquet::schema::Node> createParquetSchemaNode(int& parquetColumnsCount,
std::string& columnName, const common::LogicalType& logicalType,
parquet::Repetition::type repetition = parquet::Repetition::REQUIRED, int length = -1);

static std::shared_ptr<parquet::schema::Node> createNestedNode(
int& parquetColumnsCount, std::string& columnName, const common::LogicalType& logicalType);

static std::shared_ptr<parquet::schema::Node> createPrimitiveNode(int& parquetColumnsCount,
std::string& columnName, const common::LogicalType& logicalType,
parquet::Repetition::type repetition, int length);

void writeValue(common::LogicalTypeID type, void* value);

void flush();
Expand All @@ -37,7 +47,7 @@ class ParquetFileWriter : public CSVParquetWriter {

std::shared_ptr<ParquetColumnWriter> parquetColumnWriter;

parquet::RowGroupWriter* rowWriter;
parquet::RowGroupWriter* rowGroupWriter;

std::shared_ptr<parquet::ParquetFileWriter> fileWriter;

Expand Down
6 changes: 3 additions & 3 deletions src/processor/map/map_copy_to.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ std::unique_ptr<PhysicalOperator> PlanMapper::mapCopyTo(LogicalOperator* logical
for (auto& expression : childSchema->getExpressionsInScope()) {
vectorsToCopyPos.emplace_back(childSchema->getExpressionPos(*expression));
}
auto sharedState =
std::make_shared<CSVParquetWriterSharedState>(copy->getCopyDescription()->fileType);

auto sharedState = std::make_shared<CopyToSharedState>(copy->getCopyDescription()->fileType,
copy->getCopyDescription()->filePaths[0], copy->getCopyDescription()->columnNames,
copy->getCopyDescription()->columnTypes);
auto copyTo = std::make_unique<CopyTo>(std::make_unique<ResultSetDescriptor>(childSchema),
sharedState, std::move(vectorsToCopyPos), *copy->getCopyDescription(), getOperatorID(),
copy->getExpressionsForPrinting(), std::move(prevOperator));
Expand Down
3 changes: 0 additions & 3 deletions src/processor/operator/persistent/copy_to.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ namespace kuzu {
namespace processor {

void CopyTo::initGlobalStateInternal(ExecutionContext* context) {
sharedState->getWriter()->openFile(getCopyDescription().filePaths[0]);
sharedState->getWriter()->setColumns(
getCopyDescription().columnNames, getCopyDescription().columnTypes);
sharedState->getWriter()->init();
}

Expand Down
7 changes: 4 additions & 3 deletions src/processor/operator/persistent/csv_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ using namespace kuzu::common;
namespace kuzu {
namespace processor {

void CSVFileWriter::openFile(const std::string& filePath) {
void CSVFileWriter::openFile() {
fileInfo = FileUtils::openFile(filePath, O_WRONLY | O_CREAT | O_TRUNC);
}

void CSVFileWriter::init() {
writeHeader(getColumnNames());
openFile();
writeHeader();
}

void CSVFileWriter::writeHeader(const std::vector<std::string>& columnNames) {
void CSVFileWriter::writeHeader() {
if (columnNames.size() == 0) {
return;
}
Expand Down
Loading

0 comments on commit a30c208

Please sign in to comment.