Skip to content

Commit

Permalink
Add support for earliest aggregatorMergeStrategy (#14598)
Browse files Browse the repository at this point in the history
* Add EARLIEST aggregator merge strategy.

- More unit tests.
- Include the aggregators analysis type by default in tests.

* Docs.

* Some comments and a test

* Collapse into individual code blocks.
  • Loading branch information
abhishekrb19 authored Jul 18, 2023
1 parent 913416c commit f4d0ea7
Show file tree
Hide file tree
Showing 5 changed files with 459 additions and 58 deletions.
12 changes: 7 additions & 5 deletions docs/querying/segmentmetadataquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ There are several main parts to a segment metadata query:
|merge|Merge all individual segment metadata results into a single result|no|
|context|See [Context](../querying/query-context.md)|no|
|analysisTypes|A list of Strings specifying what column properties (e.g. cardinality, size) should be calculated and returned in the result. Defaults to ["cardinality", "interval", "minmax"], but can be overridden with using the [segment metadata query config](../configuration/index.md#segmentmetadata-query-config). See section [analysisTypes](#analysistypes) for more details.|no|
|aggregatorMergeStrategy| The strategy Druid uses to merge aggregators across segments. If true and if the `aggregators` analysis type is enabled, `aggregatorMergeStrategy` defaults to `strict`. Possible values include `strict`, `lenient`, and `latest`. See [`aggregatorMergeStrategy`](#aggregatormergestrategy) for details.|no|
|aggregatorMergeStrategy| The strategy Druid uses to merge aggregators across segments. If true and if the `aggregators` analysis type is enabled, `aggregatorMergeStrategy` defaults to `strict`. Possible values include `strict`, `lenient`, `earliest`, and `latest`. See [`aggregatorMergeStrategy`](#aggregatormergestrategy) for details.|no|
|lenientAggregatorMerge|Deprecated. Use `aggregatorMergeStrategy` property instead. If true, and if the `aggregators` analysis type is enabled, Druid merges aggregators leniently.|no|

The format of the result is:
Expand Down Expand Up @@ -186,7 +186,7 @@ Currently, there is no API for retrieving this information.
* `aggregators` in the result will contain the list of aggregators usable for querying metric columns. This may be
null if the aggregators are unknown or unmergeable (if merging is enabled).

* Merging can be `strict`, `lenient`, or `latest`. See [`aggregatorMergeStrategy`](#aggregatormergestrategy) for details.
* Merging can be `strict`, `lenient`, `earliest`, or `latest`. See [`aggregatorMergeStrategy`](#aggregatormergestrategy) for details.

* The form of the result is a map of column name to aggregator.

Expand All @@ -201,10 +201,12 @@ Conflicts between aggregator metadata across segments can occur if some segments
two segments use incompatible aggregators for the same column, such as `longSum` changed to `doubleSum`.
Druid supports the following aggregator merge strategies:

- `strict`: If there are any segments with unknown aggregators or any conflicts of any kind, the merged aggregators
- `strict`: If there are any segments with unknown aggregators or any conflicts of any kind, the merged aggregators
list is `null`.
- `lenient`: Druid ignores segments with unknown aggregators. Conflicts between aggregators set the aggregator for that particular column to null.
- the aggregator for that particular column.
- `lenient`: Druid ignores segments with unknown aggregators. Conflicts between aggregators set the aggregator for
that particular column to null.
- `earliest`: In the event of conflicts between segments, Druid selects the aggregator from the earliest segment
for that particular column.
- `latest`: In the event of conflicts between segments, Druid selects the aggregator from the most recent segment
for that particular column.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ public static SegmentAnalysis mergeAnalyses(

SegmentId mergedSegmentId = null;

// Union datasources can have multiple datasources. So we iterate through all the datasources to parse the segment id.
for (String dataSource : dataSources) {
final SegmentId id1 = SegmentId.tryParse(dataSource, arg1.getId());
final SegmentId id2 = SegmentId.tryParse(dataSource, arg2.getId());
Expand Down Expand Up @@ -364,15 +365,23 @@ public static SegmentAnalysis mergeAnalyses(
aggregators.put(aggregator.getName(), aggregator);
}
}
} else if (AggregatorMergeStrategy.EARLIEST == aggregatorMergeStrategy) {
// The segment analyses are already ordered above, where arg1 is the analysis pertaining to the latest interval
// followed by arg2. So for earliest strategy, the iteration order should be arg2 and arg1.
for (SegmentAnalysis analysis : ImmutableList.of(arg2, arg1)) {
if (analysis.getAggregators() != null) {
for (Map.Entry<String, AggregatorFactory> entry : analysis.getAggregators().entrySet()) {
aggregators.putIfAbsent(entry.getKey(), entry.getValue());
}
}
}
} else if (AggregatorMergeStrategy.LATEST == aggregatorMergeStrategy) {
// The segment analyses are already ordered above, where arg1 is the analysis pertaining to the latest interval
// followed by arg2.
// followed by arg2. So for latest strategy, the iteration order should be arg1 and arg2.
for (SegmentAnalysis analysis : ImmutableList.of(arg1, arg2)) {
if (analysis.getAggregators() != null) {
for (Map.Entry<String, AggregatorFactory> entry : analysis.getAggregators().entrySet()) {
final String aggregatorName = entry.getKey();
final AggregatorFactory aggregator = entry.getValue();
aggregators.putIfAbsent(aggregatorName, aggregator);
aggregators.putIfAbsent(entry.getKey(), entry.getValue());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public enum AggregatorMergeStrategy
{
STRICT,
LENIENT,
EARLIEST,
LATEST;

@JsonValue
Expand Down
Loading

0 comments on commit f4d0ea7

Please sign in to comment.