Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Parquet filetype on COPY TO #1893

Merged
merged 7 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,22 @@ std::unique_ptr<BoundStatement> Binder::bindCopyToClause(const Statement& statem
auto boundFilePath = copyToStatement.getFilePath();
auto fileType = bindFileType(boundFilePath);
std::vector<std::string> columnNames;
std::vector<LogicalType> columnTypes;
auto query = bindQuery(*copyToStatement.getRegularQuery());
auto columns = query->getStatementResult()->getColumns();
for (auto& column : columns) {
auto columnName = column->hasAlias() ? column->getAlias() : column->toString();
columnNames.push_back(columnName);
columnTypes.push_back(column->getDataType());
}
if (fileType != CopyDescription::FileType::CSV) {
throw BinderException("COPY TO currently only supports csv files.");
if (fileType != CopyDescription::FileType::CSV &&
fileType != CopyDescription::FileType::PARQUET) {
throw BinderException(ExceptionMessage::validateCopyToCSVParquetExtensionsException());
}
auto copyDescription = std::make_unique<CopyDescription>(
fileType, std::vector<std::string>{boundFilePath}, nullptr /* parsing option */);
copyDescription->columnNames = columnNames;
copyDescription->columnTypes = columnTypes;
return std::make_unique<BoundCopyTo>(std::move(copyDescription), std::move(query));
}

Expand Down
13 changes: 13 additions & 0 deletions src/common/types/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,19 @@ bool LogicalTypeUtils::isNumerical(const LogicalType& dataType) {
}
}

bool LogicalTypeUtils::isNested(const LogicalType& dataType) {
switch (dataType.typeID) {
case LogicalTypeID::STRUCT:
case LogicalTypeID::VAR_LIST:
case LogicalTypeID::FIXED_LIST:
case LogicalTypeID::UNION:
case LogicalTypeID::MAP:
return true;
default:
return false;
}
}

std::vector<LogicalType> LogicalTypeUtils::getAllValidComparableLogicalTypes() {
return std::vector<LogicalType>{LogicalType{LogicalTypeID::BOOL},
LogicalType{LogicalTypeID::INT64}, LogicalType{LogicalTypeID::INT32},
Expand Down
7 changes: 6 additions & 1 deletion src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <vector>

#include "common/constants.h"
#include "common/types/types.h"

namespace kuzu {
namespace common {
Expand Down Expand Up @@ -40,14 +41,16 @@ struct CopyDescription {
FileType fileType;
std::vector<std::string> filePaths;
std::vector<std::string> columnNames;
std::vector<LogicalType> columnTypes;
std::unique_ptr<CSVReaderConfig> csvReaderConfig;

CopyDescription(FileType fileType, const std::vector<std::string>& filePaths,
std::unique_ptr<CSVReaderConfig> csvReaderConfig)
: fileType{fileType}, filePaths{filePaths}, csvReaderConfig{std::move(csvReaderConfig)} {}

CopyDescription(const CopyDescription& other)
: fileType{other.fileType}, filePaths{other.filePaths}, columnNames{other.columnNames} {
: fileType{other.fileType}, filePaths{other.filePaths}, columnNames{other.columnNames},
columnTypes{other.columnTypes} {
if (other.csvReaderConfig != nullptr) {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(*other.csvReaderConfig);
}
Expand All @@ -67,6 +70,8 @@ struct CopyDescription {
{".npy", FileType::NPY}, {".ttl", FileType::TURTLE}};

static FileType getFileTypeFromExtension(const std::string& extension);

static std::string getFileTypeName(FileType fileType);
};

} // namespace common
Expand Down
3 changes: 3 additions & 0 deletions src/include/common/exception/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ struct ExceptionMessage {
static inline std::string validateCopyCSVParquetByColumnException() {
return "Please use COPY FROM statement for copying csv and parquet files.";
}
static inline std::string validateCopyToCSVParquetExtensionsException() {
return "COPY TO currently only supports csv and parquet files.";
}
static std::string validateCopyNpyNotForRelTablesException(const std::string& tableName);
};

Expand Down
1 change: 1 addition & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ class LogicalTypeUtils {
KUZU_API static LogicalType dataTypeFromString(const std::string& dataTypeString);
static uint32_t getRowLayoutSize(const LogicalType& logicalType);
static bool isNumerical(const LogicalType& dataType);
static bool isNested(const LogicalType& dataType);
static std::vector<LogicalType> getAllValidComparableLogicalTypes();
static std::vector<LogicalTypeID> getNumericalLogicalTypeIDs();
static std::vector<LogicalType> getAllValidLogicTypes();
Expand Down
52 changes: 35 additions & 17 deletions src/include/processor/operator/persistent/copy_to.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,57 @@
#include "common/copier_config/copier_config.h"
#include "common/task_system/task_scheduler.h"
#include "processor/operator/persistent/csv_file_writer.h"
#include "processor/operator/persistent/file_writer.h"
#include "processor/operator/persistent/parquet_file_writer.h"
#include "processor/operator/physical_operator.h"
#include "processor/operator/sink.h"
#include "processor/result/result_set.h"

namespace kuzu {
namespace processor {

class WriteCSVFileSharedState {
class CopyToSharedState {
public:
WriteCSVFileSharedState() {
csvFileWriter = std::make_unique<kuzu::processor::CSVFileWriter>();
};
std::unique_ptr<kuzu::processor::CSVFileWriter> csvFileWriter;
CopyToSharedState(common::CopyDescription::FileType fileType, std::string& filePath,
std::vector<std::string>& columnNames, std::vector<common::LogicalType>& columnTypes) {
if (fileType == common::CopyDescription::FileType::CSV) {
fileWriter = std::make_unique<kuzu::processor::CSVFileWriter>(
filePath, columnNames, columnTypes);
} else if (fileType == common::CopyDescription::FileType::PARQUET) {
fileWriter = std::make_unique<kuzu::processor::ParquetFileWriter>(
filePath, columnNames, columnTypes);
} else {
throw common::NotImplementedException("CopyToSharedState::CopyToSharedState");

Check warning on line 26 in src/include/processor/operator/persistent/copy_to.h

View check run for this annotation

Codecov / codecov/patch

src/include/processor/operator/persistent/copy_to.h#L26

Added line #L26 was not covered by tests
}
}
inline std::unique_ptr<FileWriter>& getWriter() { return fileWriter; }

private:
std::unique_ptr<FileWriter> fileWriter;
};

class CopyTo : public PhysicalOperator {
class CopyTo : public Sink {
public:
CopyTo(std::shared_ptr<WriteCSVFileSharedState> sharedState,
std::vector<DataPos> vectorsToCopyPos, const common::CopyDescription& copyDescription,
uint32_t id, const std::string& paramsString, std::unique_ptr<PhysicalOperator> child)
: PhysicalOperator{PhysicalOperatorType::COPY_TO, std::move(child), id, paramsString},
CopyTo(std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::shared_ptr<CopyToSharedState> sharedState, std::vector<DataPos> vectorsToCopyPos,
const common::CopyDescription& copyDescription, uint32_t id,
const std::string& paramsString, std::unique_ptr<PhysicalOperator> child)
: Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_TO, std::move(child), id,
paramsString},
sharedState{std::move(sharedState)}, vectorsToCopyPos{std::move(vectorsToCopyPos)},
copyDescription{copyDescription} {}

inline bool canParallel() const final { return false; }

bool getNextTuplesInternal(ExecutionContext* context) override;
void executeInternal(ExecutionContext* context) final;

common::CopyDescription& getCopyDescription() { return copyDescription; }
void finalize(ExecutionContext* context) final;

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;
common::CopyDescription& getCopyDescription() { return copyDescription; }

std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<CopyTo>(
sharedState, vectorsToCopyPos, copyDescription, id, paramsString, children[0]->clone());
std::unique_ptr<PhysicalOperator> clone() final {
return make_unique<CopyTo>(resultSetDescriptor->copy(), sharedState, vectorsToCopyPos,
copyDescription, id, paramsString, children[0]->clone());
}

protected:
Expand All @@ -45,9 +62,10 @@

private:
void initGlobalStateInternal(ExecutionContext* context) override;
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;
common::CopyDescription copyDescription;
std::vector<common::ValueVector*> outputVectors;
std::shared_ptr<WriteCSVFileSharedState> sharedState;
std::shared_ptr<CopyToSharedState> sharedState;
};

} // namespace processor
Expand Down
19 changes: 10 additions & 9 deletions src/include/processor/operator/persistent/csv_file_writer.h
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
#pragma once

#include "common/copier_config/copier_config.h"
#include "common/file_utils.h"
#include "processor/operator/persistent/file_writer.h"
#include "processor/result/result_set.h"

namespace kuzu {
namespace processor {

class CSVFileWriter {
class CSVFileWriter : public FileWriter {
public:
CSVFileWriter(){};
void open(const std::string& filePath);
void writeHeader(const std::vector<std::string>& columnNames);
void writeValues(std::vector<common::ValueVector*>& outputVectors);
using FileWriter::FileWriter;
void openFile() final;
void init() final;
inline void closeFile() final { flush(); }
void writeValues(std::vector<common::ValueVector*>& outputVectors) final;

private:
void writeHeader();
void escapeString(std::string& value);
void writeValue(common::ValueVector* vector);
void flush();

template<typename T>
void writeToBuffer(common::ValueVector* vector, int64_t pos, bool escapeStringValue = false);
void writeToBuffer(common::ValueVector* vector, bool escapeStringValue = false);

template<typename T>
void writeListToBuffer(common::ValueVector* vector, int64_t pos);
void writeListToBuffer(common::ValueVector* vector);

inline void writeToBuffer(const std::string& value) { buffer << value; }
inline void writeToBuffer(const char value) { buffer << value; }
Expand Down
27 changes: 27 additions & 0 deletions src/include/processor/operator/persistent/file_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include "common/copier_config/copier_config.h"
#include "processor/result/result_set.h"

namespace kuzu {
namespace processor {

class FileWriter {
public:
FileWriter(std::string filePath, std::vector<std::string> columnNames,
std::vector<common::LogicalType> columnTypes)
: filePath{filePath}, columnNames{columnNames}, columnTypes{columnTypes} {}
virtual ~FileWriter(){};
virtual void init() = 0;
virtual void openFile() = 0;
virtual void closeFile() = 0;
virtual void writeValues(std::vector<common::ValueVector*>& outputVectors) = 0;

protected:
std::string filePath;
std::vector<std::string> columnNames;
std::vector<common::LogicalType> columnTypes;
};

} // namespace processor
} // namespace kuzu
76 changes: 76 additions & 0 deletions src/include/processor/operator/persistent/parquet_column_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#pragma once

#include "common/types/date_t.h"
#include "common/types/timestamp_t.h"
#include "processor/operator/persistent/file_writer.h"
#include <parquet/api/writer.h>

namespace kuzu {
namespace processor {

// Kuzu column represents a single column in Kuzu, which may contains multiple Parquet columns
// writeColumn will be called for each Kuzu column.
class ParquetColumnWriter {
public:
ParquetColumnWriter(int totalColumns, std::vector<int> maxDefinitionLevels)
: maxDefinitionLevels(maxDefinitionLevels), totalColumns(totalColumns){};

void writeColumn(
int column, common::ValueVector* vector, parquet::RowGroupWriter* rowGroupWriter);

int64_t estimatedRowBytes;

private:
void nextParquetColumn(common::LogicalTypeID logicalTypeID);
void writePrimitiveValue(common::LogicalTypeID logicalTypeID, uint8_t* value,
int16_t definitionLevel = 0, int16_t repetitionLevel = 0);

// ParquetBatch contains the information needed by low-level arrow API writeBatch:
// WriteBatch(int64_t num_values, const int16_t* def_levels,
// const int16_t* rep_levels, const T* values
struct ParquetColumn {
common::LogicalTypeID logicalTypeID;
std::vector<int16_t> repetitionLevels;
std::vector<int16_t> definitionLevels;
std::vector<uint8_t*> values;
};

void addToParquetColumns(uint8_t* value, common::ValueVector* vector,
std::map<std::string, ParquetColumn>& parquetColumns, int currentElementIdx = 0,
int parentElementIdx = 0, int depth = 0, std::string parentStructFieldName = "");

void extractList(const common::list_entry_t& list, const common::ValueVector* vector,
std::map<std::string, ParquetColumn>& parquetColumns, int currentElementIdx = 0,
int parentElementIdx = 0, int depth = 0, std::string parentStructFieldName = "");

void extractStruct(const common::struct_entry_t& val, const common::ValueVector* vector,
std::map<std::string, ParquetColumn>& parquetColumns, int currentElementIdx = 0,
int parentElementIdx = 0, int depth = 0, std::string parentStructFieldName = "");

void extractNested(uint8_t* value, const common::ValueVector* vector,
std::map<std::string, ParquetColumn>& parquetColumns, int currentElementIdx = 0,
int parentElementIdx = 0, int depth = 0, std::string parentStructFieldName = "");

inline void initNewColumn() { isListStarting = true; }

void writeLittleEndianUint32(uint8_t* buffer, uint32_t value);

// Extract dremel encoding levels
int getRepetitionLevel(int currentElementIdx, int parentElementIdx, int depth);

std::vector<int> maxDefinitionLevels;

// Properties for nested lists and structs
bool isListStarting;

int currentColumn = 0;
int currentParquetColumn = 0;
int totalColumns = 0;

// define the writers
parquet::RowGroupWriter* rowGroupWriter;
parquet::ColumnWriter* columnWriter;
};

} // namespace processor
} // namespace kuzu
58 changes: 58 additions & 0 deletions src/include/processor/operator/persistent/parquet_file_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#pragma once

#include "arrow/io/file.h"
#include "processor/operator/persistent/file_writer.h"
#include "processor/operator/persistent/parquet_column_writer.h"
#include <parquet/api/writer.h>

namespace kuzu {
namespace processor {

// ParquetFileWriter performs the following:
// openFile: opens the file and create an arrow::io::FileOutputStream object
// init:
// - generate the schema
// - calculate the max definition levels and number of primitive nodes. The
// definition levels are used for nested types.
// - initialize parquetColumnWriter
// writeValues : take a vector of ValueVector and pass to parquetColumnWriter
// More information about the encoding:
// https://parquet.apache.org/docs/file-format/data-pages/encodings
class ParquetFileWriter : public FileWriter {
public:
using FileWriter::FileWriter;
void openFile() final;
void init() final;
void closeFile() final;
void writeValues(std::vector<common::ValueVector*>& outputVectors) final;

private:
static std::shared_ptr<parquet::schema::Node> createParquetSchemaNode(int& parquetColumnsCount,
std::string& columnName, const common::LogicalType& logicalType,
parquet::Repetition::type repetition = parquet::Repetition::REQUIRED, int length = -1);

static std::shared_ptr<parquet::schema::Node> createNestedNode(
int& parquetColumnsCount, std::string& columnName, const common::LogicalType& logicalType);

static std::shared_ptr<parquet::schema::Node> createPrimitiveNode(int& parquetColumnsCount,
std::string& columnName, const common::LogicalType& logicalType,
parquet::Repetition::type repetition, int length);

void writeValue(common::LogicalTypeID type, void* value);

void flush();

void generateSchema(
std::shared_ptr<parquet::schema::GroupNode>& schema, int& parquetColumnsCount);

std::shared_ptr<ParquetColumnWriter> parquetColumnWriter;

parquet::RowGroupWriter* rowGroupWriter;

std::shared_ptr<parquet::ParquetFileWriter> fileWriter;

std::shared_ptr<arrow::io::FileOutputStream> outFile;
};

} // namespace processor
} // namespace kuzu
Loading