From 7566b29e8aa317e70446d99356f774f3c538107e Mon Sep 17 00:00:00 2001 From: Jay Deng Date: Mon, 17 Jul 2023 17:50:14 -0700 Subject: [PATCH] Change InternalSignificantTerms to only sum shard level counts in final reduce Signed-off-by: Jay Deng --- CHANGELOG.md | 1 + .../opensearch/search/DefaultSearchContext.java | 4 +++- .../search/aggregations/InternalAggregation.java | 11 +++++++++++ .../bucket/terms/InternalSignificantTerms.java | 14 ++++++++++++-- 4 files changed, 27 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d9c77e371ce69..c0970015dd9a2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Migrate client transports to Apache HttpClient / Core 5.x ([#4459](https://github.com/opensearch-project/OpenSearch/pull/4459)) - Change http code on create index API with bad input raising NotXContentException from 500 to 400 ([#4773](https://github.com/opensearch-project/OpenSearch/pull/4773)) - Improve summary error message for invalid setting updates ([#4792](https://github.com/opensearch-project/OpenSearch/pull/4792)) +- Change InternalSignificantTerms to sum shard-level superset counts only in final reduce ([#8735](https://github.com/opensearch-project/OpenSearch/pull/8735)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index ee29d6bfe2b62..3494817c07aab 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -918,7 +918,9 @@ public ReaderContext readerContext() { @Override public InternalAggregation.ReduceContext partial() { - return requestToAggReduceContextBuilder.apply(request.source()).forPartialReduction(); + InternalAggregation.ReduceContext rc = requestToAggReduceContextBuilder.apply(request.source()).forPartialReduction(); + rc.setSliceLevel(true); + return rc; } @Override diff --git a/server/src/main/java/org/opensearch/search/aggregations/InternalAggregation.java b/server/src/main/java/org/opensearch/search/aggregations/InternalAggregation.java index b7577fb647be5..0f7ef89d10bfe 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/InternalAggregation.java +++ b/server/src/main/java/org/opensearch/search/aggregations/InternalAggregation.java @@ -89,6 +89,8 @@ public static class ReduceContext { private final ScriptService scriptService; private final IntConsumer multiBucketConsumer; private final PipelineTree pipelineTreeRoot; + + private boolean isSliceLevel; /** * Supplies the pipelines when the result of the reduce is serialized * to node versions that need pipeline aggregators to be serialized @@ -138,6 +140,7 @@ private ReduceContext( this.multiBucketConsumer = multiBucketConsumer; this.pipelineTreeRoot = pipelineTreeRoot; this.pipelineTreeForBwcSerialization = pipelineTreeForBwcSerialization; + this.isSliceLevel = false; } /** @@ -149,6 +152,14 @@ public boolean isFinalReduce() { return pipelineTreeRoot != null; } + public void setSliceLevel(boolean b) { + this.isSliceLevel = b; + } + + public boolean getSliceLevel() { + return this.isSliceLevel; + } + public BigArrays bigArrays() { return bigArrays; } diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java index 6104d2193f6cd..c666003e6a64a 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/terms/InternalSignificantTerms.java @@ -232,7 +232,12 @@ public InternalAggregation reduce(List aggregations, Reduce @SuppressWarnings("unchecked") InternalSignificantTerms terms = (InternalSignificantTerms) aggregation; globalSubsetSize += terms.getSubsetSize(); - globalSupersetSize += terms.getSupersetSize(); + // supersetSize is a shard level count, so we don't sum it for segment slices + if (!reduceContext.getSliceLevel()) { + globalSupersetSize += terms.getSupersetSize(); + } else { + globalSupersetSize = terms.getSupersetSize(); + } } Map> buckets = new HashMap<>(); for (InternalAggregation aggregation : aggregations) { @@ -291,7 +296,12 @@ protected B reduceBucket(List buckets, ReduceContext context) { List aggregationsList = new ArrayList<>(buckets.size()); for (B bucket : buckets) { subsetDf += bucket.subsetDf; - supersetDf += bucket.supersetDf; + // supersetDf is a shard level count, so we don't sum it for segment slices + if (!context.getSliceLevel()) { + supersetDf += bucket.supersetDf; + } else { + supersetDf = bucket.supersetDf; + } aggregationsList.add(bucket.aggregations); } InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context);