diff --git a/src/catalog/catalog.cpp b/src/catalog/catalog.cpp index 562806c9b09..2b80211bc25 100644 --- a/src/catalog/catalog.cpp +++ b/src/catalog/catalog.cpp @@ -3,7 +3,6 @@ #include "catalog/node_table_schema.h" #include "catalog/rel_table_group_schema.h" #include "catalog/rel_table_schema.h" -#include "common/ser_deser.h" #include "storage/wal/wal.h" #include "transaction/transaction_action.h" diff --git a/src/catalog/catalog_content.cpp b/src/catalog/catalog_content.cpp index c12b24e3e0b..37e9a625178 100644 --- a/src/catalog/catalog_content.cpp +++ b/src/catalog/catalog_content.cpp @@ -6,7 +6,9 @@ #include "catalog/rel_table_schema.h" #include "common/exception/catalog.h" #include "common/exception/runtime.h" -#include "common/ser_deser.h" +#include "common/serializer/buffered_file.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" #include "common/string_utils.h" #include "storage/storage_utils.h" diff --git a/src/catalog/node_table_schema.cpp b/src/catalog/node_table_schema.cpp index 0d7fd33d2dc..211e4b1fc9d 100644 --- a/src/catalog/node_table_schema.cpp +++ b/src/catalog/node_table_schema.cpp @@ -1,6 +1,7 @@ #include "catalog/node_table_schema.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" using namespace kuzu::common; diff --git a/src/catalog/property.cpp b/src/catalog/property.cpp index cf7e99eb846..59f4a6672bc 100644 --- a/src/catalog/property.cpp +++ b/src/catalog/property.cpp @@ -1,6 +1,7 @@ #include "catalog/property.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" using namespace kuzu::common; diff --git a/src/catalog/rdf_graph_schema.cpp b/src/catalog/rdf_graph_schema.cpp index 79dd15b3072..ad15562fd81 100644 --- a/src/catalog/rdf_graph_schema.cpp +++ b/src/catalog/rdf_graph_schema.cpp @@ -1,6 +1,7 @@ #include "catalog/rdf_graph_schema.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" using namespace kuzu::common; diff --git a/src/catalog/rel_table_group_schema.cpp b/src/catalog/rel_table_group_schema.cpp index bfc98488c12..1f7a319f966 100644 --- a/src/catalog/rel_table_group_schema.cpp +++ b/src/catalog/rel_table_group_schema.cpp @@ -1,6 +1,7 @@ #include "catalog/rel_table_group_schema.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" using namespace kuzu::common; diff --git a/src/catalog/rel_table_schema.cpp b/src/catalog/rel_table_schema.cpp index 186e8ad13f6..3a876ada6a6 100644 --- a/src/catalog/rel_table_schema.cpp +++ b/src/catalog/rel_table_schema.cpp @@ -1,7 +1,8 @@ #include "catalog/rel_table_schema.h" #include "common/exception/catalog.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" using namespace kuzu::common; diff --git a/src/catalog/table_schema.cpp b/src/catalog/table_schema.cpp index beda619bbc1..3a074658cc8 100644 --- a/src/catalog/table_schema.cpp +++ b/src/catalog/table_schema.cpp @@ -8,7 +8,8 @@ #include "common/exception/internal.h" #include "common/exception/not_implemented.h" #include "common/exception/runtime.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" #include "common/string_utils.h" using namespace kuzu::common; diff --git a/src/common/CMakeLists.txt b/src/common/CMakeLists.txt index 923437dfd21..d78bc7a00bc 100644 --- a/src/common/CMakeLists.txt +++ b/src/common/CMakeLists.txt @@ -2,6 +2,7 @@ add_subdirectory(arrow) add_subdirectory(copier_config) add_subdirectory(data_chunk) add_subdirectory(exception) +add_subdirectory(serializer) add_subdirectory(task_system) add_subdirectory(types) add_subdirectory(vector) @@ -20,8 +21,7 @@ add_library(kuzu_common type_utils.cpp utils.cpp string_utils.cpp - table_type.cpp - ser_deser.cpp) + table_type.cpp) target_link_libraries(kuzu_common Glob) diff --git a/src/common/ser_deser.cpp b/src/common/ser_deser.cpp deleted file mode 100644 index e5d63be68f6..00000000000 --- a/src/common/ser_deser.cpp +++ /dev/null @@ -1,22 +0,0 @@ -#include "common/ser_deser.h" - -namespace kuzu { -namespace common { - -template<> -void Serializer::serializeValue(const std::string& value) { - uint64_t valueLength = value.length(); - writer->write((uint8_t*)&valueLength, sizeof(uint64_t)); - writer->write((uint8_t*)value.data(), valueLength); -} - -template<> -void Deserializer::deserializeValue(std::string& value) { - uint64_t valueLength = 0; - deserializeValue(valueLength); - value.resize(valueLength); - reader->read((uint8_t*)value.data(), valueLength); -} - -} // namespace common -} // namespace kuzu diff --git a/src/common/serializer/CMakeLists.txt b/src/common/serializer/CMakeLists.txt new file mode 100644 index 00000000000..76774aa4b42 --- /dev/null +++ b/src/common/serializer/CMakeLists.txt @@ -0,0 +1,10 @@ +add_library(kuzu_common_serializer + OBJECT + serializer.cpp + deserializer.cpp + buffered_file.cpp + buffered_serializer.cpp) + +set(ALL_OBJECT_FILES + ${ALL_OBJECT_FILES} $ + PARENT_SCOPE) diff --git a/src/common/serializer/buffered_file.cpp b/src/common/serializer/buffered_file.cpp new file mode 100644 index 00000000000..eb1214cfc70 --- /dev/null +++ b/src/common/serializer/buffered_file.cpp @@ -0,0 +1,21 @@ +#include "common/serializer/buffered_file.h" + +#include "common/file_utils.h" + +namespace kuzu { +namespace common { +void BufferedFileWriter::flush() { + FileUtils::writeToFile(fileInfo.get(), buffer.get(), bufferOffset, fileOffset); + fileOffset += bufferOffset; + bufferOffset = 0; + memset(buffer.get(), 0, BUFFER_SIZE); +} + +void BufferedFileReader::readNextPage() { + FileUtils::readFromFile(fileInfo.get(), buffer.get(), BUFFER_SIZE, fileOffset); + fileOffset += BUFFER_SIZE; + bufferOffset = 0; +} + +} // namespace common +} // namespace kuzu diff --git a/src/processor/operator/persistent/writer/parquet/buffered_serializer.cpp b/src/common/serializer/buffered_serializer.cpp similarity index 79% rename from src/processor/operator/persistent/writer/parquet/buffered_serializer.cpp rename to src/common/serializer/buffered_serializer.cpp index 7bf87a79eb8..0ea7ad548d2 100644 --- a/src/processor/operator/persistent/writer/parquet/buffered_serializer.cpp +++ b/src/common/serializer/buffered_serializer.cpp @@ -1,9 +1,9 @@ -#include "processor/operator/persistent/writer/parquet/buffered_serializer.h" +#include "common/serializer/buffered_serializer.h" #include namespace kuzu { -namespace processor { +namespace common { BufferedSerializer::BufferedSerializer(uint64_t maximum_size) : BufferedSerializer(std::make_unique(maximum_size), maximum_size) {} @@ -14,7 +14,7 @@ BufferedSerializer::BufferedSerializer(std::unique_ptr data, uint64_t blob.data = std::move(data); } -void BufferedSerializer::writeData(const uint8_t* buffer, uint64_t len) { +void BufferedSerializer::write(const uint8_t* buffer, uint64_t len) { if (blob.size + len >= maximumSize) { do { maximumSize *= 2; @@ -29,5 +29,5 @@ void BufferedSerializer::writeData(const uint8_t* buffer, uint64_t len) { blob.size += len; } -} // namespace processor +} // namespace common } // namespace kuzu diff --git a/src/common/serializer/deserializer.cpp b/src/common/serializer/deserializer.cpp new file mode 100644 index 00000000000..d4d08231bcf --- /dev/null +++ b/src/common/serializer/deserializer.cpp @@ -0,0 +1,15 @@ +#include "common/serializer/deserializer.h" + +namespace kuzu { +namespace common { + +template<> +void Deserializer::deserializeValue(std::string& value) { + uint64_t valueLength = 0; + deserializeValue(valueLength); + value.resize(valueLength); + reader->read((uint8_t*)value.data(), valueLength); +} + +} // namespace common +} // namespace kuzu diff --git a/src/common/serializer/file_writer.cpp b/src/common/serializer/file_writer.cpp new file mode 100644 index 00000000000..6c302ea2c04 --- /dev/null +++ b/src/common/serializer/file_writer.cpp @@ -0,0 +1,21 @@ +#include "common/file_utils.h" +#include "common/serializer/buffered_file.h" + +namespace kuzu { +namespace common { + +void BufferedFileWriter::flush() { + FileUtils::writeToFile(fileInfo.get(), buffer.get(), bufferOffset, fileOffset); + fileOffset += bufferOffset; + bufferOffset = 0; + memset(buffer.get(), 0, BUFFER_SIZE); +} + +void BufferedFileReader::readNextPage() { + FileUtils::readFromFile(fileInfo.get(), buffer.get(), BUFFER_SIZE, fileOffset); + fileOffset += BUFFER_SIZE; + bufferOffset = 0; +} + +} // namespace common +} // namespace kuzu diff --git a/src/common/serializer/serializer.cpp b/src/common/serializer/serializer.cpp new file mode 100644 index 00000000000..e2e2d708240 --- /dev/null +++ b/src/common/serializer/serializer.cpp @@ -0,0 +1,14 @@ +#include "common/serializer/serializer.h" + +namespace kuzu { +namespace common { + +template<> +void Serializer::serializeValue(const std::string& value) { + uint64_t valueLength = value.length(); + writer->write((uint8_t*)&valueLength, sizeof(uint64_t)); + writer->write((uint8_t*)value.data(), valueLength); +} + +} // namespace common +} // namespace kuzu diff --git a/src/common/types/types.cpp b/src/common/types/types.cpp index e9a1822d4a5..c02a7d65b94 100644 --- a/src/common/types/types.cpp +++ b/src/common/types/types.cpp @@ -6,7 +6,8 @@ #include "common/exception/binder.h" #include "common/exception/not_implemented.h" #include "common/null_buffer.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" #include "common/string_utils.h" #include "common/types/interval_t.h" #include "common/types/ku_list.h" diff --git a/src/common/types/value/value.cpp b/src/common/types/value/value.cpp index ab5cc862966..d3dddb7211f 100644 --- a/src/common/types/value/value.cpp +++ b/src/common/types/value/value.cpp @@ -1,7 +1,8 @@ #include "common/types/value/value.h" #include "common/null_buffer.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" #include "common/types/blob.h" #include "storage/storage_utils.h" diff --git a/src/function/scalar_macro_function.cpp b/src/function/scalar_macro_function.cpp index c2001ae02b8..80529713f6e 100644 --- a/src/function/scalar_macro_function.cpp +++ b/src/function/scalar_macro_function.cpp @@ -1,6 +1,7 @@ #include "function/scalar_macro_function.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" using namespace kuzu::common; using namespace kuzu::parser; diff --git a/src/include/common/ser_deser.h b/src/include/common/ser_deser.h deleted file mode 100644 index 0df9cfdcbf8..00000000000 --- a/src/include/common/ser_deser.h +++ /dev/null @@ -1,238 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include - -#include "common/file_utils.h" - -namespace kuzu { -namespace common { - -class LogicalType; - -class Writer { -public: - virtual void write(uint8_t* data, uint64_t size) = 0; - virtual ~Writer(){}; -}; - -class Reader { -public: - virtual void read(uint8_t* data, uint64_t size) = 0; - virtual ~Reader(){}; -}; - -// Serializes values to disk via a 4KB buffer -class Serializer { -public: - explicit Serializer(std::unique_ptr writer) : writer(std::move(writer)) {} - - template - void serializeValue(const T& value) { - static_assert(std::is_trivially_destructible(), "value must be a trivial type"); - writer->write((uint8_t*)&value, sizeof(T)); - } - - template - void serializeOptionalValue(const std::unique_ptr& value) { - serializeValue(value == nullptr); - if (value != nullptr) { - value->serialize(*this); - } - } - - template - void serializeUnorderedMap(const std::unordered_map>& values) { - uint64_t mapSize = values.size(); - serializeValue(mapSize); - for (auto& value : values) { - serializeValue(value.first); - value.second->serialize(*this); - } - } - - template - void serializeVector(const std::vector& values) { - uint64_t vectorSize = values.size(); - serializeValue(vectorSize); - for (auto& value : values) { - serializeValue(value); - } - } - - template - void serializeVectorOfPtrs(const std::vector>& values) { - uint64_t vectorSize = values.size(); - serializeValue(vectorSize); - for (auto& value : values) { - value->serialize(*this); - } - } - - template - void serializeUnorderedSet(const std::unordered_set& values) { - uint64_t setSize = values.size(); - serializeValue(setSize); - for (const auto& value : values) { - serializeValue(value); - } - } - -private: - std::unique_ptr writer; -}; - -class BufferedFileWriter : public Writer { -public: - ~BufferedFileWriter() { flush(); } - explicit BufferedFileWriter(std::unique_ptr fileInfo) - : buffer(std::make_unique(BUFFER_SIZE)), fileOffset(0), bufferOffset(0), - fileInfo(std::move(fileInfo)) {} - - void write(uint8_t* data, uint64_t size) final { - if (bufferOffset + size <= BUFFER_SIZE) { - memcpy(&buffer[bufferOffset], data, size); - bufferOffset += size; - } else { - auto toCopy = BUFFER_SIZE - bufferOffset; - memcpy(&buffer[bufferOffset], data, toCopy); - bufferOffset += toCopy; - flush(); - auto remaining = size - toCopy; - memcpy(buffer.get(), data + toCopy, remaining); - bufferOffset += remaining; - } - } - -protected: - std::unique_ptr buffer; - uint64_t fileOffset, bufferOffset; - std::unique_ptr fileInfo; - static const uint64_t BUFFER_SIZE = 4096; - - void flush() { - FileUtils::writeToFile(fileInfo.get(), buffer.get(), bufferOffset, fileOffset); - fileOffset += bufferOffset; - bufferOffset = 0; - memset(buffer.get(), 0, BUFFER_SIZE); - } -}; - -// TODO: Move BufferedSerializer from parquet to become an in-memory serializer - -// De-serializes values from disk via a 4KB buffer -class Deserializer { -public: - explicit Deserializer(std::unique_ptr reader) : reader(std::move(reader)) {} - - template - void deserializeValue(T& value) { - static_assert(std::is_trivially_destructible(), "value must be a trivial type"); - reader->read((uint8_t*)&value, sizeof(T)); - } - - template - void deserializeOptionalValue(std::unique_ptr& value) { - bool isNull; - deserializeValue(isNull); - if (!isNull) { - value = T::deserialize(*this); - } - } - - template - void deserializeUnorderedMap(std::unordered_map>& values) { - uint64_t mapSize; - deserializeValue(mapSize); - values.reserve(mapSize); - for (auto i = 0u; i < mapSize; i++) { - T1 key; - deserializeValue(key); - auto val = T2::deserialize(*this); - values.emplace(key, std::move(val)); - } - } - - template - void deserializeVector(std::vector& values) { - uint64_t vectorSize; - deserializeValue(vectorSize); - values.resize(vectorSize); - for (auto& value : values) { - deserializeValue(value); - } - } - - template - void deserializeVectorOfPtrs(std::vector>& values) { - uint64_t vectorSize; - deserializeValue(vectorSize); - values.reserve(vectorSize); - for (auto i = 0u; i < vectorSize; i++) { - values.push_back(T::deserialize(*this)); - } - } - - template - void deserializeUnorderedSet(std::unordered_set& values) { - uint64_t setSize; - deserializeValue(setSize); - for (auto i = 0u; i < setSize; i++) { - T value; - deserializeValue(value); - values.insert(value); - } - } - -private: - std::unique_ptr reader; -}; - -class BufferedFileReader : public Reader { -public: - explicit BufferedFileReader(std::unique_ptr fileInfo) - : buffer(std::make_unique(BUFFER_SIZE)), fileOffset(0), bufferOffset(0), - fileInfo(std::move(fileInfo)) { - readNextPage(); - } - - void read(uint8_t* data, uint64_t size) final { - if (bufferOffset + size <= BUFFER_SIZE) { - memcpy(data, &buffer[bufferOffset], size); - bufferOffset += size; - } else { - auto toCopy = BUFFER_SIZE - bufferOffset; - memcpy(data, &buffer[bufferOffset], toCopy); - bufferOffset += toCopy; - readNextPage(); - auto remaining = size - toCopy; - memcpy(data + toCopy, buffer.get(), remaining); - bufferOffset += remaining; - } - } - -private: - std::unique_ptr buffer; - uint64_t fileOffset, bufferOffset; - std::unique_ptr fileInfo; - static const uint64_t BUFFER_SIZE = 4096; - - void readNextPage() { - FileUtils::readFromFile(fileInfo.get(), buffer.get(), BUFFER_SIZE, fileOffset); - fileOffset += BUFFER_SIZE; - bufferOffset = 0; - } -}; - -template<> -void Serializer::serializeValue(const std::string& value); - -template<> -void Deserializer::deserializeValue(std::string& value); - -} // namespace common -} // namespace kuzu diff --git a/src/include/common/serializer/buffered_file.h b/src/include/common/serializer/buffered_file.h new file mode 100644 index 00000000000..e507807155f --- /dev/null +++ b/src/include/common/serializer/buffered_file.h @@ -0,0 +1,83 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "common/serializer/reader.h" +#include "common/serializer/writer.h" + +namespace kuzu { +namespace common { + +struct FileInfo; + +class BufferedFileWriter : public Writer { +public: + ~BufferedFileWriter() { flush(); } + explicit BufferedFileWriter(std::unique_ptr fileInfo) + : buffer(std::make_unique(BUFFER_SIZE)), fileOffset(0), bufferOffset(0), + fileInfo(std::move(fileInfo)) {} + + inline void write(const uint8_t* data, uint64_t size) final { + if (bufferOffset + size <= BUFFER_SIZE) { + memcpy(&buffer[bufferOffset], data, size); + bufferOffset += size; + } else { + auto toCopy = BUFFER_SIZE - bufferOffset; + memcpy(&buffer[bufferOffset], data, toCopy); + bufferOffset += toCopy; + flush(); + auto remaining = size - toCopy; + memcpy(buffer.get(), data + toCopy, remaining); + bufferOffset += remaining; + } + } + +protected: + std::unique_ptr buffer; + uint64_t fileOffset, bufferOffset; + std::unique_ptr fileInfo; + static const uint64_t BUFFER_SIZE = 4096; + + void flush(); +}; + +class BufferedFileReader : public Reader { +public: + explicit BufferedFileReader(std::unique_ptr fileInfo) + : buffer(std::make_unique(BUFFER_SIZE)), fileOffset(0), bufferOffset(0), + fileInfo(std::move(fileInfo)) { + readNextPage(); + } + + inline void read(uint8_t* data, uint64_t size) final { + if (bufferOffset + size <= BUFFER_SIZE) { + memcpy(data, &buffer[bufferOffset], size); + bufferOffset += size; + } else { + auto toCopy = BUFFER_SIZE - bufferOffset; + memcpy(data, &buffer[bufferOffset], toCopy); + bufferOffset += toCopy; + readNextPage(); + auto remaining = size - toCopy; + memcpy(data + toCopy, buffer.get(), remaining); + bufferOffset += remaining; + } + } + +private: + std::unique_ptr buffer; + uint64_t fileOffset, bufferOffset; + std::unique_ptr fileInfo; + static const uint64_t BUFFER_SIZE = 4096; + + void readNextPage(); +}; + +} // namespace common +} // namespace kuzu diff --git a/src/include/processor/operator/persistent/writer/parquet/buffered_serializer.h b/src/include/common/serializer/buffered_serializer.h similarity index 82% rename from src/include/processor/operator/persistent/writer/parquet/buffered_serializer.h rename to src/include/common/serializer/buffered_serializer.h index 96abdc46bf6..04b984326e7 100644 --- a/src/include/processor/operator/persistent/writer/parquet/buffered_serializer.h +++ b/src/include/common/serializer/buffered_serializer.h @@ -2,8 +2,10 @@ #include +#include "common/serializer/writer.h" + namespace kuzu { -namespace processor { +namespace common { // TODO(Ziyi): Move this to constants.h once we have a unified serializer design. static constexpr uint64_t SERIALIZER_DEFAULT_SIZE = 1024; @@ -13,7 +15,7 @@ struct BinaryData { uint64_t size; }; -class BufferedSerializer { +class BufferedSerializer : public Writer { public: // Serializes to a buffer allocated by the serializer, will expand when // writing past the initial threshold. @@ -34,10 +36,10 @@ class BufferedSerializer { void write(T element) { static_assert( std::is_trivially_destructible(), "Write element must be trivially destructible"); - writeData(reinterpret_cast(&element), sizeof(T)); + write(reinterpret_cast(&element), sizeof(T)); } - void writeData(const uint8_t* buffer, uint64_t len); + void write(const uint8_t* buffer, uint64_t len) final; private: uint64_t maximumSize; @@ -46,5 +48,5 @@ class BufferedSerializer { BinaryData blob; }; -} // namespace processor +} // namespace common } // namespace kuzu diff --git a/src/include/common/serializer/deserializer.h b/src/include/common/serializer/deserializer.h new file mode 100644 index 00000000000..1683090b1a3 --- /dev/null +++ b/src/include/common/serializer/deserializer.h @@ -0,0 +1,85 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "common/serializer/reader.h" + +namespace kuzu { +namespace common { + +class Deserializer { +public: + explicit Deserializer(std::unique_ptr reader) : reader(std::move(reader)) {} + + template + void deserializeValue(T& value) { + static_assert(std::is_trivially_destructible(), "value must be a trivial type"); + reader->read((uint8_t*)&value, sizeof(T)); + } + + template + void deserializeOptionalValue(std::unique_ptr& value) { + bool isNull; + deserializeValue(isNull); + if (!isNull) { + value = T::deserialize(*this); + } + } + + template + void deserializeUnorderedMap(std::unordered_map>& values) { + uint64_t mapSize; + deserializeValue(mapSize); + values.reserve(mapSize); + for (auto i = 0u; i < mapSize; i++) { + T1 key; + deserializeValue(key); + auto val = T2::deserialize(*this); + values.emplace(key, std::move(val)); + } + } + + template + void deserializeVector(std::vector& values) { + uint64_t vectorSize; + deserializeValue(vectorSize); + values.resize(vectorSize); + for (auto& value : values) { + deserializeValue(value); + } + } + + template + void deserializeVectorOfPtrs(std::vector>& values) { + uint64_t vectorSize; + deserializeValue(vectorSize); + values.reserve(vectorSize); + for (auto i = 0u; i < vectorSize; i++) { + values.push_back(T::deserialize(*this)); + } + } + + template + void deserializeUnorderedSet(std::unordered_set& values) { + uint64_t setSize; + deserializeValue(setSize); + for (auto i = 0u; i < setSize; i++) { + T value; + deserializeValue(value); + values.insert(value); + } + } + +private: + std::unique_ptr reader; +}; + +template<> +void Deserializer::deserializeValue(std::string& value); + +} // namespace common +} // namespace kuzu diff --git a/src/include/common/serializer/reader.h b/src/include/common/serializer/reader.h new file mode 100644 index 00000000000..7a63207a3ed --- /dev/null +++ b/src/include/common/serializer/reader.h @@ -0,0 +1,15 @@ +#pragma once + +#include + +namespace kuzu { +namespace common { + +class Reader { +public: + virtual void read(uint8_t* data, uint64_t size) = 0; + virtual ~Reader(){}; +}; + +} // namespace common +} // namespace kuzu diff --git a/src/include/common/serializer/serializer.h b/src/include/common/serializer/serializer.h new file mode 100644 index 00000000000..d0d49e1e3e8 --- /dev/null +++ b/src/include/common/serializer/serializer.h @@ -0,0 +1,85 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "common/serializer/writer.h" + +namespace kuzu { +namespace common { + +class Serializer { +public: + explicit Serializer(std::shared_ptr writer) : writer(std::move(writer)) {} + + template + void serializeValue(const T& value) { + static_assert(std::is_trivially_destructible(), "value must be a trivial type"); + writer->write((uint8_t*)&value, sizeof(T)); + } + + // Alias for serializeValue + template + void write(const T& value) { + serializeValue(value); + } + + void write(const uint8_t* value, uint64_t len) { writer->write(value, len); } + + template + void serializeOptionalValue(const std::unique_ptr& value) { + serializeValue(value == nullptr); + if (value != nullptr) { + value->serialize(*this); + } + } + + template + void serializeUnorderedMap(const std::unordered_map>& values) { + uint64_t mapSize = values.size(); + serializeValue(mapSize); + for (auto& value : values) { + serializeValue(value.first); + value.second->serialize(*this); + } + } + + template + void serializeVector(const std::vector& values) { + uint64_t vectorSize = values.size(); + serializeValue(vectorSize); + for (auto& value : values) { + serializeValue(value); + } + } + + template + void serializeVectorOfPtrs(const std::vector>& values) { + uint64_t vectorSize = values.size(); + serializeValue(vectorSize); + for (auto& value : values) { + value->serialize(*this); + } + } + + template + void serializeUnorderedSet(const std::unordered_set& values) { + uint64_t setSize = values.size(); + serializeValue(setSize); + for (const auto& value : values) { + serializeValue(value); + } + } + +private: + std::shared_ptr writer; +}; + +template<> +void Serializer::serializeValue(const std::string& value); + +} // namespace common +} // namespace kuzu diff --git a/src/include/common/serializer/writer.h b/src/include/common/serializer/writer.h new file mode 100644 index 00000000000..95eb77d961c --- /dev/null +++ b/src/include/common/serializer/writer.h @@ -0,0 +1,15 @@ +#pragma once + +#include + +namespace kuzu { +namespace common { + +class Writer { +public: + virtual void write(const uint8_t* data, uint64_t size) = 0; + virtual ~Writer() = default; +}; + +} // namespace common +} // namespace kuzu diff --git a/src/include/parser/expression/parsed_literal_expression.h b/src/include/parser/expression/parsed_literal_expression.h index 082eda708ad..b5d197bf13b 100644 --- a/src/include/parser/expression/parsed_literal_expression.h +++ b/src/include/parser/expression/parsed_literal_expression.h @@ -1,6 +1,5 @@ #pragma once -#include "common/ser_deser.h" #include "common/types/value/value.h" #include "parsed_expression.h" diff --git a/src/include/parser/expression/parsed_property_expression.h b/src/include/parser/expression/parsed_property_expression.h index a5a4172f72b..60e2698f3a5 100644 --- a/src/include/parser/expression/parsed_property_expression.h +++ b/src/include/parser/expression/parsed_property_expression.h @@ -1,7 +1,7 @@ #pragma once #include "common/constants.h" -#include "common/ser_deser.h" +#include "common/serializer/serializer.h" #include "parsed_expression.h" namespace kuzu { diff --git a/src/include/parser/expression/parsed_variable_expression.h b/src/include/parser/expression/parsed_variable_expression.h index df199eedd69..0255269cb5c 100644 --- a/src/include/parser/expression/parsed_variable_expression.h +++ b/src/include/parser/expression/parsed_variable_expression.h @@ -1,6 +1,7 @@ #pragma once -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" #include "parsed_expression.h" namespace kuzu { diff --git a/src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h b/src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h index fdb6a94346d..527e6d755e8 100644 --- a/src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h +++ b/src/include/processor/operator/persistent/writer/parquet/basic_column_writer.h @@ -39,7 +39,7 @@ class BasicColumnWriter : public ColumnWriter { void finalizeWrite(ColumnWriterState& state) override; protected: - void writeLevels(BufferedSerializer& bufferedSerializer, const std::vector& levels, + void writeLevels(common::Serializer& bufferedSerializer, const std::vector& levels, uint64_t maxValue, uint64_t startOffset, uint64_t count); virtual kuzu_parquet::format::Encoding::type getEncoding(BasicColumnWriterState& state) { @@ -62,7 +62,7 @@ class BasicColumnWriter : public ColumnWriter { // Flushes the writer for a specific page. Only used for scalar types. virtual void flushPageState( - BufferedSerializer& bufferedSerializer, ColumnWriterPageState* state) {} + common::Serializer& bufferedSerializer, ColumnWriterPageState* state) {} // Retrieves the row size of a vector at the specified location. Only used for scalar types. virtual uint64_t getRowSize( @@ -70,7 +70,7 @@ class BasicColumnWriter : public ColumnWriter { throw common::NotImplementedException{"BasicColumnWriter::getRowSize"}; } // Writes a (subset of a) vector to the specified serializer. Only used for scalar types. - virtual void writeVector(BufferedSerializer& bufferedSerializer, ColumnWriterStatistics* stats, + virtual void writeVector(common::Serializer& bufferedSerializer, ColumnWriterStatistics* stats, ColumnWriterPageState* pageState, common::ValueVector* vector, uint64_t chunkStart, uint64_t chunkEnd) = 0; @@ -80,7 +80,7 @@ class BasicColumnWriter : public ColumnWriter { throw common::NotImplementedException{"BasicColumnWriter::dictionarySize"}; } void writeDictionary(BasicColumnWriterState& state, - std::unique_ptr bufferedSerializer, uint64_t rowCount); + std::unique_ptr bufferedSerializer, uint64_t rowCount); virtual void flushDictionary(BasicColumnWriterState& state, ColumnWriterStatistics* stats) { throw common::NotImplementedException{"BasicColumnWriter::flushDictionary"}; } diff --git a/src/include/processor/operator/persistent/writer/parquet/boolean_column_writer.h b/src/include/processor/operator/persistent/writer/parquet/boolean_column_writer.h index 73942345c08..965254033fc 100644 --- a/src/include/processor/operator/persistent/writer/parquet/boolean_column_writer.h +++ b/src/include/processor/operator/persistent/writer/parquet/boolean_column_writer.h @@ -55,12 +55,12 @@ class BooleanColumnWriter : public BasicColumnWriter { return std::make_unique(); } - void writeVector(BufferedSerializer& bufferedSerializer, + void writeVector(common::Serializer& bufferedSerializer, ColumnWriterStatistics* writerStatistics, ColumnWriterPageState* writerPageState, common::ValueVector* vector, uint64_t chunkStart, uint64_t chunkEnd) override; void flushPageState( - BufferedSerializer& temp_writer, ColumnWriterPageState* writerPageState) override; + common::Serializer& temp_writer, ColumnWriterPageState* writerPageState) override; }; } // namespace processor diff --git a/src/include/processor/operator/persistent/writer/parquet/column_writer.h b/src/include/processor/operator/persistent/writer/parquet/column_writer.h index 2827a9b20c0..afa410ee803 100644 --- a/src/include/processor/operator/persistent/writer/parquet/column_writer.h +++ b/src/include/processor/operator/persistent/writer/parquet/column_writer.h @@ -1,7 +1,7 @@ #pragma once -#include "buffered_serializer.h" #include "common/exception/not_implemented.h" +#include "common/serializer/buffered_serializer.h" #include "common/types/types.h" #include "common/vector/value_vector.h" #include "parquet/parquet_types.h" @@ -24,7 +24,8 @@ class ColumnWriterPageState { struct PageWriteInformation { kuzu_parquet::format::PageHeader pageHeader; - std::unique_ptr bufferWriter; + std::shared_ptr bufferWriter; + std::unique_ptr writer; std::unique_ptr pageState; uint64_t writePageIdx = 0; uint64_t writeCount = 0; @@ -103,7 +104,7 @@ class ColumnWriter { void handleDefineLevels(ColumnWriterState& state, ColumnWriterState* parent, common::ValueVector* vector, uint64_t count, uint16_t defineValue, uint16_t nullValue); void handleRepeatLevels(ColumnWriterState& stateToHandle, ColumnWriterState* parent); - void compressPage(BufferedSerializer& bufferedSerializer, size_t& compressedSize, + void compressPage(common::BufferedSerializer& bufferedSerializer, size_t& compressedSize, uint8_t*& compressedData, std::unique_ptr& compressedBuf); }; diff --git a/src/include/processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.h b/src/include/processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.h index 379a2d10dcb..69578ebae3d 100644 --- a/src/include/processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.h +++ b/src/include/processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.h @@ -1,6 +1,6 @@ #pragma once -#include "buffered_serializer.h" +#include "common/serializer/serializer.h" namespace kuzu { namespace processor { @@ -17,9 +17,9 @@ class RleBpEncoder { void prepareValue(uint32_t value); void finishPrepare(); - void beginWrite(BufferedSerializer& writer, uint32_t first_value); - void writeValue(BufferedSerializer& writer, uint32_t value); - void finishWrite(BufferedSerializer& writer); + void beginWrite(common::Serializer& writer, uint32_t first_value); + void writeValue(common::Serializer& writer, uint32_t value); + void finishWrite(common::Serializer& writer); uint64_t getByteCount(); @@ -36,7 +36,7 @@ class RleBpEncoder { private: void finishRun(); - void writeRun(BufferedSerializer& bufferedSerializer); + void writeRun(common::Serializer& bufferedSerializer); }; } // namespace processor diff --git a/src/include/processor/operator/persistent/writer/parquet/standard_column_writer.h b/src/include/processor/operator/persistent/writer/parquet/standard_column_writer.h index f7461cfaf79..5190138595f 100644 --- a/src/include/processor/operator/persistent/writer/parquet/standard_column_writer.h +++ b/src/include/processor/operator/persistent/writer/parquet/standard_column_writer.h @@ -1,6 +1,7 @@ #pragma once #include "basic_column_writer.h" +#include "common/serializer/serializer.h" #include "function/cast/numeric_limits.h" #include "function/comparison/comparison_functions.h" @@ -77,7 +78,7 @@ class StandardColumnWriter : public BasicColumnWriter { } void templatedWritePlain(common::ValueVector* vector, ColumnWriterStatistics* stats, - uint64_t chunkStart, uint64_t chunkEnd, BufferedSerializer& ser) { + uint64_t chunkStart, uint64_t chunkEnd, common::Serializer& ser) { for (auto r = chunkStart; r < chunkEnd; r++) { auto pos = getVectorPos(vector, r); if (!vector->isNull(pos)) { @@ -88,7 +89,7 @@ class StandardColumnWriter : public BasicColumnWriter { } } - inline void writeVector(BufferedSerializer& bufferedSerializer, ColumnWriterStatistics* stats, + inline void writeVector(common::Serializer& bufferedSerializer, ColumnWriterStatistics* stats, ColumnWriterPageState* pageState, common::ValueVector* vector, uint64_t chunkStart, uint64_t chunkEnd) override { templatedWritePlain(vector, stats, chunkStart, chunkEnd, bufferedSerializer); diff --git a/src/include/processor/operator/persistent/writer/parquet/string_column_writer.h b/src/include/processor/operator/persistent/writer/parquet/string_column_writer.h index 74c4fa42b8d..7b373daf6cd 100644 --- a/src/include/processor/operator/persistent/writer/parquet/string_column_writer.h +++ b/src/include/processor/operator/persistent/writer/parquet/string_column_writer.h @@ -123,12 +123,12 @@ class StringColumnWriter : public BasicColumnWriter { void finalizeAnalyze(ColumnWriterState& writerState) override; - void writeVector(BufferedSerializer& bufferedSerializer, ColumnWriterStatistics* statsToWrite, + void writeVector(common::Serializer& bufferedSerializer, ColumnWriterStatistics* statsToWrite, ColumnWriterPageState* writerPageState, common::ValueVector* vector, uint64_t chunkStart, uint64_t chunkEnd) override; void flushPageState( - BufferedSerializer& bufferedSerializer, ColumnWriterPageState* writerPageState) override; + common::Serializer& bufferedSerializer, ColumnWriterPageState* writerPageState) override; void flushDictionary( BasicColumnWriterState& writerState, ColumnWriterStatistics* writerStats) override; diff --git a/src/parser/expression/parsed_case_expression.cpp b/src/parser/expression/parsed_case_expression.cpp index 0dbad07de64..32be138525e 100644 --- a/src/parser/expression/parsed_case_expression.cpp +++ b/src/parser/expression/parsed_case_expression.cpp @@ -1,6 +1,7 @@ #include "parser/expression/parsed_case_expression.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" using namespace kuzu::common; diff --git a/src/parser/expression/parsed_expression.cpp b/src/parser/expression/parsed_expression.cpp index eedcbd1790c..ce9f069f4dc 100644 --- a/src/parser/expression/parsed_expression.cpp +++ b/src/parser/expression/parsed_expression.cpp @@ -1,7 +1,8 @@ #include "parser/expression/parsed_expression.h" #include "common/exception/not_implemented.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" #include "parser/expression/parsed_case_expression.h" #include "parser/expression/parsed_function_expression.h" #include "parser/expression/parsed_literal_expression.h" diff --git a/src/parser/expression/parsed_function_expression.cpp b/src/parser/expression/parsed_function_expression.cpp index 85097a3fe5d..29a2d9629df 100644 --- a/src/parser/expression/parsed_function_expression.cpp +++ b/src/parser/expression/parsed_function_expression.cpp @@ -1,6 +1,7 @@ #include "parser/expression/parsed_function_expression.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" using namespace kuzu::common; diff --git a/src/parser/expression/parsed_property_expression.cpp b/src/parser/expression/parsed_property_expression.cpp index 3da7b690074..42bf6f39c3e 100644 --- a/src/parser/expression/parsed_property_expression.cpp +++ b/src/parser/expression/parsed_property_expression.cpp @@ -1,6 +1,6 @@ #include "parser/expression/parsed_property_expression.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" using namespace kuzu::common; diff --git a/src/parser/expression/parsed_variable_expression.cpp b/src/parser/expression/parsed_variable_expression.cpp index 1513766536f..d2aa5c51465 100644 --- a/src/parser/expression/parsed_variable_expression.cpp +++ b/src/parser/expression/parsed_variable_expression.cpp @@ -1,6 +1,6 @@ #include "parser/expression/parsed_variable_expression.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" using namespace kuzu::common; diff --git a/src/processor/operator/persistent/writer/parquet/CMakeLists.txt b/src/processor/operator/persistent/writer/parquet/CMakeLists.txt index fb8740652c5..62b886c4a7e 100644 --- a/src/processor/operator/persistent/writer/parquet/CMakeLists.txt +++ b/src/processor/operator/persistent/writer/parquet/CMakeLists.txt @@ -2,7 +2,6 @@ add_library(kuzu_processor_operator_parquet_writer OBJECT basic_column_writer.cpp boolean_column_writer.cpp - buffered_serializer.cpp column_writer.cpp struct_column_writer.cpp string_column_writer.cpp diff --git a/src/processor/operator/persistent/writer/parquet/basic_column_writer.cpp b/src/processor/operator/persistent/writer/parquet/basic_column_writer.cpp index 7d1decbb91e..f7ac7c75a78 100644 --- a/src/processor/operator/persistent/writer/parquet/basic_column_writer.cpp +++ b/src/processor/operator/persistent/writer/parquet/basic_column_writer.cpp @@ -75,7 +75,8 @@ void BasicColumnWriter::beginWrite(ColumnWriterState& writerState) { hdr.data_page_header.definition_level_encoding = Encoding::RLE; hdr.data_page_header.repetition_level_encoding = Encoding::RLE; - writeInfo.bufferWriter = std::make_unique(); + writeInfo.bufferWriter = std::make_shared(); + writeInfo.writer = std::make_unique(writeInfo.bufferWriter); writeInfo.writeCount = pageInfo.emptyCount; writeInfo.maxWriteCount = pageInfo.rowCount; writeInfo.pageState = initializePageState(state); @@ -101,8 +102,8 @@ void BasicColumnWriter::write( auto writeCount = std::min(remaining, writeInfo.maxWriteCount - writeInfo.writeCount); - writeVector(*writeInfo.bufferWriter, state.statsState.get(), writeInfo.pageState.get(), - vector, offset, offset + writeCount); + writeVector(*writeInfo.writer, state.statsState.get(), writeInfo.pageState.get(), vector, + offset, offset + writeCount); writeInfo.writeCount += writeCount; if (writeInfo.writeCount == writeInfo.maxWriteCount) { @@ -151,8 +152,8 @@ void BasicColumnWriter::finalizeWrite(ColumnWriterState& writerState) { columnChunk.meta_data.total_uncompressed_size = totalUncompressedSize; } -void BasicColumnWriter::writeLevels(BufferedSerializer& bufferedSerializer, - const std::vector& levels, uint64_t maxValue, uint64_t startOffset, uint64_t count) { +void BasicColumnWriter::writeLevels(Serializer& serializer, const std::vector& levels, + uint64_t maxValue, uint64_t startOffset, uint64_t count) { if (levels.empty() || count == 0) { return; } @@ -168,12 +169,12 @@ void BasicColumnWriter::writeLevels(BufferedSerializer& bufferedSerializer, rleEncoder.finishPrepare(); // Start off by writing the byte count as a uint32_t. - bufferedSerializer.write(rleEncoder.getByteCount()); - rleEncoder.beginWrite(bufferedSerializer, levels[startOffset]); + serializer.write(rleEncoder.getByteCount()); + rleEncoder.beginWrite(serializer, levels[startOffset]); for (auto i = startOffset + 1; i < startOffset + count; i++) { - rleEncoder.writeValue(bufferedSerializer, levels[i]); + rleEncoder.writeValue(serializer, levels[i]); } - rleEncoder.finishWrite(bufferedSerializer); + rleEncoder.finishWrite(serializer); } void BasicColumnWriter::nextPage(BasicColumnWriterState& state) { @@ -190,12 +191,12 @@ void BasicColumnWriter::nextPage(BasicColumnWriterState& state) { state.currentPage++; // write the repetition levels - writeLevels(*writeInfo.bufferWriter, state.repetitionLevels, maxRepeat, pageInfo.offset, - pageInfo.rowCount); + writeLevels( + *writeInfo.writer, state.repetitionLevels, maxRepeat, pageInfo.offset, pageInfo.rowCount); // write the definition levels - writeLevels(*writeInfo.bufferWriter, state.definitionLevels, maxDefine, pageInfo.offset, - pageInfo.rowCount); + writeLevels( + *writeInfo.writer, state.definitionLevels, maxDefine, pageInfo.offset, pageInfo.rowCount); } void BasicColumnWriter::flushPage(BasicColumnWriterState& state) { @@ -209,7 +210,7 @@ void BasicColumnWriter::flushPage(BasicColumnWriterState& state) { auto& bufferedWriter = *writeInfo.bufferWriter; auto& hdr = writeInfo.pageHeader; - flushPageState(bufferedWriter, writeInfo.pageState.get()); + flushPageState(*writeInfo.writer, writeInfo.pageState.get()); // now that we have finished writing the data we know the uncompressed size if (bufferedWriter.getSize() > uint64_t(function::NumericLimits::maximum())) { @@ -251,6 +252,7 @@ void BasicColumnWriter::writeDictionary(BasicColumnWriterState& state, hdr.dictionary_page_header.num_values = rowCount; writeInfo.bufferWriter = std::move(bufferedSerializer); + writeInfo.writer = std::make_unique(writeInfo.bufferWriter); writeInfo.writeCount = 0; writeInfo.maxWriteCount = 0; diff --git a/src/processor/operator/persistent/writer/parquet/boolean_column_writer.cpp b/src/processor/operator/persistent/writer/parquet/boolean_column_writer.cpp index fcf383db93e..2565bc47893 100644 --- a/src/processor/operator/persistent/writer/parquet/boolean_column_writer.cpp +++ b/src/processor/operator/persistent/writer/parquet/boolean_column_writer.cpp @@ -1,9 +1,11 @@ #include "processor/operator/persistent/writer/parquet/boolean_column_writer.h" +#include "common/serializer/serializer.h" + namespace kuzu { namespace processor { -void BooleanColumnWriter::writeVector(BufferedSerializer& temp_writer, +void BooleanColumnWriter::writeVector(common::Serializer& temp_writer, ColumnWriterStatistics* writerStatistics, ColumnWriterPageState* writerPageState, common::ValueVector* vector, uint64_t chunkStart, uint64_t chunkEnd) { auto stats = reinterpret_cast(writerStatistics); @@ -30,7 +32,7 @@ void BooleanColumnWriter::writeVector(BufferedSerializer& temp_writer, } void BooleanColumnWriter::flushPageState( - BufferedSerializer& temp_writer, ColumnWriterPageState* writerPageState) { + common::Serializer& temp_writer, ColumnWriterPageState* writerPageState) { auto state = reinterpret_cast(writerPageState); if (state->bytePos > 0) { temp_writer.write(state->byte); diff --git a/src/processor/operator/persistent/writer/parquet/column_writer.cpp b/src/processor/operator/persistent/writer/parquet/column_writer.cpp index b67ad617135..5e91779210a 100644 --- a/src/processor/operator/persistent/writer/parquet/column_writer.cpp +++ b/src/processor/operator/persistent/writer/parquet/column_writer.cpp @@ -194,8 +194,8 @@ void ColumnWriter::handleDefineLevels(ColumnWriterState& state, ColumnWriterStat } } -void ColumnWriter::compressPage(BufferedSerializer& bufferedSerializer, size_t& compressedSize, - uint8_t*& compressedData, std::unique_ptr& compressedBuf) { +void ColumnWriter::compressPage(common::BufferedSerializer& bufferedSerializer, + size_t& compressedSize, uint8_t*& compressedData, std::unique_ptr& compressedBuf) { switch (writer.getCodec()) { case CompressionCodec::UNCOMPRESSED: { compressedSize = bufferedSerializer.getSize(); diff --git a/src/processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.cpp b/src/processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.cpp index d99afce2099..24866e5123e 100644 --- a/src/processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.cpp +++ b/src/processor/operator/persistent/writer/parquet/parquet_rle_bp_encoder.cpp @@ -7,7 +7,7 @@ namespace kuzu { namespace processor { -static void varintEncode(uint32_t val, BufferedSerializer& ser) { +static void varintEncode(uint32_t val, common::Serializer& ser) { do { uint8_t byte = val & 127; val >>= 7; @@ -64,13 +64,13 @@ uint64_t RleBpEncoder::getByteCount() { return byteCount; } -void RleBpEncoder::beginWrite(BufferedSerializer& writer, uint32_t first_value) { +void RleBpEncoder::beginWrite(common::Serializer& writer, uint32_t first_value) { // start the RLE runs lastValue = first_value; currentRunCount = 1; } -void RleBpEncoder::writeRun(BufferedSerializer& writer) { +void RleBpEncoder::writeRun(common::Serializer& writer) { // write the header of the run varintEncode(currentRunCount << 1, writer); // now write the value @@ -96,7 +96,7 @@ void RleBpEncoder::writeRun(BufferedSerializer& writer) { currentRunCount = 1; } -void RleBpEncoder::writeValue(BufferedSerializer& writer, uint32_t value) { +void RleBpEncoder::writeValue(common::Serializer& writer, uint32_t value) { if (value != lastValue) { writeRun(writer); lastValue = value; @@ -105,7 +105,7 @@ void RleBpEncoder::writeValue(BufferedSerializer& writer, uint32_t value) { } } -void RleBpEncoder::finishWrite(BufferedSerializer& writer) { +void RleBpEncoder::finishWrite(common::Serializer& writer) { writeRun(writer); } diff --git a/src/processor/operator/persistent/writer/parquet/string_column_writer.cpp b/src/processor/operator/persistent/writer/parquet/string_column_writer.cpp index c099cb3bd70..04a84b248a3 100644 --- a/src/processor/operator/persistent/writer/parquet/string_column_writer.cpp +++ b/src/processor/operator/persistent/writer/parquet/string_column_writer.cpp @@ -115,7 +115,7 @@ void StringColumnWriter::finalizeAnalyze(ColumnWriterState& writerState) { } } -void StringColumnWriter::writeVector(BufferedSerializer& bufferedSerializer, +void StringColumnWriter::writeVector(common::Serializer& serializer, ColumnWriterStatistics* statsToWrite, ColumnWriterPageState* writerPageState, ValueVector* vector, uint64_t chunkStart, uint64_t chunkEnd) { auto pageState = reinterpret_cast(writerPageState); @@ -131,12 +131,12 @@ void StringColumnWriter::writeVector(BufferedSerializer& bufferedSerializer, auto value_index = pageState->dictionary.at(vector->getValue(pos)); if (!pageState->writtenValue) { // Write the bit-width as a one-byte entry. - bufferedSerializer.write(pageState->bitWidth); + serializer.write(pageState->bitWidth); // Now begin writing the actual value. - pageState->encoder.beginWrite(bufferedSerializer, value_index); + pageState->encoder.beginWrite(serializer, value_index); pageState->writtenValue = true; } else { - pageState->encoder.writeValue(bufferedSerializer, value_index); + pageState->encoder.writeValue(serializer, value_index); } } } else { @@ -147,23 +147,23 @@ void StringColumnWriter::writeVector(BufferedSerializer& bufferedSerializer, } auto& str = vector->getValue(pos); stats->update(str); - bufferedSerializer.write(str.len); - bufferedSerializer.writeData(str.getData(), str.len); + serializer.write(str.len); + serializer.write(str.getData(), str.len); } } } void StringColumnWriter::flushPageState( - BufferedSerializer& bufferedSerializer, ColumnWriterPageState* writerPageState) { + common::Serializer& serializer, ColumnWriterPageState* writerPageState) { auto pageState = reinterpret_cast(writerPageState); if (pageState->bitWidth != 0) { if (!pageState->writtenValue) { // all values are null // just write the bit width - bufferedSerializer.write(pageState->bitWidth); + serializer.write(pageState->bitWidth); return; } - pageState->encoder.finishWrite(bufferedSerializer); + pageState->encoder.finishWrite(serializer); } } @@ -181,14 +181,14 @@ void StringColumnWriter::flushDictionary( values[entry.second] = entry.first; } // First write the contents of the dictionary page to a temporary buffer. - auto bufferedSerializer = std::make_unique(); + auto bufferedSerializer = std::make_unique(); for (auto r = 0u; r < values.size(); r++) { auto& value = values[r]; // Update the statistics. stats->update(value); // Write this string value to the dictionary. bufferedSerializer->write(value.len); - bufferedSerializer->writeData(value.getData(), value.len); + bufferedSerializer->write(value.getData(), value.len); } // Flush the dictionary page and add it to the to-be-written pages. writeDictionary(state, std::move(bufferedSerializer), values.size()); diff --git a/src/storage/stats/metadata_dah_info.cpp b/src/storage/stats/metadata_dah_info.cpp index 04a62e62f41..bce07419df7 100644 --- a/src/storage/stats/metadata_dah_info.cpp +++ b/src/storage/stats/metadata_dah_info.cpp @@ -1,6 +1,7 @@ #include "storage/stats/metadata_dah_info.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" using namespace kuzu::common; diff --git a/src/storage/stats/node_table_statistics.cpp b/src/storage/stats/node_table_statistics.cpp index 4516983eece..203e48d1406 100644 --- a/src/storage/stats/node_table_statistics.cpp +++ b/src/storage/stats/node_table_statistics.cpp @@ -1,6 +1,7 @@ #include "storage/stats/node_table_statistics.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" #include "common/string_utils.h" #include "storage/stats/table_statistics_collection.h" diff --git a/src/storage/stats/property_statistics.cpp b/src/storage/stats/property_statistics.cpp index d60c7e29e3a..9535eedd384 100644 --- a/src/storage/stats/property_statistics.cpp +++ b/src/storage/stats/property_statistics.cpp @@ -1,6 +1,7 @@ #include "storage/stats/property_statistics.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" #include "storage/stats/table_statistics_collection.h" namespace kuzu { diff --git a/src/storage/stats/rel_table_statistics.cpp b/src/storage/stats/rel_table_statistics.cpp index 6bd6627bb98..0ea76c7b679 100644 --- a/src/storage/stats/rel_table_statistics.cpp +++ b/src/storage/stats/rel_table_statistics.cpp @@ -1,6 +1,7 @@ #include "storage/stats/rel_table_statistics.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" using namespace kuzu::common; diff --git a/src/storage/stats/table_statistics.cpp b/src/storage/stats/table_statistics.cpp index 009270646e2..4a12251cdbd 100644 --- a/src/storage/stats/table_statistics.cpp +++ b/src/storage/stats/table_statistics.cpp @@ -1,7 +1,8 @@ #include "storage/stats/table_statistics.h" #include "catalog/table_schema.h" -#include "common/ser_deser.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" #include "storage/stats/node_table_statistics.h" #include "storage/stats/rel_table_statistics.h" diff --git a/src/storage/stats/table_statistics_collection.cpp b/src/storage/stats/table_statistics_collection.cpp index 99112037ff9..77742df2ffe 100644 --- a/src/storage/stats/table_statistics_collection.cpp +++ b/src/storage/stats/table_statistics_collection.cpp @@ -1,6 +1,8 @@ #include "storage/stats/table_statistics_collection.h" -#include "common/ser_deser.h" +#include "common/serializer/buffered_file.h" +#include "common/serializer/deserializer.h" +#include "common/serializer/serializer.h" #include "storage/storage_structure/disk_array.h" #include "storage/store/column_chunk.h"