Skip to content

Commit

Permalink
Support Parquet filetype on COPY TO (#1893)
Browse files Browse the repository at this point in the history
  • Loading branch information
rfdavid committed Sep 18, 2023
1 parent 314934a commit 9ff7a2a
Show file tree
Hide file tree
Showing 21 changed files with 1,910 additions and 65 deletions.
8 changes: 6 additions & 2 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,22 @@ std::unique_ptr<BoundStatement> Binder::bindCopyToClause(const Statement& statem
auto boundFilePath = copyToStatement.getFilePath();
auto fileType = bindFileType(boundFilePath);
std::vector<std::string> columnNames;
std::vector<LogicalType> columnTypes;
auto query = bindQuery(*copyToStatement.getRegularQuery());
auto columns = query->getStatementResult()->getColumns();
for (auto& column : columns) {
auto columnName = column->hasAlias() ? column->getAlias() : column->toString();
columnNames.push_back(columnName);
columnTypes.push_back(column->getDataType());
}
if (fileType != CopyDescription::FileType::CSV) {
throw BinderException("COPY TO currently only supports csv files.");
if (fileType != CopyDescription::FileType::CSV &&
fileType != CopyDescription::FileType::PARQUET) {
throw BinderException(ExceptionMessage::validateCopyToCSVParquetExtensionsException());
}
auto copyDescription = std::make_unique<CopyDescription>(
fileType, std::vector<std::string>{boundFilePath}, nullptr /* parsing option */);
copyDescription->columnNames = columnNames;
copyDescription->columnTypes = columnTypes;
return std::make_unique<BoundCopyTo>(std::move(copyDescription), std::move(query));
}

Expand Down
13 changes: 13 additions & 0 deletions src/common/types/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,19 @@ bool LogicalTypeUtils::isNumerical(const LogicalType& dataType) {
}
}

bool LogicalTypeUtils::isNested(const LogicalType& dataType) {
switch (dataType.typeID) {
case LogicalTypeID::STRUCT:
case LogicalTypeID::VAR_LIST:
case LogicalTypeID::FIXED_LIST:
case LogicalTypeID::UNION:
case LogicalTypeID::MAP:
return true;
default:
return false;
}
}

std::vector<LogicalType> LogicalTypeUtils::getAllValidComparableLogicalTypes() {
return std::vector<LogicalType>{LogicalType{LogicalTypeID::BOOL},
LogicalType{LogicalTypeID::INT64}, LogicalType{LogicalTypeID::INT32},
Expand Down
7 changes: 6 additions & 1 deletion src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <vector>

#include "common/constants.h"
#include "common/types/types.h"

namespace kuzu {
namespace common {
Expand Down Expand Up @@ -40,14 +41,16 @@ struct CopyDescription {
FileType fileType;
std::vector<std::string> filePaths;
std::vector<std::string> columnNames;
std::vector<LogicalType> columnTypes;
std::unique_ptr<CSVReaderConfig> csvReaderConfig;

CopyDescription(FileType fileType, const std::vector<std::string>& filePaths,
std::unique_ptr<CSVReaderConfig> csvReaderConfig)
: fileType{fileType}, filePaths{filePaths}, csvReaderConfig{std::move(csvReaderConfig)} {}

CopyDescription(const CopyDescription& other)
: fileType{other.fileType}, filePaths{other.filePaths}, columnNames{other.columnNames} {
: fileType{other.fileType}, filePaths{other.filePaths}, columnNames{other.columnNames},
columnTypes{other.columnTypes} {
if (other.csvReaderConfig != nullptr) {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(*other.csvReaderConfig);
}
Expand All @@ -67,6 +70,8 @@ struct CopyDescription {
{".npy", FileType::NPY}, {".ttl", FileType::TURTLE}};

static FileType getFileTypeFromExtension(const std::string& extension);

static std::string getFileTypeName(FileType fileType);
};

} // namespace common
Expand Down
3 changes: 3 additions & 0 deletions src/include/common/exception/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ struct ExceptionMessage {
static inline std::string validateCopyCSVParquetByColumnException() {
return "Please use COPY FROM statement for copying csv and parquet files.";
}
static inline std::string validateCopyToCSVParquetExtensionsException() {
return "COPY TO currently only supports csv and parquet files.";
}
static std::string validateCopyNpyNotForRelTablesException(const std::string& tableName);
};

Expand Down
1 change: 1 addition & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ class LogicalTypeUtils {
KUZU_API static LogicalType dataTypeFromString(const std::string& dataTypeString);
static uint32_t getRowLayoutSize(const LogicalType& logicalType);
static bool isNumerical(const LogicalType& dataType);
static bool isNested(const LogicalType& dataType);
static std::vector<LogicalType> getAllValidComparableLogicalTypes();
static std::vector<LogicalTypeID> getNumericalLogicalTypeIDs();
static std::vector<LogicalType> getAllValidLogicTypes();
Expand Down
52 changes: 35 additions & 17 deletions src/include/processor/operator/persistent/copy_to.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,57 @@
#include "common/copier_config/copier_config.h"
#include "common/task_system/task_scheduler.h"
#include "processor/operator/persistent/csv_file_writer.h"
#include "processor/operator/persistent/file_writer.h"
#include "processor/operator/persistent/parquet_file_writer.h"
#include "processor/operator/physical_operator.h"
#include "processor/operator/sink.h"
#include "processor/result/result_set.h"

namespace kuzu {
namespace processor {

class WriteCSVFileSharedState {
class CopyToSharedState {
public:
WriteCSVFileSharedState() {
csvFileWriter = std::make_unique<kuzu::processor::CSVFileWriter>();
};
std::unique_ptr<kuzu::processor::CSVFileWriter> csvFileWriter;
CopyToSharedState(common::CopyDescription::FileType fileType, std::string& filePath,
std::vector<std::string>& columnNames, std::vector<common::LogicalType>& columnTypes) {
if (fileType == common::CopyDescription::FileType::CSV) {
fileWriter = std::make_unique<kuzu::processor::CSVFileWriter>(
filePath, columnNames, columnTypes);
} else if (fileType == common::CopyDescription::FileType::PARQUET) {
fileWriter = std::make_unique<kuzu::processor::ParquetFileWriter>(
filePath, columnNames, columnTypes);
} else {
throw common::NotImplementedException("CopyToSharedState::CopyToSharedState");
}
}
inline std::unique_ptr<FileWriter>& getWriter() { return fileWriter; }

private:
std::unique_ptr<FileWriter> fileWriter;
};

class CopyTo : public PhysicalOperator {
class CopyTo : public Sink {
public:
CopyTo(std::shared_ptr<WriteCSVFileSharedState> sharedState,
std::vector<DataPos> vectorsToCopyPos, const common::CopyDescription& copyDescription,
uint32_t id, const std::string& paramsString, std::unique_ptr<PhysicalOperator> child)
: PhysicalOperator{PhysicalOperatorType::COPY_TO, std::move(child), id, paramsString},
CopyTo(std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::shared_ptr<CopyToSharedState> sharedState, std::vector<DataPos> vectorsToCopyPos,
const common::CopyDescription& copyDescription, uint32_t id,
const std::string& paramsString, std::unique_ptr<PhysicalOperator> child)
: Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_TO, std::move(child), id,
paramsString},
sharedState{std::move(sharedState)}, vectorsToCopyPos{std::move(vectorsToCopyPos)},
copyDescription{copyDescription} {}

inline bool canParallel() const final { return false; }

bool getNextTuplesInternal(ExecutionContext* context) override;
void executeInternal(ExecutionContext* context) final;

common::CopyDescription& getCopyDescription() { return copyDescription; }
void finalize(ExecutionContext* context) final;

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;
common::CopyDescription& getCopyDescription() { return copyDescription; }

std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<CopyTo>(
sharedState, vectorsToCopyPos, copyDescription, id, paramsString, children[0]->clone());
std::unique_ptr<PhysicalOperator> clone() final {
return make_unique<CopyTo>(resultSetDescriptor->copy(), sharedState, vectorsToCopyPos,
copyDescription, id, paramsString, children[0]->clone());
}

protected:
Expand All @@ -45,9 +62,10 @@ class CopyTo : public PhysicalOperator {

private:
void initGlobalStateInternal(ExecutionContext* context) override;
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;
common::CopyDescription copyDescription;
std::vector<common::ValueVector*> outputVectors;
std::shared_ptr<WriteCSVFileSharedState> sharedState;
std::shared_ptr<CopyToSharedState> sharedState;
};

} // namespace processor
Expand Down
19 changes: 10 additions & 9 deletions src/include/processor/operator/persistent/csv_file_writer.h
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
#pragma once

#include "common/copier_config/copier_config.h"
#include "common/file_utils.h"
#include "processor/operator/persistent/file_writer.h"
#include "processor/result/result_set.h"

namespace kuzu {
namespace processor {

class CSVFileWriter {
class CSVFileWriter : public FileWriter {
public:
CSVFileWriter(){};
void open(const std::string& filePath);
void writeHeader(const std::vector<std::string>& columnNames);
void writeValues(std::vector<common::ValueVector*>& outputVectors);
using FileWriter::FileWriter;
void openFile() final;
void init() final;
inline void closeFile() final { flush(); }
void writeValues(std::vector<common::ValueVector*>& outputVectors) final;

private:
void writeHeader();
void escapeString(std::string& value);
void writeValue(common::ValueVector* vector);
void flush();

template<typename T>
void writeToBuffer(common::ValueVector* vector, int64_t pos, bool escapeStringValue = false);
void writeToBuffer(common::ValueVector* vector, bool escapeStringValue = false);

template<typename T>
void writeListToBuffer(common::ValueVector* vector, int64_t pos);
void writeListToBuffer(common::ValueVector* vector);

inline void writeToBuffer(const std::string& value) { buffer << value; }
inline void writeToBuffer(const char value) { buffer << value; }
Expand Down
27 changes: 27 additions & 0 deletions src/include/processor/operator/persistent/file_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include "common/copier_config/copier_config.h"
#include "processor/result/result_set.h"

namespace kuzu {
namespace processor {

class FileWriter {
public:
FileWriter(std::string filePath, std::vector<std::string> columnNames,
std::vector<common::LogicalType> columnTypes)
: filePath{filePath}, columnNames{columnNames}, columnTypes{columnTypes} {}
virtual ~FileWriter(){};
virtual void init() = 0;
virtual void openFile() = 0;
virtual void closeFile() = 0;
virtual void writeValues(std::vector<common::ValueVector*>& outputVectors) = 0;

protected:
std::string filePath;
std::vector<std::string> columnNames;
std::vector<common::LogicalType> columnTypes;
};

} // namespace processor
} // namespace kuzu
76 changes: 76 additions & 0 deletions src/include/processor/operator/persistent/parquet_column_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#pragma once

#include "common/types/date_t.h"
#include "common/types/timestamp_t.h"
#include "processor/operator/persistent/file_writer.h"
#include <parquet/api/writer.h>

namespace kuzu {
namespace processor {

// Kuzu column represents a single column in Kuzu, which may contains multiple Parquet columns
// writeColumn will be called for each Kuzu column.
class ParquetColumnWriter {
public:
ParquetColumnWriter(int totalColumns, std::vector<int> maxDefinitionLevels)
: maxDefinitionLevels(maxDefinitionLevels), totalColumns(totalColumns){};

void writeColumn(
int column, common::ValueVector* vector, parquet::RowGroupWriter* rowGroupWriter);

int64_t estimatedRowBytes;

private:
void nextParquetColumn(common::LogicalTypeID logicalTypeID);
void writePrimitiveValue(common::LogicalTypeID logicalTypeID, uint8_t* value,
int16_t definitionLevel = 0, int16_t repetitionLevel = 0);

// ParquetBatch contains the information needed by low-level arrow API writeBatch:
// WriteBatch(int64_t num_values, const int16_t* def_levels,
// const int16_t* rep_levels, const T* values
struct ParquetColumn {
common::LogicalTypeID logicalTypeID;
std::vector<int16_t> repetitionLevels;
std::vector<int16_t> definitionLevels;
std::vector<uint8_t*> values;
};

void addToParquetColumns(uint8_t* value, common::ValueVector* vector,
std::map<std::string, ParquetColumn>& parquetColumns, int currentElementIdx = 0,
int parentElementIdx = 0, int depth = 0, std::string parentStructFieldName = "");

void extractList(const common::list_entry_t& list, const common::ValueVector* vector,
std::map<std::string, ParquetColumn>& parquetColumns, int currentElementIdx = 0,
int parentElementIdx = 0, int depth = 0, std::string parentStructFieldName = "");

void extractStruct(const common::struct_entry_t& val, const common::ValueVector* vector,
std::map<std::string, ParquetColumn>& parquetColumns, int currentElementIdx = 0,
int parentElementIdx = 0, int depth = 0, std::string parentStructFieldName = "");

void extractNested(uint8_t* value, const common::ValueVector* vector,
std::map<std::string, ParquetColumn>& parquetColumns, int currentElementIdx = 0,
int parentElementIdx = 0, int depth = 0, std::string parentStructFieldName = "");

inline void initNewColumn() { isListStarting = true; }

void writeLittleEndianUint32(uint8_t* buffer, uint32_t value);

// Extract dremel encoding levels
int getRepetitionLevel(int currentElementIdx, int parentElementIdx, int depth);

std::vector<int> maxDefinitionLevels;

// Properties for nested lists and structs
bool isListStarting;

int currentColumn = 0;
int currentParquetColumn = 0;
int totalColumns = 0;

// define the writers
parquet::RowGroupWriter* rowGroupWriter;
parquet::ColumnWriter* columnWriter;
};

} // namespace processor
} // namespace kuzu
58 changes: 58 additions & 0 deletions src/include/processor/operator/persistent/parquet_file_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#pragma once

#include "arrow/io/file.h"
#include "processor/operator/persistent/file_writer.h"
#include "processor/operator/persistent/parquet_column_writer.h"
#include <parquet/api/writer.h>

namespace kuzu {
namespace processor {

// ParquetFileWriter performs the following:
// openFile: opens the file and create an arrow::io::FileOutputStream object
// init:
// - generate the schema
// - calculate the max definition levels and number of primitive nodes. The
// definition levels are used for nested types.
// - initialize parquetColumnWriter
// writeValues : take a vector of ValueVector and pass to parquetColumnWriter
// More information about the encoding:
// https://parquet.apache.org/docs/file-format/data-pages/encodings
class ParquetFileWriter : public FileWriter {
public:
using FileWriter::FileWriter;
void openFile() final;
void init() final;
void closeFile() final;
void writeValues(std::vector<common::ValueVector*>& outputVectors) final;

private:
static std::shared_ptr<parquet::schema::Node> createParquetSchemaNode(int& parquetColumnsCount,
std::string& columnName, const common::LogicalType& logicalType,
parquet::Repetition::type repetition = parquet::Repetition::REQUIRED, int length = -1);

static std::shared_ptr<parquet::schema::Node> createNestedNode(
int& parquetColumnsCount, std::string& columnName, const common::LogicalType& logicalType);

static std::shared_ptr<parquet::schema::Node> createPrimitiveNode(int& parquetColumnsCount,
std::string& columnName, const common::LogicalType& logicalType,
parquet::Repetition::type repetition, int length);

void writeValue(common::LogicalTypeID type, void* value);

void flush();

void generateSchema(
std::shared_ptr<parquet::schema::GroupNode>& schema, int& parquetColumnsCount);

std::shared_ptr<ParquetColumnWriter> parquetColumnWriter;

parquet::RowGroupWriter* rowGroupWriter;

std::shared_ptr<parquet::ParquetFileWriter> fileWriter;

std::shared_ptr<arrow::io::FileOutputStream> outFile;
};

} // namespace processor
} // namespace kuzu
Loading

0 comments on commit 9ff7a2a

Please sign in to comment.