Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Update core to duckdb v0.10.0 #90

Merged
merged 5 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rconfigure.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def open_utf8(fpath, flags):

include_list += " -DDUCKDB_PLATFORM_RTOOLS=1"
text = text.replace('{{ INCLUDES }}', include_list)
text = text.replace('{{ LINK_FLAGS }}', "-lws2_32")
text = text.replace('{{ LINK_FLAGS }}', "-lws2_32 -lrstrtmgr")

# now write it to the output Makevars
with open_utf8(os.path.join('src', 'Makevars.win'), 'w+') as f:
Expand Down
5 changes: 5 additions & 0 deletions src/duckdb/src/common/enum_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4138,6 +4138,8 @@ const char* EnumUtil::ToChars<PendingExecutionResult>(PendingExecutionResult val
return "RESULT_NOT_READY";
case PendingExecutionResult::EXECUTION_ERROR:
return "EXECUTION_ERROR";
case PendingExecutionResult::BLOCKED:
return "BLOCKED";
case PendingExecutionResult::NO_TASKS_AVAILABLE:
return "NO_TASKS_AVAILABLE";
default:
Expand All @@ -4156,6 +4158,9 @@ PendingExecutionResult EnumUtil::FromString<PendingExecutionResult>(const char *
if (StringUtil::Equals(value, "EXECUTION_ERROR")) {
return PendingExecutionResult::EXECUTION_ERROR;
}
if (StringUtil::Equals(value, "BLOCKED")) {
return PendingExecutionResult::BLOCKED;
}
if (StringUtil::Equals(value, "NO_TASKS_AVAILABLE")) {
return PendingExecutionResult::NO_TASKS_AVAILABLE;
}
Expand Down
15 changes: 15 additions & 0 deletions src/duckdb/src/common/types/vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,21 @@ void Vector::ReferenceAndSetType(const Vector &other) {

void Vector::Reinterpret(const Vector &other) {
vector_type = other.vector_type;
#ifdef DEBUG
auto &this_type = GetType();
auto &other_type = other.GetType();

auto type_is_same = other_type == this_type;
bool this_is_nested = this_type.IsNested();
bool other_is_nested = other_type.IsNested();

bool not_nested = this_is_nested == false && other_is_nested == false;
bool type_size_equal = GetTypeIdSize(this_type.InternalType()) == GetTypeIdSize(other_type.InternalType());
//! Either the types are completely identical, or they are not nested and their physical type size is the same
//! The reason nested types are not allowed is because copying the auxiliary buffer does not happen recursively
//! e.g DOUBLE[] to BIGINT[], the type of the LIST would say BIGINT but the child Vector says DOUBLE
D_ASSERT((not_nested && type_size_equal) || type_is_same);
#endif
AssignSharedPointer(buffer, other.buffer);
AssignSharedPointer(auxiliary, other.auxiliary);
data = other.data;
Expand Down
4 changes: 2 additions & 2 deletions src/duckdb/src/core_functions/scalar/map/map_entries.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ namespace duckdb {

// Reverse of map_from_entries
static void MapEntriesFunction(DataChunk &args, ExpressionState &state, Vector &result) {
idx_t count = args.size();
auto count = args.size();

result.Reinterpret(args.data[0]);
MapUtil::ReinterpretMap(result, args.data[0], count);

if (args.AllConstant()) {
result.SetVectorType(VectorType::CONSTANT_VECTOR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ namespace duckdb {
static void MapFromEntriesFunction(DataChunk &args, ExpressionState &state, Vector &result) {
auto count = args.size();

result.Reinterpret(args.data[0]);

MapUtil::ReinterpretMap(result, args.data[0], count);
MapVector::MapConversionVerify(result, count);
result.Verify(count);

Expand Down
2 changes: 1 addition & 1 deletion src/duckdb/src/core_functions/scalar/string/parse_path.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ static void TrimPathFunction(DataChunk &args, ExpressionState &state, Vector &re
// set default values
Vector &path = args.data[0];
Vector separator(string_t("default"));
Vector trim_extension(false);
Vector trim_extension(Value::BOOLEAN(false));
ReadOptionalArgs(args, separator, trim_extension, FRONT_TRIM);

TernaryExecutor::Execute<string_t, string_t, bool, string_t>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "duckdb/planner/expression/bound_aggregate_expression.hpp"
#include "duckdb/planner/expression/bound_constant_expression.hpp"
#include "duckdb/planner/expression/bound_reference_expression.hpp"
#include "duckdb/common/optional_idx.hpp"

namespace duckdb {

Expand Down Expand Up @@ -545,14 +546,22 @@ class HashAggregateDistinctFinalizeTask : public ExecutorTask {
TaskExecutionResult ExecuteTask(TaskExecutionMode mode) override;

private:
void AggregateDistinctGrouping(const idx_t grouping_idx);
TaskExecutionResult AggregateDistinctGrouping(const idx_t grouping_idx);

private:
Pipeline &pipeline;
shared_ptr<Event> event;

const PhysicalHashAggregate &op;
HashAggregateGlobalSinkState &gstate;

unique_ptr<LocalSinkState> local_sink_state;
idx_t grouping_idx = 0;
unique_ptr<LocalSourceState> radix_table_lstate;
bool blocked = false;
idx_t aggregation_idx = 0;
idx_t payload_idx = 0;
idx_t next_payload_idx = 0;
};

void HashAggregateDistinctFinalizeEvent::Schedule() {
Expand Down Expand Up @@ -604,14 +613,22 @@ void HashAggregateDistinctFinalizeEvent::FinishEvent() {
}

TaskExecutionResult HashAggregateDistinctFinalizeTask::ExecuteTask(TaskExecutionMode mode) {
for (idx_t grouping_idx = 0; grouping_idx < op.groupings.size(); grouping_idx++) {
AggregateDistinctGrouping(grouping_idx);
for (; grouping_idx < op.groupings.size(); grouping_idx++) {
auto res = AggregateDistinctGrouping(grouping_idx);
if (res == TaskExecutionResult::TASK_BLOCKED) {
return res;
}
D_ASSERT(res == TaskExecutionResult::TASK_FINISHED);
aggregation_idx = 0;
payload_idx = 0;
next_payload_idx = 0;
local_sink_state = nullptr;
}
event->FinishTask();
return TaskExecutionResult::TASK_FINISHED;
}

void HashAggregateDistinctFinalizeTask::AggregateDistinctGrouping(const idx_t grouping_idx) {
TaskExecutionResult HashAggregateDistinctFinalizeTask::AggregateDistinctGrouping(const idx_t grouping_idx) {
D_ASSERT(op.distinct_collection_info);
auto &info = *op.distinct_collection_info;

Expand All @@ -628,9 +645,11 @@ void HashAggregateDistinctFinalizeTask::AggregateDistinctGrouping(const idx_t gr
ExecutionContext execution_context(executor.context, thread_context, &pipeline);

// Sink state to sink into global HTs
InterruptState interrupt_state;
InterruptState interrupt_state(shared_from_this());
auto &global_sink_state = *grouping_state.table_state;
auto local_sink_state = grouping_data.table_data.GetLocalSinkState(execution_context);
if (!local_sink_state) {
local_sink_state = grouping_data.table_data.GetLocalSinkState(execution_context);
}
OperatorSinkInput sink_input {global_sink_state, *local_sink_state, interrupt_state};

// Create a chunk that mimics the 'input' chunk in Sink, for storing the group vectors
Expand All @@ -639,24 +658,24 @@ void HashAggregateDistinctFinalizeTask::AggregateDistinctGrouping(const idx_t gr
group_chunk.Initialize(executor.context, op.input_group_types);
}

auto &groups = op.grouped_aggregate_data.groups;
const idx_t group_by_size = groups.size();
const idx_t group_by_size = op.grouped_aggregate_data.groups.size();

DataChunk aggregate_input_chunk;
if (!gstate.payload_types.empty()) {
aggregate_input_chunk.Initialize(executor.context, gstate.payload_types);
}

auto &finalize_event = event->Cast<HashAggregateDistinctFinalizeEvent>();
const auto &finalize_event = event->Cast<HashAggregateDistinctFinalizeEvent>();

idx_t payload_idx;
idx_t next_payload_idx = 0;
for (idx_t agg_idx = 0; agg_idx < op.grouped_aggregate_data.aggregates.size(); agg_idx++) {
auto &agg_idx = aggregation_idx;
for (; agg_idx < op.grouped_aggregate_data.aggregates.size(); agg_idx++) {
auto &aggregate = aggregates[agg_idx]->Cast<BoundAggregateExpression>();

// Forward the payload idx
payload_idx = next_payload_idx;
next_payload_idx = payload_idx + aggregate.children.size();
if (!blocked) {
// Forward the payload idx
payload_idx = next_payload_idx;
next_payload_idx = payload_idx + aggregate.children.size();
}

// If aggregate is not distinct, skip it
if (!distinct_data.IsDistinct(agg_idx)) {
Expand All @@ -668,8 +687,11 @@ void HashAggregateDistinctFinalizeTask::AggregateDistinctGrouping(const idx_t gr
auto &radix_table = distinct_data.radix_tables[table_idx];

auto &sink = *distinct_state.radix_states[table_idx];
auto local_source = radix_table->GetLocalSourceState(execution_context);
OperatorSourceInput source_input {*finalize_event.global_source_states[grouping_idx][agg_idx], *local_source,
if (!blocked) {
radix_table_lstate = radix_table->GetLocalSourceState(execution_context);
}
auto &local_source = *radix_table_lstate;
OperatorSourceInput source_input {*finalize_event.global_source_states[grouping_idx][agg_idx], local_source,
interrupt_state};

// Create a duplicate of the output_chunk, because of multi-threading we cant alter the original
Expand All @@ -687,8 +709,8 @@ void HashAggregateDistinctFinalizeTask::AggregateDistinctGrouping(const idx_t gr
D_ASSERT(output_chunk.size() == 0);
break;
} else if (res == SourceResultType::BLOCKED) {
throw InternalException(
"Unexpected interrupt from radix table GetData in HashAggregateDistinctFinalizeTask");
blocked = true;
return TaskExecutionResult::TASK_BLOCKED;
}

auto &grouped_aggregate_data = *distinct_data.grouped_aggregate_data[table_idx];
Expand All @@ -708,8 +730,10 @@ void HashAggregateDistinctFinalizeTask::AggregateDistinctGrouping(const idx_t gr
// Sink it into the main ht
grouping_data.table_data.Sink(execution_context, group_chunk, sink_input, aggregate_input_chunk, {agg_idx});
}
blocked = false;
}
grouping_data.table_data.Combine(execution_context, global_sink_state, *local_sink_state);
return TaskExecutionResult::TASK_FINISHED;
}

SinkFinalizeType PhysicalHashAggregate::FinalizeDistinct(Pipeline &pipeline, Event &event, ClientContext &context,
Expand Down Expand Up @@ -809,6 +833,7 @@ class HashAggregateLocalSourceState : public LocalSourceState {
}
}

optional_idx radix_idx;
vector<unique_ptr<LocalSourceState>> radix_states;
};

Expand All @@ -823,32 +848,37 @@ SourceResultType PhysicalHashAggregate::GetData(ExecutionContext &context, DataC
auto &gstate = input.global_state.Cast<HashAggregateGlobalSourceState>();
auto &lstate = input.local_state.Cast<HashAggregateLocalSourceState>();
while (true) {
idx_t radix_idx = gstate.state_index;
if (!lstate.radix_idx.IsValid()) {
lstate.radix_idx = gstate.state_index.load();
}
const auto radix_idx = lstate.radix_idx.GetIndex();
if (radix_idx >= groupings.size()) {
break;
}

auto &grouping = groupings[radix_idx];
auto &radix_table = grouping.table_data;
auto &grouping_gstate = sink_gstate.grouping_states[radix_idx];

InterruptState interrupt_state;
OperatorSourceInput source_input {*gstate.radix_states[radix_idx], *lstate.radix_states[radix_idx],
interrupt_state};
input.interrupt_state};
auto res = radix_table.GetData(context, chunk, *grouping_gstate.table_state, source_input);
if (res == SourceResultType::BLOCKED) {
return res;
}
if (chunk.size() != 0) {
return SourceResultType::HAVE_MORE_OUTPUT;
} else if (res == SourceResultType::BLOCKED) {
throw InternalException("Unexpectedly Blocked from radix_table");
}

// move to the next table
lock_guard<mutex> l(gstate.lock);
radix_idx++;
if (radix_idx > gstate.state_index) {
lstate.radix_idx = lstate.radix_idx.GetIndex() + 1;
if (lstate.radix_idx.GetIndex() > gstate.state_index) {
// we have not yet worked on the table
// move the global index forwards
gstate.state_index = radix_idx;
gstate.state_index = lstate.radix_idx.GetIndex();
}
lstate.radix_idx = gstate.state_index.load();
}

return chunk.size() == 0 ? SourceResultType::FINISHED : SourceResultType::HAVE_MORE_OUTPUT;
Expand Down
Loading