Skip to content

Commit

Permalink
Merge 6ccc51c into fcf45a6
Browse files Browse the repository at this point in the history
  • Loading branch information
krlmlr committed Feb 24, 2024
2 parents fcf45a6 + 6ccc51c commit d065ea0
Show file tree
Hide file tree
Showing 871 changed files with 52,860 additions and 35,487 deletions.
2 changes: 1 addition & 1 deletion src/Makevars.win
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ include Makevars.duckdb
CXX_STD = CXX17
PKG_CPPFLAGS = -Iinclude -I../inst/include -DDUCKDB_DISABLE_PRINT -DDUCKDB_R_BUILD -Iduckdb/src/include -Iduckdb/third_party/fmt/include -Iduckdb/third_party/fsst -Iduckdb/third_party/re2 -Iduckdb/third_party/miniz -Iduckdb/third_party/utf8proc/include -Iduckdb/third_party/utf8proc -Iduckdb/third_party/hyperloglog -Iduckdb/third_party/skiplist -Iduckdb/third_party/fastpforlib -Iduckdb/third_party/tdigest -Iduckdb/third_party/libpg_query/include -Iduckdb/third_party/libpg_query -Iduckdb/third_party/concurrentqueue -Iduckdb/third_party/pcg -Iduckdb/third_party/httplib -Iduckdb/third_party/fast_float -Iduckdb/third_party/mbedtls -Iduckdb/third_party/mbedtls/include -Iduckdb/third_party/mbedtls/library -Iduckdb/third_party/jaro_winkler -Iduckdb/third_party/jaro_winkler/details -Iduckdb/extension/parquet/include -Iduckdb/third_party/parquet -Iduckdb/third_party/thrift -Iduckdb/third_party/snappy -Iduckdb/third_party/zstd/include -Iduckdb/third_party/mbedtls -Iduckdb/third_party/mbedtls/include -I../inst/include -Iduckdb -DDUCKDB_EXTENSION_PARQUET_LINKED -DDUCKDB_BUILD_LIBRARY -DDUCKDB_PLATFORM_RTOOLS=1
OBJECTS=database.o connection.o statement.o register.o relational.o scan.o transform.o utils.o reltoaltrep.o types.o cpp11.o $(SOURCES)
PKG_LIBS=-lws2_32
PKG_LIBS=-lws2_32 -lrstrtmgr
7 changes: 1 addition & 6 deletions src/duckdb/extension/parquet/column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#ifndef DUCKDB_AMALGAMATION
#include "duckdb/common/types/bit.hpp"
#include "duckdb/common/types/blob.hpp"
#include "duckdb/common/types/chunk_collection.hpp"
#endif

namespace duckdb {
Expand Down Expand Up @@ -181,11 +180,7 @@ idx_t ColumnReader::GroupRowsAvailable() {
}

unique_ptr<BaseStatistics> ColumnReader::Stats(idx_t row_group_idx_p, const vector<ColumnChunk> &columns) {
if (Type().id() == LogicalTypeId::LIST || Type().id() == LogicalTypeId::STRUCT ||
Type().id() == LogicalTypeId::MAP || Type().id() == LogicalTypeId::ARRAY) {
return nullptr;
}
return ParquetStatisticsUtils::TransformColumnStatistics(Schema(), Type(), columns[file_idx]);
return ParquetStatisticsUtils::TransformColumnStatistics(*this, columns);
}

void ColumnReader::Plain(shared_ptr<ByteBuffer> plain_data, uint8_t *defines, idx_t num_values, // NOLINT
Expand Down
21 changes: 20 additions & 1 deletion src/duckdb/extension/parquet/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
#include "duckdb/common/serializer/memory_stream.hpp"
#include "duckdb/common/serializer/write_stream.hpp"
#include "duckdb/common/string_map_set.hpp"
#include "duckdb/common/types/chunk_collection.hpp"
#include "duckdb/common/types/date.hpp"
#include "duckdb/common/types/hugeint.hpp"
#include "duckdb/common/types/uhugeint.hpp"
#include "duckdb/common/types/string_heap.hpp"
#include "duckdb/common/types/time.hpp"
#include "duckdb/common/types/timestamp.hpp"
Expand Down Expand Up @@ -824,6 +824,22 @@ struct ParquetHugeintOperator {
}
};

struct ParquetUhugeintOperator {
template <class SRC, class TGT>
static TGT Operation(SRC input) {
return Uhugeint::Cast<double>(input);
}

template <class SRC, class TGT>
static unique_ptr<ColumnWriterStatistics> InitializeStats() {
return make_uniq<ColumnWriterStatistics>();
}

template <class SRC, class TGT>
static void HandleStats(ColumnWriterStatistics *stats, SRC source_value, TGT target_value) {
}
};

template <class SRC, class TGT, class OP = ParquetCastOperator>
static void TemplatedWritePlain(Vector &col, ColumnWriterStatistics *stats, idx_t chunk_start, idx_t chunk_end,
ValidityMask &mask, WriteStream &ser) {
Expand Down Expand Up @@ -1997,6 +2013,9 @@ unique_ptr<ColumnWriter> ColumnWriter::CreateWriterRecursive(vector<duckdb_parqu
case LogicalTypeId::HUGEINT:
return make_uniq<StandardColumnWriter<hugeint_t, double, ParquetHugeintOperator>>(
writer, schema_idx, std::move(schema_path), max_repeat, max_define, can_have_nulls);
case LogicalTypeId::UHUGEINT:
return make_uniq<StandardColumnWriter<uhugeint_t, double, ParquetUhugeintOperator>>(
writer, schema_idx, std::move(schema_path), max_repeat, max_define, can_have_nulls);
case LogicalTypeId::TIMESTAMP_NS:
return make_uniq<StandardColumnWriter<int64_t, int64_t, ParquetTimestampNSOperator>>(
writer, schema_idx, std::move(schema_path), max_repeat, max_define, can_have_nulls);
Expand Down
1 change: 0 additions & 1 deletion src/duckdb/extension/parquet/include/column_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#ifndef DUCKDB_AMALGAMATION

#include "duckdb/common/operator/cast_operators.hpp"
#include "duckdb/common/types/chunk_collection.hpp"
#include "duckdb/common/types/string_type.hpp"
#include "duckdb/common/types/vector.hpp"
#include "duckdb/common/types/vector_cache.hpp"
Expand Down
4 changes: 2 additions & 2 deletions src/duckdb/extension/parquet/include/decode_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ class ParquetDecodeUtils {

public:
template <class T>
static T ZigzagToInt(const T n) {
return (n >> 1) ^ -(n & 1);
static T ZigzagToInt(const uint64_t n) {
return T(n >> 1) ^ -T(n & 1);
}

static const uint64_t BITPACK_MASKS[];
Expand Down
7 changes: 4 additions & 3 deletions src/duckdb/extension/parquet/include/parquet_dbp_decoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class DbpDecoder {
block_value_count = ParquetDecodeUtils::VarintDecode<uint64_t>(buffer_);
miniblocks_per_block = ParquetDecodeUtils::VarintDecode<uint64_t>(buffer_);
total_value_count = ParquetDecodeUtils::VarintDecode<uint64_t>(buffer_);
start_value = ParquetDecodeUtils::ZigzagToInt(ParquetDecodeUtils::VarintDecode<int64_t>(buffer_));
start_value = ParquetDecodeUtils::ZigzagToInt<int64_t>(ParquetDecodeUtils::VarintDecode<uint64_t>(buffer_));

// some derivatives
D_ASSERT(miniblocks_per_block > 0);
Expand Down Expand Up @@ -61,7 +61,8 @@ class DbpDecoder {
if (bitpack_pos > 0) { // have to eat the leftovers if any
buffer_.inc(1);
}
min_delta = ParquetDecodeUtils::ZigzagToInt(ParquetDecodeUtils::VarintDecode<uint64_t>(buffer_));
min_delta =
ParquetDecodeUtils::ZigzagToInt<int64_t>(ParquetDecodeUtils::VarintDecode<uint64_t>(buffer_));
for (idx_t miniblock_idx = 0; miniblock_idx < miniblocks_per_block; miniblock_idx++) {
miniblock_bit_widths[miniblock_idx] = buffer_.read<uint8_t>();
// TODO what happens if width is 0?
Expand All @@ -80,7 +81,7 @@ class DbpDecoder {
ParquetDecodeUtils::BitUnpack<T>(buffer_, bitpack_pos, &values[value_offset], read_now,
miniblock_bit_widths[miniblock_offset]);
for (idx_t i = value_offset; i < value_offset + read_now; i++) {
values[i] = ((i == 0) ? start_value : values[i - 1]) + min_delta + values[i];
values[i] = T(uint64_t((i == 0) ? start_value : values[i - 1]) + min_delta + uint64_t(values[i]));
}
value_offset += read_now;
values_left_in_miniblock -= read_now;
Expand Down
14 changes: 11 additions & 3 deletions src/duckdb/extension/parquet/include/parquet_decimal_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,26 @@ class ParquetDecimalUtils {
public:
template <class PHYSICAL_TYPE>
static PHYSICAL_TYPE ReadDecimalValue(const_data_ptr_t pointer, idx_t size,
const duckdb_parquet::format::SchemaElement &schema_ele) {
D_ASSERT(size <= sizeof(PHYSICAL_TYPE));
const duckdb_parquet::format::SchemaElement &) {
PHYSICAL_TYPE res = 0;

auto res_ptr = (uint8_t *)&res;
bool positive = (*pointer & 0x80) == 0;

// numbers are stored as two's complement so some muckery is required
for (idx_t i = 0; i < size; i++) {
for (idx_t i = 0; i < MinValue<idx_t>(size, sizeof(PHYSICAL_TYPE)); i++) {
auto byte = *(pointer + (size - i - 1));
res_ptr[i] = positive ? byte : byte ^ 0xFF;
}
// Verify that there are only 0s here
if (size > sizeof(PHYSICAL_TYPE)) {
for (idx_t i = sizeof(PHYSICAL_TYPE); i < size; i++) {
auto byte = *(pointer + (size - i - 1));
if (byte != 0) {
throw InvalidInputException("Invalid decimal encoding in Parquet file");
}
}
}
if (!positive) {
res += 1;
return -res;
Expand Down
5 changes: 3 additions & 2 deletions src/duckdb/extension/parquet/include/parquet_statistics.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ using duckdb_parquet::format::ColumnChunk;
using duckdb_parquet::format::SchemaElement;

struct LogicalType;
class ColumnReader;

struct ParquetStatisticsUtils {

static unique_ptr<BaseStatistics> TransformColumnStatistics(const SchemaElement &s_ele, const LogicalType &type,
const ColumnChunk &column_chunk);
static unique_ptr<BaseStatistics> TransformColumnStatistics(const ColumnReader &reader,
const vector<ColumnChunk> &columns);

static Value ConvertValue(const LogicalType &type, const duckdb_parquet::format::SchemaElement &schema_ele,
const std::string &stats);
Expand Down
4 changes: 4 additions & 0 deletions src/duckdb/extension/parquet/include/parquet_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ class ParquetWriter {
BufferedFileWriter &GetWriter() {
return *writer;
}
idx_t FileSize() {
lock_guard<mutex> glock(lock);
return writer->total_written;
}

static CopyTypeSupport TypeIsSupported(const LogicalType &type);

Expand Down
33 changes: 23 additions & 10 deletions src/duckdb/extension/parquet/parquet_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include "duckdb/common/multi_file_reader.hpp"
#include "duckdb/common/serializer/deserializer.hpp"
#include "duckdb/common/serializer/serializer.hpp"
#include "duckdb/common/types/chunk_collection.hpp"
#include "duckdb/function/copy_function.hpp"
#include "duckdb/function/pragma_function.hpp"
#include "duckdb/function/table_function.hpp"
Expand Down Expand Up @@ -491,10 +490,9 @@ class ParquetScanFunction {
if (bind_data.initial_file_cardinality == 0) {
return (100.0 * (bind_data.cur_file + 1)) / bind_data.files.size();
}
auto percentage = (bind_data.chunk_count * STANDARD_VECTOR_SIZE * 100.0 / bind_data.initial_file_cardinality) /
bind_data.files.size();
percentage += 100.0 * bind_data.cur_file / bind_data.files.size();
return percentage;
auto percentage = std::min(
100.0, (bind_data.chunk_count * STANDARD_VECTOR_SIZE * 100.0 / bind_data.initial_file_cardinality));
return (percentage + 100.0 * bind_data.cur_file) / bind_data.files.size();
}

static unique_ptr<LocalTableFunctionState>
Expand Down Expand Up @@ -630,7 +628,7 @@ class ParquetScanFunction {

static idx_t ParquetScanMaxThreads(ClientContext &context, const FunctionData *bind_data) {
auto &data = bind_data->Cast<ParquetReadBindData>();
return data.initial_file_row_groups * data.files.size();
return std::max(data.initial_file_row_groups, idx_t(1)) * data.files.size();
}

// This function looks for the next available row group. If not available, it will open files from bind_data.files
Expand Down Expand Up @@ -910,12 +908,12 @@ static void GetFieldIDs(const Value &field_ids_value, ChildFieldIDs &field_ids,
}
}

unique_ptr<FunctionData> ParquetWriteBind(ClientContext &context, const CopyInfo &info, const vector<string> &names,
const vector<LogicalType> &sql_types) {
unique_ptr<FunctionData> ParquetWriteBind(ClientContext &context, CopyFunctionBindInput &input,
const vector<string> &names, const vector<LogicalType> &sql_types) {
D_ASSERT(names.size() == sql_types.size());
bool row_group_size_bytes_set = false;
auto bind_data = make_uniq<ParquetWriteBindData>();
for (auto &option : info.options) {
for (auto &option : input.info.options) {
const auto loption = StringUtil::Lower(option.first);
if (option.second.size() != 1) {
// All parquet write options require exactly one argument
Expand Down Expand Up @@ -986,7 +984,13 @@ unique_ptr<FunctionData> ParquetWriteBind(ClientContext &context, const CopyInfo
throw NotImplementedException("Unrecognized option for PARQUET: %s", option.first.c_str());
}
}
if (!row_group_size_bytes_set) {
if (row_group_size_bytes_set) {
if (DBConfig::GetConfig(context).options.preserve_insertion_order) {
throw BinderException("ROW_GROUP_SIZE_BYTES does not work while preserving insertion order. Use \"SET "
"preserve_insertion_order=false;\" to disable preserving insertion order.");
}
} else {
// We always set a max row group size bytes so we don't use too much memory
bind_data->row_group_size_bytes = bind_data->row_group_size * ParquetWriteBindData::BYTES_PER_ROW;
}

Expand Down Expand Up @@ -1179,6 +1183,14 @@ idx_t ParquetWriteDesiredBatchSize(ClientContext &context, FunctionData &bind_da
return bind_data.row_group_size;
}

//===--------------------------------------------------------------------===//
// Current File Size
//===--------------------------------------------------------------------===//
idx_t ParquetWriteFileSize(GlobalFunctionData &gstate) {
auto &global_state = gstate.Cast<ParquetWriteGlobalState>();
return global_state.writer->FileSize();
}

//===--------------------------------------------------------------------===//
// Scan Replacement
//===--------------------------------------------------------------------===//
Expand Down Expand Up @@ -1240,6 +1252,7 @@ void ParquetExtension::Load(DuckDB &db) {
function.prepare_batch = ParquetWritePrepareBatch;
function.flush_batch = ParquetWriteFlushBatch;
function.desired_batch_size = ParquetWriteDesiredBatchSize;
function.file_size_bytes = ParquetWriteFileSize;
function.serialize = ParquetCopySerialize;
function.deserialize = ParquetCopyDeserialize;
function.supports_type = ParquetWriter::TypeIsSupported;
Expand Down
6 changes: 6 additions & 0 deletions src/duckdb/extension/parquet/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "duckdb/planner/filter/conjunction_filter.hpp"
#include "duckdb/planner/filter/constant_filter.hpp"
#include "duckdb/planner/filter/null_filter.hpp"
#include "duckdb/planner/filter/struct_filter.hpp"
#include "duckdb/planner/table_filter.hpp"
#include "duckdb/storage/object_cache.hpp"
#endif
Expand Down Expand Up @@ -874,6 +875,11 @@ static void ApplyFilter(Vector &v, TableFilter &filter, parquet_filter_t &filter
case TableFilterType::IS_NULL:
FilterIsNull(v, filter_mask, count);
break;
case TableFilterType::STRUCT_EXTRACT: {
auto &struct_filter = filter.Cast<StructFilter>();
auto &child = StructVector::GetEntries(v)[struct_filter.child_idx];
ApplyFilter(*child, *struct_filter.child_filter, filter_mask, count);
} break;
default:
D_ASSERT(0);
break;
Expand Down
Loading

0 comments on commit d065ea0

Please sign in to comment.