From bc3809dac0e5bd3af4036b140ab1129b8bd22fb9 Mon Sep 17 00:00:00 2001 From: rfdavid Date: Sun, 20 Aug 2023 13:06:38 -0400 Subject: [PATCH] VAR_LIST with INT64 type --- examples/cpp/main.cpp | 20 +- src/binder/bind/bind_copy.cpp | 6 +- src/common/types/types.cpp | 12 + src/include/common/types/types.h | 3 + src/include/common/vector/value_vector.h | 2 + .../operator/copy_to/parquet_writer.h | 64 +- src/processor/map/map_copy.cpp | 16 +- .../operator/copy_to/parquet_writer.cpp | 1155 ++++++++++++----- 8 files changed, 935 insertions(+), 343 deletions(-) diff --git a/examples/cpp/main.cpp b/examples/cpp/main.cpp index 1935d0eed23..427593b248b 100644 --- a/examples/cpp/main.cpp +++ b/examples/cpp/main.cpp @@ -26,7 +26,25 @@ int main() { connection->query("COPY marries FROM '/Users/rfdavid/Devel/waterloo/kuzu/dataset/tinysnb/eMarries.csv';"); - connection->query("COPY (MATCH (p:person) RETURN p.age) TO 'out.parquet';"); + connection->query("COPY (MATCH (p:person) RETURN p.courseScoresPerTerm) TO 'out.parquet';"); + + // connection->query("COPY (RETURN {first: {f: 44}, second: {s: 38}}) TO 'out.parquet';"); +// connection->query("COPY (RETURN {first:[[7],[5]], second:[[4],[3],[2]]}) TO 'out.parquet';"); + +// connection->query("COPY (RETURN [[[1,2],[3,4]]]) TO 'out.parquet';"); +// connection->query("COPY (RETURN [[1,2],[3,4],[5,6,7,8]]) TO 'out.parquet';"); +// connection->query("COPY (RETURN [1,2,3,4]) TO 'out.parquet';"); + + // connection->query("COPY (RETURN [ 1,2,3,4]) TO 'out.parquet';"); + +// connection->query("COPY (RETURN {first:[1,2,3], second:[4,5,6,7]}) TO 'out.parquet';"); + + +// connection->query("COPY (RETURN {first: {f: 'please'}, second: {s: 'work'}}) TO 'out.parquet';"); + +// connection->query("COPY (MATCH (p:person) RETURN p.fName, p.gender, p.isStudent, p.age, p.birthdate, p.registerTime) TO 'out.parquet';"); +// connection->query("COPY (RETURN {first: {f: 'please'}, second: {s: 'work'}}) TO 'out.parquet';"); +// connection->query("COPY (RETURN {first: [[44]], second: [[12]]}) TO 'out.parquet';"); // connection->query("COPY (RETURN {first:[['a'],['b']], second:[['c'],['d'],['e']]}) TO 'out.parquet';"); // connection->query("COPY (MATCH (p:person) RETURN p.workedHours) TO 'dates.parquet';"); diff --git a/src/binder/bind/bind_copy.cpp b/src/binder/bind/bind_copy.cpp index 20440da7661..6bb8bd0581b 100644 --- a/src/binder/bind/bind_copy.cpp +++ b/src/binder/bind/bind_copy.cpp @@ -23,9 +23,9 @@ std::unique_ptr Binder::bindCopyToClause(const Statement& statem auto columnName = column->hasAlias() ? column->getAlias() : column->toString(); columnNames.push_back(columnName); columnTypes.push_back(column->getDataType()); -// auto d = column->getDataType(); -// auto e = VarListType::getChildType(&d); -// auto f = "ok"; + // auto d = column->getDataType(); + // auto e = VarListType::getChildType(&d); + // auto f = "ok"; } if (fileType != CopyDescription::FileType::CSV && fileType != CopyDescription::FileType::PARQUET) { diff --git a/src/common/types/types.cpp b/src/common/types/types.cpp index 22ed87eb82e..575bb3cd11c 100644 --- a/src/common/types/types.cpp +++ b/src/common/types/types.cpp @@ -605,6 +605,18 @@ bool LogicalTypeUtils::isNumerical(const LogicalType& dataType) { } } +// TODO (Rui): review the datatypes +bool LogicalTypeUtils::isPrimitive(const LogicalType& dataType) { + switch (dataType.typeID) { + case LogicalTypeID::STRUCT: + case LogicalTypeID::VAR_LIST: + case LogicalTypeID::FIXED_LIST: + return false; + default: + return true; + } +} + std::vector LogicalTypeUtils::getAllValidComparableLogicalTypes() { return std::vector{LogicalType{LogicalTypeID::BOOL}, LogicalType{LogicalTypeID::INT64}, LogicalType{LogicalTypeID::INT32}, diff --git a/src/include/common/types/types.h b/src/include/common/types/types.h index 8f1bfb00b1a..11138fa8332 100644 --- a/src/include/common/types/types.h +++ b/src/include/common/types/types.h @@ -274,6 +274,8 @@ class LogicalType { static std::vector> copy( const std::vector>& types); + static const bool isPrimitiveDataType(); + private: void setPhysicalType(); @@ -421,6 +423,7 @@ class LogicalTypeUtils { KUZU_API static LogicalType dataTypeFromString(const std::string& dataTypeString); static uint32_t getRowLayoutSize(const LogicalType& logicalType); static bool isNumerical(const LogicalType& dataType); + static bool isPrimitive(const LogicalType& dataType); static std::vector getAllValidComparableLogicalTypes(); static std::vector getNumericalLogicalTypeIDs(); static std::vector getAllValidLogicTypes(); diff --git a/src/include/common/vector/value_vector.h b/src/include/common/vector/value_vector.h index 45cb8f4b4ba..7a4571efcdd 100644 --- a/src/include/common/vector/value_vector.h +++ b/src/include/common/vector/value_vector.h @@ -81,6 +81,8 @@ class ValueVector { void resetAuxiliaryBuffer(); + inline bool isPrimitiveDataType() { return LogicalTypeUtils::isPrimitive(dataType); } + // If there is still non-null values after discarding, return true. Otherwise, return false. // For an unflat vector, its selection vector is also updated to the resultSelVector. static bool discardNull(ValueVector& vector); diff --git a/src/include/processor/operator/copy_to/parquet_writer.h b/src/include/processor/operator/copy_to/parquet_writer.h index 1c2dd03d452..1e588deb890 100644 --- a/src/include/processor/operator/copy_to/parquet_writer.h +++ b/src/include/processor/operator/copy_to/parquet_writer.h @@ -6,7 +6,6 @@ #include #include - namespace kuzu { namespace processor { @@ -20,37 +19,84 @@ class ParquetWriter : public CSVParquetWriter { void writeValues(std::vector& outputVectors) override; private: - static std::shared_ptr kuzuTypeToParquetType( - std::string& columnName, const common::LogicalType& logicalType, int length = -1); +// struct ParquetValue { +// common::LogicalTypeID logicalTypeID; +// int16_t definitionLevel; +// int16_t repetitionLevel; +// uint8_t* value; +// }; +// + static std::shared_ptr kuzuTypeToParquetType(std::string& columnName, + const common::LogicalType& logicalType, + parquet::Repetition::type repetition = parquet::Repetition::REQUIRED, int length = -1); void* getValueToWrite(common::ValueVector* valueVector, uint32_t selPos); void writeValue(common::LogicalTypeID type, void* value); void flush(); + void generateSchema(); - std::shared_ptr generateTable(); void writeParquetFile(const arrow::Table& table); std::shared_ptr schema; std::shared_ptr getParquetType(const common::LogicalType& logicalType); - std::shared_ptr buildParquetArray(common::ValueVector* vector); std::vector> data; std::vector> builders; - std::unique_ptr fileWriter; + std::shared_ptr fileWriter; + parquet::RowGroupWriter* rowWriter; + std::shared_ptr outFile; std::shared_ptr getBuilder(const common::LogicalType& logicalType); - void pushToBuilder(std::shared_ptr& builder, common::ValueVector* vector); +// void writeBatch(ParquetValue& parquetValue); + void writeParquetBatch(common::ValueVector* vector); +// void createBatch(std::vector& parquetValues, uint16_t& outSize, +// std::vector& outDefinitionLevels, std::vector& outRepetitionLevels, +// std::vector& outValues); +// + // void pushToBuilder(std::shared_ptr& builder, common::ValueVector* + // vector); - // TODO (Rui) : check what is necessary, lots of unused variables +// ParquetValue getParquetValue( +// common::ValueVector* vector, int16_t repetitionLevel = 0, int16_t definitionLevel = 0); +// std::vector getParquetValues( +// common::ValueVector* vector, int16_t repetitionLevel = 0, int16_t definitionLevel = 0); +// std::shared_ptr outputStream; - parquet::RowGroupWriter* rowWriter; + +// template +// void extractValues(const common::ValueVector* vector, common::list_entry_t list, std::vector& result); + + + template + struct ParquetBatch { + std::vector values; + std::vector repetitionLevels; + std::vector definitionLevels; + }; + + template + void castValueToVector(const common::LogicalType& dataType, uint8_t* value, common::ValueVector* vector, ParquetBatch& parquetBatch, int currentElementIndex=0, int parentElementIndex=0, int depth=0); + + template + void extractList(const common::list_entry_t list, const common::ValueVector* vector, ParquetBatch& parquetBatch, int currentElementIndex=0, int parentElementIndex=0, int depth=0); + + template + void extractStruct(const common::ValueVector* vector, ParquetBatch& parquetBatch); + + + int getRepetitionLevel(int currentElementIndex, int parentElementIndex, int depth); + + // std::vector getValues(common::ValueVector* vector, common::list_entry_t list); + + // std::shared_ptr buildParquetArray(common::ValueVector* vector); + // std::shared_ptr generateTable(); }; } // namespace processor diff --git a/src/processor/map/map_copy.cpp b/src/processor/map/map_copy.cpp index 131dc04aacb..17c3b7ef315 100644 --- a/src/processor/map/map_copy.cpp +++ b/src/processor/map/map_copy.cpp @@ -38,26 +38,14 @@ std::unique_ptr PlanMapper::mapCopyTo(LogicalOperator* logical } auto sharedState = std::make_shared(copy->getCopyDescription().fileType); - // return std::make_unique(std::make_unique(childSchema), - // sharedState, - // std::move(vectorsToCopyPos), copy->getCopyDescription(), getOperatorID(), - // copy->getExpressionsForPrinting(), std::move(prevOperator)); - auto copyTo = std::make_unique(std::make_unique(childSchema), sharedState, std::move(vectorsToCopyPos), copy->getCopyDescription(), getOperatorID(), copy->getExpressionsForPrinting(), std::move(prevOperator)); - std::shared_ptr fTable; - auto ftTableSchema = std::make_unique(); - // ftTableSchema->appendColumn( - // std::make_unique(false /* flat */, 0 /* dataChunkPos */, - // common::LogicalTypeUtils::getRowLayoutSize(common::LogicalType{common::LogicalTypeID::STRING}))); - fTable = std::make_shared(memoryManager, std::move(ftTableSchema)); - - return createFactorizedTableScan( - binder::expression_vector{}, childSchema, fTable, std::move(copyTo)); + return createFactorizedTableScan(binder::expression_vector{}, std::vector{}, + childSchema, fTable, 0, std::move(copyTo)); } std::unique_ptr PlanMapper::mapCopyNode( diff --git a/src/processor/operator/copy_to/parquet_writer.cpp b/src/processor/operator/copy_to/parquet_writer.cpp index 7c8b3b0d69a..e8689ee7cf9 100644 --- a/src/processor/operator/copy_to/parquet_writer.cpp +++ b/src/processor/operator/copy_to/parquet_writer.cpp @@ -4,15 +4,12 @@ #include // TODO (Rui): remove this #include -#include "arrow/array/builder_primitive.h" #include "arrow/array/builder_binary.h" -#include "arrow/table.h" - +#include "arrow/array/builder_primitive.h" #include "arrow/io/api.h" +#include "arrow/table.h" #include "parquet/arrow/writer.h" - - using namespace kuzu::common; namespace kuzu { @@ -26,242 +23,620 @@ void ParquetWriter::open(const std::string& filePath) { outFile = *result; } -std::shared_ptr ParquetWriter::getBuilder(const LogicalType& logicalType) { - arrow::MemoryPool* pool = arrow::default_memory_pool(); - switch(logicalType.getLogicalTypeID()) { - case LogicalTypeID::STRUCT: { - } break; - - case LogicalTypeID::VAR_LIST: { - auto childType = *VarListType::getChildType(&logicalType); - // return std::make_shared(pool, getBuilder(childType)); - //return std::make_shared(pool, std::make_shared(pool)); - - return std::static_pointer_cast( - std::make_shared(pool, std::make_shared(pool)) - ); - - } break; - - case LogicalTypeID::INT64: { - return std::make_shared(pool); - } - } -} - -// For schema creation -std::shared_ptr ParquetWriter::getParquetType(const LogicalType& logicalType) { - switch(logicalType.getLogicalTypeID()) { +std::shared_ptr ParquetWriter::kuzuTypeToParquetType(std::string& columnName, + const LogicalType& logicalType, parquet::Repetition::type repetition, int length) { + parquet::Type::type parquetType; + parquet::ConvertedType::type convertedType; + switch (logicalType.getLogicalTypeID()) { case LogicalTypeID::BOOL: - return arrow::boolean(); - case LogicalTypeID::INT64: - return arrow::int64(); + parquetType = parquet::Type::BOOLEAN; + convertedType = parquet::ConvertedType::NONE; + break; case LogicalTypeID::STRING: - return arrow::utf8(); - case LogicalTypeID::VAR_LIST: - return arrow::list(getParquetType(*VarListType::getChildType(&logicalType))); + parquetType = parquet::Type::BYTE_ARRAY; + convertedType = parquet::ConvertedType::UTF8; + break; + case LogicalTypeID::INT64: + parquetType = parquet::Type::INT64; + convertedType = parquet::ConvertedType::INT_64; + break; + case LogicalTypeID::INT16: + parquetType = parquet::Type::INT32; + convertedType = parquet::ConvertedType::INT_16; + break; + case LogicalTypeID::INT32: + parquetType = parquet::Type::INT32; + convertedType = parquet::ConvertedType::INT_32; + break; + case LogicalTypeID::FLOAT: + parquetType = parquet::Type::FLOAT; + convertedType = parquet::ConvertedType::NONE; + break; + case LogicalTypeID::DOUBLE: + parquetType = parquet::Type::DOUBLE; + convertedType = parquet::ConvertedType::NONE; + break; + case LogicalTypeID::DATE: + parquetType = parquet::Type::INT32; + convertedType = parquet::ConvertedType::DATE; + break; + case LogicalTypeID::TIMESTAMP: + parquetType = parquet::Type::INT64; + convertedType = parquet::ConvertedType::TIMESTAMP_MICROS; + break; + case LogicalTypeID::INTERVAL: + parquetType = parquet::Type::FIXED_LEN_BYTE_ARRAY; + convertedType = parquet::ConvertedType::INTERVAL; + length = 12; + break; + case LogicalTypeID::INTERNAL_ID: { + // ... + } break; case LogicalTypeID::STRUCT: { auto structType = StructType::getFieldTypes(&logicalType); auto structNames = StructType::getFieldNames(&logicalType); - std::vector> fields; + std::vector> nodes; for (auto i = 0u; i < structType.size(); ++i) { - fields.push_back(arrow::field(structNames[i], getParquetType(*structType[i]))); + nodes.push_back(kuzuTypeToParquetType(structNames[i], *structType[i])); } - return arrow::struct_(fields); - } - } -} + auto groupNode = std::static_pointer_cast( + parquet::schema::GroupNode::Make(columnName, parquet::Repetition::REQUIRED, nodes)); + return groupNode; + } break; + case LogicalTypeID::FIXED_LIST: + case LogicalTypeID::VAR_LIST: { -void ParquetWriter::generateSchema() { - std::vector> fields; - for (auto i = 0u; i < getColumnNames().size(); ++i) { - fields.push_back(arrow::field(getColumnNames()[i], getParquetType(getColumnTypes()[i]))); - // TODO (Rui): move to a specific function - builders.push_back(getBuilder(getColumnTypes()[i])); - } - schema = arrow::schema(fields); -} + auto childLogicalType = VarListType::getChildType(&logicalType); -void ParquetWriter::pushToBuilder(std::shared_ptr& builder, common::ValueVector* vector) { - auto selPos = vector->state->selVector->selectedPositions[0]; + // Recursion - create node for the child first. + std::string childName = "child"; + auto childNode = kuzuTypeToParquetType(childName, *childLogicalType, parquet::Repetition::OPTIONAL, length); - switch (vector->dataType.getLogicalTypeID()) { - case LogicalTypeID::STRUCT: { - } break; + auto repeated_group = parquet::schema::GroupNode::Make("", parquet::Repetition::REPEATED, {childNode}); + auto optional = parquet::schema::GroupNode::Make("list", parquet::Repetition::OPTIONAL, {repeated_group}, parquet::LogicalType::List()); + return optional; - case LogicalTypeID::VAR_LIST: - break; + if (LogicalTypeUtils::isPrimitive(*childLogicalType)) { + // If child is primitive, return the childNode directly without wrapping it. + return childNode; + } else { + // Otherwise, wrap the childNode inside a group node. + auto repeated_group = parquet::schema::GroupNode::Make("", parquet::Repetition::REPEATED, {childNode}); + auto optional = parquet::schema::GroupNode::Make("list", parquet::Repetition::OPTIONAL, {repeated_group}, parquet::LogicalType::List()); + return optional; + } - case LogicalTypeID::INT64: { - auto& int64Builder = static_cast(*builder); - auto value = vector->getValue(selPos); - PARQUET_THROW_NOT_OK(int64Builder.Append(value)); - } - } -} -void ParquetWriter::writeValues(std::vector& outputVectors) { - if (outputVectors.size() == 0) { - return; - } - for (auto i = 0u; i < outputVectors.size(); i++) { - assert(outputVectors[i]->state->isFlat()); - pushToBuilder(builders[i], outputVectors[i]); +// // [[1,2], [3,4]] +// auto childLogicalType = VarListType::getChildType(&logicalType); +// auto node = kuzuTypeToParquetType( +// columnName, *childLogicalType, parquet::Repetition::REPEATED, length); +// +// if (LogicalTypeUtils::isPrimitive(*childLogicalType)) { +// std::cout << "- Primitive" << std::endl; +// return node; +// } else { +// std::cout << "- Group" << std::endl; +// auto groupNode = std::static_pointer_cast( +// parquet::schema::GroupNode::Make( +// columnName, parquet::Repetition::REPEATED, {node})); +// return groupNode; +// } +// + } break; + default: + throw RuntimeException("Unsupported type for Parquet datatype"); } - flush(); + return parquet::schema::PrimitiveNode::Make( + columnName, repetition, parquetType, convertedType, length); } -void ParquetWriter::flush() { - for (auto builder : builders) { - std::shared_ptr array; - PARQUET_THROW_NOT_OK(builder->Finish(&array)); - data.push_back(array); - } - auto table = arrow::Table::Make(schema, data); - writeParquetFile(*table); - data.clear(); -} -void ParquetWriter::closeFile() { - fileWriter->Close(); - outFile->Close(); -} +// Parquet Physical Types: +// BOOLEAN: 1 bit boolean +// INT32: 32 bit signed ints +// INT64: 64 bit signed ints +// INT96: 96 bit signed ints +// FLOAT: IEEE 32-bit floating point values +// DOUBLE: IEEE 64-bit floating point values +// BYTE_ARRAY: arbitrarily long byte arrays. +// https://github.com/apache/parquet-cpp/blob/master/src/parquet/column_writer.h +void ParquetWriter::init() { -void ParquetWriter::writeParquetFile(const arrow::Table& table) { - // TODO (Rui) : check if you're gonna use PARQUET_THROW_NOT_OK and also the - // row number - PARQUET_THROW_NOT_OK(fileWriter->WriteTable(table, 3)); -} + parquet::schema::NodeVector fields; + for (auto i = 0u; i < getColumnNames().size(); ++i) { + fields.push_back(kuzuTypeToParquetType(getColumnNames()[i], getColumnTypes()[i])); + } + auto schema = std::static_pointer_cast( + parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, fields)); -void ParquetWriter::init() { - generateSchema(); - // Choose compression - std::shared_ptr props = - parquet::WriterProperties::Builder().compression(arrow::Compression::SNAPPY)->build(); - // Opt to store Arrow schema for easier reads back into Arrow - std::shared_ptr arrowProps = - parquet::ArrowWriterProperties::Builder().store_schema()->build(); - auto result = parquet::arrow::FileWriter::Open(*schema, - arrow::default_memory_pool(), outFile, - props, arrowProps); +// auto inner_field = parquet::schema::PrimitiveNode::Make("item", parquet::Repetition::REPEATED, parquet::Type::INT64); +// auto list_group = parquet::schema::GroupNode::Make("values", parquet::Repetition::REQUIRED, {inner_field}); +// auto schema = std::static_pointer_cast( parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, {list_group})); +// +// auto value_field = parquet::schema::PrimitiveNode::Make("value", parquet::Repetition::REPEATED, parquet::Type::INT64); +// auto inner_list = parquet::schema::GroupNode::Make("inner_list", parquet::Repetition::REPEATED, {value_field}); +// auto outer_list = parquet::schema::GroupNode::Make("outer_list", parquet::Repetition::REQUIRED, {inner_list}); +// auto schema = std::static_pointer_cast(parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, {outer_list})); + + +// auto value_field = parquet::schema::PrimitiveNode::Make("value", parquet::Repetition::REPEATED, parquet::Type::INT64); +// auto list_group = parquet::schema::GroupNode::Make("list", parquet::Repetition::REPEATED, {value_field}); +// auto schema = std::static_pointer_cast(parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, {list_group})); + + // kind of work +// auto int_node = parquet::schema::PrimitiveNode::Make("", parquet::Repetition::REPEATED, parquet::Type::INT64); +// auto inner_list = parquet::schema::GroupNode::Make("inner_list", parquet::Repetition::REPEATED, {int_node}); +// auto schema = std::static_pointer_cast(parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, {inner_list})); + + +// auto inner_list = parquet::schema::GroupNode::Make("list", parquet::Repetition::REPEATED, +// {parquet::schema::PrimitiveNode::Make("element", parquet::Repetition::REQUIRED, parquet::Type::INT64)}); +// auto schema = std::static_pointer_cast(parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, {inner_list})); + +// auto inner_element = parquet::schema::PrimitiveNode::Make("item", parquet::Repetition::REQUIRED, parquet::Type::INT64); +// auto inner_list = parquet::schema::GroupNode::Make("list", parquet::Repetition::REPEATED, {inner_element}); +// auto element = parquet::schema::GroupNode::Make("element", parquet::Repetition::REQUIRED, {inner_list}, parquet::ConvertedType::LIST); +// auto list = parquet::schema::GroupNode::Make("list", parquet::Repetition::REPEATED, {element}); +// auto schema = std::static_pointer_cast( parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, {list})); + +// auto int64_node = parquet::schema::PrimitiveNode::Make("element", parquet::Repetition::OPTIONAL, parquet::Type::INT64, parquet::ConvertedType::NONE); +// auto inner_list_group = parquet::schema::GroupNode::Make("list", parquet::Repetition::REPEATED, {int64_node}); +// auto inner_element_group = parquet::schema::GroupNode::Make("element", parquet::Repetition::OPTIONAL, {inner_list_group}); +// auto outer_list_group = parquet::schema::GroupNode::Make("list", parquet::Repetition::REPEATED, {inner_element_group}); +// auto root_node = parquet::schema::GroupNode::Make("my_list_of_lists", parquet::Repetition::OPTIONAL, {outer_list_group}, parquet::ConvertedType::LIST); +// auto schema = std::static_pointer_cast( parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, {root_node})); + +// auto inner_item = parquet::schema::PrimitiveNode::Make("item", parquet::Repetition::OPTIONAL, parquet::Type::INT64); +// auto outer_list_group = parquet::schema::GroupNode::Make("list", parquet::Repetition::REPEATED, {inner_item}); +// auto root = parquet::schema::GroupNode::Make("column9", parquet::Repetition::OPTIONAL, {outer_list_group}, parquet::ConvertedType::LIST); +// auto schema = std::static_pointer_cast( parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, {root})); + +// Working as expected: +// auto inner_item = parquet::schema::PrimitiveNode::Make("item", parquet::Repetition::OPTIONAL, parquet::Type::INT64); +// auto inner_list_group = parquet::schema::GroupNode::Make("list", parquet::Repetition::REPEATED, {inner_item}); +// auto outer_item_group = parquet::schema::GroupNode::Make("item", parquet::Repetition::OPTIONAL, {inner_list_group}, parquet::ConvertedType::LIST); +// auto outer_list_group = parquet::schema::GroupNode::Make("list", parquet::Repetition::REPEATED, {outer_item_group}); +// auto root = parquet::schema::GroupNode::Make("column9", parquet::Repetition::OPTIONAL, {outer_list_group}, parquet::ConvertedType::LIST); +// auto schema = std::static_pointer_cast( parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, {root})); + + +// trying +// auto inner_field = parquet::schema::PrimitiveNode::Make("value", parquet::Repetition::REPEATED, parquet::Type::INT64); +// auto list_group = parquet::schema::GroupNode::Make("item", parquet::Repetition::OPTIONAL, {inner_field}, parquet::ConvertedType::LIST); +// auto group = parquet::schema::GroupNode::Make("list", parquet::Repetition::REPEATED, {list_group}); +// auto schema = std::static_pointer_cast( parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, {group})); + +// int64_t values[] = {1,2,3,4}; +// int16_t definition_levels[] = {5,5,5,5}; +// int16_t repetition_levels[] = {0,2,1,2}; +// - // TODO (Rui) : check value or die - fileWriter = std::move(result.ValueOrDie()); +// required group field_id=-1 schema { 0 +// optional group field_id=-1 column9 (List) { 1 +// +// repeated group field_id=-1 list { 2 +// +// optional group field_id=-1 item (List) { 3 +// repeated group field_id=-1 list { 4 +// optional int64 field_id=-1 item; 5 +// } +// } +// +// } +// +// } +// } + + +// A list is represented by: +// required group field_id=-1 schema { +// optional group field_id=-1 list (List) { +// repeated group field_id=-1 { +// optional int64 field_id=-1 item; +// } +// } +// } +// +// a double list is: +// +// required group field_id=-1 schema { +// optional group field_id=-1 list (List) { +// repeated group field_id=-1 { +// optional group field_id=-1 (List) { +// repeated group field_id=-1 { +// optional int64 field_id=-1 item; +// } +// } +// } +// } +// } + + + // single list +// auto item = parquet::schema::PrimitiveNode::Make("item", parquet::Repetition::OPTIONAL, parquet::Type::INT64); +// auto repeated_group = parquet::schema::GroupNode::Make("", parquet::Repetition::REPEATED, {item}); +// auto list = parquet::schema::GroupNode::Make("list", parquet::Repetition::OPTIONAL, {repeated_group}, parquet::LogicalType::List()); +// auto schema = std::static_pointer_cast( parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, {list})); + + // 2-level list +// auto item = parquet::schema::PrimitiveNode::Make("item", parquet::Repetition::OPTIONAL, parquet::Type::INT64); // common +// // encapsulate optional + repeaated +// auto repeated = parquet::schema::GroupNode::Make("", parquet::Repetition::REPEATED, {item}); +// auto optional = parquet::schema::GroupNode::Make("", parquet::Repetition::OPTIONAL, {repeated}, parquet::LogicalType::List()); +// +// auto repeated_group = parquet::schema::GroupNode::Make("", parquet::Repetition::REPEATED, {optional}); +// auto list = parquet::schema::GroupNode::Make("list", parquet::Repetition::OPTIONAL, {repeated_group}, parquet::LogicalType::List()); +// auto schema = std::static_pointer_cast( parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, {list})); + parquet::schema::PrintSchema(schema.get(),std::cout); - // field can be either a Node or a GroupNode - // https://github.com/apache/parquet-cpp/blob/master/src/parquet/schema.h -// std::vector> fields; -// for (auto i = 0u; i < getColumnNames().size(); ++i) { -// fields.push_back(kuzuTypeToParquetType(getColumnNames()[i], getColumnTypes()[i])); -// } -// auto schema = std::static_pointer_cast( -// parquet::schema::GroupNode::Make("schema", parquet::Repetition::REQUIRED, fields)); -// -// parquet::WriterProperties::Builder builder; -// builder.compression(parquet::Compression::SNAPPY); -// std::shared_ptr props = builder.build(); -// fileWriter = parquet::ParquetFileWriter::Open(outFile, schema, props); + parquet::WriterProperties::Builder builder; + builder.compression(parquet::Compression::SNAPPY); + std::shared_ptr props = builder.build(); + fileWriter = parquet::ParquetFileWriter::Open(outFile, schema, props); } +void ParquetWriter::writeValues(std::vector& outputVectors) { + if (outputVectors.size() == 0) { + return; + } + rowWriter = fileWriter->AppendRowGroup(); + for (auto i = 0u; i < outputVectors.size(); i++) { + assert(outputVectors[i]->state->isFlat()); + writeParquetBatch(outputVectors[i]); + } +} +void ParquetWriter::writeParquetBatch(ValueVector* vector) { + auto selPos = vector->state->selVector->selectedPositions[0]; + switch (vector->dataType.getLogicalTypeID()) { + case LogicalTypeID::BOOL: { + parquet::BoolWriter* boolWriter = + static_cast(rowWriter->NextColumn()); + bool value = vector->getValue(selPos); + std::cout << "Write bool: " << value << std::endl; + boolWriter->WriteBatch(1, nullptr, nullptr, &value); + } break; + case LogicalTypeID::INT16: { + std::cout << "Write int16" << std::endl; + parquet::Int32Writer* int32Writer = + static_cast(rowWriter->NextColumn()); + auto value = static_cast(vector->getValue(selPos)); + int32Writer->WriteBatch(1, nullptr, nullptr, &value); + } break; + case LogicalTypeID::INT32: { + std::cout << "Write int32" << std::endl; + parquet::Int32Writer* int32Writer = + static_cast(rowWriter->NextColumn()); + auto value = vector->getValue(selPos); + int32Writer->WriteBatch(1, nullptr, nullptr, &value); + } break; + case LogicalTypeID::INT64: { + std::cout << "Write int64" << std::endl; + parquet::Int64Writer* int64Writer = + static_cast(rowWriter->NextColumn()); + auto value = vector->getValue(selPos); + int64Writer->WriteBatch(1, nullptr, nullptr, &value); + } break; + case LogicalTypeID::DOUBLE: { + std::cout << "Write double" << std::endl; + parquet::DoubleWriter* doubleWriter = + static_cast(rowWriter->NextColumn()); + auto value = vector->getValue(selPos); + doubleWriter->WriteBatch(1, nullptr, nullptr, &value); + } break; + case LogicalTypeID::FLOAT: { + std::cout << "Write float" << std::endl; + parquet::FloatWriter* floatWriter = + static_cast(rowWriter->NextColumn()); + auto value = vector->getValue(selPos); + floatWriter->WriteBatch(1, nullptr, nullptr, &value); + } break; + case LogicalTypeID::DATE: { + std::cout << "Write date" << std::endl; + parquet::Int32Writer* int32Writer = + static_cast(rowWriter->NextColumn()); + auto date = vector->getValue(selPos); + int32Writer->WriteBatch(1, nullptr, nullptr, &date.days); + } break; + case LogicalTypeID::TIMESTAMP: { + std::cout << "Write timestamp" << std::endl; + parquet::Int64Writer* int64Writer = + static_cast(rowWriter->NextColumn()); + auto timestamp = vector->getValue(selPos); + std::cout << "Value: " << timestamp.value << std::endl; + int64Writer->WriteBatch(1, nullptr, nullptr, ×tamp.value); + } break; + case LogicalTypeID::INTERVAL: { + // ... + } break; + case LogicalTypeID::STRING: { + auto valueRead = vector->getValue(selPos); + parquet::ByteArrayWriter* baWriter = + static_cast(rowWriter->NextColumn()); + parquet::ByteArray valueToWrite; + valueToWrite.ptr = &valueRead.getData()[0]; + valueToWrite.len = valueRead.len; + baWriter->WriteBatch(1, nullptr, nullptr, &valueToWrite); + } break; + case LogicalTypeID::INTERNAL_ID: { + // ... + } break; + case LogicalTypeID::FIXED_LIST: { + // ... + } break; + case LogicalTypeID::STRUCT: { + // std::vector values; +// extractStruct(vector, values); + // auto fields = StructType::getFields(&vector->dataType); -// Parquet Physical Types: -// BOOLEAN: 1 bit boolean -// INT32: 32 bit signed ints -// INT64: 64 bit signed ints -// INT96: 96 bit signed ints -// FLOAT: IEEE 32-bit floating point values -// DOUBLE: IEEE 64-bit floating point values -// BYTE_ARRAY: arbitrarily long byte arrays. -// https://github.com/apache/parquet-cpp/blob/master/src/parquet/column_writer.h -/* - -std::shared_ptr ParquetWriter::generateTable() { - arrow::Int64Builder i64builder; - PARQUET_THROW_NOT_OK(i64builder.AppendValues({1, 2, 3, 4, 5})); - std::shared_ptr i64array; - PARQUET_THROW_NOT_OK(i64builder.Finish(&i64array)); +// for (auto i = 0u; i < fields.size(); ++i) { +// auto fieldVector = StructVector::getFieldVector(vector, i); +// writeParquetBatch(fieldVector.get()); +// } + } break; + case LogicalTypeID::VAR_LIST: { + std::cout << "Write list" << std::endl; - arrow::StringBuilder strbuilder; - PARQUET_THROW_NOT_OK(strbuilder.Append("some")); - PARQUET_THROW_NOT_OK(strbuilder.Append("string")); - PARQUET_THROW_NOT_OK(strbuilder.Append("content")); - PARQUET_THROW_NOT_OK(strbuilder.Append("in")); - PARQUET_THROW_NOT_OK(strbuilder.Append("rows")); - std::shared_ptr strarray; - PARQUET_THROW_NOT_OK(strbuilder.Finish(&strarray)); + auto list = vector->getValue(selPos); + ParquetBatch batch; + extractList(list, vector, batch); - std::shared_ptr schema = arrow::schema( - {arrow::field("int", arrow::int64()), arrow::field("str", arrow::utf8())}); +// batch.values = {1,2,3,4}; +// - return arrow::Table::Make(schema, {i64array, strarray}); -} +// [1,2,3,4] +// batch.definitionLevels = {3,3,3,3}; +// batch.repetitionLevels = {0,1,1,1}; + +// [[1,2],[3,4]] +// batch.definitionLevels = {5,5,5,5}; +// batch.repetitionLevels = {0,2,1,2}; + +// [[1,2,3,4]] +// required group field_id=-1 schema { 0 +// optional group field_id=-1 list (List) { 1 +// repeated group field_id=-1 { 2 +// optional group field_id=-1 list (List) { 3 +// repeated group field_id=-1 { 4 +// optional int64 field_id=-1 child (Int(bitWidth=64, isSigned=true)); +// } +// } +// } +// } +// } +// batch.definitionLevels = {5,5,5,5}; +// batch.repetitionLevels = {0,2,2,2}; + +// [[[1,2,3,4]]] +// required group field_id=-1 schema { +// optional group field_id=-1 list (List) { +// repeated group field_id=-1 { +// optional group field_id=-1 list (List) { +// repeated group field_id=-1 { +// optional group field_id=-1 list (List) { +// repeated group field_id=-1 { +// optional int64 field_id=-1 child (Int(bitWidth=64, isSigned=true)); +// } +// } +// } +// } +// } +// } +// } +// batch.definitionLevels = {7,7,7,7}; +// batch.repetitionLevels = {0,3,3,3}; + +// [[[1,2],[3,4]]] +// batch.definitionLevels = {7,7,7,7}; +// batch.repetitionLevels = {0,3,1,3}; + + parquet::Int64Writer* colWriterInt = + static_cast(rowWriter->NextColumn()); + + colWriterInt->WriteBatch( + batch.definitionLevels.size(), batch.definitionLevels.data(), batch.repetitionLevels.data(), batch.values.data()); + + +// working with that nested list +// int64_t values[] = {1,2,3,4}; +// int16_t definition_levels[] = {5,5,5,5}; +// int16_t repetition_levels[] = {0,2,1,2}; + +// int64_t values[] = {1,2,3,4}; + + // single list +// int16_t definition_levels[] = {3,3,3,3}; +// int16_t repetition_levels[] = {0,3,3,3}; + + // int16_t definition_levels[] = {5,5,5,5}; + // int16_t repetition_levels[] = {0,2,2,2}; + + // parquet::Int64Writer* col_writer = static_cast(rowWriter->NextColumn()); + // col_writer->WriteBatch(4, definition_levels, repetition_levels, values); + + + +// auto values = ListVector::getListValues(vector, list); +// auto childType = VarListType::getChildType(&vector->dataType); +// if (LogicalTypeUtils::isPrimitive(*childType)) { +// std::vector intValues(list.size); +// std::vector repetitionLevels(list.size, 1); +// std::vector definitionLevels(list.size, 1); +// repetitionLevels[0] = 0; +// +// for (auto i = 0u; i < list.size; ++i) { +// intValues[i] = *reinterpret_cast(values); +// values += ListVector::getDataVector(vector)->getNumBytesPerValue(); +// } +// parquet::Int64Writer* colWriterInt = +// static_cast(rowWriter->NextColumn()); +// colWriterInt->WriteBatch( +// list.size, definitionLevels.data(), repetitionLevels.data(), intValues.data()); +// } -void* ParquetWriter::getValueToWrite(common::ValueVector* valueVector, uint32_t selPos) { - switch (valueVector->dataType.getLogicalTypeID()) { - case LogicalTypeID::BOOL: - return &valueVector->getValue(selPos); - case LogicalTypeID::INT16: - return &valueVector->getValue(selPos); - case LogicalTypeID::INT32: - return &valueVector->getValue(selPos); - case LogicalTypeID::INT64: - return &valueVector->getValue(selPos); - case LogicalTypeID::FLOAT: - return &valueVector->getValue(selPos); - case LogicalTypeID::DOUBLE: - return &valueVector->getValue(selPos); - case LogicalTypeID::DATE: - return &valueVector->getValue(selPos); - case LogicalTypeID::TIMESTAMP: - return &valueVector->getValue(selPos); - case LogicalTypeID::INTERVAL: - return &valueVector->getValue(selPos); - case LogicalTypeID::STRING: - return &valueVector->getValue(selPos); + } break; default: - throw RuntimeException("Unsupported type for Parquet datatype"); + NotImplementedException("ParquetWriter::writeValue"); } } -void ParquetWriter::writeList(LogicalTypeID type, void* value) { +// List example: +// +// [[[1,2,3],[4,5,6]],[[7],[8,9],[10,11]]] (values) +// 0,2,2 1,2,2 0 1,2 1,2 (repetition levels) +// +// currentElementIndex +// 0,1,2, 0,1,2 0 0,1 0,1 +// parentElementIndex +// 0,0,0 1,1,1 0 1,1 2,2 +// depth +// 2,2,2 2,2,2 2 2,2 2,2 +// int ParquetWriter::getRepetitionLevel(int currentElementIndex, int parentElementIndex, int depth) { +// if (currentElementIndex == 0) { +// return (parentElementIndex == 0) ? 0 : depth - 1; +// } +// if (depth == 0) { +// return 1; +// } +// return depth; +// } + + +// [1,2,3,4] +// 0,1,1,1 +// +// [[1,2],[3,4]] +// 0,2, 1,2 +// +// [[[1,2],[3,4]]] +// 0,3 1,3 +int ParquetWriter::getRepetitionLevel(int currentElementIndex, int parentElementIndex, int depth) { + if (currentElementIndex == 0) { + return (parentElementIndex == 0) ? 0 : 1; + } + if (depth == 0) { + return 1; + } + return depth + 1; } -void ParquetWriter::writeValue(LogicalTypeID type, void* value) { - switch (type) { - case LogicalTypeID::BOOL: { - parquet::BoolWriter* boolWriter = - static_cast(rowWriter->NextColumn()); - boolWriter->WriteBatch(1, nullptr, nullptr, &value); - } +template +void ParquetWriter::castValueToVector( + const LogicalType& dataType, uint8_t* value, ValueVector* vector, ParquetBatch &parquetBatch, int currentElementIndex, int parentElementIndex, int depth) { + auto valueVector = reinterpret_cast(vector); + switch (dataType.getLogicalTypeID()) { + case LogicalTypeID::INT64: { + int repetitionLevel = getRepetitionLevel(currentElementIndex, parentElementIndex, depth); + int definitionLevel = 3 + depth * 2; + + std::cout << "Definition level: " << definitionLevel; + std::cout << " | Repetition level: " << repetitionLevel << " | " << "Value: " << *reinterpret_cast(value) << std::endl; + parquetBatch.repetitionLevels.push_back(repetitionLevel); + parquetBatch.definitionLevels.push_back(definitionLevel); + parquetBatch.values.push_back(*reinterpret_cast(value)); + + } break; + case LogicalTypeID::VAR_LIST: + extractList(*reinterpret_cast(value), vector, parquetBatch, currentElementIndex, parentElementIndex, depth+1); + break; + case LogicalTypeID::STRUCT: + extractStruct(vector, parquetBatch); break; - - default: - throw RuntimeException("Unsupported type for Parquet datatype"); } } -void ParquetWriter::writeValues(std::vector& outputVectors) { - if (outputVectors.size() == 0) { - return; +// std::vector& definitionLevels, std::vector& repetitionLevels +template +void ParquetWriter::extractList(const list_entry_t list, const ValueVector* vector, ParquetBatch& parquetBatch, int currentElementIndex, int parentElementIndex, int depth) { + auto values = ListVector::getListValues(vector, list); + auto childType = VarListType::getChildType(&vector->dataType); + auto dataVector = ListVector::getDataVector(vector); + for (auto i = 0u; i < list.size; ++i) { + castValueToVector(*childType, values, dataVector, parquetBatch, i, currentElementIndex, depth); + values += ListVector::getDataVector(vector)->getNumBytesPerValue(); } - rowWriter = fileWriter->AppendRowGroup(); - for (auto i = 0u; i < outputVectors.size(); i++) { - assert(outputVectors[i]->state->isFlat()); - auto selPos = outputVectors[i]->state->selVector->selectedPositions[0]; - auto value = getValueToWrite(outputVectors[i], selPos); - writeValue(outputVectors[i]->dataType.getLogicalTypeID(), &value); +} + +template +void ParquetWriter::extractStruct(const ValueVector* vector, ParquetBatch& parquetBatch) { + auto fields = StructType::getFields(&vector->dataType); + for (auto i = 0u; i < fields.size(); ++i) { + auto fieldVector = StructVector::getFieldVector(vector, i); + castValueToVector(fieldVector->dataType, fieldVector->getData(), fieldVector.get(), parquetBatch); } } -void ParquetWriter::writeValues(std::vector& outputVectors) { + + +// input ... +// output: +// - repetitionLevels array +// - values: array of int64_t, k_string_t etc... + +// template +// void ParquetWriter::extractValues(const ValueVector* vector, list_entry_t list, std::vector& result) { +// auto values = ListVector::getListValues(vector, list); +// auto childType = VarListType::getChildType(&vector->dataType); +// for (auto i = 0u; i < list.size; ++i) { +// if (childType->getLogicalTypeID() == LogicalTypeID::VAR_LIST) { +// auto value = *reinterpret_cast(values); +// auto dataVector = ListVector::getDataVector(vector); +// extractValues(dataVector, value, result); +// } else { +// auto value = *reinterpret_cast(values); +// result.push_back(value); +// } +// values += ListVector::getDataVector(vector)->getNumBytesPerValue(); +// } +// } +// + +// std::vector ParquetWriter::getValues(ValueVector* vector, list_entry_t list) { +// auto values = ListVector::getListValues(vector, list); +// auto childType = VarListType::getChildType(&vector->dataType); +// std::vector kValues; +// +// for (auto i = 0u; i < list.size; ++i) { +// kValues.push_back(*reinterpret_cast(values)); +// values += ListVector::getDataVector(vector)->getNumBytesPerValue(); +// // if primitive +// // push value to result +// // else +// // for loop +// // +// } +// +// +// +// if (LogicalTypeUtils::isPrimitive(*childType)) { +// for (auto i = 0u; i < list.size; ++i) { +// kValues.push_back(*reinterpret_cast(values)); +// values += ListVector::getDataVector(vector)->getNumBytesPerValue(); +// } +// } else { +// auto dataVector = ListVector::getDataVector(vector); +// // list type +// for (auto i = 0u; i < list.size; ++i) { +// auto nestedList = *reinterpret_cast(values); +// auto nestedValues = getValues(dataVector, nestedList); +// values += ListVector::getDataVector(vector)->getNumBytesPerValue(); +// kValues.insert(kValues.end(), nestedValues.begin(), nestedValues.end()); +// } +// } +// return kValues; +// } + + +/* +void ParquetWriter::writeValues(std::vector& outputVectors) { if (outputVectors.size() == 0) { return; } @@ -344,6 +719,30 @@ void ParquetWriter::writeValues(std::vector& outputVectors // ... } break; case LogicalTypeID::FIXED_LIST: { + // ... + } break; + case LogicalTypeID::STRUCT: { + int64_t values[1]; + int16_t definition_level = 1; + int16_t repetition_level = 0; + parquet::ColumnWriter* colWriter = rowWriter->NextColumn(); + + auto typed_writer = static_cast(colWriter); + values[0] = 44; // the value for 'first' + // typed_writer->WriteBatch(1, &definition_level, &repetition_level,values); + typed_writer->WriteBatch(1, nullptr, nullptr ,values); + typed_writer->Close(); + + // Write to the "second" column + colWriter = rowWriter->NextColumn(); + typed_writer = static_cast(colWriter); + values[0] = 12; // the value for 'second' + // typed_writer->WriteBatch(1, &definition_level, &repetition_level,values); + typed_writer->WriteBatch(1, nullptr, nullptr, values); + typed_writer->Close(); + + + // ... } break; case LogicalTypeID::VAR_LIST: { @@ -361,7 +760,7 @@ void ParquetWriter::writeValues(std::vector& outputVectors // [[1,2],[3,4,5],[6,7]] // 0,1, 0,1,1, 0,1 (repetition level) // 1,1 1,1,1 1,1 (definition level, all values present at level 1) - // + // // Repetition level explanation: // 0 : create a new level 1 list and mark the element at level 1 // 1 : mark element at level 1 @@ -389,16 +788,22 @@ void ParquetWriter::writeValues(std::vector& outputVectors // 0: [[[1,2,3],[4,5,6],[7]]] (add item at level 2) // 1: [[[1,2,3],[4,5,6],[7],[8]] (new level 1 record) // 2: [[[1,2,3],[4,5,6],[7],[8,9]]] (add item at level 2) - + // child is a VAR_LIST ... // call the same function but passing a different repetition and - - // list_struct_t [[[1,2,3],[4,5,6]],[[7],[8,9]]] 1 (level = 0, element_index = 0, i = 0, size = 1, level = 0) - // list_struct_t [[1,2,3],[4,5,6]] (item 0) 2 (level = 1, element_index = 0, i = 0, size = 2) - // list_struct_t [[7],[8,9]] (item 1) 5 (level = 1, element_index = 1) - // list_struct_t [1,2,3] (item 0, item 0) 3 (level = 2, element_index = 0) - // list_struct_t [4,5,6] (item 0, item 1) 4 (level = 2, elemenent_index = 1) - // list_struct_t [7] (item 1, item 0) 6 (level = 2, element_index = 1) + + // list_struct_t [[[1,2,3],[4,5,6]],[[7],[8,9]]] 1 (level = 0, +element_index = 0, i = 0, size = 1, level = 0) + // list_struct_t [[1,2,3],[4,5,6]] (item 0) 2 (level = 1, +element_index = 0, i = 0, size = 2) + // list_struct_t [[7],[8,9]] (item 1) 5 (level = 1, +element_index = 1) + // list_struct_t [1,2,3] (item 0, item 0) 3 (level = 2, +element_index = 0) + // list_struct_t [4,5,6] (item 0, item 1) 4 (level = 2, +elemenent_index = 1) + // list_struct_t [7] (item 1, item 0) 6 (level = 2, +element_index = 1) // list_struct_t [8,9] (item 1, item 1) // // func (level, element_index) { @@ -417,31 +822,33 @@ void ParquetWriter::writeValues(std::vector& outputVectors // read all values from a var list // return values, def level and rep level - struct parquetNested { - std::vector definitionLevels; - std::vector repetitionLevels; - std::vector values; - } - - ParquetNested parquetNested = getParquetNested(outputVectors[i], 0, 0); - - auto colWriterInt = static_cast*>( - rowWriter->NextColumn()); - colWriterInt->WriteBatch(list.size, defLevels.data(), repLevels.data(), static_cast(intValues.data())); - - ParquetNested getParquetNested(common::ValueVector* valueVector, int level, int elementIndex) { - auto childType = VarListType::getChildType(&outputVectors[i]->dataType); - } +// struct parquetNested { +// int64_t size; +// std::vector definitionLevels; +// std::vector repetitionLevels; +// std::vector values; +// } +// +// ParquetNested parquetNested = getParquetNested(outputVectors[i], 0, 0); +// +// auto colWriterInt = static_cast*>( +// rowWriter->NextColumn()); +// colWriterInt->WriteBatch(list.size, defLevels.data(), repLevels.data(), +static_cast(intValues.data())); +// +// ParquetNested getParquetNested(ValueVector* valueVector, int level, int +elementIndex) { +// auto childType = VarListType::getChildType(&outputVectors[i]->dataType); +// } +// auto list = outputVectors[i]->getValue(selPos); auto values = ListVector::getListValues(outputVectors[i], list); auto childType = VarListType::getChildType(&outputVectors[i]->dataType); auto dataVector = ListVector::getDataVector(outputVectors[i]); - - // child is a primitive type, then this should be enough: - + std::vector repLevels(list.size, 1); std::vector defLevels(list.size, 1); @@ -458,16 +865,12 @@ void ParquetWriter::writeValues(std::vector& outputVectors values += ListVector::getDataVector(outputVectors[i])->getNumBytesPerValue(); } - // int64_t intValues2[] = {1, 2, 3}; // int16_t defLevels2[] = {1, 1, 1}; // Definition levels // int16_t repLevels2[] = {0, 1, 1}; // Repetition levels // colWriterInt->WriteBatch(3, defLevels2, repLevels2, intValues2); } break; - case LogicalTypeID::STRUCT: { - // ... - } break; default: NotImplementedException("ParquetWriter::writeValue"); } @@ -475,101 +878,16 @@ void ParquetWriter::writeValues(std::vector& outputVectors } */ +void ParquetWriter::closeFile() { + rowWriter->Close(); + fileWriter->Close(); + outFile->Close(); +} + } // namespace processor } // namespace kuzu /* -// To Cover: -// field STRING[], field INT64[], field INT32[], field BOOLEAN[], field FLOAT[], field DOUBLE[] ... -// field STRUCT(field1: INT64, field2: STRING, field) ... -// field INT64[][] ... -// -// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md -// https://github.com/apache/arrow/blob/main/cpp/src/parquet/stream_writer.cc -std::shared_ptr ParquetWriter::kuzuTypeToParquetType( - std::string& columnName, const LogicalType& logicalType, int length) { - parquet::Type::type parquetType; - parquet::ConvertedType::type convertedType; - auto repetition = parquet::Repetition::REQUIRED; - - switch (logicalType.getLogicalTypeID()) { - case common::LogicalTypeID::BOOL: - parquetType = parquet::Type::BOOLEAN; - convertedType = parquet::ConvertedType::NONE; - break; - case common::LogicalTypeID::STRING: - parquetType = parquet::Type::BYTE_ARRAY; - convertedType = parquet::ConvertedType::UTF8; - break; - case common::LogicalTypeID::INT64: - parquetType = parquet::Type::INT64; - convertedType = parquet::ConvertedType::INT_64; - break; - case common::LogicalTypeID::INT16: - parquetType = parquet::Type::INT32; - convertedType = parquet::ConvertedType::INT_16; - break; - case common::LogicalTypeID::INT32: - parquetType = parquet::Type::INT32; - convertedType = parquet::ConvertedType::INT_32; - break; - case common::LogicalTypeID::FLOAT: - parquetType = parquet::Type::FLOAT; - convertedType = parquet::ConvertedType::NONE; - break; - case common::LogicalTypeID::DOUBLE: - parquetType = parquet::Type::DOUBLE; - convertedType = parquet::ConvertedType::NONE; - break; - case common::LogicalTypeID::DATE: - parquetType = parquet::Type::INT32; - convertedType = parquet::ConvertedType::DATE; - break; - case common::LogicalTypeID::TIMESTAMP: - parquetType = parquet::Type::INT64; - convertedType = parquet::ConvertedType::TIMESTAMP_MICROS; - break; - case common::LogicalTypeID::INTERVAL: - parquetType = parquet::Type::FIXED_LEN_BYTE_ARRAY; - convertedType = parquet::ConvertedType::INTERVAL; - length = 12; - break; - case LogicalTypeID::INTERNAL_ID: { - // ... - } break; - case LogicalTypeID::FIXED_LIST: { - // ... - } break; - case LogicalTypeID::STRUCT: { - // ... - } break; - case LogicalTypeID::VAR_LIST: { - auto childLogicalType = VarListType::getChildType(&logicalType); - // "sink" node - // if (isPrimitive(childLogicalType)) { - if (childLogicalType->getLogicalTypeID() == LogicalTypeID::INT64) { - parquetType = parquet::Type::INT64; - convertedType = parquet::ConvertedType::INT_64; - repetition = parquet::Repetition::REPEATED; - } else { - // child node = VAR_LIST or FIXED_LIST - // only one node inside a group node - auto node = kuzuTypeToParquetType(columnName, *childLogicalType, length); - auto groupNode = std::static_pointer_cast( - parquet::schema::GroupNode::Make(columnName, parquet::Repetition::REQUIRED, {node})); - return groupNode; - - // node = STRUCT (multiple nodes inside a group node) - // ... - } - } break; - default: - throw RuntimeException("Unsupported type for Parquet datatype"); - } - return parquet::schema::PrimitiveNode::Make( - columnName, repetition, parquetType, convertedType, length); -} - std::string TypeUtils::castValueToArrowArray( const LogicalType& dataType, uint8_t* value, void* vector) { auto valueVector = reinterpret_cast(vector); @@ -629,7 +947,29 @@ arrow::Array ParquetWriter::toArrow(const list_entry_t& val, void* valueVector) return result; } -std::shared_ptr ParquetWriter::buildParquetArray(common::ValueVector* vector) { +std::shared_ptr ParquetWriter::generateTable() { + arrow::Int64Builder i64builder; + PARQUET_THROW_NOT_OK(i64builder.AppendValues({1, 2, 3, 4, 5})); + std::shared_ptr i64array; + PARQUET_THROW_NOT_OK(i64builder.Finish(&i64array)); + + arrow::StringBuilder strbuilder; + PARQUET_THROW_NOT_OK(strbuilder.Append("some")); + PARQUET_THROW_NOT_OK(strbuilder.Append("string")); + PARQUET_THROW_NOT_OK(strbuilder.Append("content")); + PARQUET_THROW_NOT_OK(strbuilder.Append("in")); + PARQUET_THROW_NOT_OK(strbuilder.Append("rows")); + std::shared_ptr strarray; + PARQUET_THROW_NOT_OK(strbuilder.Finish(&strarray)); + + std::shared_ptr schema = arrow::schema( + {arrow::field("int", arrow::int64()), arrow::field("str", arrow::utf8())}); + + return arrow::Table::Make(schema, {i64array, strarray}); +} + + +std::shared_ptr ParquetWriter::buildParquetArray(ValueVector* vector) { std::shared_ptr field; auto selPos = vector->state->selVector->selectedPositions[0]; @@ -641,7 +981,7 @@ std::shared_ptr ParquetWriter::buildParquetArray(common::ValueVect break; //return std::make_shared(); - + case LogicalTypeID::INT64: { arrow::Int64Builder i64builder; auto value = vector->getValue(selPos); @@ -651,6 +991,189 @@ std::shared_ptr ParquetWriter::buildParquetArray(common::ValueVect } } } - + + + + + + + + + + + + + + + + + + + + + + + + +ParquetWriter::ParquetValue ParquetWriter::getParquetValue(common::ValueVector* vector, int16_t +repetitionLevel, int16_t definitionLevel) { ParquetValue +parquetValue{vector->dataType.getLogicalTypeID(), definitionLevel, repetitionLevel}; auto selPos = +vector->state->selVector->selectedPositions[0]; switch (vector->dataType.getLogicalTypeID()) { case +LogicalTypeID::BOOL: case LogicalTypeID::STRING: case LogicalTypeID::INT64: { parquetValue.value = +vector->getData() + vector->getNumBytesPerValue() * selPos; + } + } + return parquetValue; +} + +std::vector ParquetWriter::getParquetValues(common::ValueVector* +vector, int16_t repetitionLevel, int16_t definitionLevel) { ParquetWriter::ParquetValue +parquetValue; std::vector parquetValues; auto selPos = +vector->state->selVector->selectedPositions[0]; switch (vector->dataType.getLogicalTypeID()) { case +LogicalTypeID::VAR_LIST: { auto list = vector->getValue(selPos); auto values = +ListVector::getListValues(vector, list); auto childType = +VarListType::getChildType(&vector->dataType); auto dataVector = ListVector::getDataVector(vector); + // childType is primitive + // getParquetValue here directly... + + if (LogicalTypeUtils::isPrimitive(*childType)) { + for (auto i = 0u; i < list.size; ++i) { + // getParquetValue(values); + values += ListVector::getDataVector(vector)->getNumBytesPerValue(); + int16_t repetitionLevel = 0; + if (i > 0) { + repetitionLevel = 1; + } + ParquetValue parquetValue{childType->getLogicalTypeID(), repetitionLevel, 1}; + parquetValue.value = values; + parquetValues.push_back(parquetValue); + } + } + + // not primitive: either var list or struct + // struct, cast thing + +// for (auto i = 0u; i < list.size; ++i) { +// getParquetValue(values); +// values += ListVector::getDataVector(vector)->getNumBytesPerValue(); + +// intValues[x] = *reinterpret_cast(values); +// std::cout << "Value: " << intValues[x] << std::endl; +// values += ListVector::getDataVector(outputVectors[i])->getNumBytesPerValue(); + +// if (dataVector->isPrimitiveDataType()) { +// parquetValues.push_back(getParquetValue(dataVector.get(), repetitionLevel, +definitionLevel)); +// } else { +// auto p = getParquetValues(dataVector.get(), repetitionLevel, definitionLevel); +// parquetValues.insert(parquetValues.end(), p.begin(), p.end()); +// } +// } + + + } break; + case LogicalTypeID::STRUCT: { + auto val = vector->getValue(selPos); + auto fields = StructType::getFields(&vector->dataType); + for (auto i = 0u; i < fields.size(); ++i) { + auto fieldVector = StructVector::getFieldVector(vector, i); + // is primitive? + if (fieldVector->isPrimitiveDataType()) { + parquetValues.push_back(getParquetValue(fieldVector.get(), repetitionLevel, +definitionLevel)); } else { + // not primitive + auto p = getParquetValues(fieldVector.get(), repetitionLevel, definitionLevel); + // merge into parqueValues + parquetValues.insert(parquetValues.end(), p.begin(), p.end()); + } + } + } break; + } + return parquetValues; +} + +void ParquetWriter::writeBatch(ParquetValue& parquetValue) { + switch (parquetValue.logicalTypeID) { + case LogicalTypeID::INT64: { + parquet::Int64Writer* int64Writer = + static_cast(rowWriter->NextColumn()); +// int64_t *valuePtr = reinterpret_cast(parquetValue.value); +// int64Writer->WriteBatch(1, &parquetValue.repetitionLevel, +&parquetValue.definitionLevel, parquetValue.value); } break; case LogicalTypeID::STRING: { + parquet::ByteArrayWriter* baWriter = + static_cast(rowWriter->NextColumn()); + parquet::ByteArray valueToWrite; + ku_string_t *valuePtr = reinterpret_cast(parquetValue.value); + valueToWrite.ptr = &valuePtr->getData()[0]; + valueToWrite.len = valuePtr->len; + baWriter->WriteBatch(1, nullptr, nullptr, &valueToWrite); + } break; + } +} + + +void ParquetWriter::createBatch(std::vector& parquetValues, + uint16_t& outSize, + std::vector& outDefinitionLevels, + std::vector& outRepetitionLevels, + std::vector& outValues) { + outSize = parquetValues.size(); + for (auto& parquetValue : parquetValues) { + switch (parquetValue.logicalTypeID) { + case LogicalTypeID::INT64: { + int64_t *valuePtr = reinterpret_cast(parquetValue.value); + outValues.push_back(*valuePtr); + } break; +// case LogicalTypeID::STRING: { +// parquet::ByteArrayWriter* baWriter = +// static_cast(rowWriter->NextColumn()); +// parquet::ByteArray valueToWrite; +// ku_string_t *valuePtr = reinterpret_cast(parquetValue.value); +// valueToWrite.ptr = &valuePtr->getData()[0]; +// valueToWrite.len = valuePtr->len; +// baWriter->WriteBatch(1, nullptr, nullptr, &valueToWrite); +// } break; + } + } +} + +// void ParquetWriter::writeValues(std::vector& outputVectors) { +// if (outputVectors.size() == 0) { +// return; +// } +// rowWriter = fileWriter->AppendRowGroup(); +// for (auto& outputVector : outputVectors) { +// assert(outputVector->state->isFlat()); +// +// // TODO (Rui): check if its necessary adding this method to valueVector +// if (outputVector->isPrimitiveDataType()) { +// auto parquetValue = getParquetValue(outputVector, 0, 0); +// writeBatch(parquetValue); +// } else { +// auto parquetValues = getParquetValues(outputVector, 0, 0); +// // list with uint64_t +// uint16_t size; +// std::vector definitionLevels; +// std::vector repetitionLevels; +// std::vector values; +// createBatch(parquetValues, size, definitionLevels, repetitionLevels, values); +// +// +// // struct +// // for (auto& parquetValue : getParquetValues(outputVector, 0, 0)) { +// // writeBatch(parquetValue); +// // } +// } +// } +// } + + +// To Cover: +// field STRING[], field INT64[], field INT32[], field BOOLEAN[], field FLOAT[], field DOUBLE[] ... +// field STRUCT(field1: INT64, field2: STRING, field) ... +// field INT64[][] ... +// +// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md +// https://github.com/apache/arrow/blob/main/cpp/src/parquet/stream_writer.cc */