From 32829cb0ec4082034a776980ec59b7106674dcc3 Mon Sep 17 00:00:00 2001 From: Benjamin Winger Date: Fri, 13 Oct 2023 15:52:53 -0400 Subject: [PATCH] Buffer SerDeser and split into separate classes --- src/catalog/catalog.cpp | 1 - src/catalog/catalog_content.cpp | 48 +++--- src/catalog/node_table_schema.cpp | 20 +-- src/catalog/property.cpp | 23 +-- src/catalog/rdf_graph_schema.cpp | 15 +- src/catalog/rel_table_group_schema.cpp | 12 +- src/catalog/rel_table_schema.cpp | 19 +-- src/catalog/table_schema.cpp | 41 ++--- src/common/CMakeLists.txt | 4 +- src/common/ser_deser.cpp | 27 ---- src/common/serializer/CMakeLists.txt | 10 ++ src/common/serializer/buffered_file.cpp | 21 +++ .../serializer}/buffered_serializer.cpp | 8 +- src/common/serializer/deserializer.cpp | 15 ++ src/common/serializer/file_writer.cpp | 21 +++ src/common/serializer/serializer.cpp | 14 ++ src/common/types/types.cpp | 64 ++++---- src/common/types/value/value.cpp | 79 +++++----- src/function/scalar_macro_function.cpp | 28 ++-- src/include/catalog/catalog_content.h | 8 +- src/include/catalog/node_table_schema.h | 5 +- src/include/catalog/property.h | 8 +- src/include/catalog/rdf_graph_schema.h | 5 +- src/include/catalog/rel_table_group_schema.h | 5 +- src/include/catalog/rel_table_schema.h | 5 +- src/include/catalog/table_schema.h | 10 +- src/include/common/ser_deser.h | 148 ------------------ src/include/common/serializer/buffered_file.h | 83 ++++++++++ .../serializer}/buffered_serializer.h | 12 +- src/include/common/serializer/deserializer.h | 86 ++++++++++ src/include/common/serializer/reader.h | 15 ++ src/include/common/serializer/serializer.h | 86 ++++++++++ src/include/common/serializer/writer.h | 15 ++ src/include/common/types/types.h | 33 ++-- src/include/common/types/value/value.h | 6 +- src/include/function/scalar_macro_function.h | 5 +- .../expression/parsed_case_expression.h | 10 +- .../parser/expression/parsed_expression.h | 11 +- .../expression/parsed_function_expression.h | 4 +- .../expression/parsed_literal_expression.h | 10 +- .../expression/parsed_parameter_expression.h | 4 +- .../expression/parsed_property_expression.h | 8 +- .../expression/parsed_subquery_expression.h | 4 +- .../expression/parsed_variable_expression.h | 9 +- .../writer/parquet/basic_column_writer.h | 8 +- .../writer/parquet/boolean_column_writer.h | 4 +- .../persistent/writer/parquet/column_writer.h | 7 +- .../writer/parquet/parquet_rle_bp_encoder.h | 10 +- .../writer/parquet/standard_column_writer.h | 5 +- .../writer/parquet/string_column_writer.h | 4 +- src/include/storage/stats/metadata_dah_info.h | 5 +- .../storage/stats/node_table_statistics.h | 8 +- .../storage/stats/property_statistics.h | 5 +- .../storage/stats/rel_table_statistics.h | 4 +- src/include/storage/stats/table_statistics.h | 11 +- .../expression/parsed_case_expression.cpp | 31 ++-- src/parser/expression/parsed_expression.cpp | 40 ++--- .../expression/parsed_function_expression.cpp | 15 +- .../expression/parsed_property_expression.cpp | 6 +- .../expression/parsed_variable_expression.cpp | 6 +- .../persistent/writer/parquet/CMakeLists.txt | 1 - .../writer/parquet/basic_column_writer.cpp | 30 ++-- .../writer/parquet/boolean_column_writer.cpp | 6 +- .../writer/parquet/column_writer.cpp | 4 +- .../writer/parquet/parquet_rle_bp_encoder.cpp | 10 +- .../writer/parquet/string_column_writer.cpp | 22 +-- src/storage/stats/metadata_dah_info.cpp | 20 +-- src/storage/stats/node_table_statistics.cpp | 16 +- src/storage/stats/property_statistics.cpp | 11 +- src/storage/stats/rel_table_statistics.cpp | 13 +- src/storage/stats/table_statistics.cpp | 31 ++-- .../stats/table_statistics_collection.cpp | 18 +-- 72 files changed, 816 insertions(+), 600 deletions(-) delete mode 100644 src/common/ser_deser.cpp create mode 100644 src/common/serializer/CMakeLists.txt create mode 100644 src/common/serializer/buffered_file.cpp rename src/{processor/operator/persistent/writer/parquet => common/serializer}/buffered_serializer.cpp (79%) create mode 100644 src/common/serializer/deserializer.cpp create mode 100644 src/common/serializer/file_writer.cpp create mode 100644 src/common/serializer/serializer.cpp delete mode 100644 src/include/common/ser_deser.h create mode 100644 src/include/common/serializer/buffered_file.h rename src/include/{processor/operator/persistent/writer/parquet => common/serializer}/buffered_serializer.h (82%) create mode 100644 src/include/common/serializer/deserializer.h create mode 100644 src/include/common/serializer/reader.h create mode 100644 src/include/common/serializer/serializer.h create mode 100644 src/include/common/serializer/writer.h diff --git a/src/catalog/catalog.cpp b/src/catalog/catalog.cpp index 562806c9b0..2b80211bc2 100644 --- a/src/catalog/catalog.cpp +++ b/src/catalog/catalog.cpp @@ -3,7 +3,6 @@ #include "catalog/node_table_schema.h" #include "catalog/rel_table_group_schema.h" #include "catalog/rel_table_schema.h" -#include "common/ser_deser.h" #include "storage/wal/wal.h" #include "transaction/transaction_action.h" diff --git a/src/catalog/catalog_content.cpp b/src/catalog/catalog_content.cpp index e1955b51b3..2e44ee4cd7 100644 --- a/src/catalog/catalog_content.cpp +++ b/src/catalog/catalog_content.cpp @@ -6,7 +6,9 @@ #include "catalog/rel_table_schema.h" #include "common/exception/catalog.h" #include "common/exception/runtime.h" -#include "common/ser_deser.h" +#include "common/serializer/buffered_file.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" #include "common/string_format.h" #include "common/string_utils.h" #include "storage/storage_utils.h" @@ -154,39 +156,39 @@ void CatalogContent::renameTable(table_id_t tableID, const std::string& newName) void CatalogContent::saveToFile(const std::string& directory, DBFileType dbFileType) { auto catalogPath = StorageUtils::getCatalogFilePath(directory, dbFileType); - auto fileInfo = FileUtils::openFile(catalogPath, O_WRONLY | O_CREAT); - uint64_t offset = 0; - writeMagicBytes(fileInfo.get(), offset); - SerDeser::serializeValue(StorageVersionInfo::getStorageVersion(), fileInfo.get(), offset); - SerDeser::serializeValue(tableSchemas.size(), fileInfo.get(), offset); + Serializer serializer( + std::make_unique(FileUtils::openFile(catalogPath, O_WRONLY | O_CREAT))); + writeMagicBytes(serializer); + serializer.serializeValue(StorageVersionInfo::getStorageVersion()); + serializer.serializeValue(tableSchemas.size()); for (auto& [tableID, tableSchema] : tableSchemas) { - SerDeser::serializeValue(tableID, fileInfo.get(), offset); - tableSchema->serialize(fileInfo.get(), offset); + serializer.serializeValue(tableID); + tableSchema->serialize(serializer); } - SerDeser::serializeValue(nextTableID, fileInfo.get(), offset); - SerDeser::serializeUnorderedMap(macros, fileInfo.get(), offset); + serializer.serializeValue(nextTableID); + serializer.serializeUnorderedMap(macros); } void CatalogContent::readFromFile(const std::string& directory, DBFileType dbFileType) { auto catalogPath = StorageUtils::getCatalogFilePath(directory, dbFileType); - auto fileInfo = FileUtils::openFile(catalogPath, O_RDONLY); - uint64_t offset = 0; - validateMagicBytes(fileInfo.get(), offset); + Deserializer deserializer( + std::make_unique(FileUtils::openFile(catalogPath, O_RDONLY))); + validateMagicBytes(deserializer); storage_version_t savedStorageVersion; - SerDeser::deserializeValue(savedStorageVersion, fileInfo.get(), offset); + deserializer.deserializeValue(savedStorageVersion); validateStorageVersion(savedStorageVersion); uint64_t numTables; - SerDeser::deserializeValue(numTables, fileInfo.get(), offset); + deserializer.deserializeValue(numTables); table_id_t tableID; for (auto i = 0u; i < numTables; i++) { - SerDeser::deserializeValue(tableID, fileInfo.get(), offset); - tableSchemas[tableID] = TableSchema::deserialize(fileInfo.get(), offset); + deserializer.deserializeValue(tableID); + tableSchemas[tableID] = TableSchema::deserialize(deserializer); } for (auto& [tableID_, tableSchema] : tableSchemas) { tableNameToIDMap[tableSchema->tableName] = tableID_; } - SerDeser::deserializeValue(nextTableID, fileInfo.get(), offset); - SerDeser::deserializeUnorderedMap(macros, fileInfo.get(), offset); + deserializer.deserializeValue(nextTableID); + deserializer.deserializeUnorderedMap(macros); } ExpressionType CatalogContent::getFunctionType(const std::string& name) const { @@ -239,11 +241,11 @@ void CatalogContent::validateStorageVersion(storage_version_t savedStorageVersio } } -void CatalogContent::validateMagicBytes(FileInfo* fileInfo, offset_t& offset) { +void CatalogContent::validateMagicBytes(Deserializer& deserializer) { auto numMagicBytes = strlen(StorageVersionInfo::MAGIC_BYTES); uint8_t magicBytes[4]; for (auto i = 0u; i < numMagicBytes; i++) { - SerDeser::deserializeValue(magicBytes[i], fileInfo, offset); + deserializer.deserializeValue(magicBytes[i]); } if (memcmp(magicBytes, StorageVersionInfo::MAGIC_BYTES, numMagicBytes) != 0) { throw RuntimeException( @@ -251,10 +253,10 @@ void CatalogContent::validateMagicBytes(FileInfo* fileInfo, offset_t& offset) { } } -void CatalogContent::writeMagicBytes(FileInfo* fileInfo, offset_t& offset) { +void CatalogContent::writeMagicBytes(Serializer& serializer) { auto numMagicBytes = strlen(StorageVersionInfo::MAGIC_BYTES); for (auto i = 0u; i < numMagicBytes; i++) { - SerDeser::serializeValue(StorageVersionInfo::MAGIC_BYTES[i], fileInfo, offset); + serializer.serializeValue(StorageVersionInfo::MAGIC_BYTES[i]); } } diff --git a/src/catalog/node_table_schema.cpp b/src/catalog/node_table_schema.cpp index 4480767343..211e4b1fc9 100644 --- a/src/catalog/node_table_schema.cpp +++ b/src/catalog/node_table_schema.cpp @@ -1,26 +1,26 @@ #include "catalog/node_table_schema.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" using namespace kuzu::common; namespace kuzu { namespace catalog { -void NodeTableSchema::serializeInternal(FileInfo* fileInfo, uint64_t& offset) { - SerDeser::serializeValue(primaryKeyPropertyID, fileInfo, offset); - SerDeser::serializeUnorderedSet(fwdRelTableIDSet, fileInfo, offset); - SerDeser::serializeUnorderedSet(bwdRelTableIDSet, fileInfo, offset); +void NodeTableSchema::serializeInternal(Serializer& serializer) { + serializer.serializeValue(primaryKeyPropertyID); + serializer.serializeUnorderedSet(fwdRelTableIDSet); + serializer.serializeUnorderedSet(bwdRelTableIDSet); } -std::unique_ptr NodeTableSchema::deserialize( - FileInfo* fileInfo, uint64_t& offset) { +std::unique_ptr NodeTableSchema::deserialize(Deserializer& deserializer) { property_id_t primaryKeyPropertyID; std::unordered_set fwdRelTableIDSet; std::unordered_set bwdRelTableIDSet; - SerDeser::deserializeValue(primaryKeyPropertyID, fileInfo, offset); - SerDeser::deserializeUnorderedSet(fwdRelTableIDSet, fileInfo, offset); - SerDeser::deserializeUnorderedSet(bwdRelTableIDSet, fileInfo, offset); + deserializer.deserializeValue(primaryKeyPropertyID); + deserializer.deserializeUnorderedSet(fwdRelTableIDSet); + deserializer.deserializeUnorderedSet(bwdRelTableIDSet); return std::make_unique( primaryKeyPropertyID, fwdRelTableIDSet, bwdRelTableIDSet); } diff --git a/src/catalog/property.cpp b/src/catalog/property.cpp index 55481e9b6d..59f4a6672b 100644 --- a/src/catalog/property.cpp +++ b/src/catalog/property.cpp @@ -1,27 +1,28 @@ #include "catalog/property.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" using namespace kuzu::common; namespace kuzu { namespace catalog { -void Property::serialize(FileInfo* fileInfo, uint64_t& offset) const { - SerDeser::serializeValue(name, fileInfo, offset); - dataType->serialize(fileInfo, offset); - SerDeser::serializeValue(propertyID, fileInfo, offset); - SerDeser::serializeValue(tableID, fileInfo, offset); +void Property::serialize(Serializer& serializer) const { + serializer.serializeValue(name); + dataType->serialize(serializer); + serializer.serializeValue(propertyID); + serializer.serializeValue(tableID); } -std::unique_ptr Property::deserialize(FileInfo* fileInfo, uint64_t& offset) { +std::unique_ptr Property::deserialize(Deserializer& deserializer) { std::string name; property_id_t propertyID; table_id_t tableID; - SerDeser::deserializeValue(name, fileInfo, offset); - auto dataType = LogicalType::deserialize(fileInfo, offset); - SerDeser::deserializeValue(propertyID, fileInfo, offset); - SerDeser::deserializeValue(tableID, fileInfo, offset); + deserializer.deserializeValue(name); + auto dataType = LogicalType::deserialize(deserializer); + deserializer.deserializeValue(propertyID); + deserializer.deserializeValue(tableID); return std::make_unique(name, std::move(dataType), propertyID, tableID); } diff --git a/src/catalog/rdf_graph_schema.cpp b/src/catalog/rdf_graph_schema.cpp index 5aa1e342f1..ad15562fd8 100644 --- a/src/catalog/rdf_graph_schema.cpp +++ b/src/catalog/rdf_graph_schema.cpp @@ -1,22 +1,23 @@ #include "catalog/rdf_graph_schema.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" using namespace kuzu::common; namespace kuzu { namespace catalog { -void RdfGraphSchema::serializeInternal(FileInfo* fileInfo, uint64_t& offset) { - SerDeser::serializeValue(nodeTableID, fileInfo, offset); - SerDeser::serializeValue(relTableID, fileInfo, offset); +void RdfGraphSchema::serializeInternal(Serializer& serializer) { + serializer.serializeValue(nodeTableID); + serializer.serializeValue(relTableID); } -std::unique_ptr RdfGraphSchema::deserialize(FileInfo* fileInfo, uint64_t& offset) { +std::unique_ptr RdfGraphSchema::deserialize(Deserializer& deserializer) { table_id_t nodeTableID; table_id_t relTableID; - SerDeser::deserializeValue(nodeTableID, fileInfo, offset); - SerDeser::deserializeValue(relTableID, fileInfo, offset); + deserializer.deserializeValue(nodeTableID); + deserializer.deserializeValue(relTableID); return std::make_unique(nodeTableID, relTableID); } diff --git a/src/catalog/rel_table_group_schema.cpp b/src/catalog/rel_table_group_schema.cpp index f8cd604854..1f7a319f96 100644 --- a/src/catalog/rel_table_group_schema.cpp +++ b/src/catalog/rel_table_group_schema.cpp @@ -1,20 +1,20 @@ #include "catalog/rel_table_group_schema.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" using namespace kuzu::common; namespace kuzu { namespace catalog { -void RelTableGroupSchema::serializeInternal(FileInfo* fileInfo, uint64_t& offset) { - SerDeser::serializeVector(relTableIDs, fileInfo, offset); +void RelTableGroupSchema::serializeInternal(Serializer& serializer) { + serializer.serializeVector(relTableIDs); } -std::unique_ptr RelTableGroupSchema::deserialize( - FileInfo* fileInfo, uint64_t& offset) { +std::unique_ptr RelTableGroupSchema::deserialize(Deserializer& deserializer) { std::vector relTableIDs; - SerDeser::deserializeVector(relTableIDs, fileInfo, offset); + deserializer.deserializeVector(relTableIDs); return std::make_unique(std::move(relTableIDs)); } diff --git a/src/catalog/rel_table_schema.cpp b/src/catalog/rel_table_schema.cpp index fcce32d10b..3a876ada6a 100644 --- a/src/catalog/rel_table_schema.cpp +++ b/src/catalog/rel_table_schema.cpp @@ -1,7 +1,8 @@ #include "catalog/rel_table_schema.h" #include "common/exception/catalog.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" using namespace kuzu::common; @@ -40,19 +41,19 @@ std::string getRelMultiplicityAsString(RelMultiplicity relMultiplicity) { } } -void RelTableSchema::serializeInternal(FileInfo* fileInfo, uint64_t& offset) { - SerDeser::serializeValue(relMultiplicity, fileInfo, offset); - SerDeser::serializeValue(srcTableID, fileInfo, offset); - SerDeser::serializeValue(dstTableID, fileInfo, offset); +void RelTableSchema::serializeInternal(Serializer& serializer) { + serializer.serializeValue(relMultiplicity); + serializer.serializeValue(srcTableID); + serializer.serializeValue(dstTableID); } -std::unique_ptr RelTableSchema::deserialize(FileInfo* fileInfo, uint64_t& offset) { +std::unique_ptr RelTableSchema::deserialize(Deserializer& deserializer) { RelMultiplicity relMultiplicity; table_id_t srcTableID; table_id_t dstTableID; - SerDeser::deserializeValue(relMultiplicity, fileInfo, offset); - SerDeser::deserializeValue(srcTableID, fileInfo, offset); - SerDeser::deserializeValue(dstTableID, fileInfo, offset); + deserializer.deserializeValue(relMultiplicity); + deserializer.deserializeValue(srcTableID); + deserializer.deserializeValue(dstTableID); return std::make_unique(relMultiplicity, srcTableID, dstTableID); } diff --git a/src/catalog/table_schema.cpp b/src/catalog/table_schema.cpp index ed49238347..dced516469 100644 --- a/src/catalog/table_schema.cpp +++ b/src/catalog/table_schema.cpp @@ -8,7 +8,8 @@ #include "common/exception/internal.h" #include "common/exception/not_implemented.h" #include "common/exception/runtime.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" #include "common/string_format.h" #include "common/string_utils.h" @@ -76,42 +77,42 @@ void TableSchema::renameProperty(property_id_t propertyID, const std::string& ne // LCOV_EXCL_END } -void TableSchema::serialize(FileInfo* fileInfo, uint64_t& offset) { - SerDeser::serializeValue(tableName, fileInfo, offset); - SerDeser::serializeValue(tableID, fileInfo, offset); - SerDeser::serializeValue(tableType, fileInfo, offset); - SerDeser::serializeVectorOfPtrs(properties, fileInfo, offset); - SerDeser::serializeValue(comment, fileInfo, offset); - SerDeser::serializeValue(nextPropertyID, fileInfo, offset); - serializeInternal(fileInfo, offset); +void TableSchema::serialize(Serializer& serializer) { + serializer.serializeValue(tableName); + serializer.serializeValue(tableID); + serializer.serializeValue(tableType); + serializer.serializeVectorOfPtrs(properties); + serializer.serializeValue(comment); + serializer.serializeValue(nextPropertyID); + serializeInternal(serializer); } -std::unique_ptr TableSchema::deserialize(FileInfo* fileInfo, uint64_t& offset) { +std::unique_ptr TableSchema::deserialize(Deserializer& deserializer) { std::string tableName; table_id_t tableID; TableType tableType; std::vector> properties; std::string comment; property_id_t nextPropertyID; - SerDeser::deserializeValue(tableName, fileInfo, offset); - SerDeser::deserializeValue(tableID, fileInfo, offset); - SerDeser::deserializeValue(tableType, fileInfo, offset); - SerDeser::deserializeVectorOfPtrs(properties, fileInfo, offset); - SerDeser::deserializeValue(comment, fileInfo, offset); - SerDeser::deserializeValue(nextPropertyID, fileInfo, offset); + deserializer.deserializeValue(tableName); + deserializer.deserializeValue(tableID); + deserializer.deserializeValue(tableType); + deserializer.deserializeVectorOfPtrs(properties); + deserializer.deserializeValue(comment); + deserializer.deserializeValue(nextPropertyID); std::unique_ptr result; switch (tableType) { case TableType::NODE: { - result = NodeTableSchema::deserialize(fileInfo, offset); + result = NodeTableSchema::deserialize(deserializer); } break; case TableType::REL: { - result = RelTableSchema::deserialize(fileInfo, offset); + result = RelTableSchema::deserialize(deserializer); } break; case TableType::REL_GROUP: { - result = RelTableGroupSchema::deserialize(fileInfo, offset); + result = RelTableGroupSchema::deserialize(deserializer); } break; case TableType::RDF: { - result = RdfGraphSchema::deserialize(fileInfo, offset); + result = RdfGraphSchema::deserialize(deserializer); } break; default: { // LCOV_EXCL_START diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index 923437dfd2..d78bc7a00b 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -2,6 +2,7 @@ add_subdirectory(arrow) add_subdirectory(copier_config) add_subdirectory(data_chunk) add_subdirectory(exception) +add_subdirectory(serializer) add_subdirectory(task_system) add_subdirectory(types) add_subdirectory(vector) @@ -20,8 +21,7 @@ add_library(kuzu_common type_utils.cpp utils.cpp string_utils.cpp - table_type.cpp - ser_deser.cpp) + table_type.cpp) target_link_libraries(kuzu_common Glob) diff --git a/src/common/ser_deser.cpp b/src/common/ser_deser.cpp deleted file mode 100644 index ba55ea041c..0000000000 --- a/src/common/ser_deser.cpp +++ /dev/null @@ -1,27 +0,0 @@ -#include "common/ser_deser.h" - -namespace kuzu { -namespace common { - -template<> -void SerDeser::serializeValue( - const std::string& value, FileInfo* fileInfo, uint64_t& offset) { - uint64_t valueLength = value.length(); - FileUtils::writeToFile(fileInfo, (uint8_t*)&valueLength, sizeof(uint64_t), offset); - FileUtils::writeToFile( - fileInfo, (uint8_t*)value.data(), valueLength, offset + sizeof(uint64_t)); - offset = offset + sizeof(uint64_t) + valueLength; -} - -template<> -void SerDeser::deserializeValue( - std::string& value, FileInfo* fileInfo, uint64_t& offset) { - uint64_t valueLength = 0; - deserializeValue(valueLength, fileInfo, offset); - value.resize(valueLength); - FileUtils::readFromFile(fileInfo, (uint8_t*)value.data(), valueLength, offset); - offset += valueLength; -} - -} // namespace common -} // namespace kuzu diff --git a/src/common/serializer/CMakeLists.txt b/src/common/serializer/CMakeLists.txt new file mode 100644 index 0000000000..76774aa4b4 --- /dev/null +++ b/src/common/serializer/CMakeLists.txt @@ -0,0 +1,10 @@ +add_library(kuzu_common_serializer + OBJECT + serializer.cpp + deserializer.cpp + buffered_file.cpp + buffered_serializer.cpp) + +set(ALL_OBJECT_FILES + ${ALL_OBJECT_FILES} $ + PARENT_SCOPE) diff --git a/src/common/serializer/buffered_file.cpp b/src/common/serializer/buffered_file.cpp new file mode 100644 index 0000000000..eb1214cfc7 --- /dev/null +++ b/src/common/serializer/buffered_file.cpp @@ -0,0 +1,21 @@ +#include "common/serializer/buffered_file.h" + +#include "common/file_utils.h" + +namespace kuzu { +namespace common { +void BufferedFileWriter::flush() { + FileUtils::writeToFile(fileInfo.get(), buffer.get(), bufferOffset, fileOffset); + fileOffset += bufferOffset; + bufferOffset = 0; + memset(buffer.get(), 0, BUFFER_SIZE); +} + +void BufferedFileReader::readNextPage() { + FileUtils::readFromFile(fileInfo.get(), buffer.get(), BUFFER_SIZE, fileOffset); + fileOffset += BUFFER_SIZE; + bufferOffset = 0; +} + +} // namespace common +} // namespace kuzu diff --git a/src/processor/operator/persistent/writer/parquet/buffered_serializer.cpp b/src/common/serializer/buffered_serializer.cpp similarity index 79% rename from src/processor/operator/persistent/writer/parquet/buffered_serializer.cpp rename to src/common/serializer/buffered_serializer.cpp index 7bf87a79eb..0ea7ad548d 100644 --- a/src/processor/operator/persistent/writer/parquet/buffered_serializer.cpp +++ b/src/common/serializer/buffered_serializer.cpp @@ -1,9 +1,9 @@ -#include "processor/operator/persistent/writer/parquet/buffered_serializer.h" +#include "common/serializer/buffered_serializer.h" #include namespace kuzu { -namespace processor { +namespace common { BufferedSerializer::BufferedSerializer(uint64_t maximum_size) : BufferedSerializer(std::make_unique(maximum_size), maximum_size) {} @@ -14,7 +14,7 @@ BufferedSerializer::BufferedSerializer(std::unique_ptr data, uint64_t blob.data = std::move(data); } -void BufferedSerializer::writeData(const uint8_t* buffer, uint64_t len) { +void BufferedSerializer::write(const uint8_t* buffer, uint64_t len) { if (blob.size + len >= maximumSize) { do { maximumSize *= 2; @@ -29,5 +29,5 @@ void BufferedSerializer::writeData(const uint8_t* buffer, uint64_t len) { blob.size += len; } -} // namespace processor +} // namespace common } // namespace kuzu diff --git a/src/common/serializer/deserializer.cpp b/src/common/serializer/deserializer.cpp new file mode 100644 index 0000000000..d4d08231bc --- /dev/null +++ b/src/common/serializer/deserializer.cpp @@ -0,0 +1,15 @@ +#include "common/serializer/deserializer.h" + +namespace kuzu { +namespace common { + +template<> +void Deserializer::deserializeValue(std::string& value) { + uint64_t valueLength = 0; + deserializeValue(valueLength); + value.resize(valueLength); + reader->read((uint8_t*)value.data(), valueLength); +} + +} // namespace common +} // namespace kuzu diff --git a/src/common/serializer/file_writer.cpp b/src/common/serializer/file_writer.cpp new file mode 100644 index 0000000000..6c302ea2c0 --- /dev/null +++ b/src/common/serializer/file_writer.cpp @@ -0,0 +1,21 @@ +#include "common/file_utils.h" +#include "common/serializer/buffered_file.h" + +namespace kuzu { +namespace common { + +void BufferedFileWriter::flush() { + FileUtils::writeToFile(fileInfo.get(), buffer.get(), bufferOffset, fileOffset); + fileOffset += bufferOffset; + bufferOffset = 0; + memset(buffer.get(), 0, BUFFER_SIZE); +} + +void BufferedFileReader::readNextPage() { + FileUtils::readFromFile(fileInfo.get(), buffer.get(), BUFFER_SIZE, fileOffset); + fileOffset += BUFFER_SIZE; + bufferOffset = 0; +} + +} // namespace common +} // namespace kuzu diff --git a/src/common/serializer/serializer.cpp b/src/common/serializer/serializer.cpp new file mode 100644 index 0000000000..e2e2d70824 --- /dev/null +++ b/src/common/serializer/serializer.cpp @@ -0,0 +1,14 @@ +#include "common/serializer/serializer.h" + +namespace kuzu { +namespace common { + +template<> +void Serializer::serializeValue(const std::string& value) { + uint64_t valueLength = value.length(); + writer->write((uint8_t*)&valueLength, sizeof(uint64_t)); + writer->write((uint8_t*)value.data(), valueLength); +} + +} // namespace common +} // namespace kuzu diff --git a/src/common/types/types.cpp b/src/common/types/types.cpp index d69a947530..c02a7d65b9 100644 --- a/src/common/types/types.cpp +++ b/src/common/types/types.cpp @@ -6,7 +6,8 @@ #include "common/exception/binder.h" #include "common/exception/not_implemented.h" #include "common/null_buffer.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" #include "common/string_utils.h" #include "common/types/interval_t.h" #include "common/types/ku_list.h" @@ -97,23 +98,22 @@ std::unique_ptr VarListTypeInfo::copy() const { return std::make_unique(childType->copy()); } -std::unique_ptr VarListTypeInfo::deserialize(FileInfo* fileInfo, uint64_t& offset) { - return std::make_unique(LogicalType::deserialize(fileInfo, offset)); +std::unique_ptr VarListTypeInfo::deserialize(Deserializer& deserializer) { + return std::make_unique(LogicalType::deserialize(deserializer)); } -void VarListTypeInfo::serializeInternal(FileInfo* fileInfo, uint64_t& offset) const { - childType->serialize(fileInfo, offset); +void VarListTypeInfo::serializeInternal(Serializer& serializer) const { + childType->serialize(serializer); } bool FixedListTypeInfo::operator==(const FixedListTypeInfo& other) const { return *childType == *other.childType && fixedNumElementsInList == other.fixedNumElementsInList; } -std::unique_ptr FixedListTypeInfo::deserialize( - FileInfo* fileInfo, uint64_t& offset) { - auto childType = LogicalType::deserialize(fileInfo, offset); +std::unique_ptr FixedListTypeInfo::deserialize(Deserializer& deserializer) { + auto childType = LogicalType::deserialize(deserializer); uint64_t fixedNumElementsInList; - SerDeser::deserializeValue(fixedNumElementsInList, fileInfo, offset); + deserializer.deserializeValue(fixedNumElementsInList); return std::make_unique(std::move(childType), fixedNumElementsInList); } @@ -121,24 +121,24 @@ std::unique_ptr FixedListTypeInfo::copy() const { return std::make_unique(childType->copy(), fixedNumElementsInList); } -void FixedListTypeInfo::serializeInternal(FileInfo* fileInfo, uint64_t& offset) const { - VarListTypeInfo::serializeInternal(fileInfo, offset); - SerDeser::serializeValue(fixedNumElementsInList, fileInfo, offset); +void FixedListTypeInfo::serializeInternal(Serializer& serializer) const { + VarListTypeInfo::serializeInternal(serializer); + serializer.serializeValue(fixedNumElementsInList); } bool StructField::operator==(const StructField& other) const { return *type == *other.type; } -void StructField::serialize(FileInfo* fileInfo, uint64_t& offset) const { - SerDeser::serializeValue(name, fileInfo, offset); - type->serialize(fileInfo, offset); +void StructField::serialize(Serializer& serializer) const { + serializer.serializeValue(name); + type->serialize(serializer); } -std::unique_ptr StructField::deserialize(FileInfo* fileInfo, uint64_t& offset) { +std::unique_ptr StructField::deserialize(Deserializer& deserializer) { std::string name; - SerDeser::deserializeValue(name, fileInfo, offset); - auto type = LogicalType::deserialize(fileInfo, offset); + deserializer.deserializeValue(name); + auto type = LogicalType::deserialize(deserializer); return std::make_unique(std::move(name), std::move(type)); } @@ -226,9 +226,9 @@ bool StructTypeInfo::operator==(const StructTypeInfo& other) const { return true; } -std::unique_ptr StructTypeInfo::deserialize(FileInfo* fileInfo, uint64_t& offset) { +std::unique_ptr StructTypeInfo::deserialize(Deserializer& deserializer) { std::vector> fields; - SerDeser::deserializeVectorOfPtrs(fields, fileInfo, offset); + deserializer.deserializeVectorOfPtrs(fields); return std::make_unique(std::move(fields)); } @@ -240,8 +240,8 @@ std::unique_ptr StructTypeInfo::copy() const { return std::make_unique(std::move(structFields)); } -void StructTypeInfo::serializeInternal(FileInfo* fileInfo, uint64_t& offset) const { - SerDeser::serializeVectorOfPtrs(fields, fileInfo, offset); +void StructTypeInfo::serializeInternal(Serializer& serializer) const { + serializer.serializeVectorOfPtrs(fields); } LogicalType::LogicalType(LogicalTypeID typeID) : typeID{typeID}, extraTypeInfo{nullptr} { @@ -303,34 +303,34 @@ LogicalType& LogicalType::operator=(LogicalType&& other) noexcept { return *this; } -void LogicalType::serialize(FileInfo* fileInfo, uint64_t& offset) const { - SerDeser::serializeValue(typeID, fileInfo, offset); - SerDeser::serializeValue(physicalType, fileInfo, offset); +void LogicalType::serialize(Serializer& serializer) const { + serializer.serializeValue(typeID); + serializer.serializeValue(physicalType); switch (physicalType) { case PhysicalTypeID::VAR_LIST: case PhysicalTypeID::FIXED_LIST: case PhysicalTypeID::STRUCT: - extraTypeInfo->serialize(fileInfo, offset); + extraTypeInfo->serialize(serializer); default: break; } } -std::unique_ptr LogicalType::deserialize(FileInfo* fileInfo, uint64_t& offset) { +std::unique_ptr LogicalType::deserialize(Deserializer& deserializer) { LogicalTypeID typeID; - SerDeser::deserializeValue(typeID, fileInfo, offset); + deserializer.deserializeValue(typeID); PhysicalTypeID physicalType; - SerDeser::deserializeValue(physicalType, fileInfo, offset); + deserializer.deserializeValue(physicalType); std::unique_ptr extraTypeInfo; switch (physicalType) { case PhysicalTypeID::VAR_LIST: { - extraTypeInfo = VarListTypeInfo::deserialize(fileInfo, offset); + extraTypeInfo = VarListTypeInfo::deserialize(deserializer); } break; case PhysicalTypeID::FIXED_LIST: { - extraTypeInfo = FixedListTypeInfo::deserialize(fileInfo, offset); + extraTypeInfo = FixedListTypeInfo::deserialize(deserializer); } break; case PhysicalTypeID::STRUCT: { - extraTypeInfo = StructTypeInfo::deserialize(fileInfo, offset); + extraTypeInfo = StructTypeInfo::deserialize(deserializer); } break; default: extraTypeInfo = nullptr; diff --git a/src/common/types/value/value.cpp b/src/common/types/value/value.cpp index 90385dfa2c..d3dddb7211 100644 --- a/src/common/types/value/value.cpp +++ b/src/common/types/value/value.cpp @@ -1,7 +1,8 @@ #include "common/types/value/value.h" #include "common/null_buffer.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" #include "common/types/blob.h" #include "storage/storage_utils.h" @@ -550,124 +551,124 @@ void Value::copyFromUnion(const uint8_t* kuUnion) { } } -void Value::serialize(FileInfo* fileInfo, uint64_t& offset) const { - dataType->serialize(fileInfo, offset); - SerDeser::serializeValue(isNull_, fileInfo, offset); +void Value::serialize(Serializer& serializer) const { + dataType->serialize(serializer); + serializer.serializeValue(isNull_); switch (dataType->getPhysicalType()) { case PhysicalTypeID::BOOL: { - SerDeser::serializeValue(val.booleanVal, fileInfo, offset); + serializer.serializeValue(val.booleanVal); } break; case PhysicalTypeID::INT64: { - SerDeser::serializeValue(val.int64Val, fileInfo, offset); + serializer.serializeValue(val.int64Val); } break; case PhysicalTypeID::INT32: { - SerDeser::serializeValue(val.int32Val, fileInfo, offset); + serializer.serializeValue(val.int32Val); } break; case PhysicalTypeID::INT16: { - SerDeser::serializeValue(val.int16Val, fileInfo, offset); + serializer.serializeValue(val.int16Val); } break; case PhysicalTypeID::INT8: { - SerDeser::serializeValue(val.int8Val, fileInfo, offset); + serializer.serializeValue(val.int8Val); } break; case PhysicalTypeID::UINT64: { - SerDeser::serializeValue(val.uint64Val, fileInfo, offset); + serializer.serializeValue(val.uint64Val); } break; case PhysicalTypeID::UINT32: { - SerDeser::serializeValue(val.uint32Val, fileInfo, offset); + serializer.serializeValue(val.uint32Val); } break; case PhysicalTypeID::UINT16: { - SerDeser::serializeValue(val.uint16Val, fileInfo, offset); + serializer.serializeValue(val.uint16Val); } break; case PhysicalTypeID::UINT8: { - SerDeser::serializeValue(val.uint8Val, fileInfo, offset); + serializer.serializeValue(val.uint8Val); } break; case PhysicalTypeID::DOUBLE: { - SerDeser::serializeValue(val.doubleVal, fileInfo, offset); + serializer.serializeValue(val.doubleVal); } break; case PhysicalTypeID::FLOAT: { - SerDeser::serializeValue(val.floatVal, fileInfo, offset); + serializer.serializeValue(val.floatVal); } break; case PhysicalTypeID::INTERVAL: { - SerDeser::serializeValue(val.intervalVal, fileInfo, offset); + serializer.serializeValue(val.intervalVal); } break; case PhysicalTypeID::INTERNAL_ID: { - SerDeser::serializeValue(val.internalIDVal, fileInfo, offset); + serializer.serializeValue(val.internalIDVal); } break; case PhysicalTypeID::STRING: { - SerDeser::serializeValue(strVal, fileInfo, offset); + serializer.serializeValue(strVal); } break; case PhysicalTypeID::VAR_LIST: case PhysicalTypeID::FIXED_LIST: case PhysicalTypeID::STRUCT: { for (auto i = 0u; i < childrenSize; ++i) { - children[i]->serialize(fileInfo, offset); + children[i]->serialize(serializer); } } break; default: { throw NotImplementedException{"Value::serialize"}; } } - SerDeser::serializeValue(childrenSize, fileInfo, offset); + serializer.serializeValue(childrenSize); } -std::unique_ptr Value::deserialize(FileInfo* fileInfo, uint64_t& offset) { - LogicalType dataType = *LogicalType::deserialize(fileInfo, offset); +std::unique_ptr Value::deserialize(Deserializer& deserializer) { + LogicalType dataType = *LogicalType::deserialize(deserializer); bool isNull; - SerDeser::deserializeValue(isNull, fileInfo, offset); + deserializer.deserializeValue(isNull); std::unique_ptr val = std::make_unique(createDefaultValue(dataType)); switch (dataType.getPhysicalType()) { case PhysicalTypeID::BOOL: { - SerDeser::deserializeValue(val->val.booleanVal, fileInfo, offset); + deserializer.deserializeValue(val->val.booleanVal); } break; case PhysicalTypeID::INT64: { - SerDeser::deserializeValue(val->val.int64Val, fileInfo, offset); + deserializer.deserializeValue(val->val.int64Val); } break; case PhysicalTypeID::INT32: { - SerDeser::deserializeValue(val->val.int32Val, fileInfo, offset); + deserializer.deserializeValue(val->val.int32Val); } break; case PhysicalTypeID::INT16: { - SerDeser::deserializeValue(val->val.int16Val, fileInfo, offset); + deserializer.deserializeValue(val->val.int16Val); } break; case PhysicalTypeID::INT8: { - SerDeser::deserializeValue(val->val.int8Val, fileInfo, offset); + deserializer.deserializeValue(val->val.int8Val); } break; case PhysicalTypeID::UINT64: { - SerDeser::deserializeValue(val->val.uint64Val, fileInfo, offset); + deserializer.deserializeValue(val->val.uint64Val); } break; case PhysicalTypeID::UINT32: { - SerDeser::deserializeValue(val->val.uint32Val, fileInfo, offset); + deserializer.deserializeValue(val->val.uint32Val); } break; case PhysicalTypeID::UINT16: { - SerDeser::deserializeValue(val->val.uint16Val, fileInfo, offset); + deserializer.deserializeValue(val->val.uint16Val); } break; case PhysicalTypeID::UINT8: { - SerDeser::deserializeValue(val->val.uint8Val, fileInfo, offset); + deserializer.deserializeValue(val->val.uint8Val); } break; case PhysicalTypeID::DOUBLE: { - SerDeser::deserializeValue(val->val.doubleVal, fileInfo, offset); + deserializer.deserializeValue(val->val.doubleVal); } break; case PhysicalTypeID::FLOAT: { - SerDeser::deserializeValue(val->val.floatVal, fileInfo, offset); + deserializer.deserializeValue(val->val.floatVal); } break; case PhysicalTypeID::INTERVAL: { - SerDeser::deserializeValue(val->val.intervalVal, fileInfo, offset); + deserializer.deserializeValue(val->val.intervalVal); } break; case PhysicalTypeID::INTERNAL_ID: { - SerDeser::deserializeValue(val->val.internalIDVal, fileInfo, offset); + deserializer.deserializeValue(val->val.internalIDVal); } break; case PhysicalTypeID::STRING: { - SerDeser::deserializeValue(val->strVal, fileInfo, offset); + deserializer.deserializeValue(val->strVal); } break; case PhysicalTypeID::VAR_LIST: case PhysicalTypeID::FIXED_LIST: case PhysicalTypeID::STRUCT: { - SerDeser::deserializeVectorOfPtrs(val->children, fileInfo, offset); + deserializer.deserializeVectorOfPtrs(val->children); } break; default: { throw NotImplementedException{"Value::deserializeValue"}; } } - SerDeser::deserializeValue(val->childrenSize, fileInfo, offset); + deserializer.deserializeValue(val->childrenSize); val->setNull(isNull); return val; } diff --git a/src/function/scalar_macro_function.cpp b/src/function/scalar_macro_function.cpp index a8be482ba2..80529713f6 100644 --- a/src/function/scalar_macro_function.cpp +++ b/src/function/scalar_macro_function.cpp @@ -1,6 +1,7 @@ #include "function/scalar_macro_function.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" using namespace kuzu::common; using namespace kuzu::parser; @@ -25,30 +26,29 @@ std::unique_ptr ScalarMacroFunction::copy() const { expression->copy(), positionalArgs, std::move(defaultArgsCopy)); } -void ScalarMacroFunction::serialize(FileInfo* fileInfo, uint64_t& offset) const { - expression->serialize(fileInfo, offset); - SerDeser::serializeVector(positionalArgs, fileInfo, offset); +void ScalarMacroFunction::serialize(Serializer& serializer) const { + expression->serialize(serializer); + serializer.serializeVector(positionalArgs); auto vectorSize = defaultArgs.size(); - SerDeser::serializeValue(vectorSize, fileInfo, offset); + serializer.serializeValue(vectorSize); for (auto& defaultArg : defaultArgs) { - SerDeser::serializeValue(defaultArg.first, fileInfo, offset); - defaultArg.second->serialize(fileInfo, offset); + serializer.serializeValue(defaultArg.first); + defaultArg.second->serialize(serializer); } } -std::unique_ptr ScalarMacroFunction::deserialize( - FileInfo* fileInfo, uint64_t& offset) { - auto expression = ParsedExpression::deserialize(fileInfo, offset); +std::unique_ptr ScalarMacroFunction::deserialize(Deserializer& deserializer) { + auto expression = ParsedExpression::deserialize(deserializer); std::vector positionalArgs; - SerDeser::deserializeVector(positionalArgs, fileInfo, offset); + deserializer.deserializeVector(positionalArgs); default_macro_args defaultArgs; uint64_t vectorSize; - SerDeser::deserializeValue(vectorSize, fileInfo, offset); + deserializer.deserializeValue(vectorSize); defaultArgs.reserve(vectorSize); for (auto i = 0u; i < vectorSize; i++) { std::string key; - SerDeser::deserializeValue(key, fileInfo, offset); - auto val = ParsedExpression::deserialize(fileInfo, offset); + deserializer.deserializeValue(key); + auto val = ParsedExpression::deserialize(deserializer); defaultArgs.emplace_back(std::move(key), std::move(val)); } return std::make_unique( diff --git a/src/include/catalog/catalog_content.h b/src/include/catalog/catalog_content.h index 9efbcf36e0..358f8a9c55 100644 --- a/src/include/catalog/catalog_content.h +++ b/src/include/catalog/catalog_content.h @@ -9,6 +9,10 @@ #include "table_schema.h" namespace kuzu { +namespace common { +class Serializer; +class Deserializer; +} // namespace common namespace catalog { class CatalogContent { @@ -117,9 +121,9 @@ class CatalogContent { static void validateStorageVersion(storage::storage_version_t savedStorageVersion); - static void validateMagicBytes(common::FileInfo* fileInfo, common::offset_t& offset); + static void validateMagicBytes(common::Deserializer& deserializer); - static void writeMagicBytes(common::FileInfo* fileInfo, common::offset_t& offset); + static void writeMagicBytes(common::Serializer& serializer); void registerBuiltInFunctions(); diff --git a/src/include/catalog/node_table_schema.h b/src/include/catalog/node_table_schema.h index 0b96f3b122..0c21499899 100644 --- a/src/include/catalog/node_table_schema.h +++ b/src/include/catalog/node_table_schema.h @@ -34,8 +34,7 @@ class NodeTableSchema : public TableSchema { inline Property* getPrimaryKey() const { return properties[primaryKeyPropertyID].get(); } - static std::unique_ptr deserialize( - common::FileInfo* fileInfo, uint64_t& offset); + static std::unique_ptr deserialize(common::Deserializer& deserializer); inline common::property_id_t getPrimaryKeyPropertyID() const { return primaryKeyPropertyID; } @@ -53,7 +52,7 @@ class NodeTableSchema : public TableSchema { } private: - void serializeInternal(common::FileInfo* fileInfo, uint64_t& offset) final; + void serializeInternal(common::Serializer& serializer) final; private: // TODO(Semih): When we support updating the schemas, we need to update this or, we need diff --git a/src/include/catalog/property.h b/src/include/catalog/property.h index 7e5fff6d6b..82aeee4b37 100644 --- a/src/include/catalog/property.h +++ b/src/include/catalog/property.h @@ -3,6 +3,10 @@ #include "common/types/types.h" namespace kuzu { +namespace common { +class Serializer; +class Deserializer; +} // namespace common namespace catalog { class Property { @@ -37,8 +41,8 @@ class Property { inline void rename(std::string newName) { this->name = std::move(newName); } - void serialize(common::FileInfo* fileInfo, uint64_t& offset) const; - static std::unique_ptr deserialize(common::FileInfo* fileInfo, uint64_t& offset); + void serialize(common::Serializer& serializer) const; + static std::unique_ptr deserialize(common::Deserializer& deserializer); inline std::unique_ptr copy() const { return std::make_unique(name, dataType->copy(), propertyID, tableID); diff --git a/src/include/catalog/rdf_graph_schema.h b/src/include/catalog/rdf_graph_schema.h index 7ec5ac0a0a..90f33b3006 100644 --- a/src/include/catalog/rdf_graph_schema.h +++ b/src/include/catalog/rdf_graph_schema.h @@ -20,15 +20,14 @@ class RdfGraphSchema : public TableSchema { inline common::table_id_t getNodeTableID() const { return nodeTableID; } inline common::table_id_t getRelTableID() const { return relTableID; } - static std::unique_ptr deserialize( - common::FileInfo* fileInfo, uint64_t& offset); + static std::unique_ptr deserialize(common::Deserializer& deserializer); inline std::unique_ptr copy() const final { return std::make_unique(tableName, tableID, nodeTableID, relTableID); } private: - void serializeInternal(common::FileInfo* fileInfo, uint64_t& offset) final; + void serializeInternal(common::Serializer& serializer) final; private: common::table_id_t nodeTableID; diff --git a/src/include/catalog/rel_table_group_schema.h b/src/include/catalog/rel_table_group_schema.h index 011f6f7cb0..274f635c17 100644 --- a/src/include/catalog/rel_table_group_schema.h +++ b/src/include/catalog/rel_table_group_schema.h @@ -19,15 +19,14 @@ class RelTableGroupSchema : public TableSchema { inline std::vector getRelTableIDs() const { return relTableIDs; } - static std::unique_ptr deserialize( - common::FileInfo* fileInfo, uint64_t& offset); + static std::unique_ptr deserialize(common::Deserializer& deserializer); inline std::unique_ptr copy() const final { return std::make_unique(tableName, tableID, relTableIDs); } private: - void serializeInternal(common::FileInfo* fileInfo, uint64_t& offset) override; + void serializeInternal(common::Serializer& serializer) override; private: std::vector relTableIDs; diff --git a/src/include/catalog/rel_table_schema.h b/src/include/catalog/rel_table_schema.h index 198e7c2db7..80c7db3f39 100644 --- a/src/include/catalog/rel_table_schema.h +++ b/src/include/catalog/rel_table_schema.h @@ -57,8 +57,7 @@ class RelTableSchema : public TableSchema { inline common::table_id_t getDstTableID() const { return dstTableID; } - static std::unique_ptr deserialize( - common::FileInfo* fileInfo, uint64_t& offset); + static std::unique_ptr deserialize(common::Deserializer& deserializer); inline std::unique_ptr copy() const override { return std::make_unique(tableName, tableID, Property::copy(properties), @@ -66,7 +65,7 @@ class RelTableSchema : public TableSchema { } private: - void serializeInternal(common::FileInfo* fileInfo, uint64_t& offset) final; + void serializeInternal(common::Serializer& serializer) final; private: RelMultiplicity relMultiplicity; diff --git a/src/include/catalog/table_schema.h b/src/include/catalog/table_schema.h index 5914d4218a..ea3b28528b 100644 --- a/src/include/catalog/table_schema.h +++ b/src/include/catalog/table_schema.h @@ -9,6 +9,10 @@ #include "property.h" namespace kuzu { +namespace common { +class Serializer; +class Deserializer; +} // namespace common namespace catalog { class TableSchema { @@ -69,8 +73,8 @@ class TableSchema { void renameProperty(common::property_id_t propertyID, const std::string& newName); - void serialize(common::FileInfo* fileInfo, uint64_t& offset); - static std::unique_ptr deserialize(common::FileInfo* fileInfo, uint64_t& offset); + void serialize(common::Serializer& serializer); + static std::unique_ptr deserialize(common::Deserializer& deserializer); inline common::TableType getTableType() const { return tableType; } @@ -83,7 +87,7 @@ class TableSchema { private: inline common::property_id_t increaseNextPropertyID() { return nextPropertyID++; } - virtual void serializeInternal(common::FileInfo* fileInfo, uint64_t& offset) = 0; + virtual void serializeInternal(common::Serializer& serializer) = 0; public: common::TableType tableType; diff --git a/src/include/common/ser_deser.h b/src/include/common/ser_deser.h deleted file mode 100644 index adef739395..0000000000 --- a/src/include/common/ser_deser.h +++ /dev/null @@ -1,148 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include - -#include "common/file_utils.h" - -namespace kuzu { -namespace common { - -class LogicalType; - -class SerDeser { - -public: - template - static void serializeValue(const T& value, FileInfo* fileInfo, uint64_t& offset) { - static_assert(std::is_trivially_destructible(), "value must be a trivial type"); - FileUtils::writeToFile(fileInfo, (uint8_t*)&value, sizeof(T), offset); - offset += sizeof(T); - } - - template - static void serializeOptionalValue( - const std::unique_ptr& value, FileInfo* fileInfo, uint64_t& offset) { - serializeValue(value == nullptr, fileInfo, offset); - if (value != nullptr) { - value->serialize(fileInfo, offset); - } - } - - template - static void deserializeValue(T& value, FileInfo* fileInfo, uint64_t& offset) { - static_assert(std::is_trivially_destructible(), "value must be a trivial type"); - FileUtils::readFromFile(fileInfo, (uint8_t*)&value, sizeof(T), offset); - offset += sizeof(T); - } - - template - static void deserializeOptionalValue( - std::unique_ptr& value, FileInfo* fileInfo, uint64_t& offset) { - bool isNull; - deserializeValue(isNull, fileInfo, offset); - if (!isNull) { - value = T::deserialize(fileInfo, offset); - } - } - - template - static void serializeUnorderedMap(const std::unordered_map>& values, - FileInfo* fileInfo, uint64_t& offset) { - uint64_t mapSize = values.size(); - serializeValue(mapSize, fileInfo, offset); - for (auto& value : values) { - serializeValue(value.first, fileInfo, offset); - value.second->serialize(fileInfo, offset); - } - } - - template - static void serializeVector( - const std::vector& values, FileInfo* fileInfo, uint64_t& offset) { - uint64_t vectorSize = values.size(); - serializeValue(vectorSize, fileInfo, offset); - for (auto& value : values) { - serializeValue(value, fileInfo, offset); - } - } - - template - static void serializeVectorOfPtrs( - const std::vector>& values, FileInfo* fileInfo, uint64_t& offset) { - uint64_t vectorSize = values.size(); - serializeValue(vectorSize, fileInfo, offset); - for (auto& value : values) { - value->serialize(fileInfo, offset); - } - } - - template - static void deserializeUnorderedMap( - std::unordered_map>& values, FileInfo* fileInfo, uint64_t& offset) { - uint64_t mapSize; - deserializeValue(mapSize, fileInfo, offset); - values.reserve(mapSize); - for (auto i = 0u; i < mapSize; i++) { - T1 key; - deserializeValue(key, fileInfo, offset); - auto val = T2::deserialize(fileInfo, offset); - values.emplace(key, std::move(val)); - } - } - - template - static void deserializeVector(std::vector& values, FileInfo* fileInfo, uint64_t& offset) { - uint64_t vectorSize; - deserializeValue(vectorSize, fileInfo, offset); - values.resize(vectorSize); - for (auto& value : values) { - deserializeValue(value, fileInfo, offset); - } - } - - template - static void deserializeVectorOfPtrs( - std::vector>& values, FileInfo* fileInfo, uint64_t& offset) { - uint64_t vectorSize; - deserializeValue(vectorSize, fileInfo, offset); - values.reserve(vectorSize); - for (auto i = 0u; i < vectorSize; i++) { - values.push_back(T::deserialize(fileInfo, offset)); - } - } - - template - static void serializeUnorderedSet( - const std::unordered_set& values, FileInfo* fileInfo, uint64_t& offset) { - uint64_t setSize = values.size(); - serializeValue(setSize, fileInfo, offset); - for (auto& value : values) { - serializeValue(value, fileInfo, offset); - } - } - - template - static void deserializeUnorderedSet( - std::unordered_set& values, FileInfo* fileInfo, uint64_t& offset) { - uint64_t setSize; - deserializeValue(setSize, fileInfo, offset); - for (auto i = 0u; i < setSize; i++) { - T value; - deserializeValue(value, fileInfo, offset); - values.insert(value); - } - } -}; - -template<> -void SerDeser::serializeValue(const std::string& value, FileInfo* fileInfo, uint64_t& offset); - -template<> -void SerDeser::deserializeValue(std::string& value, FileInfo* fileInfo, uint64_t& offset); - -} // namespace common -} // namespace kuzu diff --git a/src/include/common/serializer/buffered_file.h b/src/include/common/serializer/buffered_file.h new file mode 100644 index 0000000000..e507807155 --- /dev/null +++ b/src/include/common/serializer/buffered_file.h @@ -0,0 +1,83 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "common/serializer/reader.h" +#include "common/serializer/writer.h" + +namespace kuzu { +namespace common { + +struct FileInfo; + +class BufferedFileWriter : public Writer { +public: + ~BufferedFileWriter() { flush(); } + explicit BufferedFileWriter(std::unique_ptr fileInfo) + : buffer(std::make_unique(BUFFER_SIZE)), fileOffset(0), bufferOffset(0), + fileInfo(std::move(fileInfo)) {} + + inline void write(const uint8_t* data, uint64_t size) final { + if (bufferOffset + size <= BUFFER_SIZE) { + memcpy(&buffer[bufferOffset], data, size); + bufferOffset += size; + } else { + auto toCopy = BUFFER_SIZE - bufferOffset; + memcpy(&buffer[bufferOffset], data, toCopy); + bufferOffset += toCopy; + flush(); + auto remaining = size - toCopy; + memcpy(buffer.get(), data + toCopy, remaining); + bufferOffset += remaining; + } + } + +protected: + std::unique_ptr buffer; + uint64_t fileOffset, bufferOffset; + std::unique_ptr fileInfo; + static const uint64_t BUFFER_SIZE = 4096; + + void flush(); +}; + +class BufferedFileReader : public Reader { +public: + explicit BufferedFileReader(std::unique_ptr fileInfo) + : buffer(std::make_unique(BUFFER_SIZE)), fileOffset(0), bufferOffset(0), + fileInfo(std::move(fileInfo)) { + readNextPage(); + } + + inline void read(uint8_t* data, uint64_t size) final { + if (bufferOffset + size <= BUFFER_SIZE) { + memcpy(data, &buffer[bufferOffset], size); + bufferOffset += size; + } else { + auto toCopy = BUFFER_SIZE - bufferOffset; + memcpy(data, &buffer[bufferOffset], toCopy); + bufferOffset += toCopy; + readNextPage(); + auto remaining = size - toCopy; + memcpy(data + toCopy, buffer.get(), remaining); + bufferOffset += remaining; + } + } + +private: + std::unique_ptr buffer; + uint64_t fileOffset, bufferOffset; + std::unique_ptr fileInfo; + static const uint64_t BUFFER_SIZE = 4096; + + void readNextPage(); +}; + +} // namespace common +} // namespace kuzu diff --git a/src/include/processor/operator/persistent/writer/parquet/buffered_serializer.h b/src/include/common/serializer/buffered_serializer.h similarity index 82% rename from src/include/processor/operator/persistent/writer/parquet/buffered_serializer.h rename to src/include/common/serializer/buffered_serializer.h index 96abdc46bf..04b984326e 100644 --- a/src/include/processor/operator/persistent/writer/parquet/buffered_serializer.h +++ b/src/include/common/serializer/buffered_serializer.h @@ -2,8 +2,10 @@ #include +#include "common/serializer/writer.h" + namespace kuzu { -namespace processor { +namespace common { // TODO(Ziyi): Move this to constants.h once we have a unified serializer design. static constexpr uint64_t SERIALIZER_DEFAULT_SIZE = 1024; @@ -13,7 +15,7 @@ struct BinaryData { uint64_t size; }; -class BufferedSerializer { +class BufferedSerializer : public Writer { public: // Serializes to a buffer allocated by the serializer, will expand when // writing past the initial threshold. @@ -34,10 +36,10 @@ class BufferedSerializer { void write(T element) { static_assert( std::is_trivially_destructible(), "Write element must be trivially destructible"); - writeData(reinterpret_cast(&element), sizeof(T)); + write(reinterpret_cast(&element), sizeof(T)); } - void writeData(const uint8_t* buffer, uint64_t len); + void write(const uint8_t* buffer, uint64_t len) final; private: uint64_t maximumSize; @@ -46,5 +48,5 @@ class BufferedSerializer { BinaryData blob; }; -} // namespace processor +} // namespace common } // namespace kuzu diff --git a/src/include/common/serializer/deserializer.h b/src/include/common/serializer/deserializer.h new file mode 100644 index 0000000000..047493c23d --- /dev/null +++ b/src/include/common/serializer/deserializer.h @@ -0,0 +1,86 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "common/serializer/reader.h" + +namespace kuzu { +namespace common { + +class Deserializer { +public: + explicit Deserializer(std::unique_ptr reader) : reader(std::move(reader)) {} + + template + void deserializeValue(T& value) { + static_assert(std::is_trivially_destructible(), "value must be a trivial type"); + reader->read((uint8_t*)&value, sizeof(T)); + } + + template + void deserializeOptionalValue(std::unique_ptr& value) { + bool isNull; + deserializeValue(isNull); + if (!isNull) { + value = T::deserialize(*this); + } + } + + template + void deserializeUnorderedMap(std::unordered_map>& values) { + uint64_t mapSize; + deserializeValue(mapSize); + values.reserve(mapSize); + for (auto i = 0u; i < mapSize; i++) { + T1 key; + deserializeValue(key); + auto val = T2::deserialize(*this); + values.emplace(key, std::move(val)); + } + } + + template + void deserializeVector(std::vector& values) { + uint64_t vectorSize; + deserializeValue(vectorSize); + values.resize(vectorSize); + for (auto& value : values) { + deserializeValue(value); + } + } + + template + void deserializeVectorOfPtrs(std::vector>& values) { + uint64_t vectorSize; + deserializeValue(vectorSize); + values.reserve(vectorSize); + for (auto i = 0u; i < vectorSize; i++) { + values.push_back(T::deserialize(*this)); + } + } + + template + void deserializeUnorderedSet(std::unordered_set& values) { + uint64_t setSize; + deserializeValue(setSize); + for (auto i = 0u; i < setSize; i++) { + T value; + deserializeValue(value); + values.insert(value); + } + } + +private: + std::unique_ptr reader; +}; + +template<> +void Deserializer::deserializeValue(std::string& value); + +} // namespace common +} // namespace kuzu diff --git a/src/include/common/serializer/reader.h b/src/include/common/serializer/reader.h new file mode 100644 index 0000000000..7a63207a3e --- /dev/null +++ b/src/include/common/serializer/reader.h @@ -0,0 +1,15 @@ +#pragma once + +#include + +namespace kuzu { +namespace common { + +class Reader { +public: + virtual void read(uint8_t* data, uint64_t size) = 0; + virtual ~Reader(){}; +}; + +} // namespace common +} // namespace kuzu diff --git a/src/include/common/serializer/serializer.h b/src/include/common/serializer/serializer.h new file mode 100644 index 0000000000..31e7c9a8ac --- /dev/null +++ b/src/include/common/serializer/serializer.h @@ -0,0 +1,86 @@ +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "common/serializer/writer.h" + +namespace kuzu { +namespace common { + +class Serializer { +public: + explicit Serializer(std::shared_ptr writer) : writer(std::move(writer)) {} + + template + void serializeValue(const T& value) { + static_assert(std::is_trivially_destructible(), "value must be a trivial type"); + writer->write((uint8_t*)&value, sizeof(T)); + } + + // Alias for serializeValue + template + void write(const T& value) { + serializeValue(value); + } + + void write(const uint8_t* value, uint64_t len) { writer->write(value, len); } + + template + void serializeOptionalValue(const std::unique_ptr& value) { + serializeValue(value == nullptr); + if (value != nullptr) { + value->serialize(*this); + } + } + + template + void serializeUnorderedMap(const std::unordered_map>& values) { + uint64_t mapSize = values.size(); + serializeValue(mapSize); + for (auto& value : values) { + serializeValue(value.first); + value.second->serialize(*this); + } + } + + template + void serializeVector(const std::vector& values) { + uint64_t vectorSize = values.size(); + serializeValue(vectorSize); + for (auto& value : values) { + serializeValue(value); + } + } + + template + void serializeVectorOfPtrs(const std::vector>& values) { + uint64_t vectorSize = values.size(); + serializeValue(vectorSize); + for (auto& value : values) { + value->serialize(*this); + } + } + + template + void serializeUnorderedSet(const std::unordered_set& values) { + uint64_t setSize = values.size(); + serializeValue(setSize); + for (const auto& value : values) { + serializeValue(value); + } + } + +private: + std::shared_ptr writer; +}; + +template<> +void Serializer::serializeValue(const std::string& value); + +} // namespace common +} // namespace kuzu diff --git a/src/include/common/serializer/writer.h b/src/include/common/serializer/writer.h new file mode 100644 index 0000000000..95eb77d961 --- /dev/null +++ b/src/include/common/serializer/writer.h @@ -0,0 +1,15 @@ +#pragma once + +#include + +namespace kuzu { +namespace common { + +class Writer { +public: + virtual void write(const uint8_t* data, uint64_t size) = 0; + virtual ~Writer() = default; +}; + +} // namespace common +} // namespace kuzu diff --git a/src/include/common/types/types.h b/src/include/common/types/types.h index 715a5b25e7..1d5441d2af 100644 --- a/src/include/common/types/types.h +++ b/src/include/common/types/types.h @@ -15,6 +15,8 @@ namespace kuzu { namespace common { +class Serializer; +class Deserializer; class FileInfo; using sel_t = uint16_t; @@ -134,18 +136,15 @@ class ExtraTypeInfo { public: virtual ~ExtraTypeInfo() = default; - inline void serialize(FileInfo* fileInfo, uint64_t& offset) const { - serializeInternal(fileInfo, offset); - } + inline void serialize(Serializer& serializer) const { serializeInternal(serializer); } virtual std::unique_ptr copy() const = 0; protected: - virtual void serializeInternal(FileInfo* fileInfo, uint64_t& offset) const = 0; + virtual void serializeInternal(Serializer& serializer) const = 0; }; class VarListTypeInfo : public ExtraTypeInfo { - friend class SerDeser; public: VarListTypeInfo() = default; @@ -157,17 +156,16 @@ class VarListTypeInfo : public ExtraTypeInfo { bool operator==(const VarListTypeInfo& other) const; std::unique_ptr copy() const override; - static std::unique_ptr deserialize(FileInfo* fileInfo, uint64_t& offset); + static std::unique_ptr deserialize(Deserializer& deserializer); protected: - void serializeInternal(FileInfo* fileInfo, uint64_t& offset) const override; + void serializeInternal(Serializer& serializer) const override; protected: std::unique_ptr childType; }; class FixedListTypeInfo : public VarListTypeInfo { - friend class SerDeser; public: FixedListTypeInfo() = default; @@ -176,18 +174,17 @@ class FixedListTypeInfo : public VarListTypeInfo { : VarListTypeInfo{std::move(childType)}, fixedNumElementsInList{fixedNumElementsInList} {} inline uint64_t getNumElementsInList() const { return fixedNumElementsInList; } bool operator==(const FixedListTypeInfo& other) const; - static std::unique_ptr deserialize(FileInfo* fileInfo, uint64_t& offset); + static std::unique_ptr deserialize(Deserializer& deserializer); std::unique_ptr copy() const override; private: - void serializeInternal(FileInfo* fileInfo, uint64_t& offset) const override; + void serializeInternal(Serializer& serializer) const override; private: uint64_t fixedNumElementsInList; }; class StructField { - friend class SerDeser; public: StructField() : type{std::make_unique()} {} @@ -200,9 +197,9 @@ class StructField { bool operator==(const StructField& other) const; - void serialize(FileInfo* fileInfo, uint64_t& offset) const; + void serialize(Serializer& serializer) const; - static std::unique_ptr deserialize(FileInfo* fileInfo, uint64_t& offset); + static std::unique_ptr deserialize(Deserializer& deserializer); std::unique_ptr copy() const; @@ -212,7 +209,6 @@ class StructField { }; class StructTypeInfo : public ExtraTypeInfo { - friend class SerDeser; public: StructTypeInfo() = default; @@ -229,11 +225,11 @@ class StructTypeInfo : public ExtraTypeInfo { std::vector getStructFields() const; bool operator==(const kuzu::common::StructTypeInfo& other) const; - static std::unique_ptr deserialize(FileInfo* fileInfo, uint64_t& offset); + static std::unique_ptr deserialize(Deserializer& deserializer); std::unique_ptr copy() const override; private: - void serializeInternal(FileInfo* fileInfo, uint64_t& offset) const override; + void serializeInternal(Serializer& serializer) const override; private: std::vector> fields; @@ -241,7 +237,6 @@ class StructTypeInfo : public ExtraTypeInfo { }; class LogicalType { - friend class SerDeser; friend class LogicalTypeUtils; friend class StructType; friend class VarListType; @@ -274,9 +269,9 @@ class LogicalType { extraTypeInfo = std::move(typeInfo); } - void serialize(FileInfo* fileInfo, uint64_t& offset) const; + void serialize(Serializer& serializer) const; - static std::unique_ptr deserialize(FileInfo* fileInfo, uint64_t& offset); + static std::unique_ptr deserialize(Deserializer& deserializer); std::unique_ptr copy() const; diff --git a/src/include/common/types/value/value.h b/src/include/common/types/value/value.h index faf170f3f8..0dd96ea0db 100644 --- a/src/include/common/types/value/value.h +++ b/src/include/common/types/value/value.h @@ -20,6 +20,8 @@ class NestedVal; class RecursiveRelVal; class ArrowRowBatch; class ValueVector; +class Serializer; +class Deserializer; class Value { friend class NodeVal; @@ -207,9 +209,9 @@ class Value { */ KUZU_API std::string toString() const; - void serialize(FileInfo* fileInfo, uint64_t& offset) const; + void serialize(Serializer& serializer) const; - static std::unique_ptr deserialize(FileInfo* fileInfo, uint64_t& offset); + static std::unique_ptr deserialize(Deserializer& deserializer); private: Value(); diff --git a/src/include/function/scalar_macro_function.h b/src/include/function/scalar_macro_function.h index 48baf474db..15e1a675db 100644 --- a/src/include/function/scalar_macro_function.h +++ b/src/include/function/scalar_macro_function.h @@ -31,10 +31,9 @@ struct ScalarMacroFunction { std::unique_ptr copy() const; - void serialize(common::FileInfo* fileInfo, uint64_t& offset) const; + void serialize(common::Serializer& serializer) const; - static std::unique_ptr deserialize( - common::FileInfo* fileInfo, uint64_t& offset); + static std::unique_ptr deserialize(common::Deserializer& deserializer); }; } // namespace function diff --git a/src/include/parser/expression/parsed_case_expression.h b/src/include/parser/expression/parsed_case_expression.h index 14b1b731ee..714f6895ea 100644 --- a/src/include/parser/expression/parsed_case_expression.h +++ b/src/include/parser/expression/parsed_case_expression.h @@ -13,9 +13,8 @@ struct ParsedCaseAlternative { std::unique_ptr thenExpression) : whenExpression{std::move(whenExpression)}, thenExpression{std::move(thenExpression)} {} - void serialize(common::FileInfo* fileInfo, uint64_t& offset) const; - static std::unique_ptr deserialize( - common::FileInfo* fileInfo, uint64_t& offset); + void serialize(common::Serializer& serializer) const; + static std::unique_ptr deserialize(common::Deserializer& deserializer); inline std::unique_ptr copy() const { return std::make_unique( @@ -71,13 +70,12 @@ class ParsedCaseExpression : public ParsedExpression { inline bool hasElseExpression() const { return elseExpression != nullptr; } inline ParsedExpression* getElseExpression() const { return elseExpression.get(); } - static std::unique_ptr deserialize( - common::FileInfo* fileInfo, uint64_t& offset); + static std::unique_ptr deserialize(common::Deserializer& deserializer); std::unique_ptr copy() const override; private: - void serializeInternal(common::FileInfo* fileInfo, uint64_t& offset) const override; + void serializeInternal(common::Serializer& serializer) const override; private: // Optional. If not specified, directly check next whenExpression diff --git a/src/include/parser/expression/parsed_expression.h b/src/include/parser/expression/parsed_expression.h index e3a94ce42d..6a75debdfd 100644 --- a/src/include/parser/expression/parsed_expression.h +++ b/src/include/parser/expression/parsed_expression.h @@ -11,7 +11,9 @@ namespace kuzu { namespace common { class FileInfo; -} +class Serializer; +class Deserializer; +} // namespace common namespace parser { @@ -62,16 +64,15 @@ class ParsedExpression { return std::make_unique(type, alias, rawName, copyChildren()); } - void serialize(common::FileInfo* fileInfo, uint64_t& offset) const; + void serialize(common::Serializer& serializer) const; - static std::unique_ptr deserialize( - common::FileInfo* fileInfo, uint64_t& offset); + static std::unique_ptr deserialize(common::Deserializer& deserializer); protected: parsed_expression_vector copyChildren() const; private: - virtual inline void serializeInternal(common::FileInfo* fileInfo, uint64_t& offset) const {} + virtual inline void serializeInternal(common::Serializer& serializer) const {} protected: common::ExpressionType type; diff --git a/src/include/parser/expression/parsed_function_expression.h b/src/include/parser/expression/parsed_function_expression.h index 0ad741b7a3..601dc9b7b3 100644 --- a/src/include/parser/expression/parsed_function_expression.h +++ b/src/include/parser/expression/parsed_function_expression.h @@ -41,7 +41,7 @@ class ParsedFunctionExpression : public ParsedExpression { } static std::unique_ptr deserialize( - common::FileInfo* fileInfo, uint64_t& offset); + common::Deserializer& deserializer); inline std::unique_ptr copy() const override { return std::make_unique( @@ -49,7 +49,7 @@ class ParsedFunctionExpression : public ParsedExpression { } private: - void serializeInternal(common::FileInfo* fileInfo, uint64_t& offset) const override; + void serializeInternal(common::Serializer& serializer) const override; private: bool isDistinct; diff --git a/src/include/parser/expression/parsed_literal_expression.h b/src/include/parser/expression/parsed_literal_expression.h index 0702d4e92b..b5d197bf13 100644 --- a/src/include/parser/expression/parsed_literal_expression.h +++ b/src/include/parser/expression/parsed_literal_expression.h @@ -1,6 +1,5 @@ #pragma once -#include "common/ser_deser.h" #include "common/types/value/value.h" #include "parsed_expression.h" @@ -24,9 +23,8 @@ class ParsedLiteralExpression : public ParsedExpression { inline common::Value* getValue() const { return value.get(); } static inline std::unique_ptr deserialize( - common::FileInfo* fileInfo, uint64_t& offset) { - return std::make_unique( - common::Value::deserialize(fileInfo, offset)); + common::Deserializer& deserializer) { + return std::make_unique(common::Value::deserialize(deserializer)); } inline std::unique_ptr copy() const override { @@ -35,8 +33,8 @@ class ParsedLiteralExpression : public ParsedExpression { } private: - void serializeInternal(common::FileInfo* fileInfo, uint64_t& offset) const override { - value->serialize(fileInfo, offset); + void serializeInternal(common::Serializer& serializer) const override { + value->serialize(serializer); } private: diff --git a/src/include/parser/expression/parsed_parameter_expression.h b/src/include/parser/expression/parsed_parameter_expression.h index c787ab4459..f856dad6de 100644 --- a/src/include/parser/expression/parsed_parameter_expression.h +++ b/src/include/parser/expression/parsed_parameter_expression.h @@ -15,7 +15,7 @@ class ParsedParameterExpression : public ParsedExpression { inline std::string getParameterName() const { return parameterName; } static std::unique_ptr deserialize( - common::FileInfo* fileInfo, uint64_t& offset) { + common::Deserializer& deserializer) { throw common::NotImplementedException{"ParsedParameterExpression::deserialize()"}; } @@ -24,7 +24,7 @@ class ParsedParameterExpression : public ParsedExpression { } private: - void serializeInternal(common::FileInfo* fileInfo, uint64_t& offset) const override { + void serializeInternal(common::Serializer& serializer) const override { throw common::NotImplementedException{"ParsedParameterExpression::serializeInternal()"}; } diff --git a/src/include/parser/expression/parsed_property_expression.h b/src/include/parser/expression/parsed_property_expression.h index 85cebb2415..60e2698f3a 100644 --- a/src/include/parser/expression/parsed_property_expression.h +++ b/src/include/parser/expression/parsed_property_expression.h @@ -1,7 +1,7 @@ #pragma once #include "common/constants.h" -#include "common/ser_deser.h" +#include "common/serializer/serializer.h" #include "parsed_expression.h" namespace kuzu { @@ -28,7 +28,7 @@ class ParsedPropertyExpression : public ParsedExpression { inline bool isStar() const { return propertyName == common::InternalKeyword::STAR; } static std::unique_ptr deserialize( - common::FileInfo* fileInfo, uint64_t& offset); + common::Deserializer& deserializer); inline std::unique_ptr copy() const override { return std::make_unique( @@ -36,8 +36,8 @@ class ParsedPropertyExpression : public ParsedExpression { } private: - inline void serializeInternal(common::FileInfo* fileInfo, uint64_t& offset) const override { - common::SerDeser::serializeValue(propertyName, fileInfo, offset); + inline void serializeInternal(common::Serializer& serializer) const override { + serializer.serializeValue(propertyName); } private: diff --git a/src/include/parser/expression/parsed_subquery_expression.h b/src/include/parser/expression/parsed_subquery_expression.h index 2297442135..a9c81d2426 100644 --- a/src/include/parser/expression/parsed_subquery_expression.h +++ b/src/include/parser/expression/parsed_subquery_expression.h @@ -32,7 +32,7 @@ class ParsedSubqueryExpression : public ParsedExpression { inline ParsedExpression* getWhereClause() const { return whereClause.get(); } static std::unique_ptr deserialize( - common::FileInfo* fileInfo, uint64_t& offset) { + common::Deserializer& deserializer) { throw common::NotImplementedException{"ParsedSubqueryExpression::deserialize()"}; } @@ -41,7 +41,7 @@ class ParsedSubqueryExpression : public ParsedExpression { } private: - void serializeInternal(common::FileInfo* fileInfo, uint64_t& offset) const override { + void serializeInternal(common::Serializer& serializer) const override { throw common::NotImplementedException{"ParsedSubqueryExpression::serializeInternal()"}; } diff --git a/src/include/parser/expression/parsed_variable_expression.h b/src/include/parser/expression/parsed_variable_expression.h index dd645508f4..0255269cb5 100644 --- a/src/include/parser/expression/parsed_variable_expression.h +++ b/src/include/parser/expression/parsed_variable_expression.h @@ -1,6 +1,7 @@ #pragma once -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" #include "parsed_expression.h" namespace kuzu { @@ -24,7 +25,7 @@ class ParsedVariableExpression : public ParsedExpression { inline std::string getVariableName() const { return variableName; } static std::unique_ptr deserialize( - common::FileInfo* fileInfo, uint64_t& offset); + common::Deserializer& deserializer); inline std::unique_ptr copy() const override { return std::make_unique( @@ -32,8 +33,8 @@ class ParsedVariableExpression : public ParsedExpression { } private: - inline void serializeInternal(common::FileInfo* fileInfo, uint64_t& offset) const override { - common::SerDeser::serializeValue(variableName, fileInfo, offset); + inline void serializeInternal(common::Serializer& serializer) const override { + serializer.serializeValue(variableName); } private: diff --git a/src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h b/src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h index fdb6a94346..527e6d755e 100644 --- a/src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h +++ b/src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h @@ -39,7 +39,7 @@ class BasicColumnWriter : public ColumnWriter { void finalizeWrite(ColumnWriterState& state) override; protected: - void writeLevels(BufferedSerializer& bufferedSerializer, const std::vector& levels, + void writeLevels(common::Serializer& bufferedSerializer, const std::vector& levels, uint64_t maxValue, uint64_t startOffset, uint64_t count); virtual kuzu_parquet::format::Encoding::type getEncoding(BasicColumnWriterState& state) { @@ -62,7 +62,7 @@ class BasicColumnWriter : public ColumnWriter { // Flushes the writer for a specific page. Only used for scalar types. virtual void flushPageState( - BufferedSerializer& bufferedSerializer, ColumnWriterPageState* state) {} + common::Serializer& bufferedSerializer, ColumnWriterPageState* state) {} // Retrieves the row size of a vector at the specified location. Only used for scalar types. virtual uint64_t getRowSize( @@ -70,7 +70,7 @@ class BasicColumnWriter : public ColumnWriter { throw common::NotImplementedException{"BasicColumnWriter::getRowSize"}; } // Writes a (subset of a) vector to the specified serializer. Only used for scalar types. - virtual void writeVector(BufferedSerializer& bufferedSerializer, ColumnWriterStatistics* stats, + virtual void writeVector(common::Serializer& bufferedSerializer, ColumnWriterStatistics* stats, ColumnWriterPageState* pageState, common::ValueVector* vector, uint64_t chunkStart, uint64_t chunkEnd) = 0; @@ -80,7 +80,7 @@ class BasicColumnWriter : public ColumnWriter { throw common::NotImplementedException{"BasicColumnWriter::dictionarySize"}; } void writeDictionary(BasicColumnWriterState& state, - std::unique_ptr bufferedSerializer, uint64_t rowCount); + std::unique_ptr bufferedSerializer, uint64_t rowCount); virtual void flushDictionary(BasicColumnWriterState& state, ColumnWriterStatistics* stats) { throw common::NotImplementedException{"BasicColumnWriter::flushDictionary"}; } diff --git a/src/include/processor/operator/persistent/writer/parquet/boolean_column_writer.h b/src/include/processor/operator/persistent/writer/parquet/boolean_column_writer.h index 73942345c0..965254033f 100644 --- a/src/include/processor/operator/persistent/writer/parquet/boolean_column_writer.h +++ b/src/include/processor/operator/persistent/writer/parquet/boolean_column_writer.h @@ -55,12 +55,12 @@ class BooleanColumnWriter : public BasicColumnWriter { return std::make_unique(); } - void writeVector(BufferedSerializer& bufferedSerializer, + void writeVector(common::Serializer& bufferedSerializer, ColumnWriterStatistics* writerStatistics, ColumnWriterPageState* writerPageState, common::ValueVector* vector, uint64_t chunkStart, uint64_t chunkEnd) override; void flushPageState( - BufferedSerializer& temp_writer, ColumnWriterPageState* writerPageState) override; + common::Serializer& temp_writer, ColumnWriterPageState* writerPageState) override; }; } // namespace processor diff --git a/src/include/processor/operator/persistent/writer/parquet/column_writer.h b/src/include/processor/operator/persistent/writer/parquet/column_writer.h index 2827a9b20c..afa410ee80 100644 --- a/src/include/processor/operator/persistent/writer/parquet/column_writer.h +++ b/src/include/processor/operator/persistent/writer/parquet/column_writer.h @@ -1,7 +1,7 @@ #pragma once -#include "buffered_serializer.h" #include "common/exception/not_implemented.h" +#include "common/serializer/buffered_serializer.h" #include "common/types/types.h" #include "common/vector/value_vector.h" #include "parquet/parquet_types.h" @@ -24,7 +24,8 @@ class ColumnWriterPageState { struct PageWriteInformation { kuzu_parquet::format::PageHeader pageHeader; - std::unique_ptr bufferWriter; + std::shared_ptr bufferWriter; + std::unique_ptr writer; std::unique_ptr pageState; uint64_t writePageIdx = 0; uint64_t writeCount = 0; @@ -103,7 +104,7 @@ class ColumnWriter { void handleDefineLevels(ColumnWriterState& state, ColumnWriterState* parent, common::ValueVector* vector, uint64_t count, uint16_t defineValue, uint16_t nullValue); void handleRepeatLevels(ColumnWriterState& stateToHandle, ColumnWriterState* parent); - void compressPage(BufferedSerializer& bufferedSerializer, size_t& compressedSize, + void compressPage(common::BufferedSerializer& bufferedSerializer, size_t& compressedSize, uint8_t*& compressedData, std::unique_ptr& compressedBuf); }; diff --git a/src/include/processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.h b/src/include/processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.h index 379a2d10dc..69578ebae3 100644 --- a/src/include/processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.h +++ b/src/include/processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.h @@ -1,6 +1,6 @@ #pragma once -#include "buffered_serializer.h" +#include "common/serializer/serializer.h" namespace kuzu { namespace processor { @@ -17,9 +17,9 @@ class RleBpEncoder { void prepareValue(uint32_t value); void finishPrepare(); - void beginWrite(BufferedSerializer& writer, uint32_t first_value); - void writeValue(BufferedSerializer& writer, uint32_t value); - void finishWrite(BufferedSerializer& writer); + void beginWrite(common::Serializer& writer, uint32_t first_value); + void writeValue(common::Serializer& writer, uint32_t value); + void finishWrite(common::Serializer& writer); uint64_t getByteCount(); @@ -36,7 +36,7 @@ class RleBpEncoder { private: void finishRun(); - void writeRun(BufferedSerializer& bufferedSerializer); + void writeRun(common::Serializer& bufferedSerializer); }; } // namespace processor diff --git a/src/include/processor/operator/persistent/writer/parquet/standard_column_writer.h b/src/include/processor/operator/persistent/writer/parquet/standard_column_writer.h index f7461cfaf7..5190138595 100644 --- a/src/include/processor/operator/persistent/writer/parquet/standard_column_writer.h +++ b/src/include/processor/operator/persistent/writer/parquet/standard_column_writer.h @@ -1,6 +1,7 @@ #pragma once #include "basic_column_writer.h" +#include "common/serializer/serializer.h" #include "function/cast/numeric_limits.h" #include "function/comparison/comparison_functions.h" @@ -77,7 +78,7 @@ class StandardColumnWriter : public BasicColumnWriter { } void templatedWritePlain(common::ValueVector* vector, ColumnWriterStatistics* stats, - uint64_t chunkStart, uint64_t chunkEnd, BufferedSerializer& ser) { + uint64_t chunkStart, uint64_t chunkEnd, common::Serializer& ser) { for (auto r = chunkStart; r < chunkEnd; r++) { auto pos = getVectorPos(vector, r); if (!vector->isNull(pos)) { @@ -88,7 +89,7 @@ class StandardColumnWriter : public BasicColumnWriter { } } - inline void writeVector(BufferedSerializer& bufferedSerializer, ColumnWriterStatistics* stats, + inline void writeVector(common::Serializer& bufferedSerializer, ColumnWriterStatistics* stats, ColumnWriterPageState* pageState, common::ValueVector* vector, uint64_t chunkStart, uint64_t chunkEnd) override { templatedWritePlain(vector, stats, chunkStart, chunkEnd, bufferedSerializer); diff --git a/src/include/processor/operator/persistent/writer/parquet/string_column_writer.h b/src/include/processor/operator/persistent/writer/parquet/string_column_writer.h index 74c4fa42b8..7b373daf6c 100644 --- a/src/include/processor/operator/persistent/writer/parquet/string_column_writer.h +++ b/src/include/processor/operator/persistent/writer/parquet/string_column_writer.h @@ -123,12 +123,12 @@ class StringColumnWriter : public BasicColumnWriter { void finalizeAnalyze(ColumnWriterState& writerState) override; - void writeVector(BufferedSerializer& bufferedSerializer, ColumnWriterStatistics* statsToWrite, + void writeVector(common::Serializer& bufferedSerializer, ColumnWriterStatistics* statsToWrite, ColumnWriterPageState* writerPageState, common::ValueVector* vector, uint64_t chunkStart, uint64_t chunkEnd) override; void flushPageState( - BufferedSerializer& bufferedSerializer, ColumnWriterPageState* writerPageState) override; + common::Serializer& bufferedSerializer, ColumnWriterPageState* writerPageState) override; void flushDictionary( BasicColumnWriterState& writerState, ColumnWriterStatistics* writerStats) override; diff --git a/src/include/storage/stats/metadata_dah_info.h b/src/include/storage/stats/metadata_dah_info.h index eb58618bec..927dad3c86 100644 --- a/src/include/storage/stats/metadata_dah_info.h +++ b/src/include/storage/stats/metadata_dah_info.h @@ -19,9 +19,8 @@ struct MetadataDAHInfo { std::unique_ptr copy(); - void serialize(common::FileInfo* fileInfo, uint64_t& offset) const; - static std::unique_ptr deserialize( - common::FileInfo* fileInfo, uint64_t& offset); + void serialize(common::Serializer& serializer) const; + static std::unique_ptr deserialize(common::Deserializer& deserializer); }; } // namespace storage diff --git a/src/include/storage/stats/node_table_statistics.h b/src/include/storage/stats/node_table_statistics.h index dd5ef8ff87..d417ea0653 100644 --- a/src/include/storage/stats/node_table_statistics.h +++ b/src/include/storage/stats/node_table_statistics.h @@ -8,6 +8,10 @@ #include "storage/store/rels_store.h" namespace kuzu { +namespace common { +class Serializer; +class Deserializer; +} // namespace common namespace storage { class NodeTableStatsAndDeletedIDs : public TableStatistics { @@ -64,9 +68,9 @@ class NodeTableStatsAndDeletedIDs : public TableStatistics { return metadataDAHInfos[columnID].get(); } - void serializeInternal(common::FileInfo* fileInfo, uint64_t& offset) final; + void serializeInternal(common::Serializer& serializer) final; static std::unique_ptr deserialize(common::table_id_t tableID, - common::offset_t maxNodeOffset, common::FileInfo* fileInfo, uint64_t& offset); + common::offset_t maxNodeOffset, common::Deserializer& deserializer); std::unique_ptr copy() final { return std::make_unique(*this); diff --git a/src/include/storage/stats/property_statistics.h b/src/include/storage/stats/property_statistics.h index f6985fcb75..2438d858e7 100644 --- a/src/include/storage/stats/property_statistics.h +++ b/src/include/storage/stats/property_statistics.h @@ -17,9 +17,8 @@ class PropertyStatistics { inline bool mayHaveNull() const { return mayHaveNullValue; } PropertyStatistics(PropertyStatistics& other) = default; - void serialize(common::FileInfo* fileInfo, uint64_t& offset); - static std::unique_ptr deserialize( - common::FileInfo* fileInfo, uint64_t& offset); + void serialize(common::Serializer& serializer); + static std::unique_ptr deserialize(common::Deserializer& deserializer); inline void setHasNull() { mayHaveNullValue = true; } diff --git a/src/include/storage/stats/rel_table_statistics.h b/src/include/storage/stats/rel_table_statistics.h index 1bdd75b201..3b678bea0c 100644 --- a/src/include/storage/stats/rel_table_statistics.h +++ b/src/include/storage/stats/rel_table_statistics.h @@ -27,9 +27,9 @@ class RelTableStats : public TableStatistics { inline common::offset_t getNextRelOffset() const { return nextRelOffset; } - void serializeInternal(common::FileInfo* fileInfo, uint64_t& offset) final; + void serializeInternal(common::Serializer& serializer) final; static std::unique_ptr deserialize( - uint64_t numRels, common::table_id_t tableID, common::FileInfo* fileInfo, uint64_t& offset); + uint64_t numRels, common::table_id_t tableID, common::Deserializer& deserializer); inline std::unique_ptr copy() final { return std::make_unique(*this); diff --git a/src/include/storage/stats/table_statistics.h b/src/include/storage/stats/table_statistics.h index 2e78ab14e5..d79c10fc4f 100644 --- a/src/include/storage/stats/table_statistics.h +++ b/src/include/storage/stats/table_statistics.h @@ -10,6 +10,10 @@ namespace kuzu { namespace catalog { class TableSchema; } +namespace common { +class Serializer; +class Deserializer; +} // namespace common namespace storage { class TableStatistics { @@ -38,10 +42,9 @@ class TableStatistics { propertyStatistics[propertyID] = std::make_unique(newStats); } - void serialize(common::FileInfo* fileInfo, uint64_t& offset); - static std::unique_ptr deserialize( - common::FileInfo* fileInfo, uint64_t& offset); - virtual void serializeInternal(common::FileInfo* fileInfo, uint64_t& offset) = 0; + void serialize(common::Serializer& serializer); + static std::unique_ptr deserialize(common::Deserializer& deserializer); + virtual void serializeInternal(common::Serializer& serializer) = 0; virtual std::unique_ptr copy() = 0; diff --git a/src/parser/expression/parsed_case_expression.cpp b/src/parser/expression/parsed_case_expression.cpp index 94a94a31b4..32be138525 100644 --- a/src/parser/expression/parsed_case_expression.cpp +++ b/src/parser/expression/parsed_case_expression.cpp @@ -1,33 +1,34 @@ #include "parser/expression/parsed_case_expression.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" using namespace kuzu::common; namespace kuzu { namespace parser { -void ParsedCaseAlternative::serialize(FileInfo* fileInfo, uint64_t& offset) const { - whenExpression->serialize(fileInfo, offset); - thenExpression->serialize(fileInfo, offset); +void ParsedCaseAlternative::serialize(Serializer& serializer) const { + whenExpression->serialize(serializer); + thenExpression->serialize(serializer); } std::unique_ptr ParsedCaseAlternative::deserialize( - FileInfo* fileInfo, uint64_t& offset) { - auto whenExpression = ParsedExpression::deserialize(fileInfo, offset); - auto thenExpression = ParsedExpression::deserialize(fileInfo, offset); + Deserializer& deserializer) { + auto whenExpression = ParsedExpression::deserialize(deserializer); + auto thenExpression = ParsedExpression::deserialize(deserializer); return std::make_unique( std::move(whenExpression), std::move(thenExpression)); } std::unique_ptr ParsedCaseExpression::deserialize( - FileInfo* fileInfo, uint64_t& offset) { + Deserializer& deserializer) { std::unique_ptr caseExpression; - SerDeser::deserializeOptionalValue(caseExpression, fileInfo, offset); + deserializer.deserializeOptionalValue(caseExpression); std::vector> caseAlternatives; - SerDeser::deserializeVectorOfPtrs(caseAlternatives, fileInfo, offset); + deserializer.deserializeVectorOfPtrs(caseAlternatives); std::unique_ptr elseExpression; - SerDeser::deserializeOptionalValue(elseExpression, fileInfo, offset); + deserializer.deserializeOptionalValue(elseExpression); return std::make_unique( std::move(caseExpression), std::move(caseAlternatives), std::move(elseExpression)); } @@ -43,10 +44,10 @@ std::unique_ptr ParsedCaseExpression::copy() const { elseExpression ? elseExpression->copy() : nullptr); } -void ParsedCaseExpression::serializeInternal(FileInfo* fileInfo, uint64_t& offset) const { - SerDeser::serializeOptionalValue(caseExpression, fileInfo, offset); - SerDeser::serializeVectorOfPtrs(caseAlternatives, fileInfo, offset); - SerDeser::serializeOptionalValue(elseExpression, fileInfo, offset); +void ParsedCaseExpression::serializeInternal(Serializer& serializer) const { + serializer.serializeOptionalValue(caseExpression); + serializer.serializeVectorOfPtrs(caseAlternatives); + serializer.serializeOptionalValue(elseExpression); } } // namespace parser diff --git a/src/parser/expression/parsed_expression.cpp b/src/parser/expression/parsed_expression.cpp index df75f8d099..ce9f069f4d 100644 --- a/src/parser/expression/parsed_expression.cpp +++ b/src/parser/expression/parsed_expression.cpp @@ -1,7 +1,8 @@ #include "parser/expression/parsed_expression.h" #include "common/exception/not_implemented.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" #include "parser/expression/parsed_case_expression.h" #include "parser/expression/parsed_function_expression.h" #include "parser/expression/parsed_literal_expression.h" @@ -37,46 +38,45 @@ parsed_expression_vector ParsedExpression::copyChildren() const { return childrenCopy; } -void ParsedExpression::serialize(FileInfo* fileInfo, uint64_t& offset) const { - SerDeser::serializeValue(type, fileInfo, offset); - SerDeser::serializeValue(alias, fileInfo, offset); - SerDeser::serializeValue(rawName, fileInfo, offset); - SerDeser::serializeVectorOfPtrs(children, fileInfo, offset); - serializeInternal(fileInfo, offset); +void ParsedExpression::serialize(Serializer& serializer) const { + serializer.serializeValue(type); + serializer.serializeValue(alias); + serializer.serializeValue(rawName); + serializer.serializeVectorOfPtrs(children); + serializeInternal(serializer); } -std::unique_ptr ParsedExpression::deserialize( - FileInfo* fileInfo, uint64_t& offset) { +std::unique_ptr ParsedExpression::deserialize(Deserializer& deserializer) { ExpressionType type; std::string alias; std::string rawName; parsed_expression_vector children; - SerDeser::deserializeValue(type, fileInfo, offset); - SerDeser::deserializeValue(alias, fileInfo, offset); - SerDeser::deserializeValue(rawName, fileInfo, offset); - SerDeser::deserializeVectorOfPtrs(children, fileInfo, offset); + deserializer.deserializeValue(type); + deserializer.deserializeValue(alias); + deserializer.deserializeValue(rawName); + deserializer.deserializeVectorOfPtrs(children); std::unique_ptr parsedExpression; switch (type) { case ExpressionType::CASE_ELSE: { - parsedExpression = ParsedCaseExpression::deserialize(fileInfo, offset); + parsedExpression = ParsedCaseExpression::deserialize(deserializer); } break; case ExpressionType::FUNCTION: { - parsedExpression = ParsedFunctionExpression::deserialize(fileInfo, offset); + parsedExpression = ParsedFunctionExpression::deserialize(deserializer); } break; case ExpressionType::LITERAL: { - parsedExpression = ParsedLiteralExpression::deserialize(fileInfo, offset); + parsedExpression = ParsedLiteralExpression::deserialize(deserializer); } break; case ExpressionType::PARAMETER: { - parsedExpression = ParsedParameterExpression::deserialize(fileInfo, offset); + parsedExpression = ParsedParameterExpression::deserialize(deserializer); } break; case ExpressionType::PROPERTY: { - parsedExpression = ParsedPropertyExpression::deserialize(fileInfo, offset); + parsedExpression = ParsedPropertyExpression::deserialize(deserializer); } break; case ExpressionType::EXISTENTIAL_SUBQUERY: { - parsedExpression = ParsedSubqueryExpression::deserialize(fileInfo, offset); + parsedExpression = ParsedSubqueryExpression::deserialize(deserializer); } break; case ExpressionType::VARIABLE: { - parsedExpression = ParsedVariableExpression::deserialize(fileInfo, offset); + parsedExpression = ParsedVariableExpression::deserialize(deserializer); } break; default: throw NotImplementedException{"ParsedExpression::deserialize"}; diff --git a/src/parser/expression/parsed_function_expression.cpp b/src/parser/expression/parsed_function_expression.cpp index be88c7624b..29a2d9629d 100644 --- a/src/parser/expression/parsed_function_expression.cpp +++ b/src/parser/expression/parsed_function_expression.cpp @@ -1,6 +1,7 @@ #include "parser/expression/parsed_function_expression.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" using namespace kuzu::common; @@ -8,17 +9,17 @@ namespace kuzu { namespace parser { std::unique_ptr ParsedFunctionExpression::deserialize( - FileInfo* fileInfo, uint64_t& offset) { + Deserializer& deserializer) { bool isDistinct; - SerDeser::deserializeValue(isDistinct, fileInfo, offset); + deserializer.deserializeValue(isDistinct); std::string functionName; - SerDeser::deserializeValue(functionName, fileInfo, offset); + deserializer.deserializeValue(functionName); return std::make_unique(isDistinct, std::move(functionName)); } -void ParsedFunctionExpression::serializeInternal(FileInfo* fileInfo, uint64_t& offset) const { - SerDeser::serializeValue(isDistinct, fileInfo, offset); - SerDeser::serializeValue(functionName, fileInfo, offset); +void ParsedFunctionExpression::serializeInternal(Serializer& serializer) const { + serializer.serializeValue(isDistinct); + serializer.serializeValue(functionName); } } // namespace parser diff --git a/src/parser/expression/parsed_property_expression.cpp b/src/parser/expression/parsed_property_expression.cpp index bbaa580d82..42bf6f39c3 100644 --- a/src/parser/expression/parsed_property_expression.cpp +++ b/src/parser/expression/parsed_property_expression.cpp @@ -1,6 +1,6 @@ #include "parser/expression/parsed_property_expression.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" using namespace kuzu::common; @@ -8,9 +8,9 @@ namespace kuzu { namespace parser { std::unique_ptr ParsedPropertyExpression::deserialize( - FileInfo* fileInfo, uint64_t& offset) { + Deserializer& deserializer) { std::string propertyName; - SerDeser::deserializeValue(propertyName, fileInfo, offset); + deserializer.deserializeValue(propertyName); return std::make_unique(std::move(propertyName)); } diff --git a/src/parser/expression/parsed_variable_expression.cpp b/src/parser/expression/parsed_variable_expression.cpp index 696d71f446..d2aa5c5146 100644 --- a/src/parser/expression/parsed_variable_expression.cpp +++ b/src/parser/expression/parsed_variable_expression.cpp @@ -1,6 +1,6 @@ #include "parser/expression/parsed_variable_expression.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" using namespace kuzu::common; @@ -8,9 +8,9 @@ namespace kuzu { namespace parser { std::unique_ptr ParsedVariableExpression::deserialize( - FileInfo* fileInfo, uint64_t& offset) { + Deserializer& deserializer) { std::string variableName; - SerDeser::deserializeValue(variableName, fileInfo, offset); + deserializer.deserializeValue(variableName); return std::make_unique(std::move(variableName)); } diff --git a/src/processor/operator/persistent/writer/parquet/CMakeLists.txt b/src/processor/operator/persistent/writer/parquet/CMakeLists.txt index fb8740652c..62b886c4a7 100644 --- a/src/processor/operator/persistent/writer/parquet/CMakeLists.txt +++ b/src/processor/operator/persistent/writer/parquet/CMakeLists.txt @@ -2,7 +2,6 @@ add_library(kuzu_processor_operator_parquet_writer OBJECT basic_column_writer.cpp boolean_column_writer.cpp - buffered_serializer.cpp column_writer.cpp struct_column_writer.cpp string_column_writer.cpp diff --git a/src/processor/operator/persistent/writer/parquet/basic_column_writer.cpp b/src/processor/operator/persistent/writer/parquet/basic_column_writer.cpp index 8ec6836d72..831b0a0ed8 100644 --- a/src/processor/operator/persistent/writer/parquet/basic_column_writer.cpp +++ b/src/processor/operator/persistent/writer/parquet/basic_column_writer.cpp @@ -75,7 +75,8 @@ void BasicColumnWriter::beginWrite(ColumnWriterState& writerState) { hdr.data_page_header.definition_level_encoding = Encoding::RLE; hdr.data_page_header.repetition_level_encoding = Encoding::RLE; - writeInfo.bufferWriter = std::make_unique(); + writeInfo.bufferWriter = std::make_shared(); + writeInfo.writer = std::make_unique(writeInfo.bufferWriter); writeInfo.writeCount = pageInfo.emptyCount; writeInfo.maxWriteCount = pageInfo.rowCount; writeInfo.pageState = initializePageState(state); @@ -101,8 +102,8 @@ void BasicColumnWriter::write( auto writeCount = std::min(remaining, writeInfo.maxWriteCount - writeInfo.writeCount); - writeVector(*writeInfo.bufferWriter, state.statsState.get(), writeInfo.pageState.get(), - vector, offset, offset + writeCount); + writeVector(*writeInfo.writer, state.statsState.get(), writeInfo.pageState.get(), vector, + offset, offset + writeCount); writeInfo.writeCount += writeCount; if (writeInfo.writeCount == writeInfo.maxWriteCount) { @@ -151,8 +152,8 @@ void BasicColumnWriter::finalizeWrite(ColumnWriterState& writerState) { columnChunk.meta_data.total_uncompressed_size = totalUncompressedSize; } -void BasicColumnWriter::writeLevels(BufferedSerializer& bufferedSerializer, - const std::vector& levels, uint64_t maxValue, uint64_t startOffset, uint64_t count) { +void BasicColumnWriter::writeLevels(Serializer& serializer, const std::vector& levels, + uint64_t maxValue, uint64_t startOffset, uint64_t count) { if (levels.empty() || count == 0) { return; } @@ -168,12 +169,12 @@ void BasicColumnWriter::writeLevels(BufferedSerializer& bufferedSerializer, rleEncoder.finishPrepare(); // Start off by writing the byte count as a uint32_t. - bufferedSerializer.write(rleEncoder.getByteCount()); - rleEncoder.beginWrite(bufferedSerializer, levels[startOffset]); + serializer.write(rleEncoder.getByteCount()); + rleEncoder.beginWrite(serializer, levels[startOffset]); for (auto i = startOffset + 1; i < startOffset + count; i++) { - rleEncoder.writeValue(bufferedSerializer, levels[i]); + rleEncoder.writeValue(serializer, levels[i]); } - rleEncoder.finishWrite(bufferedSerializer); + rleEncoder.finishWrite(serializer); } void BasicColumnWriter::nextPage(BasicColumnWriterState& state) { @@ -190,12 +191,12 @@ void BasicColumnWriter::nextPage(BasicColumnWriterState& state) { state.currentPage++; // write the repetition levels - writeLevels(*writeInfo.bufferWriter, state.repetitionLevels, maxRepeat, pageInfo.offset, - pageInfo.rowCount); + writeLevels( + *writeInfo.writer, state.repetitionLevels, maxRepeat, pageInfo.offset, pageInfo.rowCount); // write the definition levels - writeLevels(*writeInfo.bufferWriter, state.definitionLevels, maxDefine, pageInfo.offset, - pageInfo.rowCount); + writeLevels( + *writeInfo.writer, state.definitionLevels, maxDefine, pageInfo.offset, pageInfo.rowCount); } void BasicColumnWriter::flushPage(BasicColumnWriterState& state) { @@ -209,7 +210,7 @@ void BasicColumnWriter::flushPage(BasicColumnWriterState& state) { auto& bufferedWriter = *writeInfo.bufferWriter; auto& hdr = writeInfo.pageHeader; - flushPageState(bufferedWriter, writeInfo.pageState.get()); + flushPageState(*writeInfo.writer, writeInfo.pageState.get()); // now that we have finished writing the data we know the uncompressed size if (bufferedWriter.getSize() > uint64_t(function::NumericLimits::maximum())) { @@ -251,6 +252,7 @@ void BasicColumnWriter::writeDictionary(BasicColumnWriterState& state, hdr.dictionary_page_header.num_values = rowCount; writeInfo.bufferWriter = std::move(bufferedSerializer); + writeInfo.writer = std::make_unique(writeInfo.bufferWriter); writeInfo.writeCount = 0; writeInfo.maxWriteCount = 0; diff --git a/src/processor/operator/persistent/writer/parquet/boolean_column_writer.cpp b/src/processor/operator/persistent/writer/parquet/boolean_column_writer.cpp index fcf383db93..2565bc4789 100644 --- a/src/processor/operator/persistent/writer/parquet/boolean_column_writer.cpp +++ b/src/processor/operator/persistent/writer/parquet/boolean_column_writer.cpp @@ -1,9 +1,11 @@ #include "processor/operator/persistent/writer/parquet/boolean_column_writer.h" +#include "common/serializer/serializer.h" + namespace kuzu { namespace processor { -void BooleanColumnWriter::writeVector(BufferedSerializer& temp_writer, +void BooleanColumnWriter::writeVector(common::Serializer& temp_writer, ColumnWriterStatistics* writerStatistics, ColumnWriterPageState* writerPageState, common::ValueVector* vector, uint64_t chunkStart, uint64_t chunkEnd) { auto stats = reinterpret_cast(writerStatistics); @@ -30,7 +32,7 @@ void BooleanColumnWriter::writeVector(BufferedSerializer& temp_writer, } void BooleanColumnWriter::flushPageState( - BufferedSerializer& temp_writer, ColumnWriterPageState* writerPageState) { + common::Serializer& temp_writer, ColumnWriterPageState* writerPageState) { auto state = reinterpret_cast(writerPageState); if (state->bytePos > 0) { temp_writer.write(state->byte); diff --git a/src/processor/operator/persistent/writer/parquet/column_writer.cpp b/src/processor/operator/persistent/writer/parquet/column_writer.cpp index 9d5c07aa4d..c22d18a5c4 100644 --- a/src/processor/operator/persistent/writer/parquet/column_writer.cpp +++ b/src/processor/operator/persistent/writer/parquet/column_writer.cpp @@ -194,8 +194,8 @@ void ColumnWriter::handleDefineLevels(ColumnWriterState& state, ColumnWriterStat } } -void ColumnWriter::compressPage(BufferedSerializer& bufferedSerializer, size_t& compressedSize, - uint8_t*& compressedData, std::unique_ptr& compressedBuf) { +void ColumnWriter::compressPage(common::BufferedSerializer& bufferedSerializer, + size_t& compressedSize, uint8_t*& compressedData, std::unique_ptr& compressedBuf) { switch (writer.getCodec()) { case CompressionCodec::UNCOMPRESSED: { compressedSize = bufferedSerializer.getSize(); diff --git a/src/processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.cpp b/src/processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.cpp index d99afce209..24866e5123 100644 --- a/src/processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.cpp +++ b/src/processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.cpp @@ -7,7 +7,7 @@ namespace kuzu { namespace processor { -static void varintEncode(uint32_t val, BufferedSerializer& ser) { +static void varintEncode(uint32_t val, common::Serializer& ser) { do { uint8_t byte = val & 127; val >>= 7; @@ -64,13 +64,13 @@ uint64_t RleBpEncoder::getByteCount() { return byteCount; } -void RleBpEncoder::beginWrite(BufferedSerializer& writer, uint32_t first_value) { +void RleBpEncoder::beginWrite(common::Serializer& writer, uint32_t first_value) { // start the RLE runs lastValue = first_value; currentRunCount = 1; } -void RleBpEncoder::writeRun(BufferedSerializer& writer) { +void RleBpEncoder::writeRun(common::Serializer& writer) { // write the header of the run varintEncode(currentRunCount << 1, writer); // now write the value @@ -96,7 +96,7 @@ void RleBpEncoder::writeRun(BufferedSerializer& writer) { currentRunCount = 1; } -void RleBpEncoder::writeValue(BufferedSerializer& writer, uint32_t value) { +void RleBpEncoder::writeValue(common::Serializer& writer, uint32_t value) { if (value != lastValue) { writeRun(writer); lastValue = value; @@ -105,7 +105,7 @@ void RleBpEncoder::writeValue(BufferedSerializer& writer, uint32_t value) { } } -void RleBpEncoder::finishWrite(BufferedSerializer& writer) { +void RleBpEncoder::finishWrite(common::Serializer& writer) { writeRun(writer); } diff --git a/src/processor/operator/persistent/writer/parquet/string_column_writer.cpp b/src/processor/operator/persistent/writer/parquet/string_column_writer.cpp index c099cb3bd7..04a84b248a 100644 --- a/src/processor/operator/persistent/writer/parquet/string_column_writer.cpp +++ b/src/processor/operator/persistent/writer/parquet/string_column_writer.cpp @@ -115,7 +115,7 @@ void StringColumnWriter::finalizeAnalyze(ColumnWriterState& writerState) { } } -void StringColumnWriter::writeVector(BufferedSerializer& bufferedSerializer, +void StringColumnWriter::writeVector(common::Serializer& serializer, ColumnWriterStatistics* statsToWrite, ColumnWriterPageState* writerPageState, ValueVector* vector, uint64_t chunkStart, uint64_t chunkEnd) { auto pageState = reinterpret_cast(writerPageState); @@ -131,12 +131,12 @@ void StringColumnWriter::writeVector(BufferedSerializer& bufferedSerializer, auto value_index = pageState->dictionary.at(vector->getValue(pos)); if (!pageState->writtenValue) { // Write the bit-width as a one-byte entry. - bufferedSerializer.write(pageState->bitWidth); + serializer.write(pageState->bitWidth); // Now begin writing the actual value. - pageState->encoder.beginWrite(bufferedSerializer, value_index); + pageState->encoder.beginWrite(serializer, value_index); pageState->writtenValue = true; } else { - pageState->encoder.writeValue(bufferedSerializer, value_index); + pageState->encoder.writeValue(serializer, value_index); } } } else { @@ -147,23 +147,23 @@ void StringColumnWriter::writeVector(BufferedSerializer& bufferedSerializer, } auto& str = vector->getValue(pos); stats->update(str); - bufferedSerializer.write(str.len); - bufferedSerializer.writeData(str.getData(), str.len); + serializer.write(str.len); + serializer.write(str.getData(), str.len); } } } void StringColumnWriter::flushPageState( - BufferedSerializer& bufferedSerializer, ColumnWriterPageState* writerPageState) { + common::Serializer& serializer, ColumnWriterPageState* writerPageState) { auto pageState = reinterpret_cast(writerPageState); if (pageState->bitWidth != 0) { if (!pageState->writtenValue) { // all values are null // just write the bit width - bufferedSerializer.write(pageState->bitWidth); + serializer.write(pageState->bitWidth); return; } - pageState->encoder.finishWrite(bufferedSerializer); + pageState->encoder.finishWrite(serializer); } } @@ -181,14 +181,14 @@ void StringColumnWriter::flushDictionary( values[entry.second] = entry.first; } // First write the contents of the dictionary page to a temporary buffer. - auto bufferedSerializer = std::make_unique(); + auto bufferedSerializer = std::make_unique(); for (auto r = 0u; r < values.size(); r++) { auto& value = values[r]; // Update the statistics. stats->update(value); // Write this string value to the dictionary. bufferedSerializer->write(value.len); - bufferedSerializer->writeData(value.getData(), value.len); + bufferedSerializer->write(value.getData(), value.len); } // Flush the dictionary page and add it to the to-be-written pages. writeDictionary(state, std::move(bufferedSerializer), values.size()); diff --git a/src/storage/stats/metadata_dah_info.cpp b/src/storage/stats/metadata_dah_info.cpp index 511d88a3ae..bce07419df 100644 --- a/src/storage/stats/metadata_dah_info.cpp +++ b/src/storage/stats/metadata_dah_info.cpp @@ -1,6 +1,7 @@ #include "storage/stats/metadata_dah_info.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" using namespace kuzu::common; @@ -16,18 +17,17 @@ std::unique_ptr MetadataDAHInfo::copy() { return result; } -void MetadataDAHInfo::serialize(FileInfo* fileInfo, uint64_t& offset) const { - SerDeser::serializeValue(dataDAHPageIdx, fileInfo, offset); - SerDeser::serializeValue(nullDAHPageIdx, fileInfo, offset); - SerDeser::serializeVectorOfPtrs(childrenInfos, fileInfo, offset); +void MetadataDAHInfo::serialize(Serializer& serializer) const { + serializer.serializeValue(dataDAHPageIdx); + serializer.serializeValue(nullDAHPageIdx); + serializer.serializeVectorOfPtrs(childrenInfos); } -std::unique_ptr MetadataDAHInfo::deserialize( - FileInfo* fileInfo, uint64_t& offset) { +std::unique_ptr MetadataDAHInfo::deserialize(Deserializer& deserializer) { auto metadataDAHInfo = std::make_unique(); - SerDeser::deserializeValue(metadataDAHInfo->dataDAHPageIdx, fileInfo, offset); - SerDeser::deserializeValue(metadataDAHInfo->nullDAHPageIdx, fileInfo, offset); - SerDeser::deserializeVectorOfPtrs(metadataDAHInfo->childrenInfos, fileInfo, offset); + deserializer.deserializeValue(metadataDAHInfo->dataDAHPageIdx); + deserializer.deserializeValue(metadataDAHInfo->nullDAHPageIdx); + deserializer.deserializeVectorOfPtrs(metadataDAHInfo->childrenInfos); return metadataDAHInfo; } diff --git a/src/storage/stats/node_table_statistics.cpp b/src/storage/stats/node_table_statistics.cpp index f290d5f0ae..00c1461ab2 100644 --- a/src/storage/stats/node_table_statistics.cpp +++ b/src/storage/stats/node_table_statistics.cpp @@ -1,7 +1,9 @@ #include "storage/stats/node_table_statistics.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" #include "common/string_format.h" +#include "common/string_utils.h" #include "storage/stats/table_statistics_collection.h" using namespace kuzu::common; @@ -148,17 +150,17 @@ std::vector NodeTableStatsAndDeletedIDs::getDeletedNodeOffsets() const return retVal; } -void NodeTableStatsAndDeletedIDs::serializeInternal(FileInfo* fileInfo, uint64_t& offset) { - SerDeser::serializeVector(getDeletedNodeOffsets(), fileInfo, offset); - SerDeser::serializeVectorOfPtrs(metadataDAHInfos, fileInfo, offset); +void NodeTableStatsAndDeletedIDs::serializeInternal(Serializer& serializer) { + serializer.serializeVector(getDeletedNodeOffsets()); + serializer.serializeVectorOfPtrs(metadataDAHInfos); } std::unique_ptr NodeTableStatsAndDeletedIDs::deserialize( - table_id_t tableID, offset_t maxNodeOffset, FileInfo* fileInfo, uint64_t& offset) { + table_id_t tableID, offset_t maxNodeOffset, Deserializer& deserializer) { std::vector deletedNodeOffsets; std::vector> metadataDAHInfos; - SerDeser::deserializeVector(deletedNodeOffsets, fileInfo, offset); - SerDeser::deserializeVectorOfPtrs(metadataDAHInfos, fileInfo, offset); + deserializer.deserializeVector(deletedNodeOffsets); + deserializer.deserializeVectorOfPtrs(metadataDAHInfos); auto result = std::make_unique(tableID, maxNodeOffset, deletedNodeOffsets); result->metadataDAHInfos = std::move(metadataDAHInfos); diff --git a/src/storage/stats/property_statistics.cpp b/src/storage/stats/property_statistics.cpp index ff02316dd9..9535eedd38 100644 --- a/src/storage/stats/property_statistics.cpp +++ b/src/storage/stats/property_statistics.cpp @@ -1,13 +1,14 @@ #include "storage/stats/property_statistics.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" #include "storage/stats/table_statistics_collection.h" namespace kuzu { namespace storage { -void PropertyStatistics::serialize(common::FileInfo* fileInfo, uint64_t& offset) { - common::SerDeser::serializeValue(mayHaveNullValue, fileInfo, offset); +void PropertyStatistics::serialize(common::Serializer& serializer) { + serializer.serializeValue(mayHaveNullValue); } RWPropertyStats::RWPropertyStats(TablesStatistics* tablesStatistics, common::table_id_t tableID, @@ -15,9 +16,9 @@ RWPropertyStats::RWPropertyStats(TablesStatistics* tablesStatistics, common::tab : tablesStatistics{tablesStatistics}, tableID{tableID}, propertyID{propertyID} {} std::unique_ptr PropertyStatistics::deserialize( - common::FileInfo* fileInfo, uint64_t& offset) { + common::Deserializer& deserializer) { bool hasNull; - common::SerDeser::deserializeValue(hasNull, fileInfo, offset); + deserializer.deserializeValue(hasNull); return std::make_unique(hasNull); } diff --git a/src/storage/stats/rel_table_statistics.cpp b/src/storage/stats/rel_table_statistics.cpp index 0e8b820170..0ea76c7b67 100644 --- a/src/storage/stats/rel_table_statistics.cpp +++ b/src/storage/stats/rel_table_statistics.cpp @@ -1,20 +1,21 @@ #include "storage/stats/rel_table_statistics.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" using namespace kuzu::common; namespace kuzu { namespace storage { -void RelTableStats::serializeInternal(FileInfo* fileInfo, uint64_t& offset) { - SerDeser::serializeValue(nextRelOffset, fileInfo, offset); +void RelTableStats::serializeInternal(Serializer& serializer) { + serializer.serializeValue(nextRelOffset); } std::unique_ptr RelTableStats::deserialize( - uint64_t numRels, common::table_id_t tableID, FileInfo* fileInfo, uint64_t& offset) { - common::offset_t nextRelOffset; - SerDeser::deserializeValue(nextRelOffset, fileInfo, offset); + uint64_t numRels, table_id_t tableID, Deserializer& deserializer) { + offset_t nextRelOffset; + deserializer.deserializeValue(nextRelOffset); return std::make_unique(numRels, tableID, nextRelOffset); } diff --git a/src/storage/stats/table_statistics.cpp b/src/storage/stats/table_statistics.cpp index 4e3ea9a272..4a12251cdb 100644 --- a/src/storage/stats/table_statistics.cpp +++ b/src/storage/stats/table_statistics.cpp @@ -1,7 +1,8 @@ #include "storage/stats/table_statistics.h" #include "catalog/table_schema.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" #include "storage/stats/node_table_statistics.h" #include "storage/stats/rel_table_statistics.h" @@ -34,33 +35,31 @@ TableStatistics::TableStatistics(const TableStatistics& other) } } -void TableStatistics::serialize(FileInfo* fileInfo, uint64_t& offset) { - SerDeser::serializeValue(tableType, fileInfo, offset); - SerDeser::serializeValue(numTuples, fileInfo, offset); - SerDeser::serializeValue(tableID, fileInfo, offset); - SerDeser::serializeUnorderedMap(propertyStatistics, fileInfo, offset); - serializeInternal(fileInfo, offset); +void TableStatistics::serialize(Serializer& serializer) { + serializer.serializeValue(tableType); + serializer.serializeValue(numTuples); + serializer.serializeValue(tableID); + serializer.serializeUnorderedMap(propertyStatistics); + serializeInternal(serializer); } -std::unique_ptr TableStatistics::deserialize( - FileInfo* fileInfo, uint64_t& offset) { +std::unique_ptr TableStatistics::deserialize(Deserializer& deserializer) { TableType tableType; uint64_t numTuples; table_id_t tableID; std::unordered_map> propertyStatistics; - SerDeser::deserializeValue(tableType, fileInfo, offset); - SerDeser::deserializeValue(numTuples, fileInfo, offset); - SerDeser::deserializeValue(tableID, fileInfo, offset); - SerDeser::deserializeUnorderedMap(propertyStatistics, fileInfo, offset); + deserializer.deserializeValue(tableType); + deserializer.deserializeValue(numTuples); + deserializer.deserializeValue(tableID); + deserializer.deserializeUnorderedMap(propertyStatistics); std::unique_ptr result; switch (tableType) { case TableType::NODE: { result = NodeTableStatsAndDeletedIDs::deserialize(tableID, - NodeTableStatsAndDeletedIDs::getMaxNodeOffsetFromNumTuples(numTuples), fileInfo, - offset); + NodeTableStatsAndDeletedIDs::getMaxNodeOffsetFromNumTuples(numTuples), deserializer); } break; case TableType::REL: { - result = RelTableStats::deserialize(numTuples, tableID, fileInfo, offset); + result = RelTableStats::deserialize(numTuples, tableID, deserializer); } break; // LCOV_EXCL_START default: { diff --git a/src/storage/stats/table_statistics_collection.cpp b/src/storage/stats/table_statistics_collection.cpp index 86b40867ed..77742df2ff 100644 --- a/src/storage/stats/table_statistics_collection.cpp +++ b/src/storage/stats/table_statistics_collection.cpp @@ -1,6 +1,8 @@ #include "storage/stats/table_statistics_collection.h" -#include "common/ser_deser.h" +#include "common/serializer/buffered_file.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" #include "storage/storage_structure/disk_array.h" #include "storage/store/column_chunk.h" @@ -19,23 +21,21 @@ void TablesStatistics::readFromFile(const std::string& directory) { void TablesStatistics::readFromFile(const std::string& directory, DBFileType dbFileType) { auto filePath = getTableStatisticsFilePath(directory, dbFileType); - auto fileInfo = FileUtils::openFile(filePath, O_RDONLY); - uint64_t offset = 0; - SerDeser::deserializeUnorderedMap( - tablesStatisticsContentForReadOnlyTrx->tableStatisticPerTable, fileInfo.get(), offset); + auto deser = + Deserializer(std::make_unique(FileUtils::openFile(filePath, O_RDONLY))); + deser.deserializeUnorderedMap(tablesStatisticsContentForReadOnlyTrx->tableStatisticPerTable); } void TablesStatistics::saveToFile(const std::string& directory, DBFileType dbFileType, transaction::TransactionType transactionType) { auto filePath = getTableStatisticsFilePath(directory, dbFileType); - auto fileInfo = FileUtils::openFile(filePath, O_WRONLY | O_CREAT); - uint64_t offset = 0; + auto ser = Serializer( + std::make_unique(FileUtils::openFile(filePath, O_WRONLY | O_CREAT))); auto& tablesStatisticsContent = (transactionType == transaction::TransactionType::READ_ONLY || tablesStatisticsContentForWriteTrx == nullptr) ? tablesStatisticsContentForReadOnlyTrx : tablesStatisticsContentForWriteTrx; - SerDeser::serializeUnorderedMap( - tablesStatisticsContent->tableStatisticPerTable, fileInfo.get(), offset); + ser.serializeUnorderedMap(tablesStatisticsContent->tableStatisticPerTable); } void TablesStatistics::initTableStatisticsForWriteTrx() {