Skip to content

Commit

Permalink
Merge pull request #2329 from kuzudb/parquet-copy
Browse files Browse the repository at this point in the history
Parquet copy improvement
  • Loading branch information
acquamarin committed Nov 2, 2023
2 parents 2f30023 + 3560235 commit a4ed60f
Show file tree
Hide file tree
Showing 79 changed files with 34,845 additions and 82 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ include_directories(third_party/re2/include)
include_directories(third_party/serd/include)
include_directories(third_party/miniparquet/src)
include_directories(third_party/fast_float/include)
include_directories(third_party/zstd/include)
include_directories(third_party/miniz)

add_subdirectory(third_party)
add_subdirectory(src)
Expand Down
Binary file added dataset/reader/parquet/compression/gzip.parquet
Binary file not shown.
Binary file added dataset/reader/parquet/compression/zstd.parquet
Binary file not shown.
4 changes: 2 additions & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ add_subdirectory(transaction)

add_library(kuzu STATIC ${ALL_OBJECT_FILES})
target_link_libraries(kuzu
PUBLIC antlr4_cypher antlr4_runtime fast_float utf8proc re2 serd Threads::Threads fastpfor miniparquet)
PUBLIC antlr4_cypher antlr4_runtime fast_float utf8proc re2 serd Threads::Threads fastpfor miniparquet zstd miniz)
target_include_directories(kuzu
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>)
add_library(kuzu_shared SHARED ${ALL_OBJECT_FILES})
if(NOT WIN32)
set_target_properties(kuzu_shared PROPERTIES OUTPUT_NAME kuzu)
endif()
target_link_libraries(kuzu_shared
PUBLIC antlr4_cypher antlr4_runtime fast_float utf8proc re2 serd Threads::Threads fastpfor miniparquet)
PUBLIC antlr4_cypher antlr4_runtime fast_float utf8proc re2 serd Threads::Threads fastpfor miniparquet zstd miniz)
target_include_directories(kuzu_shared
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include> $<INSTALL_INTERFACE:${CMAKE_INSTALL_INCLUDEDIR}>)

Expand Down
1 change: 1 addition & 0 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ struct ParquetConstants {
// The size of encoding the string length.
static constexpr uint64_t STRING_LENGTH_SIZE = sizeof(uint32_t);
static constexpr uint64_t MAX_STRING_STATISTICS_SIZE = 10000;
static constexpr uint64_t PARQUET_INTERVAL_SIZE = 12;
};

struct CopyToCSVConstants {
Expand Down
4 changes: 3 additions & 1 deletion src/include/common/types/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,9 @@ class LogicalType {
friend struct FixedListType;

public:
KUZU_API LogicalType() : typeID{LogicalTypeID::ANY}, extraTypeInfo{nullptr} {};
KUZU_API LogicalType() : typeID{LogicalTypeID::ANY}, extraTypeInfo{nullptr} {
setPhysicalType();
};
explicit KUZU_API LogicalType(LogicalTypeID typeID);
KUZU_API LogicalType(LogicalTypeID typeID, std::unique_ptr<ExtraTypeInfo> extraTypeInfo);
// For deserialize only.
Expand Down
2 changes: 1 addition & 1 deletion src/include/function/cast/functions/cast_functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ inline void CastToDouble::operation(common::int128_t& input, double_t& result) {
if (!common::Int128_t::tryCast(input, result)) { // LCOV_EXCL_START
throw common::OverflowException{common::stringFormat(
"Value {} is not within DOUBLE range", common::TypeUtils::toString(input).c_str())};
}; // LCOV_EXCL_STOP
} // LCOV_EXCL_STOP
}

struct CastToFloat {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#pragma once

#include "common/types/interval_t.h"
#include "templated_column_reader.h"

namespace kuzu {
namespace processor {

struct IntervalValueConversion {
static inline common::interval_t dictRead(
ByteBuffer& dict, uint32_t& offset, ColumnReader& /*reader*/) {
return (reinterpret_cast<common::interval_t*>(dict.ptr))[offset];
}

static common::interval_t readParquetInterval(const char* input);

static common::interval_t plainRead(ByteBuffer& plainData, ColumnReader& reader);

static inline void plainSkip(ByteBuffer& plain_data, ColumnReader& /*reader*/) {
plain_data.inc(common::ParquetConstants::PARQUET_INTERVAL_SIZE);
}
};

class IntervalColumnReader
: public TemplatedColumnReader<common::interval_t, IntervalValueConversion> {

public:
IntervalColumnReader(ParquetReader& reader, std::unique_ptr<common::LogicalType> type,
const kuzu_parquet::format::SchemaElement& schema, uint64_t fileIdx, uint64_t maxDefine,
uint64_t maxRepeat)
: TemplatedColumnReader<common::interval_t, IntervalValueConversion>(
reader, std::move(type), schema, fileIdx, maxDefine, maxRepeat){};

protected:
void dictionary(
std::shared_ptr<ResizeableBuffer> dictionary_data, uint64_t num_entries) override;
};

} // namespace processor
} // namespace kuzu
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once

#include "basic_column_writer.h"
#include "common/types/interval_t.h"

namespace kuzu {
namespace processor {

class IntervalColumnWriter : public BasicColumnWriter {

public:
IntervalColumnWriter(ParquetWriter& writer, uint64_t schemaIdx,
std::vector<std::string> schemaPath, uint64_t maxRepeat, uint64_t maxDefine,
bool canHaveNulls)
: BasicColumnWriter(
writer, schemaIdx, std::move(schemaPath), maxRepeat, maxDefine, canHaveNulls) {}

public:
static void writeParquetInterval(common::interval_t input, uint8_t* result);

void writeVector(common::Serializer& bufferedSerializer, ColumnWriterStatistics* state,
ColumnWriterPageState* pageState, common::ValueVector* vector, uint64_t chunkStart,
uint64_t chunkEnd) override;

inline uint64_t getRowSize(common::ValueVector* /*vector*/, uint64_t /*index*/,
BasicColumnWriterState& /*state*/) override {
return common::ParquetConstants::PARQUET_INTERVAL_SIZE;
}
};

} // namespace processor
} // namespace kuzu
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ add_library(kuzu_processor_operator_parquet_reader
boolean_column_reader.cpp
column_reader.cpp
parquet_reader.cpp
interval_column_reader.cpp
struct_column_reader.cpp
string_column_reader.cpp
list_column_reader.cpp)
Expand Down
59 changes: 50 additions & 9 deletions src/processor/operator/persistent/reader/parquet/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

#include "common/exception/not_implemented.h"
#include "common/types/date_t.h"
#include "miniz_wrapper.hpp"
#include "processor/operator/persistent/reader/parquet/boolean_column_reader.h"
#include "processor/operator/persistent/reader/parquet/callback_column_reader.h"
#include "processor/operator/persistent/reader/parquet/interval_column_reader.h"
#include "processor/operator/persistent/reader/parquet/string_column_reader.h"
#include "processor/operator/persistent/reader/parquet/templated_column_reader.h"
#include "snappy/snappy.h"
#include "zstd.h"

namespace kuzu {
namespace processor {
Expand Down Expand Up @@ -195,6 +198,10 @@ std::unique_ptr<ColumnReader> ColumnReader::createReader(ParquetReader& reader,
case common::LogicalTypeID::BOOL:
return std::make_unique<BooleanColumnReader>(
reader, std::move(type), schema, fileIdx, maxDefine, maxRepeat);
case common::LogicalTypeID::INT8:
return std::make_unique<
TemplatedColumnReader<int8_t, TemplatedParquetValueConversion<int32_t>>>(
reader, std::move(type), schema, fileIdx, maxDefine, maxRepeat);
case common::LogicalTypeID::INT16:
return std::make_unique<
TemplatedColumnReader<int16_t, TemplatedParquetValueConversion<int32_t>>>(
Expand All @@ -207,6 +214,22 @@ std::unique_ptr<ColumnReader> ColumnReader::createReader(ParquetReader& reader,
return std::make_unique<
TemplatedColumnReader<int64_t, TemplatedParquetValueConversion<int64_t>>>(
reader, std::move(type), schema, fileIdx, maxDefine, maxRepeat);
case common::LogicalTypeID::UINT8:
return std::make_unique<
TemplatedColumnReader<uint8_t, TemplatedParquetValueConversion<uint32_t>>>(
reader, std::move(type), schema, fileIdx, maxDefine, maxRepeat);
case common::LogicalTypeID::UINT16:
return std::make_unique<
TemplatedColumnReader<uint16_t, TemplatedParquetValueConversion<uint32_t>>>(
reader, std::move(type), schema, fileIdx, maxDefine, maxRepeat);
case common::LogicalTypeID::UINT32:
return std::make_unique<
TemplatedColumnReader<uint32_t, TemplatedParquetValueConversion<uint32_t>>>(
reader, std::move(type), schema, fileIdx, maxDefine, maxRepeat);
case common::LogicalTypeID::UINT64:
return std::make_unique<
TemplatedColumnReader<uint64_t, TemplatedParquetValueConversion<uint64_t>>>(
reader, std::move(type), schema, fileIdx, maxDefine, maxRepeat);
case common::LogicalTypeID::FLOAT:
return std::make_unique<
TemplatedColumnReader<float, TemplatedParquetValueConversion<float>>>(
Expand All @@ -221,6 +244,9 @@ std::unique_ptr<ColumnReader> ColumnReader::createReader(ParquetReader& reader,
case common::LogicalTypeID::STRING:
return std::make_unique<StringColumnReader>(
reader, std::move(type), schema, fileIdx, maxDefine, maxRepeat);
case common::LogicalTypeID::INTERVAL:
return std::make_unique<IntervalColumnReader>(
reader, std::move(type), schema, fileIdx, maxDefine, maxRepeat);
default:
throw common::NotImplementedException{"ColumnReader::createReader"};
}
Expand Down Expand Up @@ -270,36 +296,51 @@ void ColumnReader::decompressInternal(kuzu_parquet::format::CompressionCodec::ty
case CompressionCodec::UNCOMPRESSED:
throw common::CopyException("Parquet data unexpectedly uncompressed");
case CompressionCodec::GZIP: {
throw common::NotImplementedException("ColumnReader::decompressInternal");
MiniZStream s;
s.Decompress(
reinterpret_cast<const char*>(src), srcSize, reinterpret_cast<char*>(dst), dstSize);
break;
}
case CompressionCodec::SNAPPY: {
{
size_t uncompressed_size = 0;
size_t uncompressedSize = 0;
auto res = kuzu_snappy::GetUncompressedLength(
reinterpret_cast<const char*>(src), srcSize, &uncompressed_size);
reinterpret_cast<const char*>(src), srcSize, &uncompressedSize);
// LCOV_EXCL_START
if (!res) {
throw std::runtime_error("Snappy decompression failure");
throw common::RuntimeException{"Failed to decompress parquet file."};
}
if (uncompressed_size != (size_t)dstSize) {
throw std::runtime_error(
"Snappy decompression failure: Uncompressed data size mismatch");
if (uncompressedSize != (size_t)dstSize) {
throw common::RuntimeException{
"Snappy decompression failure: Uncompressed data size mismatch"};
}
// LCOV_EXCL_STOP
}
auto res = kuzu_snappy::RawUncompress(
reinterpret_cast<const char*>(src), srcSize, reinterpret_cast<char*>(dst));
// LCOV_EXCL_START
if (!res) {
throw std::runtime_error("Snappy decompression failure");
throw common::RuntimeException{"Snappy decompression failure"};
}
// LCOV_EXCL_STOP
break;
}
case CompressionCodec::ZSTD: {
throw common::NotImplementedException("ColumnReader::decompressInternal");
auto res = duckdb_zstd::ZSTD_decompress(dst, dstSize, src, srcSize);
// LCOV_EXCL_START
if (duckdb_zstd::ZSTD_isError(res) || res != (size_t)dstSize) {
throw common::RuntimeException{"ZSTD Decompression failure"};
}
// LCOV_EXCL_STOP
break;
}
default: {
// LCOV_EXCL_START
std::stringstream codec_name;
codec_name << codec;
throw common::CopyException("Unsupported compression codec \"" + codec_name.str() +
"\". Supported options are uncompressed, gzip, snappy or zstd");
// LCOV_EXCL_STOP
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#include "processor/operator/persistent/reader/parquet/interval_column_reader.h"

namespace kuzu {
namespace processor {

common::interval_t IntervalValueConversion::readParquetInterval(const char* input) {
common::interval_t result;
auto inputData = reinterpret_cast<const uint32_t*>(input);
result.months = inputData[0];
result.days = inputData[1];
result.micros = int64_t(inputData[2]) * 1000;
return result;
}

common::interval_t IntervalValueConversion::plainRead(
ByteBuffer& plainData, ColumnReader& /*reader*/) {
auto intervalLen = common::ParquetConstants::PARQUET_INTERVAL_SIZE;
plainData.available(intervalLen);
auto res = readParquetInterval(reinterpret_cast<const char*>(plainData.ptr));
plainData.inc(intervalLen);
return res;
}

void IntervalColumnReader::dictionary(
std::shared_ptr<ResizeableBuffer> dictionary_data, uint64_t num_entries) {
allocateDict(num_entries * sizeof(common::interval_t));
auto dict_ptr = reinterpret_cast<common::interval_t*>(this->dict->ptr);
for (auto i = 0u; i < num_entries; i++) {
dict_ptr[i] = IntervalValueConversion::plainRead(*dictionary_data, *this);
}
}

} // namespace processor
} // namespace kuzu
Loading

0 comments on commit a4ed60f

Please sign in to comment.