Skip to content

Commit

Permalink
Support Parquet files on COPY TO
Browse files Browse the repository at this point in the history
  • Loading branch information
rfdavid committed Sep 5, 2023
1 parent 9526e5e commit 031529e
Show file tree
Hide file tree
Showing 29 changed files with 3,218 additions and 2,381 deletions.
1 change: 1 addition & 0 deletions dataset/copy-test/list-of-struct/copy.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
COPY T FROM "dataset/copy-test/list-of-struct/test.csv" (HEADER=true);
1 change: 1 addition & 0 deletions dataset/copy-test/list-of-struct/schema.cypher
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create node table T (id INT64, fields STRUCT(a INT64, b INT64)[], PRIMARY KEY (id));
4 changes: 4 additions & 0 deletions dataset/copy-test/list-of-struct/test.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
id,fields
0,"[{a:1,b:2},{a:3,b:4}]"
1,"[{a:5,b:6},{a:7,b:8},{a:9,b:10}]"
2,"[{a:11,b:12}]"
2 changes: 1 addition & 1 deletion src/antlr4/Cypher.g4
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ TO: ( 'T' | 't' ) ( 'O' | 'o' ) ;

kU_DataType
: oC_SymbolicName
| ( oC_SymbolicName kU_ListIdentifiers )
| kU_DataType kU_ListIdentifiers
| UNION SP? '(' SP? kU_PropertyDefinitions SP? ')'
| oC_SymbolicName SP? '(' SP? kU_PropertyDefinitions SP? ')'
| oC_SymbolicName SP? '(' SP? kU_DataType SP? ',' SP? kU_DataType SP? ')' ;
Expand Down
14 changes: 10 additions & 4 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,23 @@ std::unique_ptr<BoundStatement> Binder::bindCopyToClause(const Statement& statem
auto boundFilePath = copyToStatement.getFilePath();
auto fileType = bindFileType(boundFilePath);
std::vector<std::string> columnNames;
std::vector<LogicalType> columnTypes;
auto query = bindQuery(*copyToStatement.getRegularQuery());
auto columns = query->getStatementResult()->getColumns();
for (auto& column : columns) {
auto columnName = column->hasAlias() ? column->getAlias() : column->toString();
columnNames.push_back(columnName);
columnTypes.push_back(column->getDataType());
// auto d = column->getDataType();
// auto e = VarListType::getChildType(&d);
// auto f = "ok";
}
if (fileType != CopyDescription::FileType::CSV) {
throw BinderException("COPY TO currently only supports csv files.");
if (fileType != CopyDescription::FileType::CSV &&
fileType != CopyDescription::FileType::PARQUET) {
throw BinderException("COPY TO currently only supports csv and parquet files.");
}
return std::make_unique<BoundCopyTo>(
CopyDescription(std::vector<std::string>{boundFilePath}, fileType, columnNames),
return std::make_unique<BoundCopyTo>(CopyDescription(std::vector<std::string>{boundFilePath},
fileType, columnNames, columnTypes),
std::move(query));
}

Expand Down
11 changes: 7 additions & 4 deletions src/common/copier_config/copier_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ CopyDescription::CopyDescription(

// Copy To
CopyDescription::CopyDescription(const std::vector<std::string>& filePaths, FileType fileType,
const std::vector<std::string>& columnNames)
: filePaths{filePaths}, fileType{fileType}, columnNames{columnNames} {}
const std::vector<std::string>& columnNames,
const std::vector<common::LogicalType>& columnTypes)
: filePaths{filePaths}, fileType{fileType}, columnNames{columnNames}, columnTypes{columnTypes} {
}

CopyDescription::CopyDescription(const CopyDescription& copyDescription)
: filePaths{copyDescription.filePaths}, csvReaderConfig{nullptr},
fileType{copyDescription.fileType}, columnNames{copyDescription.columnNames} {
: filePaths{copyDescription.filePaths},
csvReaderConfig{nullptr}, fileType{copyDescription.fileType},
columnNames{copyDescription.columnNames}, columnTypes{copyDescription.columnTypes} {
if (copyDescription.csvReaderConfig != nullptr) {
this->csvReaderConfig = std::make_unique<CSVReaderConfig>(*copyDescription.csvReaderConfig);
}
Expand Down
13 changes: 13 additions & 0 deletions src/common/types/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,19 @@ bool LogicalTypeUtils::isNumerical(const LogicalType& dataType) {
}
}

bool LogicalTypeUtils::isPrimitive(const LogicalType& dataType) {
switch (dataType.typeID) {
case LogicalTypeID::STRUCT:
case LogicalTypeID::VAR_LIST:
case LogicalTypeID::FIXED_LIST:
case LogicalTypeID::UNION:
case LogicalTypeID::MAP:
return false;
default:
return true;
}
}

std::vector<LogicalType> LogicalTypeUtils::getAllValidComparableLogicalTypes() {
return std::vector<LogicalType>{LogicalType{LogicalTypeID::BOOL},
LogicalType{LogicalTypeID::INT64}, LogicalType{LogicalTypeID::INT32},
Expand Down
4 changes: 3 additions & 1 deletion src/include/common/copier_config/copier_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ struct CopyDescription {

// Copy To
CopyDescription(const std::vector<std::string>& filePaths, FileType fileType,
const std::vector<std::string>& columnNames);
const std::vector<std::string>& columnNames,
const std::vector<common::LogicalType>& columnTypes);

CopyDescription(const CopyDescription& copyDescription);

Expand All @@ -53,6 +54,7 @@ struct CopyDescription {

const std::vector<std::string> filePaths;
const std::vector<std::string> columnNames;
const std::vector<common::LogicalType> columnTypes;
std::unique_ptr<CSVReaderConfig> csvReaderConfig;
FileType fileType;
};
Expand Down
1 change: 1 addition & 0 deletions src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ class LogicalTypeUtils {
KUZU_API static LogicalType dataTypeFromString(const std::string& dataTypeString);
static uint32_t getRowLayoutSize(const LogicalType& logicalType);
static bool isNumerical(const LogicalType& dataType);
static bool isPrimitive(const LogicalType& dataType);
static std::vector<LogicalType> getAllValidComparableLogicalTypes();
static std::vector<LogicalTypeID> getNumericalLogicalTypeIDs();
static std::vector<LogicalType> getAllValidLogicTypes();
Expand Down
2 changes: 2 additions & 0 deletions src/include/common/vector/value_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class ValueVector {

void resetAuxiliaryBuffer();

inline bool isPrimitiveDataType() { return LogicalTypeUtils::isPrimitive(dataType); }

// If there is still non-null values after discarding, return true. Otherwise, return false.
// For an unflat vector, its selection vector is also updated to the resultSelVector.
static bool discardNull(ValueVector& vector);
Expand Down
48 changes: 32 additions & 16 deletions src/include/processor/operator/copy_to/copy_to.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,54 @@

#include "common/copier_config/copier_config.h"
#include "common/task_system/task_scheduler.h"
#include "processor/operator/copy_to/csv_file_writer.h"
#include "processor/operator/copy_to/csv_parquet_writer.h"
#include "processor/operator/copy_to/csv_writer.h"
#include "processor/operator/copy_to/parquet_writer.h"
#include "processor/operator/physical_operator.h"
#include "processor/operator/sink.h"
#include "processor/result/result_set.h"

namespace kuzu {
namespace processor {

class WriteCSVFileSharedState {
class CSVParquetWriterSharedState {
public:
WriteCSVFileSharedState() {
csvFileWriter = std::make_unique<kuzu::processor::CSVFileWriter>();
};
std::unique_ptr<kuzu::processor::CSVFileWriter> csvFileWriter;
CSVParquetWriterSharedState(common::CopyDescription::FileType fileType) {
if (fileType == common::CopyDescription::FileType::CSV) {
fileWriter = std::make_unique<kuzu::processor::CSVWriter>();
} else if (fileType == common::CopyDescription::FileType::PARQUET) {
fileWriter = std::make_unique<kuzu::processor::ParquetWriter>();
} else {
common::NotImplementedException(
"CSVParquetWriterSharedState::CSVParquetWriterSharedState");
}
}
std::unique_ptr<CSVParquetWriter>& getWriter() { return fileWriter; }

private:
std::unique_ptr<CSVParquetWriter> fileWriter;
};

class CopyTo : public PhysicalOperator {
class CopyTo : public Sink {
public:
CopyTo(std::shared_ptr<WriteCSVFileSharedState> sharedState,
CopyTo(std::unique_ptr<ResultSetDescriptor> resultSetDescriptor,
std::shared_ptr<CSVParquetWriterSharedState> sharedState,
std::vector<DataPos> vectorsToCopyPos, const common::CopyDescription& copyDescription,
uint32_t id, const std::string& paramsString, std::unique_ptr<PhysicalOperator> child)
: PhysicalOperator{PhysicalOperatorType::COPY_TO, std::move(child), id, paramsString},
: Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_TO, std::move(child), id,
paramsString},
sharedState{std::move(sharedState)}, vectorsToCopyPos{std::move(vectorsToCopyPos)},
copyDescription{copyDescription} {}

bool getNextTuplesInternal(ExecutionContext* context) override;
void executeInternal(ExecutionContext* context) final;

common::CopyDescription& getCopyDescription() { return copyDescription; }
void finalize(ExecutionContext* context) final;

void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;
common::CopyDescription& getCopyDescription() { return copyDescription; }

std::unique_ptr<PhysicalOperator> clone() override {
return make_unique<CopyTo>(
sharedState, vectorsToCopyPos, copyDescription, id, paramsString, children[0]->clone());
std::unique_ptr<PhysicalOperator> clone() final {
return make_unique<CopyTo>(resultSetDescriptor->copy(), sharedState, vectorsToCopyPos,
copyDescription, id, paramsString, children[0]->clone());
}

protected:
Expand All @@ -43,9 +58,10 @@ class CopyTo : public PhysicalOperator {

private:
void initGlobalStateInternal(ExecutionContext* context) override;
void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) override;
common::CopyDescription copyDescription;
std::vector<common::ValueVector*> outputVectors;
std::shared_ptr<WriteCSVFileSharedState> sharedState;
std::shared_ptr<CSVParquetWriterSharedState> sharedState;
};

} // namespace processor
Expand Down
31 changes: 31 additions & 0 deletions src/include/processor/operator/copy_to/csv_parquet_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once

#include "common/copier_config/copier_config.h"
#include "processor/result/result_set.h"

namespace kuzu {
namespace processor {

class CSVParquetWriter {
public:
virtual ~CSVParquetWriter(){};
virtual void openFile(const std::string& filePath) = 0;
virtual void init() = 0;
virtual void closeFile() = 0;
virtual void writeValues(std::vector<common::ValueVector*>& outputVectors) = 0;

inline void setColumns(const std::vector<std::string>& columnNames,
const std::vector<common::LogicalType>& columnTypes) {
this->columnNames = columnNames;
this->columnTypes = columnTypes;
}
inline std::vector<std::string>& getColumnNames() { return columnNames; }
inline std::vector<common::LogicalType>& getColumnTypes() { return columnTypes; }

private:
std::vector<std::string> columnNames;
std::vector<common::LogicalType> columnTypes;
};

} // namespace processor
} // namespace kuzu
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
#pragma once

#include "common/copier_config/copier_config.h"
#include "common/file_utils.h"
#include "processor/operator/copy_to/csv_parquet_writer.h"
#include "processor/result/result_set.h"

namespace kuzu {
namespace processor {

class CSVFileWriter {
class CSVWriter : public CSVParquetWriter {
public:
CSVFileWriter(){};
void open(const std::string& filePath);
void writeHeader(const std::vector<std::string>& columnNames);
void writeValues(std::vector<common::ValueVector*>& outputVectors);
CSVWriter(){};
void openFile(const std::string& filePath) override;
void init() override;
inline void closeFile() override { flush(); }
void writeValues(std::vector<common::ValueVector*>& outputVectors) override;

private:
void writeHeader(const std::vector<std::string>& columnNames);
void escapeString(std::string& value);
void writeValue(common::ValueVector* vector);
void flush();

template<typename T>
void writeToBuffer(common::ValueVector* vector, int64_t pos, bool escapeStringValue = false);
void writeToBuffer(common::ValueVector* vector, bool escapeStringValue = false);

template<typename T>
void writeListToBuffer(common::ValueVector* vector, int64_t pos);
void writeListToBuffer(common::ValueVector* vector);

inline void writeToBuffer(const std::string& value) { buffer << value; }
inline void writeToBuffer(const char value) { buffer << value; }
Expand Down
84 changes: 84 additions & 0 deletions src/include/processor/operator/copy_to/parquet_column_writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#pragma once

#include "processor/operator/copy_to/csv_parquet_writer.h"
#include <parquet/api/writer.h>

namespace kuzu {
namespace processor {

// Additional computed information for each column
// that will be used to write values
// Each element in the vector is related to a column
// TODO (Rui): as other information is no longer needed, we can remove this struct
struct ParquetColumnDescriptor {
int maxDefinitionLevel = 1;
};

// Kuzu column represents a single column in Kuzu, which may contains multiple Parquet columns
// writeColumn will be called for each Kuzu column.
class ParquetColumnWriter {
public:
ParquetColumnWriter(std::unordered_map<int, ParquetColumnDescriptor> columnDescriptors)
: columnDescriptors(columnDescriptors){};

void writeColumn(
int kuzuColumn, common::ValueVector* vector, parquet::RowGroupWriter* rowWriter);

private:
void nextParquetColumn(common::LogicalTypeID logicalTypeID);
void writePrimitiveValue(common::LogicalTypeID logicalTypeID, uint8_t* value,
int16_t definitionLevel = 0, int16_t repetitionLevel = 0);

// ParquetBatch contains the information needed by low-level arrow API writeBatch:
// WriteBatch(int64_t num_values, const int16_t* def_levels,
// const int16_t* rep_levels, const T* values
struct ParquetColumn {
common::LogicalTypeID logicalTypeID;
std::vector<int16_t> repetitionLevels;
std::vector<int16_t> definitionLevels;
std::vector<uint8_t*> values;
};

void addToParquetColumns(uint8_t* value, common::ValueVector* vector,
std::map<std::string, ParquetColumn>& parquetColumns, int currentElementIdx = 0,
int parentElementIdx = 0, int depth = 0, std::string parentStructFieldName = "");

void extractList(const common::list_entry_t& list, const common::ValueVector* vector,
std::map<std::string, ParquetColumn>& parquetColumns, int currentElementIdx = 0,
int parentElementIdx = 0, int depth = 0, std::string parentStructFieldName = "");

void extractStruct(const common::struct_entry_t& val, const common::ValueVector* vector,
std::map<std::string, ParquetColumn>& parquetColumns, int currentElementIdx = 0,
int parentElementIdx = 0, int depth = 0, std::string parentStructFieldName = "");

void extractNested(uint8_t* value, const common::ValueVector* vector,
std::map<std::string, ParquetColumn>& parquetColumns, int currentElementIdx = 0,
int parentElementIdx = 0, int depth = 0, std::string parentStructFieldName = "");

inline void initNewColumn() { isListStarting = true; }

void writeLittleEndianUint32(uint8_t* buffer, uint32_t value);

// Extract dremel encoding levels
int getRepetitionLevel(int currentElementIdx, int parentElementIdx, int depth);

std::unordered_map<int, ParquetColumnDescriptor> columnDescriptors;

// Properties for nested lists and structs
bool isListStarting;

int currentKuzuColumn;

// define the writers
parquet::Int64Writer* int64Writer;
parquet::ByteArrayWriter* byteArrayWriter;
parquet::FixedLenByteArrayWriter* fixedLenByteArrayWriter;
parquet::BoolWriter* boolWriter;
parquet::RowGroupWriter* rowWriter;
parquet::Int32Writer* int32Writer;
parquet::DoubleWriter* doubleWriter;
parquet::FloatWriter* floatWriter;
};

} // namespace processor
} // namespace kuzu
Loading

0 comments on commit 031529e

Please sign in to comment.