From d634e8d9f18f3bd29518d5f2ff86e849c50fdf97 Mon Sep 17 00:00:00 2001 From: ziyi chen Date: Tue, 10 Oct 2023 16:44:08 -0400 Subject: [PATCH] Parquet writer rework --- src/include/common/constants.h | 16 + .../processor/operator/persistent/copy_to.h | 17 +- .../operator/persistent/copy_to_parquet.h | 84 +++++ .../operator/persistent/file_writer.h | 9 +- .../operator/persistent/parquet_file_writer.h | 58 ---- .../persistent/reader/parquet/thrift_tools.h | 1 + .../writer/parquet/basic_column_writer.h | 94 ++++++ .../writer/parquet/boolean_column_writer.h | 67 ++++ .../writer/parquet/buffered_serializer.h | 50 +++ .../persistent/writer/parquet/column_writer.h | 111 ++++++ .../writer/parquet/parquet_rle_bp_encoder.h | 43 +++ .../writer/parquet/parquet_writer.h | 88 +++++ .../writer/parquet/standard_column_writer.h | 104 ++++++ .../writer/parquet/string_column_writer.h | 144 ++++++++ .../writer/parquet/struct_column_writer.h | 44 +++ .../writer/parquet/var_list_column_writer.h | 45 +++ .../processor/operator/physical_operator.h | 1 + .../processor/result/factorized_table.h | 3 + src/optimizer/factorization_rewriter.cpp | 6 +- src/processor/map/map_copy_to.cpp | 57 +++- .../operator/persistent/CMakeLists.txt | 3 +- .../operator/persistent/copy_to_parquet.cpp | 40 +++ .../persistent/parquet_file_writer.cpp | 210 ------------ .../persistent/writer/parquet/CMakeLists.txt | 15 + .../writer/parquet/basic_column_writer.cpp | 315 ++++++++++++++++++ .../writer/parquet/boolean_column_writer.cpp | 43 +++ .../writer/parquet/buffered_serializer.cpp | 33 ++ .../writer/parquet/column_writer.cpp | 225 +++++++++++++ .../writer/parquet/parquet_rle_bp_encoder.cpp | 113 +++++++ .../writer/parquet/parquet_writer.cpp | 232 +++++++++++++ .../writer/parquet/string_column_writer.cpp | 208 ++++++++++++ .../writer/parquet/struct_column_writer.cpp | 99 ++++++ .../writer/parquet/var_list_column_writer.cpp | 106 ++++++ src/processor/result/factorized_table.cpp | 20 ++ test/test_files/copy/copy_to_big_results.test | 1 - test/test_files/copy/copy_to_nested.test | 84 ----- test/test_files/copy/copy_to_parquet.test | 77 +++-- 37 files changed, 2449 insertions(+), 417 deletions(-) create mode 100644 src/include/processor/operator/persistent/copy_to_parquet.h delete mode 100644 src/include/processor/operator/persistent/parquet_file_writer.h create mode 100644 src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h create mode 100644 src/include/processor/operator/persistent/writer/parquet/boolean_column_writer.h create mode 100644 src/include/processor/operator/persistent/writer/parquet/buffered_serializer.h create mode 100644 src/include/processor/operator/persistent/writer/parquet/column_writer.h create mode 100644 src/include/processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.h create mode 100644 src/include/processor/operator/persistent/writer/parquet/parquet_writer.h create mode 100644 src/include/processor/operator/persistent/writer/parquet/standard_column_writer.h create mode 100644 src/include/processor/operator/persistent/writer/parquet/string_column_writer.h create mode 100644 src/include/processor/operator/persistent/writer/parquet/struct_column_writer.h create mode 100644 src/include/processor/operator/persistent/writer/parquet/var_list_column_writer.h create mode 100644 src/processor/operator/persistent/copy_to_parquet.cpp delete mode 100644 src/processor/operator/persistent/parquet_file_writer.cpp create mode 100644 src/processor/operator/persistent/writer/parquet/CMakeLists.txt create mode 100644 src/processor/operator/persistent/writer/parquet/basic_column_writer.cpp create mode 100644 src/processor/operator/persistent/writer/parquet/boolean_column_writer.cpp create mode 100644 src/processor/operator/persistent/writer/parquet/buffered_serializer.cpp create mode 100644 src/processor/operator/persistent/writer/parquet/column_writer.cpp create mode 100644 src/processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.cpp create mode 100644 src/processor/operator/persistent/writer/parquet/parquet_writer.cpp create mode 100644 src/processor/operator/persistent/writer/parquet/string_column_writer.cpp create mode 100644 src/processor/operator/persistent/writer/parquet/struct_column_writer.cpp create mode 100644 src/processor/operator/persistent/writer/parquet/var_list_column_writer.cpp delete mode 100644 test/test_files/copy/copy_to_nested.test diff --git a/src/include/common/constants.h b/src/include/common/constants.h index f2cc666258..2c11e52635 100644 --- a/src/include/common/constants.h +++ b/src/include/common/constants.h @@ -184,5 +184,21 @@ struct OrderByConstants { static constexpr uint64_t MIN_LIMIT_RATIO_TO_REDUCE = 2; }; +struct ParquetConstants { + static constexpr uint64_t PARQUET_DEFINE_VALID = 65535; + static constexpr const char* PARQUET_MAGIC_WORDS = "PAR1"; + // We limit the uncompressed page size to 100MB. + // The max size in Parquet is 2GB, but we choose a more conservative limit. + static constexpr uint64_t MAX_UNCOMPRESSED_PAGE_SIZE = 100000000; + // Dictionary pages must be below 2GB. Unlike data pages, there's only one dictionary page. + // For this reason we go with a much higher, but still a conservative upper bound of 1GB. + static constexpr uint64_t MAX_UNCOMPRESSED_DICT_PAGE_SIZE = 1e9; + // The maximum size a key entry in an RLE page takes. + static constexpr uint64_t MAX_DICTIONARY_KEY_SIZE = sizeof(uint32_t); + // The size of encoding the string length. + static constexpr uint64_t STRING_LENGTH_SIZE = sizeof(uint32_t); + static constexpr uint64_t MAX_STRING_STATISTICS_SIZE = 10000; +}; + } // namespace common } // namespace kuzu diff --git a/src/include/processor/operator/persistent/copy_to.h b/src/include/processor/operator/persistent/copy_to.h index 3cef5615f7..749b18cee0 100644 --- a/src/include/processor/operator/persistent/copy_to.h +++ b/src/include/processor/operator/persistent/copy_to.h @@ -4,7 +4,6 @@ #include "common/task_system/task_scheduler.h" #include "processor/operator/persistent/csv_file_writer.h" #include "processor/operator/persistent/file_writer.h" -#include "processor/operator/persistent/parquet_file_writer.h" #include "processor/operator/physical_operator.h" #include "processor/operator/sink.h" #include "processor/result/result_set.h" @@ -15,16 +14,11 @@ namespace processor { class CopyToSharedState { public: CopyToSharedState(common::FileType fileType, std::string& filePath, - std::vector& columnNames, std::vector& columnTypes) { - if (fileType == common::FileType::CSV) { - fileWriter = - std::make_unique(filePath, columnNames, columnTypes); - } else if (fileType == common::FileType::PARQUET) { - fileWriter = - std::make_unique(filePath, columnNames, columnTypes); - } else { - throw common::NotImplementedException("CopyToSharedState::CopyToSharedState"); - } + std::vector& columnNames, + std::vector> columnTypes) { + assert(fileType == common::FileType::CSV); + fileWriter = std::make_unique( + filePath, columnNames, std::move(columnTypes)); } inline std::unique_ptr& getWriter() { return fileWriter; } @@ -53,7 +47,6 @@ class CopyTo : public Sink { } protected: - std::string getOutputMsg(); std::vector vectorsToCopyPos; private: diff --git a/src/include/processor/operator/persistent/copy_to_parquet.h b/src/include/processor/operator/persistent/copy_to_parquet.h new file mode 100644 index 0000000000..f7f88a2d7f --- /dev/null +++ b/src/include/processor/operator/persistent/copy_to_parquet.h @@ -0,0 +1,84 @@ +#pragma once + +#include "parquet/parquet_types.h" +#include "processor/operator/persistent/writer/parquet/parquet_writer.h" +#include "processor/operator/sink.h" +#include "processor/result/factorized_table.h" +#include "processor/result/result_set.h" + +namespace kuzu { +namespace processor { + +class CopyToParquetSharedState { +public: + std::unique_ptr writer; +}; + +struct CopyToParquetInfo { + kuzu_parquet::format::CompressionCodec::type codec = + kuzu_parquet::format::CompressionCodec::SNAPPY; + std::unique_ptr tableSchema; + std::vector> types; + std::vector names; + std::vector dataPoses; + std::string fileName; + + CopyToParquetInfo(std::unique_ptr tableSchema, + std::vector> types, std::vector names, + std::vector dataPoses, std::string fileName) + : tableSchema{std::move(tableSchema)}, types{std::move(types)}, names{std::move(names)}, + dataPoses{std::move(dataPoses)}, fileName{std::move(fileName)} {} + + std::vector> copyTypes(); + + std::unique_ptr copy() { + return std::make_unique( + tableSchema->copy(), copyTypes(), names, dataPoses, fileName); + } +}; + +struct CopyToParquetLocalState { + std::unique_ptr ft; + std::vector vectorsToAppend; + storage::MemoryManager* mm; + + void init(CopyToParquetInfo* info, storage::MemoryManager* mm, ResultSet* resultSet); + + inline void append() { ft->append(vectorsToAppend); } +}; + +class CopyToParquet : public Sink { +public: + CopyToParquet(std::unique_ptr resultSetDescriptor, + std::unique_ptr info, + std::shared_ptr sharedState, + std::unique_ptr child, uint32_t id, const std::string& paramsString) + : Sink{std::move(resultSetDescriptor), PhysicalOperatorType::COPY_TO_PARQUET, + std::move(child), id, paramsString}, + info{std::move(info)}, localState{std::make_unique()}, + sharedState{std::move(sharedState)} {} + + inline void finalize(ExecutionContext* executionContext) final { + sharedState->writer->finalize(); + } + void initLocalStateInternal(ResultSet* resultSet, ExecutionContext* context) final; + void executeInternal(ExecutionContext* context) final; + std::unique_ptr clone() final { + return std::make_unique(resultSetDescriptor->copy(), info->copy(), + sharedState, children[0]->clone(), id, paramsString); + } + +private: + void initGlobalStateInternal(ExecutionContext* context) final { + sharedState->writer = std::make_unique( + info->fileName, info->copyTypes(), info->names, info->codec, context->memoryManager); + } + +private: + std::shared_ptr sharedState; + std::unique_ptr localState; + std::unique_ptr info; +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/processor/operator/persistent/file_writer.h b/src/include/processor/operator/persistent/file_writer.h index df21b8786f..b67017bb7b 100644 --- a/src/include/processor/operator/persistent/file_writer.h +++ b/src/include/processor/operator/persistent/file_writer.h @@ -9,9 +9,10 @@ namespace processor { class FileWriter { public: FileWriter(std::string filePath, std::vector columnNames, - std::vector columnTypes) - : filePath{filePath}, columnNames{columnNames}, columnTypes{columnTypes} {} - virtual ~FileWriter(){}; + std::vector> columnTypes) + : filePath{std::move(filePath)}, columnNames{std::move(columnNames)}, columnTypes{std::move( + columnTypes)} {} + virtual ~FileWriter() = default; virtual void init() = 0; virtual void openFile() = 0; virtual void closeFile() = 0; @@ -20,7 +21,7 @@ class FileWriter { protected: std::string filePath; std::vector columnNames; - std::vector columnTypes; + std::vector> columnTypes; }; } // namespace processor diff --git a/src/include/processor/operator/persistent/parquet_file_writer.h b/src/include/processor/operator/persistent/parquet_file_writer.h deleted file mode 100644 index 65249e86d9..0000000000 --- a/src/include/processor/operator/persistent/parquet_file_writer.h +++ /dev/null @@ -1,58 +0,0 @@ -#pragma once - -#include "arrow/io/file.h" -#include "processor/operator/persistent/file_writer.h" -#include "processor/operator/persistent/parquet_column_writer.h" -#include - -namespace kuzu { -namespace processor { - -// ParquetFileWriter performs the following: -// openFile: opens the file and create an arrow::io::FileOutputStream object -// init: -// - generate the schema -// - calculate the max definition levels and number of primitive nodes. The -// definition levels are used for nested types. -// - initialize parquetColumnWriter -// writeValues : take a vector of ValueVector and pass to parquetColumnWriter -// More information about the encoding: -// https://parquet.apache.org/docs/file-format/data-pages/encodings -class ParquetFileWriter : public FileWriter { -public: - using FileWriter::FileWriter; - void openFile() final; - void init() final; - void closeFile() final; - void writeValues(std::vector& outputVectors) final; - -private: - static std::shared_ptr createParquetSchemaNode(int& parquetColumnsCount, - std::string& columnName, const common::LogicalType& logicalType, - parquet::Repetition::type repetition = parquet::Repetition::REQUIRED, int length = -1); - - static std::shared_ptr createNestedNode( - int& parquetColumnsCount, std::string& columnName, const common::LogicalType& logicalType); - - static std::shared_ptr createPrimitiveNode(int& parquetColumnsCount, - std::string& columnName, const common::LogicalType& logicalType, - parquet::Repetition::type repetition, int length); - - void writeValue(common::LogicalTypeID type, void* value); - - void flush(); - - void generateSchema( - std::shared_ptr& schema, int& parquetColumnsCount); - - std::shared_ptr parquetColumnWriter; - - parquet::RowGroupWriter* rowGroupWriter; - - std::shared_ptr fileWriter; - - std::shared_ptr outFile; -}; - -} // namespace processor -} // namespace kuzu diff --git a/src/include/processor/operator/persistent/reader/parquet/thrift_tools.h b/src/include/processor/operator/persistent/reader/parquet/thrift_tools.h index eb0b4a7f14..df35438d12 100644 --- a/src/include/processor/operator/persistent/reader/parquet/thrift_tools.h +++ b/src/include/processor/operator/persistent/reader/parquet/thrift_tools.h @@ -1,6 +1,7 @@ #pragma once #include +#include "common/file_utils.h" #include "thrift/protocol/TCompactProtocol.h" #include "thrift/transport/TBufferTransports.h" diff --git a/src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h b/src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h new file mode 100644 index 0000000000..fdb6a94346 --- /dev/null +++ b/src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h @@ -0,0 +1,94 @@ +#pragma once + +#include "parquet/parquet_types.h" +#include "processor/operator/persistent/writer/parquet/column_writer.h" + +namespace kuzu { +namespace processor { + +class BasicColumnWriterState : public ColumnWriterState { +public: + BasicColumnWriterState(kuzu_parquet::format::RowGroup& rowGroup, uint64_t colIdx) + : rowGroup{rowGroup}, colIdx{colIdx} { + pageInfo.emplace_back(); + } + + kuzu_parquet::format::RowGroup& rowGroup; + uint64_t colIdx; + std::vector pageInfo; + std::vector writeInfo; + std::unique_ptr statsState; + uint64_t currentPage = 0; +}; + +class BasicColumnWriter : public ColumnWriter { +public: + BasicColumnWriter(ParquetWriter& writer, uint64_t schemaIdx, + std::vector schemaPath, uint64_t maxRepeat, uint64_t maxDefine, + bool canHaveNulls) + : ColumnWriter( + writer, schemaIdx, std::move(schemaPath), maxRepeat, maxDefine, canHaveNulls) {} + +public: + std::unique_ptr initializeWriteState( + kuzu_parquet::format::RowGroup& rowGroup) override; + void prepare(ColumnWriterState& state, ColumnWriterState* parent, common::ValueVector* vector, + uint64_t count) override; + void beginWrite(ColumnWriterState& state) override; + void write(ColumnWriterState& state, common::ValueVector* vector, uint64_t count) override; + void finalizeWrite(ColumnWriterState& state) override; + +protected: + void writeLevels(BufferedSerializer& bufferedSerializer, const std::vector& levels, + uint64_t maxValue, uint64_t startOffset, uint64_t count); + + virtual kuzu_parquet::format::Encoding::type getEncoding(BasicColumnWriterState& state) { + return kuzu_parquet::format::Encoding::PLAIN; + } + + void nextPage(BasicColumnWriterState& state); + void flushPage(BasicColumnWriterState& state); + + // Initializes the state used to track statistics during writing. Only used for scalar types. + virtual std::unique_ptr initializeStatsState() { + return std::make_unique(); + } + + // Initialize the writer for a specific page. Only used for scalar types. + virtual std::unique_ptr initializePageState( + BasicColumnWriterState& state) { + return nullptr; + } + + // Flushes the writer for a specific page. Only used for scalar types. + virtual void flushPageState( + BufferedSerializer& bufferedSerializer, ColumnWriterPageState* state) {} + + // Retrieves the row size of a vector at the specified location. Only used for scalar types. + virtual uint64_t getRowSize( + common::ValueVector* vector, uint64_t index, BasicColumnWriterState& state) { + throw common::NotImplementedException{"BasicColumnWriter::getRowSize"}; + } + // Writes a (subset of a) vector to the specified serializer. Only used for scalar types. + virtual void writeVector(BufferedSerializer& bufferedSerializer, ColumnWriterStatistics* stats, + ColumnWriterPageState* pageState, common::ValueVector* vector, uint64_t chunkStart, + uint64_t chunkEnd) = 0; + + virtual bool hasDictionary(BasicColumnWriterState& writerState) { return false; } + // The number of elements in the dictionary. + virtual uint64_t dictionarySize(BasicColumnWriterState& writerState) { + throw common::NotImplementedException{"BasicColumnWriter::dictionarySize"}; + } + void writeDictionary(BasicColumnWriterState& state, + std::unique_ptr bufferedSerializer, uint64_t rowCount); + virtual void flushDictionary(BasicColumnWriterState& state, ColumnWriterStatistics* stats) { + throw common::NotImplementedException{"BasicColumnWriter::flushDictionary"}; + } + + void setParquetStatistics( + BasicColumnWriterState& state, kuzu_parquet::format::ColumnChunk& column); + void registerToRowGroup(kuzu_parquet::format::RowGroup& rowGroup); +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/processor/operator/persistent/writer/parquet/boolean_column_writer.h b/src/include/processor/operator/persistent/writer/parquet/boolean_column_writer.h new file mode 100644 index 0000000000..73942345c0 --- /dev/null +++ b/src/include/processor/operator/persistent/writer/parquet/boolean_column_writer.h @@ -0,0 +1,67 @@ +#pragma once + +#include "processor/operator/persistent/writer/parquet/basic_column_writer.h" + +namespace kuzu { +namespace processor { + +class BooleanStatisticsState : public ColumnWriterStatistics { +public: + BooleanStatisticsState() : min{true}, max{false} {} + + bool min; + bool max; + +public: + bool hasStats() { return !(min && !max); } + + std::string getMin() override { return getMinValue(); } + std::string getMax() override { return getMaxValue(); } + std::string getMinValue() override { + return hasStats() ? std::string(reinterpret_cast(&min), sizeof(bool)) : + std::string(); + } + std::string getMaxValue() override { + return hasStats() ? std::string(reinterpret_cast(&max), sizeof(bool)) : + std::string(); + } +}; + +class BooleanWriterPageState : public ColumnWriterPageState { +public: + uint8_t byte = 0; + uint8_t bytePos = 0; +}; + +class BooleanColumnWriter : public BasicColumnWriter { +public: + BooleanColumnWriter(ParquetWriter& writer, uint64_t schemaIdx, + std::vector schemaPath, uint64_t maxRepeat, uint64_t maxDefine, + bool canHaveNulls) + : BasicColumnWriter( + writer, schemaIdx, std::move(schemaPath), maxRepeat, maxDefine, canHaveNulls) {} + + inline std::unique_ptr initializeStatsState() override { + return std::make_unique(); + } + + inline uint64_t getRowSize( + common::ValueVector* vector, uint64_t index, BasicColumnWriterState& state) override { + return sizeof(bool); + } + + inline std::unique_ptr initializePageState( + BasicColumnWriterState& state) override { + return std::make_unique(); + } + + void writeVector(BufferedSerializer& bufferedSerializer, + ColumnWriterStatistics* writerStatistics, ColumnWriterPageState* writerPageState, + common::ValueVector* vector, uint64_t chunkStart, uint64_t chunkEnd) override; + + void flushPageState( + BufferedSerializer& temp_writer, ColumnWriterPageState* writerPageState) override; +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/processor/operator/persistent/writer/parquet/buffered_serializer.h b/src/include/processor/operator/persistent/writer/parquet/buffered_serializer.h new file mode 100644 index 0000000000..96abdc46bf --- /dev/null +++ b/src/include/processor/operator/persistent/writer/parquet/buffered_serializer.h @@ -0,0 +1,50 @@ +#pragma once + +#include + +namespace kuzu { +namespace processor { + +// TODO(Ziyi): Move this to constants.h once we have a unified serializer design. +static constexpr uint64_t SERIALIZER_DEFAULT_SIZE = 1024; + +struct BinaryData { + std::unique_ptr data; + uint64_t size; +}; + +class BufferedSerializer { +public: + // Serializes to a buffer allocated by the serializer, will expand when + // writing past the initial threshold. + explicit BufferedSerializer(uint64_t maximum_size = SERIALIZER_DEFAULT_SIZE); + // Serializes to a provided (owned) data pointer. + BufferedSerializer(std::unique_ptr data, uint64_t size); + + // Retrieves the data after the writing has been completed. + inline BinaryData getData() { return std::move(blob); } + + inline uint64_t getSize() { return blob.size; } + + inline uint8_t* getBlobData() { return blob.data.get(); } + + inline void reset() { blob.size = 0; } + + template + void write(T element) { + static_assert( + std::is_trivially_destructible(), "Write element must be trivially destructible"); + writeData(reinterpret_cast(&element), sizeof(T)); + } + + void writeData(const uint8_t* buffer, uint64_t len); + +private: + uint64_t maximumSize; + uint8_t* data; + + BinaryData blob; +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/processor/operator/persistent/writer/parquet/column_writer.h b/src/include/processor/operator/persistent/writer/parquet/column_writer.h new file mode 100644 index 0000000000..2827a9b20c --- /dev/null +++ b/src/include/processor/operator/persistent/writer/parquet/column_writer.h @@ -0,0 +1,111 @@ +#pragma once + +#include "buffered_serializer.h" +#include "common/exception/not_implemented.h" +#include "common/types/types.h" +#include "common/vector/value_vector.h" +#include "parquet/parquet_types.h" + +namespace kuzu { +namespace processor { +class ParquetWriter; + +struct PageInformation { + uint64_t offset = 0; + uint64_t rowCount = 0; + uint64_t emptyCount = 0; + uint64_t estimatedPageSize = 0; +}; + +class ColumnWriterPageState { +public: + virtual ~ColumnWriterPageState() = default; +}; + +struct PageWriteInformation { + kuzu_parquet::format::PageHeader pageHeader; + std::unique_ptr bufferWriter; + std::unique_ptr pageState; + uint64_t writePageIdx = 0; + uint64_t writeCount = 0; + uint64_t maxWriteCount = 0; + size_t compressedSize; + uint8_t* compressedData; + std::unique_ptr compressedBuf; +}; + +class ColumnWriterState { +public: + virtual ~ColumnWriterState() = default; + + std::vector definitionLevels; + std::vector repetitionLevels; + std::vector isEmpty; +}; + +class ColumnWriterStatistics { +public: + virtual ~ColumnWriterStatistics() = default; + + virtual std::string getMin() { return {}; } + virtual std::string getMax() { return {}; } + virtual std::string getMinValue() { return {}; } + virtual std::string getMaxValue() { return {}; } +}; + +class ColumnWriter { +public: + ColumnWriter(ParquetWriter& writer, uint64_t schemaIdx, std::vector schemaPath, + uint64_t maxRepeat, uint64_t maxDefine, bool canHaveNulls); + virtual ~ColumnWriter() = default; + + // Create the column writer for a specific type recursively. + // TODO(Ziyi): We currently don't have statistics to indicate whether a column + // has null value or not. So canHaveNullsToCreate is always true. + static std::unique_ptr createWriterRecursive( + std::vector& schemas, ParquetWriter& writer, + common::LogicalType* type, const std::string& name, + std::vector schemaPathToCreate, storage::MemoryManager* mm, + uint64_t maxRepeatToCreate = 0, uint64_t maxDefineToCreate = 1, + bool canHaveNullsToCreate = true); + + virtual std::unique_ptr initializeWriteState( + kuzu_parquet::format::RowGroup& rowGroup) = 0; + // Indicates whether the write need to analyse the data before preparing it. + virtual bool hasAnalyze() { return false; } + virtual void analyze(ColumnWriterState& state, ColumnWriterState* parent, + common::ValueVector* vector, uint64_t count) { + throw common::NotImplementedException{"ColumnWriter::analyze"}; + } + // Called after all data has been passed to Analyze. + virtual void finalizeAnalyze(ColumnWriterState& state) { + throw common::NotImplementedException{"ColumnWriter::finalizeAnalyze"}; + } + virtual void prepare(ColumnWriterState& state, ColumnWriterState* parent, + common::ValueVector* vector, uint64_t count) = 0; + virtual void beginWrite(ColumnWriterState& state) = 0; + virtual void write(ColumnWriterState& state, common::ValueVector* vector, uint64_t count) = 0; + virtual void finalizeWrite(ColumnWriterState& state) = 0; + inline uint64_t getVectorPos(common::ValueVector* vector, uint64_t idx) { + return (vector->state == nullptr || !vector->state->isFlat()) ? idx : 0; + } + + ParquetWriter& writer; + uint64_t schemaIdx; + std::vector schemaPath; + uint64_t maxRepeat; + uint64_t maxDefine; + bool canHaveNulls; + // collected stats + uint64_t nullCount; + +protected: + void handleDefineLevels(ColumnWriterState& state, ColumnWriterState* parent, + common::ValueVector* vector, uint64_t count, uint16_t defineValue, uint16_t nullValue); + void handleRepeatLevels(ColumnWriterState& stateToHandle, ColumnWriterState* parent); + void compressPage(BufferedSerializer& bufferedSerializer, size_t& compressedSize, + uint8_t*& compressedData, std::unique_ptr& compressedBuf); +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.h b/src/include/processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.h new file mode 100644 index 0000000000..379a2d10dc --- /dev/null +++ b/src/include/processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.h @@ -0,0 +1,43 @@ +#pragma once + +#include "buffered_serializer.h" + +namespace kuzu { +namespace processor { + +class RleBpEncoder { +public: + RleBpEncoder(uint32_t bitWidth); + +public: + // NOTE: Prepare is only required if a byte count is required BEFORE writing + // This is the case with e.g. writing repetition/definition levels + // If GetByteCount() is not required, prepare can be safely skipped. + void beginPrepare(uint32_t firstValue); + void prepareValue(uint32_t value); + void finishPrepare(); + + void beginWrite(BufferedSerializer& writer, uint32_t first_value); + void writeValue(BufferedSerializer& writer, uint32_t value); + void finishWrite(BufferedSerializer& writer); + + uint64_t getByteCount(); + + static uint8_t getVarintSize(uint32_t val); + +private: + //! meta information + uint32_t byteWidth; + //! RLE run information + uint64_t byteCount; + uint64_t runCount; + uint64_t currentRunCount; + uint32_t lastValue; + +private: + void finishRun(); + void writeRun(BufferedSerializer& bufferedSerializer); +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/processor/operator/persistent/writer/parquet/parquet_writer.h b/src/include/processor/operator/persistent/writer/parquet/parquet_writer.h new file mode 100644 index 0000000000..36c018d462 --- /dev/null +++ b/src/include/processor/operator/persistent/writer/parquet/parquet_writer.h @@ -0,0 +1,88 @@ +#pragma once + +#include "common/data_chunk/data_chunk.h" +#include "common/file_utils.h" +#include "common/types/types.h" +#include "parquet/parquet_types.h" +#include "processor/operator/persistent/writer/parquet/column_writer.h" +#include "processor/result/factorized_table.h" +#include "thrift/protocol/TProtocol.h" + +namespace kuzu { +namespace processor { + +class ParquetWriterTransport : public kuzu_apache::thrift::protocol::TTransport { +public: + explicit ParquetWriterTransport(common::FileInfo* fileInfo, common::offset_t& offset) + : fileInfo{fileInfo}, offset{offset} {} + + inline bool isOpen() const override { return true; } + + void open() override {} + + void close() override {} + + inline void write_virt(const uint8_t* buf, uint32_t len) override { + common::FileUtils::writeToFile(fileInfo, buf, len, offset); + offset += len; + } + +private: + common::FileInfo* fileInfo; + common::offset_t& offset; +}; + +struct PreparedRowGroup { + kuzu_parquet::format::RowGroup rowGroup; + std::vector> states; +}; + +class ParquetWriter { +public: + ParquetWriter(std::string fileName, std::vector> types, + std::vector names, kuzu_parquet::format::CompressionCodec::type codec, + storage::MemoryManager* mm); + + inline common::offset_t getOffset() const { return fileOffset; } + inline void write(const uint8_t* buf, uint32_t len) { + common::FileUtils::writeToFile(fileInfo.get(), buf, len, fileOffset); + fileOffset += len; + } + inline kuzu_parquet::format::CompressionCodec::type getCodec() { return codec; } + inline kuzu_apache::thrift::protocol::TProtocol* getProtocol() { return protocol.get(); } + inline kuzu_parquet::format::Type::type getParquetType(uint64_t schemaIdx) { + return fileMetaData.schema[schemaIdx].type; + } + void flush(FactorizedTable& ft); + void finalize(); + static kuzu_parquet::format::Type::type convertToParquetType(common::LogicalType* type); + static void setSchemaProperties( + common::LogicalType* type, kuzu_parquet::format::SchemaElement& schemaElement); + +private: + void prepareRowGroup(FactorizedTable& ft, PreparedRowGroup& result); + void flushRowGroup(PreparedRowGroup& rowGroup); + void readFromFT(FactorizedTable& ft, std::vector vectorsToRead, + uint64_t& numTuplesRead); + inline uint64_t getNumTuples(common::DataChunk* unflatChunk) { + return unflatChunk->getNumValueVectors() != 0 ? + unflatChunk->state->selVector->selectedSize : + 1; + } + +private: + std::string fileName; + std::vector> types; + std::vector columnNames; + kuzu_parquet::format::CompressionCodec::type codec; + std::unique_ptr fileInfo; + std::shared_ptr protocol; + kuzu_parquet::format::FileMetaData fileMetaData; + std::mutex lock; + std::vector> columnWriters; + common::offset_t fileOffset; + storage::MemoryManager* mm; +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/processor/operator/persistent/writer/parquet/standard_column_writer.h b/src/include/processor/operator/persistent/writer/parquet/standard_column_writer.h new file mode 100644 index 0000000000..f7461cfaf7 --- /dev/null +++ b/src/include/processor/operator/persistent/writer/parquet/standard_column_writer.h @@ -0,0 +1,104 @@ +#pragma once + +#include "basic_column_writer.h" +#include "function/cast/numeric_limits.h" +#include "function/comparison/comparison_functions.h" + +namespace kuzu { +namespace processor { + +template +class NumericStatisticsState : public ColumnWriterStatistics { +public: + NumericStatisticsState() + : min(function::NumericLimits::maximum()), max(function::NumericLimits::minimum()) {} + + T min; + T max; + +public: + bool hasStats() { return min <= max; } + + std::string getMin() override { + return function::NumericLimits::isSigned() ? getMinValue() : std::string(); + } + std::string getMax() override { + return function::NumericLimits::isSigned() ? getMaxValue() : std::string(); + } + std::string getMinValue() override { + return hasStats() ? std::string((char*)&min, sizeof(T)) : std::string(); + } + std::string getMaxValue() override { + return hasStats() ? std::string((char*)&max, sizeof(T)) : std::string(); + } +}; + +struct BaseParquetOperator { + template + inline static std::unique_ptr initializeStats() { + return std::make_unique>(); + } + + template + static void handleStats(ColumnWriterStatistics* stats, SRC sourceValue, TGT targetValue) { + auto& numericStats = (NumericStatisticsState&)*stats; + uint8_t result; + function::LessThan::operation(targetValue, numericStats.min, result, + nullptr /* leftVector */, nullptr /* rightVector */); + if (result != 0) { + numericStats.min = targetValue; + } + function::GreaterThan::operation(targetValue, numericStats.max, result, + nullptr /* leftVector */, nullptr /* rightVector */); + if (result != 0) { + numericStats.max = targetValue; + } + } +}; + +struct ParquetCastOperator : public BaseParquetOperator { + template + static TGT Operation(SRC input) { + return TGT(input); + } +}; + +template +class StandardColumnWriter : public BasicColumnWriter { +public: + StandardColumnWriter(ParquetWriter& writer, uint64_t schemaIdx, + std::vector schemaPath, uint64_t maxRepeat, uint64_t maxDefine, + bool canHaveNulls) + : BasicColumnWriter( + writer, schemaIdx, std::move(schemaPath), maxRepeat, maxDefine, canHaveNulls) {} + + inline std::unique_ptr initializeStatsState() override { + return OP::template initializeStats(); + } + + void templatedWritePlain(common::ValueVector* vector, ColumnWriterStatistics* stats, + uint64_t chunkStart, uint64_t chunkEnd, BufferedSerializer& ser) { + for (auto r = chunkStart; r < chunkEnd; r++) { + auto pos = getVectorPos(vector, r); + if (!vector->isNull(pos)) { + TGT targetValue = OP::template Operation(vector->getValue(pos)); + OP::template handleStats(stats, vector->getValue(pos), targetValue); + ser.write(targetValue); + } + } + } + + inline void writeVector(BufferedSerializer& bufferedSerializer, ColumnWriterStatistics* stats, + ColumnWriterPageState* pageState, common::ValueVector* vector, uint64_t chunkStart, + uint64_t chunkEnd) override { + templatedWritePlain(vector, stats, chunkStart, chunkEnd, bufferedSerializer); + } + + inline uint64_t getRowSize( + common::ValueVector* vector, uint64_t index, BasicColumnWriterState& state) override { + return sizeof(TGT); + } +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/processor/operator/persistent/writer/parquet/string_column_writer.h b/src/include/processor/operator/persistent/writer/parquet/string_column_writer.h new file mode 100644 index 0000000000..74c4fa42b8 --- /dev/null +++ b/src/include/processor/operator/persistent/writer/parquet/string_column_writer.h @@ -0,0 +1,144 @@ +#pragma once + +#include "parquet/parquet_types.h" +#include "parquet_rle_bp_encoder.h" +#include "processor/operator/persistent/reader/parquet/parquet_rle_bp_decoder.h" +#include "processor/operator/persistent/writer/parquet/basic_column_writer.h" + +namespace kuzu { +namespace processor { + +struct StringHash { + std::size_t operator()(const common::ku_string_t& k) const; +}; + +struct StringEquality { + bool operator()(const common::ku_string_t& a, const common::ku_string_t& b) const; +}; + +template +using string_map_t = std::unordered_map; + +class StringStatisticsState : public ColumnWriterStatistics { +public: + StringStatisticsState() : hasStats(false), valuesTooBig(false), min(), max() {} + + bool hasStats; + bool valuesTooBig; + std::string min; + std::string max; + +public: + bool hasValidStats() { return hasStats; } + + void update(const common::ku_string_t& val); + + std::string getMin() override { return getMinValue(); } + std::string getMax() override { return getMaxValue(); } + std::string getMinValue() override { return hasValidStats() ? min : std::string(); } + std::string getMaxValue() override { return hasValidStats() ? max : std::string(); } +}; + +class StringColumnWriterState : public BasicColumnWriterState { +public: + StringColumnWriterState( + kuzu_parquet::format::RowGroup& rowGroup, uint64_t colIdx, storage::MemoryManager* mm) + : BasicColumnWriterState{rowGroup, colIdx}, + overflowBuffer{std::make_unique(mm)} {} + + // Analysis state. + uint64_t estimatedDictPageSize = 0; + uint64_t estimatedRlePagesSize = 0; + uint64_t estimatedPlainSize = 0; + + // Dictionary and accompanying string heap. + string_map_t dictionary; + std::unique_ptr overflowBuffer; + // key_bit_width== 0 signifies the chunk is written in plain encoding + uint32_t keyBitWidth; + + bool isDictionaryEncoded() { return keyBitWidth != 0; } +}; + +class StringWriterPageState : public ColumnWriterPageState { +public: + explicit StringWriterPageState(uint32_t bitWidth, const string_map_t& values) + : bitWidth(bitWidth), dictionary(values), encoder(bitWidth), writtenValue(false) { + assert(isDictionaryEncoded() || (bitWidth == 0 && dictionary.empty())); + } + + inline bool isDictionaryEncoded() { return bitWidth != 0; } + // If 0, we're writing a plain page. + uint32_t bitWidth; + const string_map_t& dictionary; + RleBpEncoder encoder; + bool writtenValue; +}; + +class StringColumnWriter : public BasicColumnWriter { +public: + StringColumnWriter(ParquetWriter& writer, uint64_t schemaIdx, + std::vector schemaPath, uint64_t maxRepeat, uint64_t maxDefine, + bool canHaveNulls, storage::MemoryManager* mm) + : BasicColumnWriter( + writer, schemaIdx, std::move(schemaPath), maxRepeat, maxDefine, canHaveNulls), + mm{mm} {} + +public: + inline std::unique_ptr initializeStatsState() override { + return std::make_unique(); + } + + std::unique_ptr initializeWriteState( + kuzu_parquet::format::RowGroup& rowGroup) override; + + inline bool hasAnalyze() override { return true; } + + inline std::unique_ptr initializePageState( + BasicColumnWriterState& state_p) override { + auto& state = reinterpret_cast(state_p); + return std::make_unique(state.keyBitWidth, state.dictionary); + } + + inline kuzu_parquet::format::Encoding::type getEncoding( + BasicColumnWriterState& writerState) override { + auto& state = reinterpret_cast(writerState); + return state.isDictionaryEncoded() ? kuzu_parquet::format::Encoding::RLE_DICTIONARY : + kuzu_parquet::format::Encoding::PLAIN; + } + + inline bool hasDictionary(BasicColumnWriterState& writerState) override { + auto& state = reinterpret_cast(writerState); + return state.isDictionaryEncoded(); + } + + inline uint64_t dictionarySize(BasicColumnWriterState& writerState) override { + auto& state = reinterpret_cast(writerState); + assert(state.isDictionaryEncoded()); + return state.dictionary.size(); + } + + void analyze(ColumnWriterState& writerState, ColumnWriterState* parent, + common::ValueVector* vector, uint64_t count) override; + + void finalizeAnalyze(ColumnWriterState& writerState) override; + + void writeVector(BufferedSerializer& bufferedSerializer, ColumnWriterStatistics* statsToWrite, + ColumnWriterPageState* writerPageState, common::ValueVector* vector, uint64_t chunkStart, + uint64_t chunkEnd) override; + + void flushPageState( + BufferedSerializer& bufferedSerializer, ColumnWriterPageState* writerPageState) override; + + void flushDictionary( + BasicColumnWriterState& writerState, ColumnWriterStatistics* writerStats) override; + + uint64_t getRowSize( + common::ValueVector* vector, uint64_t index, BasicColumnWriterState& writerState) override; + +private: + storage::MemoryManager* mm; +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/processor/operator/persistent/writer/parquet/struct_column_writer.h b/src/include/processor/operator/persistent/writer/parquet/struct_column_writer.h new file mode 100644 index 0000000000..0811e64621 --- /dev/null +++ b/src/include/processor/operator/persistent/writer/parquet/struct_column_writer.h @@ -0,0 +1,44 @@ +#pragma once + +#include "processor/operator/persistent/writer/parquet/column_writer.h" + +namespace kuzu { +namespace processor { + +class StructColumnWriter : public ColumnWriter { +public: + StructColumnWriter(ParquetWriter& writer, uint64_t schemaIdx, std::vector schema, + uint64_t maxRepeat, uint64_t maxDefine, + std::vector> childWriter, bool canHaveNull) + : ColumnWriter{writer, schemaIdx, std::move(schema), maxRepeat, maxDefine, canHaveNull}, + childWriters{std::move(childWriter)} {} + + std::vector> childWriters; + +public: + std::unique_ptr initializeWriteState( + kuzu_parquet::format::RowGroup& rowGroup) override; + bool hasAnalyze() override; + void analyze(ColumnWriterState& state, ColumnWriterState* parent, common::ValueVector* vector, + uint64_t count) override; + void finalizeAnalyze(ColumnWriterState& state) override; + void prepare(ColumnWriterState& state, ColumnWriterState* parent, common::ValueVector* vector, + uint64_t count) override; + + void beginWrite(ColumnWriterState& state) override; + void write(ColumnWriterState& state, common::ValueVector* vector, uint64_t count) override; + void finalizeWrite(ColumnWriterState& state) override; +}; + +class StructColumnWriterState : public ColumnWriterState { +public: + StructColumnWriterState(kuzu_parquet::format::RowGroup& rowGroup, uint64_t colIdx) + : rowGroup(rowGroup), colIdx(colIdx) {} + + kuzu_parquet::format::RowGroup& rowGroup; + uint64_t colIdx; + std::vector> childStates; +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/processor/operator/persistent/writer/parquet/var_list_column_writer.h b/src/include/processor/operator/persistent/writer/parquet/var_list_column_writer.h new file mode 100644 index 0000000000..30a5d1f08f --- /dev/null +++ b/src/include/processor/operator/persistent/writer/parquet/var_list_column_writer.h @@ -0,0 +1,45 @@ +#pragma once + +#include "processor/operator/persistent/writer/parquet/column_writer.h" + +namespace kuzu { +namespace processor { + +class VarListColumnWriter : public ColumnWriter { +public: + VarListColumnWriter(ParquetWriter& writer, uint64_t schemaIdx, std::vector schema, + uint64_t maxRepeat, uint64_t maxDefine, std::unique_ptr childWriter, + bool canHaveNulls) + : ColumnWriter(writer, schemaIdx, std::move(schema), maxRepeat, maxDefine, canHaveNulls), + childWriter(std::move(childWriter)) {} + + std::unique_ptr initializeWriteState( + kuzu_parquet::format::RowGroup& rowGroup) override; + bool hasAnalyze() override; + void analyze(ColumnWriterState& writerState, ColumnWriterState* parent, + common::ValueVector* vector, uint64_t count) override; + void finalizeAnalyze(ColumnWriterState& writerState) override; + void prepare(ColumnWriterState& writerState, ColumnWriterState* parent, + common::ValueVector* vector, uint64_t count) override; + void beginWrite(ColumnWriterState& state) override; + void write( + ColumnWriterState& writerState, common::ValueVector* vector, uint64_t count) override; + void finalizeWrite(ColumnWriterState& writerState) override; + +private: + std::unique_ptr childWriter; +}; + +class ListColumnWriterState : public ColumnWriterState { +public: + ListColumnWriterState(kuzu_parquet::format::RowGroup& rowGroup, uint64_t colIdx) + : rowGroup{rowGroup}, colIdx{colIdx} {} + + kuzu_parquet::format::RowGroup& rowGroup; + uint64_t colIdx; + std::unique_ptr childState; + uint64_t parentIdx = 0; +}; + +} // namespace processor +} // namespace kuzu diff --git a/src/include/processor/operator/physical_operator.h b/src/include/processor/operator/physical_operator.h index ed0db8cff2..2175059791 100644 --- a/src/include/processor/operator/physical_operator.h +++ b/src/include/processor/operator/physical_operator.h @@ -21,6 +21,7 @@ enum class PhysicalOperatorType : uint8_t { COPY_REL_COLUMNS, COPY_REL_LISTS, COPY_TO, + COPY_TO_PARQUET, CREATE_NODE_TABLE, CREATE_REL_TABLE, CREATE_RDF_GRAPH, diff --git a/src/include/processor/result/factorized_table.h b/src/include/processor/result/factorized_table.h index 42cf0deefb..dcb0782938 100644 --- a/src/include/processor/result/factorized_table.h +++ b/src/include/processor/result/factorized_table.h @@ -161,6 +161,9 @@ class FactorizedTableSchema { return std::make_unique(*this); } + uint64_t getNumFlatColumns() const; + uint64_t getNumUnflatColumns() const; + private: std::vector> columns; uint32_t numBytesForDataPerTuple = 0; diff --git a/src/optimizer/factorization_rewriter.cpp b/src/optimizer/factorization_rewriter.cpp index 006242784b..129f974e5c 100644 --- a/src/optimizer/factorization_rewriter.cpp +++ b/src/optimizer/factorization_rewriter.cpp @@ -200,8 +200,10 @@ void FactorizationRewriter::visitMerge(planner::LogicalOperator* op) { void FactorizationRewriter::visitCopyTo(planner::LogicalOperator* op) { auto copyTo = (LogicalCopyTo*)op; - auto groupsPosToFlatten = copyTo->getGroupsPosToFlatten(); - copyTo->setChild(0, appendFlattens(copyTo->getChild(0), groupsPosToFlatten)); + if (copyTo->getConfig()->fileType == common::FileType::CSV) { + auto groupsPosToFlatten = copyTo->getGroupsPosToFlatten(); + copyTo->setChild(0, appendFlattens(copyTo->getChild(0), groupsPosToFlatten)); + } } std::shared_ptr FactorizationRewriter::appendFlattens( diff --git a/src/processor/map/map_copy_to.cpp b/src/processor/map/map_copy_to.cpp index 31afc0c199..14769569f6 100644 --- a/src/processor/map/map_copy_to.cpp +++ b/src/processor/map/map_copy_to.cpp @@ -1,5 +1,6 @@ #include "planner/operator/persistent/logical_copy_to.h" #include "processor/operator/persistent/copy_to.h" +#include "processor/operator/persistent/copy_to_parquet.h" #include "processor/plan_mapper.h" using namespace kuzu::common; @@ -12,9 +13,11 @@ namespace processor { std::unique_ptr PlanMapper::mapCopyTo(LogicalOperator* logicalOperator) { auto copy = (LogicalCopyTo*)logicalOperator; auto config = copy->getConfig(); - std::vector columnsTypes; + std::vector> columnsTypes; + std::vector columnNames; + columnsTypes.reserve(config->columnTypes.size()); for (auto& type : config->columnTypes) { - columnsTypes.push_back(*type); + columnsTypes.push_back(type->copy()); } auto childSchema = logicalOperator->getChild(0)->getSchema(); auto prevOperator = mapOperator(logicalOperator->getChild(0).get()); @@ -22,16 +25,46 @@ std::unique_ptr PlanMapper::mapCopyTo(LogicalOperator* logical for (auto& expression : childSchema->getExpressionsInScope()) { vectorsToCopyPos.emplace_back(childSchema->getExpressionPos(*expression)); } - auto sharedState = std::make_shared( - config->fileType, config->filePaths[0], config->columnNames, columnsTypes); - auto copyTo = std::make_unique(std::make_unique(childSchema), - sharedState, std::move(vectorsToCopyPos), getOperatorID(), - copy->getExpressionsForPrinting(), std::move(prevOperator)); - std::shared_ptr fTable; - auto ftTableSchema = std::make_unique(); - fTable = std::make_shared(memoryManager, std::move(ftTableSchema)); - return createFactorizedTableScan(binder::expression_vector{}, std::vector{}, - childSchema, fTable, 0, std::move(copyTo)); + if (copy->getConfig()->fileType == common::FileType::PARQUET) { + auto copyToSchema = std::make_unique(); + auto copyToExpressions = childSchema->getExpressionsInScope(); + for (auto& copyToExpression : copyToExpressions) { + auto [dataChunkPos, vectorPos] = childSchema->getExpressionPos(*copyToExpression); + std::unique_ptr columnSchema; + if (!childSchema->getGroup(dataChunkPos)->isFlat()) { + // payload is unFlat and not in the same group as keys + columnSchema = std::make_unique( + true /* isUnFlat */, dataChunkPos, sizeof(overflow_value_t)); + } else { + columnSchema = std::make_unique(false /* isUnFlat */, dataChunkPos, + LogicalTypeUtils::getRowLayoutSize(copyToExpression->getDataType())); + } + copyToSchema->appendColumn(std::move(columnSchema)); + } + + auto copyToParquetInfo = std::make_unique(std::move(copyToSchema), + std::move(columnsTypes), config->columnNames, vectorsToCopyPos, config->filePaths[0]); + auto copyTo = + std::make_unique(std::make_unique(childSchema), + std::move(copyToParquetInfo), std::make_shared(), + std::move(prevOperator), getOperatorID(), copy->getExpressionsForPrinting()); + std::shared_ptr fTable; + auto ftTableSchema = std::make_unique(); + fTable = std::make_shared(memoryManager, std::move(ftTableSchema)); + return createFactorizedTableScan(binder::expression_vector{}, std::vector{}, + childSchema, fTable, 0, std::move(copyTo)); + } else { + auto sharedState = std::make_shared( + config->fileType, config->filePaths[0], config->columnNames, std::move(columnsTypes)); + auto copyTo = std::make_unique(std::make_unique(childSchema), + sharedState, std::move(vectorsToCopyPos), getOperatorID(), + copy->getExpressionsForPrinting(), std::move(prevOperator)); + std::shared_ptr fTable; + auto ftTableSchema = std::make_unique(); + fTable = std::make_shared(memoryManager, std::move(ftTableSchema)); + return createFactorizedTableScan(binder::expression_vector{}, std::vector{}, + childSchema, fTable, 0, std::move(copyTo)); + } } } // namespace processor diff --git a/src/processor/operator/persistent/CMakeLists.txt b/src/processor/operator/persistent/CMakeLists.txt index 5d70c9227b..c1a146c328 100644 --- a/src/processor/operator/persistent/CMakeLists.txt +++ b/src/processor/operator/persistent/CMakeLists.txt @@ -2,6 +2,7 @@ add_subdirectory(reader/parquet) add_subdirectory(reader/csv) add_subdirectory(reader/rdf) add_subdirectory(reader/npy) +add_subdirectory(writer/parquet) add_library(kuzu_processor_operator_persistent OBJECT @@ -10,6 +11,7 @@ add_library(kuzu_processor_operator_persistent copy_rel_columns.cpp copy_rel_lists.cpp copy_to.cpp + copy_to_parquet.cpp csv_file_writer.cpp delete.cpp delete_executor.cpp @@ -17,7 +19,6 @@ add_library(kuzu_processor_operator_persistent insert_executor.cpp merge.cpp parquet_column_writer.cpp - parquet_file_writer.cpp reader.cpp reader_functions.cpp reader_state.cpp diff --git a/src/processor/operator/persistent/copy_to_parquet.cpp b/src/processor/operator/persistent/copy_to_parquet.cpp new file mode 100644 index 0000000000..9b821e9d75 --- /dev/null +++ b/src/processor/operator/persistent/copy_to_parquet.cpp @@ -0,0 +1,40 @@ +#include "processor/operator/persistent/copy_to_parquet.h" + +namespace kuzu { +namespace processor { + +void CopyToParquetLocalState::init( + CopyToParquetInfo* info, storage::MemoryManager* mm, ResultSet* resultSet) { + ft = std::make_unique(mm, std::move(info->tableSchema)); + vectorsToAppend.reserve(info->dataPoses.size()); + for (auto& pos : info->dataPoses) { + vectorsToAppend.push_back(resultSet->getValueVector(pos).get()); + } + this->mm = mm; +} + +std::vector> CopyToParquetInfo::copyTypes() { + std::vector> copiedTypes; + for (auto& type : types) { + copiedTypes.push_back(type->copy()); + } + return copiedTypes; +} + +void CopyToParquet::initLocalStateInternal( + kuzu::processor::ResultSet* resultSet, kuzu::processor::ExecutionContext* context) { + localState->init(info.get(), context->memoryManager, resultSet); +} + +void CopyToParquet::executeInternal(kuzu::processor::ExecutionContext* context) { + while (children[0]->getNextTuple(context)) { + localState->append(); + if (localState->ft->getTotalNumFlatTuples() > common::StorageConstants::NODE_GROUP_SIZE) { + sharedState->writer->flush(*localState->ft); + } + } + sharedState->writer->flush(*localState->ft); +} + +} // namespace processor +} // namespace kuzu diff --git a/src/processor/operator/persistent/parquet_file_writer.cpp b/src/processor/operator/persistent/parquet_file_writer.cpp deleted file mode 100644 index ba13cc032d..0000000000 --- a/src/processor/operator/persistent/parquet_file_writer.cpp +++ /dev/null @@ -1,210 +0,0 @@ -#include "processor/operator/persistent/parquet_file_writer.h" - -using namespace kuzu::common; - -namespace kuzu { -namespace processor { - -void ParquetFileWriter::openFile() { - auto result = arrow::io::FileOutputStream::Open(filePath); - if (!result.ok()) { - throw RuntimeException(result.status().ToString()); - } - outFile = *result; -} - -void ParquetFileWriter::init() { - std::shared_ptr schema; - std::vector maxDefinitionLevels; - openFile(); - // A nested Kuzu column may be represented by multiple parquet columns - // so we need this information for writing, that is generated inside createParquetSchemaNode - int parquetColumnsCount = 0; - generateSchema(schema, parquetColumnsCount); - parquet::WriterProperties::Builder builder; - builder.compression(parquet::Compression::SNAPPY); - std::shared_ptr props = builder.build(); - fileWriter = parquet::ParquetFileWriter::Open(outFile, schema, props); - const parquet::SchemaDescriptor* schemaDescriptor = fileWriter->schema(); - for (auto i = 0u; i < columnNames.size(); ++i) { - const parquet::ColumnDescriptor* colDesc = schemaDescriptor->Column(i); - maxDefinitionLevels.push_back(colDesc->max_definition_level()); - } - parquetColumnWriter = - std::make_shared(parquetColumnsCount, maxDefinitionLevels); - rowGroupWriter = fileWriter->AppendBufferedRowGroup(); -} - -void ParquetFileWriter::generateSchema( - std::shared_ptr& schema, int& parquetColumnsCount) { - parquet::schema::NodeVector fields; - for (auto i = 0u; i < columnNames.size(); ++i) { - fields.push_back( - createParquetSchemaNode(parquetColumnsCount, columnNames[i], columnTypes[i])); - } - schema = std::static_pointer_cast( - parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, fields)); -} - -// Return a parquet::schema::Node, which contains the schema for a single Kuzu -// column. The node can be a primitive node or a group node. -// A primitive node schema is like: -// -// required group field_id=-1 schema { -// required binary field_id=-1 single string (String); -// } -// -// Parquet Physical Types: -// -// BOOLEAN: 1 bit boolean -// INT32: 32 bit signed ints -// INT64: 64 bit signed ints -// INT96: 96 bit signed ints -// FLOAT: IEEE 32-bit floating point values -// DOUBLE: IEEE 64-bit floating point values -// BYTE_ARRAY: arbitrarily long byte arrays. -// https://github.com/apache/parquet-cpp/blob/master/src/parquet/column_writer.h - -std::shared_ptr ParquetFileWriter::createPrimitiveNode( - int& parquetColumnsCount, std::string& columnName, const LogicalType& logicalType, - parquet::Repetition::type repetition, int length) { - parquet::Type::type parquetType; - parquet::ConvertedType::type convertedType = parquet::ConvertedType::NONE; - parquetColumnsCount++; - switch (logicalType.getLogicalTypeID()) { - case LogicalTypeID::BOOL: { - parquetType = parquet::Type::BOOLEAN; - } break; - case LogicalTypeID::INTERNAL_ID: - case LogicalTypeID::STRING: { - parquetType = parquet::Type::BYTE_ARRAY; - convertedType = parquet::ConvertedType::UTF8; - } break; - case LogicalTypeID::INT64: { - parquetType = parquet::Type::INT64; - convertedType = parquet::ConvertedType::INT_64; - } break; - case LogicalTypeID::INT16: { - parquetType = parquet::Type::INT32; - convertedType = parquet::ConvertedType::INT_16; - } break; - case LogicalTypeID::INT32: { - parquetType = parquet::Type::INT32; - convertedType = parquet::ConvertedType::INT_32; - } break; - case LogicalTypeID::FLOAT: { - parquetType = parquet::Type::FLOAT; - } break; - case LogicalTypeID::DOUBLE: { - parquetType = parquet::Type::DOUBLE; - } break; - case LogicalTypeID::DATE: { - parquetType = parquet::Type::INT32; - convertedType = parquet::ConvertedType::DATE; - } break; - case LogicalTypeID::TIMESTAMP: { - parquetType = parquet::Type::INT64; - convertedType = parquet::ConvertedType::TIMESTAMP_MICROS; - } break; - case LogicalTypeID::INTERVAL: { - parquetType = parquet::Type::FIXED_LEN_BYTE_ARRAY; - convertedType = parquet::ConvertedType::INTERVAL; - length = 12; - } break; - default: - throw NotImplementedException("ParquetFileWriter::createPrimitiveNode"); - } - return parquet::schema::PrimitiveNode::Make( - columnName, repetition, parquetType, convertedType, length); -} - -std::shared_ptr ParquetFileWriter::createNestedNode( - int& parquetColumnsCount, std::string& columnName, const LogicalType& logicalType) { - switch (logicalType.getLogicalTypeID()) { - case LogicalTypeID::STRUCT: { - auto structType = StructType::getFieldTypes(&logicalType); - auto structNames = StructType::getFieldNames(&logicalType); - std::vector> nodes; - for (auto i = 0u; i < structType.size(); ++i) { - nodes.push_back(createParquetSchemaNode(parquetColumnsCount, structNames[i], - *structType[i], parquet::Repetition::OPTIONAL)); - } - auto groupNode = std::static_pointer_cast( - parquet::schema::GroupNode::Make(columnName, parquet::Repetition::OPTIONAL, nodes)); - return groupNode; - } - // A list is represented by the following schema: - // required group field_id=-1 schema { - // optional group field_id=-1 list (List) { - // repeated group field_id=-1 { - // optional int64 field_id=-1 item; - // } - // } - // } - // - // A nested list is encapsulated by an optional + repeated groups: - // required group field_id=-1 schema { - // optional group field_id=-1 list (List) { - // repeated group field_id=-1 { - // optional group field_id=-1 (List) { - // repeated group field_id=-1 { - // optional int64 field_id=-1 item; - // } - // } - // } - // } - // } - case LogicalTypeID::FIXED_LIST: - case LogicalTypeID::VAR_LIST: { - auto childLogicalType = VarListType::getChildType(&logicalType); - auto childNode = createParquetSchemaNode( - parquetColumnsCount, columnName, *childLogicalType, parquet::Repetition::OPTIONAL); - auto repeatedGroup = - parquet::schema::GroupNode::Make("", parquet::Repetition::REPEATED, {childNode}); - auto optional = parquet::schema::GroupNode::Make(columnName, parquet::Repetition::OPTIONAL, - {repeatedGroup}, parquet::LogicalType::List()); - return optional; - } - default: - throw NotImplementedException("ParquetFileWriter::createParquetSchemaNode"); - } -} - -std::shared_ptr ParquetFileWriter::createParquetSchemaNode( - int& parquetColumnsCount, std::string& columnName, const LogicalType& logicalType, - parquet::Repetition::type repetition, int length) { - if (LogicalTypeUtils::isNested(logicalType)) { - return createNestedNode(parquetColumnsCount, columnName, logicalType); - } else { - return createPrimitiveNode( - parquetColumnsCount, columnName, logicalType, repetition, length); - } -} - -void ParquetFileWriter::writeValues(std::vector& outputVectors) { - if (outputVectors.size() == 0) { - return; - } - for (auto i = 0u; i < outputVectors.size(); i++) { - assert(outputVectors[i]->state->isFlat()); - parquetColumnWriter->writeColumn(i, outputVectors[i], rowGroupWriter); - } - if ((rowGroupWriter->total_bytes_written() + rowGroupWriter->total_compressed_bytes() + - parquetColumnWriter->estimatedRowBytes) > StorageConstants::NODE_GROUP_SIZE) { - rowGroupWriter->Close(); - rowGroupWriter = fileWriter->AppendBufferedRowGroup(); - parquetColumnWriter->estimatedRowBytes = 0; - } -} - -void ParquetFileWriter::closeFile() { - rowGroupWriter->Close(); - fileWriter->Close(); - auto status = outFile->Close(); - if (!status.ok()) { - throw RuntimeException("Error closing file in ParquetFileWriter."); - } -} - -} // namespace processor -} // namespace kuzu diff --git a/src/processor/operator/persistent/writer/parquet/CMakeLists.txt b/src/processor/operator/persistent/writer/parquet/CMakeLists.txt new file mode 100644 index 0000000000..fb8740652c --- /dev/null +++ b/src/processor/operator/persistent/writer/parquet/CMakeLists.txt @@ -0,0 +1,15 @@ +add_library(kuzu_processor_operator_parquet_writer + OBJECT + basic_column_writer.cpp + boolean_column_writer.cpp + buffered_serializer.cpp + column_writer.cpp + struct_column_writer.cpp + string_column_writer.cpp + var_list_column_writer.cpp + parquet_writer.cpp + parquet_rle_bp_encoder.cpp) + +set(ALL_OBJECT_FILES + ${ALL_OBJECT_FILES} $ + PARENT_SCOPE) diff --git a/src/processor/operator/persistent/writer/parquet/basic_column_writer.cpp b/src/processor/operator/persistent/writer/parquet/basic_column_writer.cpp new file mode 100644 index 0000000000..7d1decbb91 --- /dev/null +++ b/src/processor/operator/persistent/writer/parquet/basic_column_writer.cpp @@ -0,0 +1,315 @@ +#include "processor/operator/persistent/writer/parquet/basic_column_writer.h" + +#include "function/cast/numeric_limits.h" +#include "processor/operator/persistent/reader/parquet/parquet_rle_bp_decoder.h" +#include "processor/operator/persistent/writer//parquet/parquet_rle_bp_encoder.h" +#include "processor/operator/persistent/writer/parquet/parquet_writer.h" + +namespace kuzu { +namespace processor { + +using namespace kuzu_parquet::format; +using namespace kuzu::common; + +std::unique_ptr BasicColumnWriter::initializeWriteState( + kuzu_parquet::format::RowGroup& rowGroup) { + auto result = std::make_unique(rowGroup, rowGroup.columns.size()); + registerToRowGroup(rowGroup); + return std::move(result); +} + +void BasicColumnWriter::prepare(ColumnWriterState& stateToPrepare, ColumnWriterState* parent, + common::ValueVector* vector, uint64_t count) { + auto& state = reinterpret_cast(stateToPrepare); + auto& colChunk = state.rowGroup.columns[state.colIdx]; + + uint64_t start = 0; + auto vcount = parent ? parent->definitionLevels.size() - state.definitionLevels.size() : count; + auto parentIdx = state.definitionLevels.size(); + handleRepeatLevels(state, parent); + handleDefineLevels(state, parent, vector, count, maxDefine, maxDefine - 1); + + auto vectorIdx = 0u; + for (auto i = start; i < vcount; i++) { + auto& pageInfo = state.pageInfo.back(); + pageInfo.rowCount++; + colChunk.meta_data.num_values++; + if (parent && !parent->isEmpty.empty() && parent->isEmpty[parentIdx + i]) { + pageInfo.emptyCount++; + continue; + } + if (!vector->isNull(vectorIdx)) { + pageInfo.estimatedPageSize += getRowSize(vector, vectorIdx, state); + if (pageInfo.estimatedPageSize >= ParquetConstants::MAX_UNCOMPRESSED_PAGE_SIZE) { + PageInformation newInfo; + newInfo.offset = pageInfo.offset + pageInfo.rowCount; + state.pageInfo.push_back(newInfo); + } + } + vectorIdx++; + } +} + +void BasicColumnWriter::beginWrite(ColumnWriterState& writerState) { + auto& state = reinterpret_cast(writerState); + + // Set up the page write info. + state.statsState = initializeStatsState(); + for (auto pageIdx = 0u; pageIdx < state.pageInfo.size(); pageIdx++) { + auto& pageInfo = state.pageInfo[pageIdx]; + if (pageInfo.rowCount == 0) { + assert(pageIdx + 1 == state.pageInfo.size()); + state.pageInfo.erase(state.pageInfo.begin() + pageIdx); + break; + } + PageWriteInformation writeInfo; + // Set up the header. + auto& hdr = writeInfo.pageHeader; + hdr.compressed_page_size = 0; + hdr.uncompressed_page_size = 0; + hdr.type = PageType::DATA_PAGE; + hdr.__isset.data_page_header = true; + + hdr.data_page_header.num_values = pageInfo.rowCount; + hdr.data_page_header.encoding = getEncoding(state); + hdr.data_page_header.definition_level_encoding = Encoding::RLE; + hdr.data_page_header.repetition_level_encoding = Encoding::RLE; + + writeInfo.bufferWriter = std::make_unique(); + writeInfo.writeCount = pageInfo.emptyCount; + writeInfo.maxWriteCount = pageInfo.rowCount; + writeInfo.pageState = initializePageState(state); + + writeInfo.compressedSize = 0; + writeInfo.compressedData = nullptr; + + state.writeInfo.push_back(std::move(writeInfo)); + } + + nextPage(state); +} + +void BasicColumnWriter::write( + ColumnWriterState& writerState, common::ValueVector* vector, uint64_t count) { + auto& state = reinterpret_cast(writerState); + + uint64_t remaining = count; + uint64_t offset = 0; + while (remaining > 0) { + auto& writeInfo = state.writeInfo[state.currentPage - 1]; + assert(writeInfo.bufferWriter != nullptr); + auto writeCount = + std::min(remaining, writeInfo.maxWriteCount - writeInfo.writeCount); + + writeVector(*writeInfo.bufferWriter, state.statsState.get(), writeInfo.pageState.get(), + vector, offset, offset + writeCount); + + writeInfo.writeCount += writeCount; + if (writeInfo.writeCount == writeInfo.maxWriteCount) { + nextPage(state); + } + offset += writeCount; + remaining -= writeCount; + } +} + +void BasicColumnWriter::finalizeWrite(ColumnWriterState& writerState) { + auto& state = reinterpret_cast(writerState); + auto& columnChunk = state.rowGroup.columns[state.colIdx]; + + // Flush the last page (if any remains). + flushPage(state); + + auto startOffset = writer.getOffset(); + auto pageOffset = startOffset; + // Flush the dictionary. + if (hasDictionary(state)) { + columnChunk.meta_data.statistics.distinct_count = dictionarySize(state); + columnChunk.meta_data.statistics.__isset.distinct_count = true; + columnChunk.meta_data.dictionary_page_offset = pageOffset; + columnChunk.meta_data.__isset.dictionary_page_offset = true; + flushDictionary(state, state.statsState.get()); + pageOffset += state.writeInfo[0].compressedSize; + } + + // Record the start position of the pages for this column. + columnChunk.meta_data.data_page_offset = pageOffset; + setParquetStatistics(state, columnChunk); + + // write the individual pages to disk + uint64_t totalUncompressedSize = 0; + for (auto& write_info : state.writeInfo) { + assert(write_info.pageHeader.uncompressed_page_size > 0); + auto header_start_offset = writer.getOffset(); + write_info.pageHeader.write(writer.getProtocol()); + // total uncompressed size in the column chunk includes the header size (!) + totalUncompressedSize += writer.getOffset() - header_start_offset; + totalUncompressedSize += write_info.pageHeader.uncompressed_page_size; + writer.write(write_info.compressedData, write_info.compressedSize); + } + columnChunk.meta_data.total_compressed_size = writer.getOffset() - startOffset; + columnChunk.meta_data.total_uncompressed_size = totalUncompressedSize; +} + +void BasicColumnWriter::writeLevels(BufferedSerializer& bufferedSerializer, + const std::vector& levels, uint64_t maxValue, uint64_t startOffset, uint64_t count) { + if (levels.empty() || count == 0) { + return; + } + + // Write the levels using the RLE-BP encoding. + auto bitWidth = RleBpDecoder::ComputeBitWidth((maxValue)); + RleBpEncoder rleEncoder(bitWidth); + + rleEncoder.beginPrepare(levels[startOffset]); + for (auto i = startOffset + 1; i < startOffset + count; i++) { + rleEncoder.prepareValue(levels[i]); + } + rleEncoder.finishPrepare(); + + // Start off by writing the byte count as a uint32_t. + bufferedSerializer.write(rleEncoder.getByteCount()); + rleEncoder.beginWrite(bufferedSerializer, levels[startOffset]); + for (auto i = startOffset + 1; i < startOffset + count; i++) { + rleEncoder.writeValue(bufferedSerializer, levels[i]); + } + rleEncoder.finishWrite(bufferedSerializer); +} + +void BasicColumnWriter::nextPage(BasicColumnWriterState& state) { + if (state.currentPage > 0) { + // Need to flush the current page. + flushPage(state); + } + if (state.currentPage >= state.writeInfo.size()) { + state.currentPage = state.writeInfo.size() + 1; + return; + } + auto& pageInfo = state.pageInfo[state.currentPage]; + auto& writeInfo = state.writeInfo[state.currentPage]; + state.currentPage++; + + // write the repetition levels + writeLevels(*writeInfo.bufferWriter, state.repetitionLevels, maxRepeat, pageInfo.offset, + pageInfo.rowCount); + + // write the definition levels + writeLevels(*writeInfo.bufferWriter, state.definitionLevels, maxDefine, pageInfo.offset, + pageInfo.rowCount); +} + +void BasicColumnWriter::flushPage(BasicColumnWriterState& state) { + assert(state.currentPage > 0); + if (state.currentPage > state.writeInfo.size()) { + return; + } + + // compress the page info + auto& writeInfo = state.writeInfo[state.currentPage - 1]; + auto& bufferedWriter = *writeInfo.bufferWriter; + auto& hdr = writeInfo.pageHeader; + + flushPageState(bufferedWriter, writeInfo.pageState.get()); + + // now that we have finished writing the data we know the uncompressed size + if (bufferedWriter.getSize() > uint64_t(function::NumericLimits::maximum())) { + throw common::RuntimeException{common::StringUtils::string_format( + "Parquet writer: %d uncompressed page size out of range for type integer", + bufferedWriter.getSize())}; + } + hdr.uncompressed_page_size = bufferedWriter.getSize(); + + // compress the data + compressPage(bufferedWriter, writeInfo.compressedSize, writeInfo.compressedData, + writeInfo.compressedBuf); + hdr.compressed_page_size = writeInfo.compressedSize; + assert(hdr.uncompressed_page_size > 0); + assert(hdr.compressed_page_size > 0); + + if (writeInfo.compressedBuf) { + // if the data has been compressed, we no longer need the compressed data + assert(writeInfo.compressedBuf.get() == writeInfo.compressedData); + writeInfo.bufferWriter.reset(); + } +} + +void BasicColumnWriter::writeDictionary(BasicColumnWriterState& state, + std::unique_ptr bufferedSerializer, uint64_t rowCount) { + assert(bufferedSerializer); + assert(bufferedSerializer->getSize() > 0); + + // write the dictionary page header + PageWriteInformation writeInfo; + // set up the header + auto& hdr = writeInfo.pageHeader; + hdr.uncompressed_page_size = bufferedSerializer->getSize(); + hdr.type = PageType::DICTIONARY_PAGE; + hdr.__isset.dictionary_page_header = true; + + hdr.dictionary_page_header.encoding = Encoding::PLAIN; + hdr.dictionary_page_header.is_sorted = false; + hdr.dictionary_page_header.num_values = rowCount; + + writeInfo.bufferWriter = std::move(bufferedSerializer); + writeInfo.writeCount = 0; + writeInfo.maxWriteCount = 0; + + // compress the contents of the dictionary page + compressPage(*writeInfo.bufferWriter, writeInfo.compressedSize, writeInfo.compressedData, + writeInfo.compressedBuf); + hdr.compressed_page_size = writeInfo.compressedSize; + + // insert the dictionary page as the first page to write for this column + state.writeInfo.insert(state.writeInfo.begin(), std::move(writeInfo)); +} + +void BasicColumnWriter::setParquetStatistics( + BasicColumnWriterState& state, kuzu_parquet::format::ColumnChunk& column) { + if (maxRepeat == 0) { + column.meta_data.statistics.null_count = nullCount; + column.meta_data.statistics.__isset.null_count = true; + column.meta_data.__isset.statistics = true; + } + // set min/max/min_value/max_value + // this code is not going to win any beauty contests, but well + auto min = state.statsState->getMin(); + if (!min.empty()) { + column.meta_data.statistics.min = std::move(min); + column.meta_data.statistics.__isset.min = true; + column.meta_data.__isset.statistics = true; + } + auto max = state.statsState->getMax(); + if (!max.empty()) { + column.meta_data.statistics.max = std::move(max); + column.meta_data.statistics.__isset.max = true; + column.meta_data.__isset.statistics = true; + } + auto min_value = state.statsState->getMinValue(); + if (!min_value.empty()) { + column.meta_data.statistics.min_value = std::move(min_value); + column.meta_data.statistics.__isset.min_value = true; + column.meta_data.__isset.statistics = true; + } + auto max_value = state.statsState->getMaxValue(); + if (!max_value.empty()) { + column.meta_data.statistics.max_value = std::move(max_value); + column.meta_data.statistics.__isset.max_value = true; + column.meta_data.__isset.statistics = true; + } + for (const auto& write_info : state.writeInfo) { + column.meta_data.encodings.push_back(write_info.pageHeader.data_page_header.encoding); + } +} + +void BasicColumnWriter::registerToRowGroup(kuzu_parquet::format::RowGroup& rowGroup) { + ColumnChunk column_chunk; + column_chunk.__isset.meta_data = true; + column_chunk.meta_data.codec = writer.getCodec(); + column_chunk.meta_data.path_in_schema = schemaPath; + column_chunk.meta_data.num_values = 0; + column_chunk.meta_data.type = writer.getParquetType(schemaIdx); + rowGroup.columns.push_back(std::move(column_chunk)); +} + +} // namespace processor +} // namespace kuzu diff --git a/src/processor/operator/persistent/writer/parquet/boolean_column_writer.cpp b/src/processor/operator/persistent/writer/parquet/boolean_column_writer.cpp new file mode 100644 index 0000000000..fcf383db93 --- /dev/null +++ b/src/processor/operator/persistent/writer/parquet/boolean_column_writer.cpp @@ -0,0 +1,43 @@ +#include "processor/operator/persistent/writer/parquet/boolean_column_writer.h" + +namespace kuzu { +namespace processor { + +void BooleanColumnWriter::writeVector(BufferedSerializer& temp_writer, + ColumnWriterStatistics* writerStatistics, ColumnWriterPageState* writerPageState, + common::ValueVector* vector, uint64_t chunkStart, uint64_t chunkEnd) { + auto stats = reinterpret_cast(writerStatistics); + auto state = reinterpret_cast(writerPageState); + for (auto r = chunkStart; r < chunkEnd; r++) { + auto pos = getVectorPos(vector, r); + if (!vector->isNull(pos)) { + // only encode if non-null + if (vector->getValue(pos)) { + stats->max = true; + state->byte |= 1 << state->bytePos; + } else { + stats->min = false; + } + state->bytePos++; + + if (state->bytePos == 8) { + temp_writer.write(state->byte); + state->byte = 0; + state->bytePos = 0; + } + } + } +} + +void BooleanColumnWriter::flushPageState( + BufferedSerializer& temp_writer, ColumnWriterPageState* writerPageState) { + auto state = reinterpret_cast(writerPageState); + if (state->bytePos > 0) { + temp_writer.write(state->byte); + state->byte = 0; + state->bytePos = 0; + } +} + +} // namespace processor +} // namespace kuzu diff --git a/src/processor/operator/persistent/writer/parquet/buffered_serializer.cpp b/src/processor/operator/persistent/writer/parquet/buffered_serializer.cpp new file mode 100644 index 0000000000..7bf87a79eb --- /dev/null +++ b/src/processor/operator/persistent/writer/parquet/buffered_serializer.cpp @@ -0,0 +1,33 @@ +#include "processor/operator/persistent/writer/parquet/buffered_serializer.h" + +#include + +namespace kuzu { +namespace processor { + +BufferedSerializer::BufferedSerializer(uint64_t maximum_size) + : BufferedSerializer(std::make_unique(maximum_size), maximum_size) {} + +BufferedSerializer::BufferedSerializer(std::unique_ptr data, uint64_t size) + : maximumSize(size), data(data.get()) { + blob.size = 0; + blob.data = std::move(data); +} + +void BufferedSerializer::writeData(const uint8_t* buffer, uint64_t len) { + if (blob.size + len >= maximumSize) { + do { + maximumSize *= 2; + } while (blob.size + len > maximumSize); + auto new_data = std::make_unique(maximumSize); + memcpy(new_data.get(), data, blob.size); + data = new_data.get(); + blob.data = std::move(new_data); + } + + memcpy(data + blob.size, buffer, len); + blob.size += len; +} + +} // namespace processor +} // namespace kuzu diff --git a/src/processor/operator/persistent/writer/parquet/column_writer.cpp b/src/processor/operator/persistent/writer/parquet/column_writer.cpp new file mode 100644 index 0000000000..b67ad61713 --- /dev/null +++ b/src/processor/operator/persistent/writer/parquet/column_writer.cpp @@ -0,0 +1,225 @@ +#include "processor/operator/persistent/writer/parquet/column_writer.h" + +#include "common/exception/not_implemented.h" +#include "common/string_utils.h" +#include "function/cast/numeric_limits.h" +#include "processor/operator/persistent/writer/parquet/boolean_column_writer.h" +#include "processor/operator/persistent/writer/parquet/parquet_writer.h" +#include "processor/operator/persistent/writer/parquet/standard_column_writer.h" +#include "processor/operator/persistent/writer/parquet/string_column_writer.h" +#include "processor/operator/persistent/writer/parquet/struct_column_writer.h" +#include "processor/operator/persistent/writer/parquet/var_list_column_writer.h" +#include "snappy/snappy.h" + +namespace kuzu { +namespace processor { + +using namespace kuzu_parquet::format; +using namespace kuzu::common; + +ColumnWriter::ColumnWriter(ParquetWriter& writer, uint64_t schemaIdx, + std::vector schemaPath, uint64_t maxRepeat, uint64_t maxDefine, bool canHaveNulls) + : writer{writer}, schemaIdx{schemaIdx}, schemaPath{std::move(schemaPath)}, maxRepeat{maxRepeat}, + maxDefine{maxDefine}, canHaveNulls{canHaveNulls}, nullCount{0} {} + +std::unique_ptr ColumnWriter::createWriterRecursive( + std::vector& schemas, ParquetWriter& writer, + LogicalType* type, const std::string& name, std::vector schemaPathToCreate, + storage::MemoryManager* mm, uint64_t maxRepeatToCreate, uint64_t maxDefineToCreate, + bool canHaveNullsToCreate) { + auto nullType = + canHaveNullsToCreate ? FieldRepetitionType::OPTIONAL : FieldRepetitionType::REQUIRED; + if (!canHaveNullsToCreate) { + maxDefineToCreate--; + } + auto schemaIdx = schemas.size(); + switch (type->getLogicalTypeID()) { + case LogicalTypeID::STRUCT: { + auto fields = StructType::getFields(type); + // set up the schema element for this struct + kuzu_parquet::format::SchemaElement schema_element; + schema_element.repetition_type = nullType; + schema_element.num_children = fields.size(); + schema_element.__isset.num_children = true; + schema_element.__isset.type = false; + schema_element.__isset.repetition_type = true; + schema_element.name = name; + schemas.push_back(std::move(schema_element)); + schemaPathToCreate.push_back(name); + + // construct the child types recursively + std::vector> childWriters; + childWriters.reserve(fields.size()); + for (auto& field : fields) { + childWriters.push_back( + createWriterRecursive(schemas, writer, field->getType(), field->getName(), + schemaPathToCreate, mm, maxRepeatToCreate, maxDefineToCreate + 1)); + } + return std::make_unique(writer, schemaIdx, + std::move(schemaPathToCreate), maxRepeatToCreate, maxDefineToCreate, + std::move(childWriters), canHaveNullsToCreate); + } + case LogicalTypeID::VAR_LIST: { + auto childType = VarListType::getChildType(type); + // set up the two schema elements for the list + // for some reason we only set the converted type in the OPTIONAL element + // first an OPTIONAL element + kuzu_parquet::format::SchemaElement optional_element; + optional_element.repetition_type = nullType; + optional_element.num_children = 1; + optional_element.converted_type = ConvertedType::LIST; + optional_element.__isset.num_children = true; + optional_element.__isset.type = false; + optional_element.__isset.repetition_type = true; + optional_element.__isset.converted_type = true; + optional_element.name = name; + schemas.push_back(std::move(optional_element)); + schemaPathToCreate.push_back(name); + + // then a REPEATED element + kuzu_parquet::format::SchemaElement repeated_element; + repeated_element.repetition_type = FieldRepetitionType::REPEATED; + repeated_element.num_children = 1; + repeated_element.__isset.num_children = true; + repeated_element.__isset.type = false; + repeated_element.__isset.repetition_type = true; + repeated_element.name = "list"; + schemas.push_back(std::move(repeated_element)); + schemaPathToCreate.emplace_back("list"); + + auto child_writer = createWriterRecursive(schemas, writer, childType, "element", + schemaPathToCreate, mm, maxRepeatToCreate + 1, maxDefineToCreate + 2); + return std::make_unique(writer, schemaIdx, + std::move(schemaPathToCreate), maxRepeatToCreate, maxDefineToCreate, + std::move(child_writer), canHaveNullsToCreate); + } + default: { + SchemaElement schemaElement; + schemaElement.type = ParquetWriter::convertToParquetType(type); + schemaElement.repetition_type = nullType; + schemaElement.__isset.num_children = false; + schemaElement.__isset.type = true; + schemaElement.__isset.repetition_type = true; + schemaElement.name = name; + ParquetWriter::setSchemaProperties(type, schemaElement); + schemas.push_back(std::move(schemaElement)); + schemaPathToCreate.push_back(name); + + switch (type->getLogicalTypeID()) { + case LogicalTypeID::BOOL: + return std::make_unique(writer, schemaIdx, + std::move(schemaPathToCreate), maxRepeatToCreate, maxDefineToCreate, + canHaveNullsToCreate); + case LogicalTypeID::INT16: + return std::make_unique>(writer, schemaIdx, + std::move(schemaPathToCreate), maxRepeatToCreate, maxDefineToCreate, + canHaveNullsToCreate); + case LogicalTypeID::INT32: + case LogicalTypeID::DATE: + return std::make_unique>(writer, schemaIdx, + std::move(schemaPathToCreate), maxRepeatToCreate, maxDefineToCreate, + canHaveNullsToCreate); + case LogicalTypeID::INT64: + return std::make_unique>(writer, schemaIdx, + std::move(schemaPathToCreate), maxRepeatToCreate, maxDefineToCreate, + canHaveNullsToCreate); + case LogicalTypeID::FLOAT: + return std::make_unique>(writer, schemaIdx, + std::move(schemaPathToCreate), maxRepeatToCreate, maxDefineToCreate, + canHaveNullsToCreate); + case LogicalTypeID::DOUBLE: + return std::make_unique>(writer, schemaIdx, + std::move(schemaPathToCreate), maxRepeatToCreate, maxDefineToCreate, + canHaveNullsToCreate); + case LogicalTypeID::BLOB: + case LogicalTypeID::STRING: + return std::make_unique(writer, schemaIdx, + std::move(schemaPathToCreate), maxRepeatToCreate, maxDefineToCreate, + canHaveNullsToCreate, mm); + default: + throw NotImplementedException("ParquetWriter::convertToParquetType"); + } + } + } +} + +void ColumnWriter::handleRepeatLevels(ColumnWriterState& stateToHandle, ColumnWriterState* parent) { + if (!parent) { + // no repeat levels without a parent node + return; + } + while (stateToHandle.repetitionLevels.size() < parent->repetitionLevels.size()) { + stateToHandle.repetitionLevels.push_back( + parent->repetitionLevels[stateToHandle.repetitionLevels.size()]); + } +} + +void ColumnWriter::handleDefineLevels(ColumnWriterState& state, ColumnWriterState* parent, + common::ValueVector* vector, uint64_t count, uint16_t defineValue, uint16_t nullValue) { + if (parent) { + // parent node: inherit definition level from the parent + uint64_t vectorIdx = 0; + while (state.definitionLevels.size() < parent->definitionLevels.size()) { + auto currentIdx = state.definitionLevels.size(); + if (parent->definitionLevels[currentIdx] != ParquetConstants::PARQUET_DEFINE_VALID) { + state.definitionLevels.push_back(parent->definitionLevels[currentIdx]); + } else if (!vector->isNull(getVectorPos(vector, vectorIdx))) { + state.definitionLevels.push_back(defineValue); + } else { + if (!canHaveNulls) { + throw RuntimeException( + "Parquet writer: map key column is not allowed to contain NULL values"); + } + nullCount++; + state.definitionLevels.push_back(nullValue); + } + if (parent->isEmpty.empty() || !parent->isEmpty[currentIdx]) { + vectorIdx++; + } + } + } else { + // no parent: set definition levels only from this validity mask + for (auto i = 0u; i < count; i++) { + if (!vector->isNull(getVectorPos(vector, i))) { + state.definitionLevels.push_back(defineValue); + } else { + if (!canHaveNulls) { + throw RuntimeException( + "Parquet writer: map key column is not allowed to contain NULL values"); + } + nullCount++; + state.definitionLevels.push_back(nullValue); + } + } + } +} + +void ColumnWriter::compressPage(BufferedSerializer& bufferedSerializer, size_t& compressedSize, + uint8_t*& compressedData, std::unique_ptr& compressedBuf) { + switch (writer.getCodec()) { + case CompressionCodec::UNCOMPRESSED: { + compressedSize = bufferedSerializer.getSize(); + compressedData = bufferedSerializer.getBlobData(); + } break; + case CompressionCodec::SNAPPY: { + compressedSize = kuzu_snappy::MaxCompressedLength(bufferedSerializer.getSize()); + compressedBuf = std::unique_ptr(new uint8_t[compressedSize]); + kuzu_snappy::RawCompress(reinterpret_cast(bufferedSerializer.getBlobData()), + bufferedSerializer.getSize(), reinterpret_cast(compressedBuf.get()), + &compressedSize); + compressedData = compressedBuf.get(); + assert(compressedSize <= kuzu_snappy::MaxCompressedLength(bufferedSerializer.getSize())); + } break; + default: + throw NotImplementedException{"ColumnWriter::compressPage"}; + } + + if (compressedSize > uint64_t(function::NumericLimits::maximum())) { + throw RuntimeException(StringUtils::string_format( + "Parquet writer: {} compressed page size out of range for type integer", + bufferedSerializer.getSize())); + } +} + +} // namespace processor +} // namespace kuzu diff --git a/src/processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.cpp b/src/processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.cpp new file mode 100644 index 0000000000..d99afce209 --- /dev/null +++ b/src/processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.cpp @@ -0,0 +1,113 @@ +#include "processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.h" + +#include + +#include "common/exception/not_implemented.h" + +namespace kuzu { +namespace processor { + +static void varintEncode(uint32_t val, BufferedSerializer& ser) { + do { + uint8_t byte = val & 127; + val >>= 7; + if (val != 0) { + byte |= 128; + } + ser.write(byte); + } while (val != 0); +} + +uint8_t RleBpEncoder::getVarintSize(uint32_t val) { + uint8_t res = 0; + do { + val >>= 7; + res++; + } while (val != 0); + return res; +} + +RleBpEncoder::RleBpEncoder(uint32_t bit_width) + : byteWidth((bit_width + 7) / 8), byteCount(uint64_t(-1)), runCount(uint64_t(-1)) {} + +// we always RLE everything (for now) +void RleBpEncoder::beginPrepare(uint32_t first_value) { + byteCount = 0; + runCount = 1; + currentRunCount = 1; + lastValue = first_value; +} + +void RleBpEncoder::finishRun() { + // last value, or value has changed + // write out the current run + byteCount += getVarintSize(currentRunCount << 1) + byteWidth; + currentRunCount = 1; + runCount++; +} + +void RleBpEncoder::prepareValue(uint32_t value) { + if (value != lastValue) { + finishRun(); + lastValue = value; + } else { + currentRunCount++; + } +} + +void RleBpEncoder::finishPrepare() { + finishRun(); +} + +uint64_t RleBpEncoder::getByteCount() { + assert(byteCount != uint64_t(-1)); + return byteCount; +} + +void RleBpEncoder::beginWrite(BufferedSerializer& writer, uint32_t first_value) { + // start the RLE runs + lastValue = first_value; + currentRunCount = 1; +} + +void RleBpEncoder::writeRun(BufferedSerializer& writer) { + // write the header of the run + varintEncode(currentRunCount << 1, writer); + // now write the value + assert(lastValue >> (byteWidth * 8) == 0); + switch (byteWidth) { + case 1: + writer.write(lastValue); + break; + case 2: + writer.write(lastValue); + break; + case 3: + writer.write(lastValue & 0xFF); + writer.write((lastValue >> 8) & 0xFF); + writer.write((lastValue >> 16) & 0xFF); + break; + case 4: + writer.write(lastValue); + break; + default: + throw common::NotImplementedException("RleBpEncoder::WriteRun"); + } + currentRunCount = 1; +} + +void RleBpEncoder::writeValue(BufferedSerializer& writer, uint32_t value) { + if (value != lastValue) { + writeRun(writer); + lastValue = value; + } else { + currentRunCount++; + } +} + +void RleBpEncoder::finishWrite(BufferedSerializer& writer) { + writeRun(writer); +} + +} // namespace processor +} // namespace kuzu diff --git a/src/processor/operator/persistent/writer/parquet/parquet_writer.cpp b/src/processor/operator/persistent/writer/parquet/parquet_writer.cpp new file mode 100644 index 0000000000..a7fa85a661 --- /dev/null +++ b/src/processor/operator/persistent/writer/parquet/parquet_writer.cpp @@ -0,0 +1,232 @@ +#include "processor/operator/persistent/writer/parquet/parquet_writer.h" + +#include "common/data_chunk/data_chunk.h" +#include "common/exception/not_implemented.h" +#include "thrift/protocol/TCompactProtocol.h" + +namespace kuzu { +namespace processor { + +using namespace kuzu_parquet::format; +using namespace kuzu::common; + +ParquetWriter::ParquetWriter(std::string fileName, + std::vector> types, std::vector columnNames, + kuzu_parquet::format::CompressionCodec::type codec, storage::MemoryManager* mm) + : fileName{std::move(fileName)}, types{std::move(types)}, + columnNames{std::move(columnNames)}, codec{codec}, fileOffset{0}, mm{mm} { + fileInfo = common::FileUtils::openFile(this->fileName, O_WRONLY | O_CREAT | O_TRUNC); + // Parquet files start with the string "PAR1". + common::FileUtils::writeToFile(fileInfo.get(), + reinterpret_cast(ParquetConstants::PARQUET_MAGIC_WORDS), + strlen(ParquetConstants::PARQUET_MAGIC_WORDS), fileOffset); + fileOffset += strlen(ParquetConstants::PARQUET_MAGIC_WORDS); + kuzu_apache::thrift::protocol::TCompactProtocolFactoryT tprotoFactory; + protocol = tprotoFactory.getProtocol( + std::make_shared(fileInfo.get(), fileOffset)); + + fileMetaData.num_rows = 0; + fileMetaData.version = 1; + + fileMetaData.__isset.created_by = true; + fileMetaData.created_by = "KUZU"; + + fileMetaData.schema.resize(1); + + // populate root schema object + fileMetaData.schema[0].name = "kuzu_schema"; + fileMetaData.schema[0].num_children = this->types.size(); + fileMetaData.schema[0].__isset.num_children = true; + fileMetaData.schema[0].repetition_type = kuzu_parquet::format::FieldRepetitionType::REQUIRED; + fileMetaData.schema[0].__isset.repetition_type = true; + + std::vector schemaPath; + for (auto i = 0u; i < this->types.size(); i++) { + columnWriters.push_back(ColumnWriter::createWriterRecursive(fileMetaData.schema, *this, + this->types[i].get(), this->columnNames[i], schemaPath, mm)); + } +} + +Type::type ParquetWriter::convertToParquetType(LogicalType* type) { + switch (type->getLogicalTypeID()) { + case LogicalTypeID::BOOL: + return Type::BOOLEAN; + case LogicalTypeID::INT8: + case LogicalTypeID::INT16: + case LogicalTypeID::INT32: + case LogicalTypeID::DATE: + return Type::INT32; + case LogicalTypeID::INT64: + return Type::INT64; + case LogicalTypeID::FLOAT: + return Type::FLOAT; + case LogicalTypeID::DOUBLE: + return Type::DOUBLE; + case LogicalTypeID::BLOB: + case LogicalTypeID::STRING: + return Type::BYTE_ARRAY; + default: + throw NotImplementedException("ParquetWriter::convertToParquetType"); + } +} + +void ParquetWriter::setSchemaProperties(LogicalType* type, SchemaElement& schemaElement) { + switch (type->getLogicalTypeID()) { + case LogicalTypeID::INT8: { + schemaElement.converted_type = ConvertedType::INT_8; + schemaElement.__isset.converted_type = true; + } break; + case LogicalTypeID::INT16: { + schemaElement.converted_type = ConvertedType::INT_16; + schemaElement.__isset.converted_type = true; + } break; + case LogicalTypeID::INT32: { + schemaElement.converted_type = ConvertedType::INT_32; + schemaElement.__isset.converted_type = true; + } break; + case LogicalTypeID::INT64: { + schemaElement.converted_type = ConvertedType::INT_64; + schemaElement.__isset.converted_type = true; + } break; + case LogicalTypeID::DATE: { + schemaElement.converted_type = ConvertedType::DATE; + schemaElement.__isset.converted_type = true; + } break; + case LogicalTypeID::STRING: { + schemaElement.converted_type = ConvertedType::UTF8; + schemaElement.__isset.converted_type = true; + } break; + default: + break; + } +} + +void ParquetWriter::flush(FactorizedTable& ft) { + if (ft.getNumTuples() == 0) { + return; + } + + PreparedRowGroup preparedRowGroup; + prepareRowGroup(ft, preparedRowGroup); + flushRowGroup(preparedRowGroup); + ft.clear(); +} + +void ParquetWriter::prepareRowGroup(FactorizedTable& ft, PreparedRowGroup& result) { + // set up a new row group for this chunk collection + auto& row_group = result.rowGroup; + row_group.num_rows = ft.getTotalNumFlatTuples(); + row_group.total_byte_size = row_group.num_rows * ft.getTableSchema()->getNumBytesPerTuple(); + row_group.__isset.file_offset = true; + + auto& states = result.states; + // iterate over each of the columns of the chunk collection and write them + assert(ft.getTableSchema()->getNumColumns() == columnWriters.size()); + std::vector> writerStates; + std::unique_ptr unflatDataChunkToRead = + std::make_unique(ft.getTableSchema()->getNumUnflatColumns()); + std::unique_ptr flatDataChunkToRead = std::make_unique( + ft.getTableSchema()->getNumFlatColumns(), DataChunkState::getSingleValueDataChunkState()); + std::vector vectorsToRead; + vectorsToRead.reserve(columnWriters.size()); + auto numFlatVectors = 0; + for (auto i = 0u; i < columnWriters.size(); i++) { + writerStates.emplace_back(columnWriters[i]->initializeWriteState(row_group)); + auto vector = std::make_unique(*types[i]->copy(), mm); + vectorsToRead.push_back(vector.get()); + if (ft.getTableSchema()->getColumn(i)->isFlat()) { + flatDataChunkToRead->insert(numFlatVectors, std::move(vector)); + numFlatVectors++; + } else { + unflatDataChunkToRead->insert(i - numFlatVectors, std::move(vector)); + } + } + uint64_t numTuplesRead = 0u; + while (numTuplesRead < ft.getNumTuples()) { + readFromFT(ft, vectorsToRead, numTuplesRead); + for (auto i = 0u; i < columnWriters.size(); i++) { + if (columnWriters[i]->hasAnalyze()) { + columnWriters[i]->analyze(*writerStates[i], nullptr, vectorsToRead[i], + getNumTuples(unflatDataChunkToRead.get())); + } + } + } + + for (auto i = 0u; i < columnWriters.size(); i++) { + if (columnWriters[i]->hasAnalyze()) { + columnWriters[i]->finalizeAnalyze(*writerStates[i]); + } + } + + numTuplesRead = 0u; + while (numTuplesRead < ft.getNumTuples()) { + readFromFT(ft, vectorsToRead, numTuplesRead); + for (auto i = 0u; i < columnWriters.size(); i++) { + columnWriters[i]->prepare(*writerStates[i], nullptr, vectorsToRead[i], + getNumTuples(unflatDataChunkToRead.get())); + } + } + + for (auto i = 0; i < columnWriters.size(); i++) { + columnWriters[i]->beginWrite(*writerStates[i]); + } + + numTuplesRead = 0u; + while (numTuplesRead < ft.getNumTuples()) { + readFromFT(ft, vectorsToRead, numTuplesRead); + for (auto i = 0u; i < columnWriters.size(); i++) { + columnWriters[i]->write( + *writerStates[i], vectorsToRead[i], getNumTuples(unflatDataChunkToRead.get())); + } + } + + for (auto& write_state : writerStates) { + states.push_back(std::move(write_state)); + } +} + +void ParquetWriter::flushRowGroup(PreparedRowGroup& rowGroup) { + std::lock_guard glock(lock); + auto& parquetRowGroup = rowGroup.rowGroup; + auto& states = rowGroup.states; + if (states.empty()) { + throw RuntimeException("Attempting to flush a row group with no rows"); + } + parquetRowGroup.file_offset = fileOffset; + for (auto i = 0u; i < states.size(); i++) { + auto write_state = std::move(states[i]); + columnWriters[i]->finalizeWrite(*write_state); + } + + // Append the row group to the file meta data. + fileMetaData.row_groups.push_back(parquetRowGroup); + fileMetaData.num_rows += parquetRowGroup.num_rows; +} + +void ParquetWriter::readFromFT( + FactorizedTable& ft, std::vector vectorsToRead, uint64_t& numTuplesRead) { + auto numTuplesToRead = + ft.getTableSchema()->getNumUnflatColumns() != 0 ? + 1 : + std::min(ft.getNumTuples() - numTuplesRead, DEFAULT_VECTOR_CAPACITY); + ft.scan(vectorsToRead, numTuplesRead, numTuplesToRead); + numTuplesRead += numTuplesToRead; +} + +void ParquetWriter::finalize() { + auto startOffset = fileOffset; + fileMetaData.write(protocol.get()); + uint32_t metadataSize = fileOffset - startOffset; + FileUtils::writeToFile(fileInfo.get(), reinterpret_cast(&metadataSize), + sizeof(metadataSize), fileOffset); + fileOffset += sizeof(uint32_t); + + // Parquet files also end with the string "PAR1". + FileUtils::writeToFile(fileInfo.get(), + reinterpret_cast(ParquetConstants::PARQUET_MAGIC_WORDS), + strlen(ParquetConstants::PARQUET_MAGIC_WORDS), fileOffset); + fileOffset += strlen(ParquetConstants::PARQUET_MAGIC_WORDS); +} + +} // namespace processor +} // namespace kuzu diff --git a/src/processor/operator/persistent/writer/parquet/string_column_writer.cpp b/src/processor/operator/persistent/writer/parquet/string_column_writer.cpp new file mode 100644 index 0000000000..c099cb3bd7 --- /dev/null +++ b/src/processor/operator/persistent/writer/parquet/string_column_writer.cpp @@ -0,0 +1,208 @@ +#include "processor/operator/persistent/writer/parquet/string_column_writer.h" + +#include "function/comparison/comparison_functions.h" +#include "function/hash/hash_functions.h" + +namespace kuzu { +namespace processor { + +using namespace kuzu::common; +using namespace kuzu_parquet::format; + +std::size_t StringHash::operator()(const ku_string_t& k) const { + hash_t result; + function::Hash::operation(k, result); + return result; +} + +bool StringEquality::operator()(const ku_string_t& a, const ku_string_t& b) const { + uint8_t result; + function::Equals::operation(a, b, result, nullptr /* leftVector */, nullptr /* rightVector */); + return result; +} + +void StringStatisticsState::update(const ku_string_t& val) { + if (valuesTooBig) { + return; + } + if (val.len > ParquetConstants::MAX_STRING_STATISTICS_SIZE) { + // we avoid gathering stats when individual string values are too large + // this is because the statistics are copied into the Parquet file meta data in + // uncompressed format ideally we avoid placing several mega or giga-byte long strings + // there we put a threshold of 10KB, if we see strings that exceed this threshold we + // avoid gathering stats + valuesTooBig = true; + min = std::string(); + max = std::string(); + return; + } + if (!hasStats || val.getAsString() < min) { + min = val.getAsString(); + } + if (!hasStats || val.getAsString() > max) { + max = val.getAsString(); + } + hasStats = true; +} + +std::unique_ptr StringColumnWriter::initializeWriteState(RowGroup& rowGroup) { + auto result = std::make_unique(rowGroup, rowGroup.columns.size(), mm); + registerToRowGroup(rowGroup); + return std::move(result); +} + +void StringColumnWriter::analyze(ColumnWriterState& writerState, ColumnWriterState* parent, + ValueVector* vector, uint64_t count) { + auto& state = reinterpret_cast(writerState); + uint64_t vcount = + parent ? parent->definitionLevels.size() - state.definitionLevels.size() : count; + uint64_t parentIdx = state.definitionLevels.size(); + uint64_t vectorIdx = 0; + uint32_t newValueIdx = state.dictionary.size(); + uint32_t lastValueIdx = -1; + uint64_t runLen = 0; + uint64_t runCount = 0; + for (auto i = 0u; i < vcount; i++) { + if (parent && !parent->isEmpty.empty() && parent->isEmpty[parentIdx + i]) { + continue; + } + auto pos = getVectorPos(vector, vectorIdx); + if (!vector->isNull(pos)) { + runLen++; + const auto& value = vector->getValue(pos); + // Try to insert into the dictionary. If it's already there, we get back the value + // index. + ku_string_t valueToInsert; + StringVector::copyToRowData(vector, pos, reinterpret_cast(&valueToInsert), + state.overflowBuffer.get()); + auto found = state.dictionary.insert( + string_map_t::value_type(valueToInsert, newValueIdx)); + state.estimatedPlainSize += value.len + ParquetConstants::STRING_LENGTH_SIZE; + if (found.second) { + // String didn't exist yet in the dictionary. + newValueIdx++; + state.estimatedDictPageSize += + value.len + ParquetConstants::MAX_DICTIONARY_KEY_SIZE; + } + // If the value changed, we will encode it in the page. + if (lastValueIdx != found.first->second) { + // we will add the value index size later, when we know the total number of keys + state.estimatedRlePagesSize += RleBpEncoder::getVarintSize(runLen); + runLen = 0; + runCount++; + lastValueIdx = found.first->second; + } + } + vectorIdx++; + } + // Add the costs of keys sizes. We don't know yet how many bytes the keys need as we haven't + // seen all the values. therefore we use an over-estimation of + state.estimatedRlePagesSize += ParquetConstants::MAX_DICTIONARY_KEY_SIZE * runCount; +} + +void StringColumnWriter::finalizeAnalyze(ColumnWriterState& writerState) { + auto& state = reinterpret_cast(writerState); + + // Check if a dictionary will require more space than a plain write, or if the dictionary + // page is going to be too large. + if (state.estimatedDictPageSize > ParquetConstants::MAX_UNCOMPRESSED_DICT_PAGE_SIZE || + state.estimatedRlePagesSize + state.estimatedDictPageSize > state.estimatedPlainSize) { + // Clearing the dictionary signals a plain write. + state.dictionary.clear(); + state.keyBitWidth = 0; + } else { + state.keyBitWidth = RleBpDecoder::ComputeBitWidth(state.dictionary.size()); + } +} + +void StringColumnWriter::writeVector(BufferedSerializer& bufferedSerializer, + ColumnWriterStatistics* statsToWrite, ColumnWriterPageState* writerPageState, + ValueVector* vector, uint64_t chunkStart, uint64_t chunkEnd) { + auto pageState = reinterpret_cast(writerPageState); + auto stats = reinterpret_cast(statsToWrite); + + if (pageState->isDictionaryEncoded()) { + // Dictionary based page. + for (auto r = chunkStart; r < chunkEnd; r++) { + auto pos = getVectorPos(vector, r); + if (vector->isNull(pos)) { + continue; + } + auto value_index = pageState->dictionary.at(vector->getValue(pos)); + if (!pageState->writtenValue) { + // Write the bit-width as a one-byte entry. + bufferedSerializer.write(pageState->bitWidth); + // Now begin writing the actual value. + pageState->encoder.beginWrite(bufferedSerializer, value_index); + pageState->writtenValue = true; + } else { + pageState->encoder.writeValue(bufferedSerializer, value_index); + } + } + } else { + for (auto r = chunkStart; r < chunkEnd; r++) { + auto pos = getVectorPos(vector, r); + if (vector->isNull(pos)) { + continue; + } + auto& str = vector->getValue(pos); + stats->update(str); + bufferedSerializer.write(str.len); + bufferedSerializer.writeData(str.getData(), str.len); + } + } +} + +void StringColumnWriter::flushPageState( + BufferedSerializer& bufferedSerializer, ColumnWriterPageState* writerPageState) { + auto pageState = reinterpret_cast(writerPageState); + if (pageState->bitWidth != 0) { + if (!pageState->writtenValue) { + // all values are null + // just write the bit width + bufferedSerializer.write(pageState->bitWidth); + return; + } + pageState->encoder.finishWrite(bufferedSerializer); + } +} + +void StringColumnWriter::flushDictionary( + BasicColumnWriterState& writerState, ColumnWriterStatistics* writerStats) { + auto stats = reinterpret_cast(writerStats); + auto& state = reinterpret_cast(writerState); + if (!state.isDictionaryEncoded()) { + return; + } + // First we need to sort the values in index order. + auto values = std::vector(state.dictionary.size()); + for (const auto& entry : state.dictionary) { + assert(values[entry.second].len == 0); + values[entry.second] = entry.first; + } + // First write the contents of the dictionary page to a temporary buffer. + auto bufferedSerializer = std::make_unique(); + for (auto r = 0u; r < values.size(); r++) { + auto& value = values[r]; + // Update the statistics. + stats->update(value); + // Write this string value to the dictionary. + bufferedSerializer->write(value.len); + bufferedSerializer->writeData(value.getData(), value.len); + } + // Flush the dictionary page and add it to the to-be-written pages. + writeDictionary(state, std::move(bufferedSerializer), values.size()); +} + +uint64_t StringColumnWriter::getRowSize( + ValueVector* vector, uint64_t index, BasicColumnWriterState& writerState) { + auto& state = reinterpret_cast(writerState); + if (state.isDictionaryEncoded()) { + return (state.keyBitWidth + 7) / 8; + } else { + return vector->getValue(getVectorPos(vector, index)).len; + } +} + +} // namespace processor +} // namespace kuzu diff --git a/src/processor/operator/persistent/writer/parquet/struct_column_writer.cpp b/src/processor/operator/persistent/writer/parquet/struct_column_writer.cpp new file mode 100644 index 0000000000..569b642d8c --- /dev/null +++ b/src/processor/operator/persistent/writer/parquet/struct_column_writer.cpp @@ -0,0 +1,99 @@ +#include "processor/operator/persistent/writer/parquet/struct_column_writer.h" + +#include "common/vector/value_vector.h" + +namespace kuzu { +namespace processor { + +using namespace kuzu_parquet::format; +using namespace kuzu::common; + +std::unique_ptr StructColumnWriter::initializeWriteState( + kuzu_parquet::format::RowGroup& rowGroup) { + auto result = std::make_unique(rowGroup, rowGroup.columns.size()); + + result->childStates.reserve(childWriters.size()); + for (auto& child_writer : childWriters) { + result->childStates.push_back(child_writer->initializeWriteState(rowGroup)); + } + return std::move(result); +} + +bool StructColumnWriter::hasAnalyze() { + for (auto& child_writer : childWriters) { + if (child_writer->hasAnalyze()) { + return true; + } + } + return false; +} + +void StructColumnWriter::analyze( + ColumnWriterState& state_p, ColumnWriterState* parent, ValueVector* vector, uint64_t count) { + auto& state = reinterpret_cast(state_p); + auto& childVectors = StructVector::getFieldVectors(vector); + for (auto child_idx = 0u; child_idx < childWriters.size(); child_idx++) { + // Need to check again. It might be that just one child needs it but the rest not + if (childWriters[child_idx]->hasAnalyze()) { + childWriters[child_idx]->analyze( + *state.childStates[child_idx], &state_p, childVectors[child_idx].get(), count); + } + } +} + +void StructColumnWriter::finalizeAnalyze(ColumnWriterState& state_p) { + auto& state = reinterpret_cast(state_p); + for (auto child_idx = 0u; child_idx < childWriters.size(); child_idx++) { + // Need to check again. It might be that just one child needs it but the rest not + if (childWriters[child_idx]->hasAnalyze()) { + childWriters[child_idx]->finalizeAnalyze(*state.childStates[child_idx]); + } + } +} + +void StructColumnWriter::prepare( + ColumnWriterState& state_p, ColumnWriterState* parent, ValueVector* vector, uint64_t count) { + auto& state = reinterpret_cast(state_p); + if (parent) { + // propagate empty entries from the parent + while (state.isEmpty.size() < parent->isEmpty.size()) { + state.isEmpty.push_back(parent->isEmpty[state.isEmpty.size()]); + } + } + handleRepeatLevels(state_p, parent); + handleDefineLevels( + state_p, parent, vector, count, ParquetConstants::PARQUET_DEFINE_VALID, maxDefine - 1); + auto& child_vectors = StructVector::getFieldVectors(vector); + for (auto child_idx = 0u; child_idx < childWriters.size(); child_idx++) { + childWriters[child_idx]->prepare( + *state.childStates[child_idx], &state_p, child_vectors[child_idx].get(), count); + } +} + +void StructColumnWriter::beginWrite(ColumnWriterState& state_p) { + auto& state = reinterpret_cast(state_p); + for (auto child_idx = 0u; child_idx < childWriters.size(); child_idx++) { + childWriters[child_idx]->beginWrite(*state.childStates[child_idx]); + } +} + +void StructColumnWriter::write(ColumnWriterState& state_p, ValueVector* vector, uint64_t count) { + auto& state = reinterpret_cast(state_p); + auto& child_vectors = StructVector::getFieldVectors(vector); + for (auto child_idx = 0u; child_idx < childWriters.size(); child_idx++) { + childWriters[child_idx]->write( + *state.childStates[child_idx], child_vectors[child_idx].get(), count); + } +} + +void StructColumnWriter::finalizeWrite(ColumnWriterState& state_p) { + auto& state = reinterpret_cast(state_p); + for (auto child_idx = 0u; child_idx < childWriters.size(); child_idx++) { + // we add the null count of the struct to the null count of the children + childWriters[child_idx]->nullCount += nullCount; + childWriters[child_idx]->finalizeWrite(*state.childStates[child_idx]); + } +} + +} // namespace processor +} // namespace kuzu diff --git a/src/processor/operator/persistent/writer/parquet/var_list_column_writer.cpp b/src/processor/operator/persistent/writer/parquet/var_list_column_writer.cpp new file mode 100644 index 0000000000..1440e7efc9 --- /dev/null +++ b/src/processor/operator/persistent/writer/parquet/var_list_column_writer.cpp @@ -0,0 +1,106 @@ +#include "processor/operator/persistent/writer/parquet/var_list_column_writer.h" + +namespace kuzu { +namespace processor { + +using namespace kuzu_parquet::format; + +std::unique_ptr VarListColumnWriter::initializeWriteState( + kuzu_parquet::format::RowGroup& rowGroup) { + auto result = std::make_unique(rowGroup, rowGroup.columns.size()); + result->childState = childWriter->initializeWriteState(rowGroup); + return std::move(result); +} + +bool VarListColumnWriter::hasAnalyze() { + return childWriter->hasAnalyze(); +} + +void VarListColumnWriter::analyze(ColumnWriterState& writerState, ColumnWriterState* parent, + common::ValueVector* vector, uint64_t count) { + auto& state = reinterpret_cast(writerState); + childWriter->analyze(*state.childState, &writerState, common::ListVector::getDataVector(vector), + common::ListVector::getDataVectorSize(vector)); +} + +void VarListColumnWriter::finalizeAnalyze(ColumnWriterState& writerState) { + auto& state = reinterpret_cast(writerState); + childWriter->finalizeAnalyze(*state.childState); +} + +void VarListColumnWriter::prepare(ColumnWriterState& writerState, ColumnWriterState* parent, + common::ValueVector* vector, uint64_t count) { + auto& state = reinterpret_cast(writerState); + + // Write definition levels and repeats. + uint64_t start = 0; + auto vcount = parent ? parent->definitionLevels.size() - state.parentIdx : count; + uint64_t vectorIdx = 0; + for (auto i = start; i < vcount; i++) { + auto parentIdx = state.parentIdx + i; + if (parent && !parent->isEmpty.empty() && parent->isEmpty[parentIdx]) { + state.definitionLevels.push_back(parent->definitionLevels[parentIdx]); + state.repetitionLevels.push_back(parent->repetitionLevels[parentIdx]); + state.isEmpty.push_back(true); + continue; + } + auto firstRepeatLevel = parent && !parent->repetitionLevels.empty() ? + parent->repetitionLevels[parentIdx] : + maxRepeat; + auto pos = getVectorPos(vector, vectorIdx); + if (parent && + parent->definitionLevels[parentIdx] != common::ParquetConstants::PARQUET_DEFINE_VALID) { + state.definitionLevels.push_back(parent->definitionLevels[parentIdx]); + state.repetitionLevels.push_back(firstRepeatLevel); + state.isEmpty.push_back(true); + } else if (!vector->isNull(pos)) { + auto listEntry = vector->getValue(pos); + // push the repetition levels + if (listEntry.size == 0) { + state.definitionLevels.push_back(maxDefine); + state.isEmpty.push_back(true); + } else { + state.definitionLevels.push_back(common::ParquetConstants::PARQUET_DEFINE_VALID); + state.isEmpty.push_back(false); + } + state.repetitionLevels.push_back(firstRepeatLevel); + for (auto k = 1; k < listEntry.size; k++) { + state.repetitionLevels.push_back(maxRepeat + 1); + state.definitionLevels.push_back(common::ParquetConstants::PARQUET_DEFINE_VALID); + state.isEmpty.push_back(false); + } + } else { + if (!canHaveNulls) { + throw common::RuntimeException( + "Parquet writer: map key column is not allowed to contain NULL values"); + } + state.definitionLevels.push_back(maxDefine - 1); + state.repetitionLevels.push_back(firstRepeatLevel); + state.isEmpty.push_back(true); + } + vectorIdx++; + } + state.parentIdx += vcount; + childWriter->prepare(*state.childState, &writerState, common::ListVector::getDataVector(vector), + common::ListVector::getDataVectorSize(vector)); +} + +void VarListColumnWriter::beginWrite(ColumnWriterState& state_p) { + auto& state = reinterpret_cast(state_p); + childWriter->beginWrite(*state.childState); +} + +void VarListColumnWriter::write( + ColumnWriterState& writerState, common::ValueVector* vector, uint64_t count) { + auto& state = reinterpret_cast(writerState); + childWriter->write(*state.childState, common::ListVector::getDataVector(vector), + common::ListVector::getDataVectorSize(vector)); +} + +void VarListColumnWriter::finalizeWrite(ColumnWriterState& writerState) { + auto& state = reinterpret_cast(writerState); + childWriter->finalizeWrite(*state.childState); +} + +} // namespace processor +} // namespace kuzu diff --git a/src/processor/result/factorized_table.cpp b/src/processor/result/factorized_table.cpp index 5f17924790..452dffb5e5 100644 --- a/src/processor/result/factorized_table.cpp +++ b/src/processor/result/factorized_table.cpp @@ -45,6 +45,26 @@ bool FactorizedTableSchema::operator==(const FactorizedTableSchema& other) const other.numBytesForNullMapPerTuple; } +uint64_t FactorizedTableSchema::getNumFlatColumns() const { + auto numFlatColumns = 0u; + for (auto& column : columns) { + if (column->isFlat()) { + numFlatColumns++; + } + } + return numFlatColumns; +} + +uint64_t FactorizedTableSchema::getNumUnflatColumns() const { + auto numUnflatColumns = 0u; + for (auto& column : columns) { + if (!column->isFlat()) { + numUnflatColumns++; + } + } + return numUnflatColumns; +} + void DataBlock::copyTuples(DataBlock* blockToCopyFrom, ft_tuple_idx_t tupleIdxToCopyFrom, DataBlock* blockToCopyInto, ft_tuple_idx_t tupleIdxToCopyTo, uint32_t numTuplesToCopy, uint32_t numBytesPerTuple) { diff --git a/test/test_files/copy/copy_to_big_results.test b/test/test_files/copy/copy_to_big_results.test index a1b6f15fb1..f62a278e6e 100644 --- a/test/test_files/copy/copy_to_big_results.test +++ b/test/test_files/copy/copy_to_big_results.test @@ -89,7 +89,6 @@ dateColumn DATE, timestampColumn TIMESTAMP, stringColumn STRING, listOfInt INT64 -INSERT_STATEMENT_BLOCK VALIDATE_RESULT --SKIP -LOG TestParquet -STATEMENT COPY (MATCH (r:tableOfTypes) RETURN r.id, r.doubleColumn, diff --git a/test/test_files/copy/copy_to_nested.test b/test/test_files/copy/copy_to_nested.test deleted file mode 100644 index eb22b83b28..0000000000 --- a/test/test_files/copy/copy_to_nested.test +++ /dev/null @@ -1,84 +0,0 @@ --GROUP CopyToNested --DATASET CSV tinysnb - --- - --CASE ListList - --STATEMENT CREATE NODE TABLE Nested(id int64, - list int64[], - doubleList double[][], - primary key(id)); ----- ok --STATEMENT COPY (RETURN 1,[1,2,3,4],[[1.0,2.0],[3.0,4.08],[8891.9999],[5.1,6.884,7.12,8.0]]) TO "${DATABASE_PATH}/nested.parquet" ----- ok --STATEMENT COPY Nested FROM "${DATABASE_PATH}/nested.parquet" ----- ok --STATEMENT MATCH (n:Nested) return n.list, n.doubleList ----- 1 -[1,2,3,4]|[[1.000000,2.000000],[3.000000,4.080000],[8891.999900],[5.100000,6.884000,7.120000,8.000000]] - --CASE StructStructStructList - --STATEMENT CREATE NODE TABLE Nested(id int64, field struct (a struct(b struct(c boolean[])), d struct(e struct(f int64[]))), primary key(id)); ----- ok --STATEMENT COPY (RETURN 1, {a: {b: {c: [true,false,false]}} , d: {e: {f: [3,4]}}}) TO "${DATABASE_PATH}/nested.parquet" ----- ok --STATEMENT COPY Nested FROM "${DATABASE_PATH}/nested.parquet" ----- ok --STATEMENT MATCH (n:Nested) return n.field ----- 1 -{a: {b: {c: [True,False,False]}}, d: {e: {f: [3,4]}}} - --CASE StructList --SKIP -# Copy-to-parquet writes a list of dates as list of strings. --STATEMENT CREATE NODE TABLE Nested(id int64, field struct(first date[], second string[]), primary key(id)); ----- ok --STATEMENT COPY (RETURN 1,{first:['1985-08-19','2023-05-01','2023-08-31','2087-09-04'], second:['list','of','strings']}) TO "${DATABASE_PATH}/nested.parquet" ----- ok --STATEMENT COPY Nested FROM "${DATABASE_PATH}/nested.parquet" ----- ok --STATEMENT MATCH (n:Nested) return n.field ----- 1 -{first: [1985-08-19,2023-05-01,2023-08-31,2087-09-04], second: [list,of,strings]} - --CASE StructStruct - --STATEMENT CREATE NODE TABLE Nested(id int64, field struct( a struct( b int64, c int64 ), d struct(b int64, c int64) ), primary key(id)); ----- ok --STATEMENT COPY (RETURN 1,{a: {b: 1, c: 2}, d: {b: 3, c: 4}}) TO "${DATABASE_PATH}/nested.parquet" ----- ok --STATEMENT COPY Nested FROM "${DATABASE_PATH}/nested.parquet" ----- ok --STATEMENT MATCH (n:Nested) return n.field ----- 1 -{a: {b: 1, c: 2}, d: {b: 3, c: 4}} - --CASE ListFiveListsList - --STATEMENT CREATE NODE TABLE Nested(id int64, field int64[][][][][][], field2 int64[][][], primary key(id)); ----- ok --STATEMENT COPY (RETURN 1,[[[[[[0,1]],[[2,3]]]]]],[[[1]],[[2],[6]],[[3,4,8],[9]],[[5]]]) TO "${DATABASE_PATH}/nested.parquet" ----- ok --STATEMENT COPY Nested FROM "${DATABASE_PATH}/nested.parquet" ----- ok --STATEMENT MATCH (n:Nested) return n.field, n.field2 ----- 1 -[[[[[[0,1]],[[2,3]]]]]]|[[[1]],[[2],[6]],[[3,4,8],[9]],[[5]]] - -# Depending on PR #1955 --CASE ListStructList --SKIP --STATEMENT CREATE NODE TABLE Nested(id string, - field struct(a double[])[], - field2 struct(s string)[], - primary key(id)); ----- ok --STATEMENT COPY (RETURN 'anyString',[{a:[1.899941]}],[{s: 'is not second'}, {s: 'is not first'}]) TO "${DATABASE_PATH}/nested.parquet" ----- ok --STATEMENT COPY Nested FROM "${DATABASE_PATH}/nested.parquet" ----- ok --STATEMENT MATCH (n:Nested) return n.id, n.field, n.field2 ----- 1 -anyString|[{a: [1.899941]}]|[{s: is not second},{s: is not first}] diff --git a/test/test_files/copy/copy_to_parquet.test b/test/test_files/copy/copy_to_parquet.test index 808c9a1264..e799a3942e 100644 --- a/test/test_files/copy/copy_to_parquet.test +++ b/test/test_files/copy/copy_to_parquet.test @@ -3,43 +3,54 @@ -- --CASE CommonDataTypesCopyToParquet +-CASE TinySnbCopyToParquet --STATEMENT COPY (MATCH (p:person) RETURN p.ID, id(p), p.fName, p.gender, p.isStudent, p.age, p.eyeSight, p.height) TO "${DATABASE_PATH}/common.parquet" +-LOG CopyPersonToParquet +-STATEMENT COPY (MATCH (p:person) RETURN p.ID, p.fName, p.gender, p.isStudent, p.age, p.eyeSight, p.height, p.birthdate, p.workedHours, p.usedNames, p.courseScoresPerTerm) TO "${DATABASE_PATH}/person.parquet" ---- ok --STATEMENT create node table personCopy (ID iNt64, internalID string, fName STRiNG, gender INT64, isStudent BoOLEAN, age INT64, eyeSight DOUBLE, height float, PRIMARY KEY (ID)) ----- ok --STATEMENT COPY personCopy FROM "${DATABASE_PATH}/common.parquet" (header= TRUE) ----- ok --STATEMENT MATCH (p:personCopy) RETURN p.ID, p.internalID, p.fName, p.gender, p.isStudent, p.age, p.eyeSight, p.height +-STATEMENT LOAD FROM "${DATABASE_PATH}/person.parquet" RETURN *; ---- 8 -0|0:0|Alice|1|True|35|5.000000|1.731000 -10|0:7|Hubert Blaine Wolfeschlegelsteinhausenbergerdorff|2|False|83|4.900000|1.323000 -2|0:1|Bob|2|True|30|5.100000|0.990000 -3|0:2|Carol|1|False|45|5.000000|1.000000 -5|0:3|Dan|2|False|20|4.800000|1.300000 -7|0:4|Elizabeth|1|False|20|4.700000|1.463000 -8|0:5|Farooq|2|True|25|4.500000|1.510000 -9|0:6|Greg|2|False|40|4.900000|1.600000 - - --CASE DatesCopyToParquet --SKIP -# Support TIMESTAMP datatype in parquet reader +0|Alice|1|True|35|5.000000|1.731000|1900-01-01|[10,5]|[Aida]|[[10,8],[6,7,8]] +2|Bob|2|True|30|5.100000|0.990000|1900-01-01|[12,8]|[Bobby]|[[8,9],[9,10]] +3|Carol|1|False|45|5.000000|1.000000|1940-06-22|[4,5]|[Carmen,Fred]|[[8,10]] +5|Dan|2|False|20|4.800000|1.300000|1950-07-23|[1,9]|[Wolfeschlegelstein,Daniel]|[[7,4],[8,8],[9]] +7|Elizabeth|1|False|20|4.700000|1.463000|1980-10-26|[2]|[Ein]|[[6],[7],[8]] +8|Farooq|2|True|25|4.500000|1.510000|1980-10-26|[3,4,5,6,7]|[Fesdwe]|[[8]] +9|Greg|2|False|40|4.900000|1.600000|1980-10-26|[1]|[Grad]|[[10]] +10|Hubert Blaine Wolfeschlegelsteinhausenbergerdorff|2|False|83|4.900000|1.323000|1990-11-27|[10,11,12,3,4,5,6,7]|[Ad,De,Hi,Kye,Orlan]|[[7],[10],[6,7]] --STATEMENT COPY (MATCH (p:person) RETURN p.ID, p.birthdate, p.registerTime) TO "${DATABASE_PATH}/dates.parquet"; +-LOG CopyOrganisationToParquet +-STATEMENT COPY (MATCH (o:organisation) RETURN o.ID, o.state) TO "${DATABASE_PATH}/organisation.parquet" ---- ok --STATEMENT CREATE NODE TABLE personCopy(ID INT64, birthdate DATE, registerTime TIMESTAMP, PRIMARY KEY(ID)) +-STATEMENT LOAD FROM "${DATABASE_PATH}/organisation.parquet" RETURN *; +---- 3 +1|{revenue: 138, location: ['toronto', 'montr,eal'], stock: {price: [96,56], volume: 1000}} +4|{revenue: 152, location: ["vanco,uver north area"], stock: {price: [15,78,671], volume: 432}} +6|{revenue: 558, location: ['very long city name', 'new york'], stock: {price: [22], volume: 99}} + +-LOG CopyEmptyListToParquet +-STATEMENT COPY (RETURN null,[], [1,null,3], [[],null,[3,4,5]]) TO "${DATABASE_PATH}/emptyList.parquet" ---- ok --STATEMENT COPY personCopy FROM "${DATABASE_PATH}/dates.parquet" +-STATEMENT LOAD FROM "${DATABASE_PATH}/emptyList.parquet" RETURN *; +---- 1 +|[]|[1,,3]|[[],,[3,4,5]] + +-LOG CopyToParquetFlatUnflat +-STATEMENT COPY (MATCH (p:person)-[:knows]->(p1:person) return p.ID, p1.ID) TO "${DATABASE_PATH}/flatUnflat.parquet" ---- ok --STATEMENT MATCH (p:personCopy) RETURN p.birthdate, p.registerTime ----- 8 -1900-01-01|2008-11-03 15:25:30.000526 -1900-01-01|2011-08-20 11:25:30 -1940-06-22|1911-08-20 02:32:21 -1950-07-23|2031-11-30 12:25:30 -1980-10-26|1972-07-31 13:22:30.678559 -1980-10-26|1976-12-23 04:41:42 -1980-10-26|1976-12-23 11:21:42 -1990-11-27|2023-02-21 13:25:30 +-STATEMENT LOAD FROM "${DATABASE_PATH}/flatUnflat.parquet" RETURN *; +---- 14 +0|2 +0|3 +0|5 +2|0 +2|3 +2|5 +3|0 +3|2 +3|5 +5|0 +5|2 +5|3 +7|8 +7|9