Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Concurrent Segment Search] SignificantTerms agg should not gather bg_count for each slice #8703

Closed
jed326 opened this issue Jul 14, 2023 · 2 comments · Fixed by #8735
Closed
Assignees
Labels

Comments

@jed326
Copy link
Collaborator

jed326 commented Jul 14, 2023

Subtask as a part of #8509

To Reproduce

./gradlew ':server:internalClusterTest' --tests "org.opensearch.search.aggregations.bucket.SignificantTermsSignificanceScoreIT.testXContentResponse" -Dtests.seed=6D7B107FC7821E62 -Dtests.locale=mk -Dtests.timezone=America/Atka -Druntime.java=17

Details

This test is failing due to an unexpected result in the query response:
Expected:

{
    "class": {
        "doc_count_error_upper_bound": 0,
        "sum_other_doc_count": 0,
        "buckets": [{
            "key": "0",
            "doc_count": 4,
            "sig_terms": {
                "doc_count": 4,
                "bg_count": 7,
                "buckets": [{
                    "key": 0,
                    "doc_count": 4,
                    "score": 0.39999999999999997,
                    "bg_count": 5
                }]
            }
        }, {
            "key": "1",
            "doc_count": 3,
            "sig_terms": {
                "doc_count": 3,
                "bg_count": 7,
                "buckets": [{
                    "key": 1,
                    "doc_count": 3,
                    "score": 0.75,
                    "bg_count": 4
                }]
            }
        }]
    }
}

Actual:

{
    "class": {
        "doc_count_error_upper_bound": 0,
        "sum_other_doc_count": 0,
        "buckets": [{
            "key": "0",
            "doc_count": 4,
            "sig_terms": {
                "doc_count": 4,
                "bg_count": 28,
                "buckets": [{
                    "key": 0,
                    "doc_count": 4,
                    "score": 0.39999999999999997,
                    "bg_count": 20
                }]
            }
        }, {
            "key": "1",
            "doc_count": 3,
            "sig_terms": {
                "doc_count": 3,
                "bg_count": 21,
                "buckets": [{
                    "key": 1,
                    "doc_count": 3,
                    "score": 0.75,
                    "bg_count": 12
                }]
            }
        }]
    }
}

Specifically, the bg_count is wrong in the concurrent search case. In a Significant terms aggregation, bg_count is the number of docs with the term in the background set while doc_count is the number of docs with the term in the foreground set.

So at the top level, the background set is the number of docs in the index with the “class” field, so the bg_count should simply be the number of docs in the index. Code ref:

supersetNumDocs = backgroundFilter == null ? searcher.getIndexReader().maxDoc() : searcher.count(this.backgroundFilter);

Background Set == All documents in the index with “class” field
At the inner level, the bg_count therefore refers to all of the documents in the background set with the given key in the “text” field. See how index is created here:
public static void index01Docs(String type, String settings, OpenSearchIntegTestCase testCase) throws ExecutionException,
InterruptedException {
String textMappings = "type=" + type;
if (type.equals("text")) {
textMappings += ",fielddata=true";
}
assertAcked(
testCase.prepareCreate(INDEX_NAME)
.setSettings(settings, XContentType.JSON)
.setMapping("text", textMappings, CLASS_FIELD, "type=keyword")
);
String[] gb = { "0", "1" };
List<IndexRequestBuilder> indexRequestBuilderList = new ArrayList<>();
indexRequestBuilderList.add(client().prepareIndex(INDEX_NAME).setId("1").setSource(TEXT_FIELD, "1", CLASS_FIELD, "1"));
indexRequestBuilderList.add(client().prepareIndex(INDEX_NAME).setId("2").setSource(TEXT_FIELD, "1", CLASS_FIELD, "1"));
indexRequestBuilderList.add(client().prepareIndex(INDEX_NAME).setId("3").setSource(TEXT_FIELD, "0", CLASS_FIELD, "0"));
indexRequestBuilderList.add(client().prepareIndex(INDEX_NAME).setId("4").setSource(TEXT_FIELD, "0", CLASS_FIELD, "0"));
indexRequestBuilderList.add(client().prepareIndex(INDEX_NAME).setId("5").setSource(TEXT_FIELD, gb, CLASS_FIELD, "1"));
indexRequestBuilderList.add(client().prepareIndex(INDEX_NAME).setId("6").setSource(TEXT_FIELD, gb, CLASS_FIELD, "0"));
indexRequestBuilderList.add(client().prepareIndex(INDEX_NAME).setId("7").setSource(TEXT_FIELD, "0", CLASS_FIELD, "0"));
testCase.indexRandom(true, false, indexRequestBuilderList);
}

So for key “0” this is 5, and for key “1” this is 4. This value is retrieved through a querying the index from the search thread. Code ref:


Just to crystalize the meanings of background and foreground sets, let’s break down the example in the ElasticSearch docs here: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-significantterms-aggregation.html#_single_set_analysis

Here the outer level bg_count refers to the number of documents with the field crime_type in the background set, which is the entire index. The query is for force == British Transport Police, so doc_count represents the number of documents where the force field matches British Transport Police, also known as our foreground set.
Background Set == Number of crimes in the index
Foreground Set == Number of crimes where force == British Transport Police

So now looking at the inner level, bg_count is going to be the number of documents where crime_type == Bicycle theft in the Background set, while doc_count is going to be the number of documents matching the same condition in the Foreground set.

References:


During the buildAggregations phase, the bg_count is gathered per collector. In the non-concurrent search case, that means it will only happen once per shard, and then at the coordinator level reduce can just sum up the bg_count across all of the shards to get the total. However, in the concurrent search use case we will have a collector per segment slice, and then at the shard-level reduce these will get summed up, so we will end up with SEGMENT_SLICE_COUNT * BG_COUNT documents.

Small caveat here is that this will only happen for slices that actually have docs, so in our test example, if a slice does not have any documents that have “text” field “0” then that collector will not contribute to the BG_COUNT miscount.
Here is an example to illustrate this problem:
In a test run, if the buckets look like this

0 = {StringTerms@13132} "{"class":{"doc_count_error_upper_bound":0,"sum_other_doc_count":0,"buckets":[{"key":"1","doc_count":2,"sig_terms":{"doc_count":2,"bg_count":7,"buckets":[]}}]}}"
1 = {StringTerms@13133} "{"class":{"doc_count_error_upper_bound":0,"sum_other_doc_count":0,"buckets":[{"key":"0","doc_count":1,"sig_terms":{"doc_count":1,"bg_count":7,"buckets":[]}},{"key":"1","doc_count":1,"sig_terms":{"doc_count":1,"bg_count":7,"buckets":[]}}]}}"
2 = {StringTerms@13134} "{"class":{"doc_count_error_upper_bound":0,"sum_other_doc_count":0,"buckets":[{"key":"0","doc_count":2,"sig_terms":{"doc_count":2,"bg_count":7,"buckets":[]}}]}}"
3 = {StringTerms@13135} "{"class":{"doc_count_error_upper_bound":0,"sum_other_doc_count":0,"buckets":[{"key":"0","doc_count":1,"sig_terms":{"doc_count":1,"bg_count":7,"buckets":[]}}]}}"

Then there are 5 buckets total, 2 for key “1” and 3 for key “0”, so if you look at the results below:

Expected:

{
    "class": {
        "doc_count_error_upper_bound": 0,
        "sum_other_doc_count": 0,
        "buckets": [{
            "key": "0",
            "doc_count": 4,
            "sig_terms": {
                "doc_count": 4,
                "bg_count": 7,
                "buckets": [{
                    "key": 0,
                    "doc_count": 4,
                    "score": 0.39999999999999997,
                    "bg_count": 5
                }]
            }
        }, {
            "key": "1",
            "doc_count": 3,
            "sig_terms": {
                "doc_count": 3,
                "bg_count": 7,
                "buckets": [{
                    "key": 1,
                    "doc_count": 3,
                    "score": 0.75,
                    "bg_count": 4
                }]
            }
        }]
    }
}

Actual:

{
    "class": {
        "doc_count_error_upper_bound": 0,
        "sum_other_doc_count": 0,
        "buckets": [{
            "key": "0",
            "doc_count": 4,
            "sig_terms": {
                "doc_count": 4,
                "bg_count": 21,
                "buckets": [{
                    "key": 0,
                    "doc_count": 4,
                    "score": 0.39999999999999997,
                    "bg_count": 15
                }]
            }
        }, {
            "key": "1",
            "doc_count": 3,
            "sig_terms": {
                "doc_count": 3,
                "bg_count": 14,
                "buckets": [{
                    "key": 1,
                    "doc_count": 3,
                    "score": 0.75,
                    "bg_count": 8
                }]
            }
        }]
    }
}

We see that key “1” has 2x the expected count while key “0” has 3x.

Single set case for reference:

    "aggregations": {
        "sig_terms": {
            "doc_count": 4,
            "bg_count": 21,
            "buckets": [{
                "key": 0,
                "doc_count": 4,
                "score": 0.39999999999999997,
                "bg_count": 15
            }]
        }
    }

Solutions

Without a deeper understanding of the scoring mechanism, it would be best for a solution to fix the scores in the slice level rather than the coordinator level.

I have added a few possible solutions below, with solution 3 looking the most promising to me and I will be working on a PR for that.

1. Divide by the bucket count

Due to the caveat above, in sparse data cases if a bucket is not collected in a segment slice then it won’t contribute to the bg_count. That means we can’t just blindly divide these numbers by the slice count. We would need to figure out some way to get the bucket count per collector to see how to divide the bg_count.

2. Separate out the shard-level vs slice-level reduce logic

Slice level reduce:

// using reduce is fine here instead of topLevelReduce as pipeline aggregation is evaluated on the coordinator after all
// documents are collected across shards for an aggregation
return new AggregationReduceableSearchResult(
InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partial())
);

Add a flag to reduce context and then don’t sum up supersetDf here:

@Override
protected B reduceBucket(List<B> buckets, ReduceContext context) {
assert buckets.size() > 0;
long subsetDf = 0;
long supersetDf = 0;
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
for (B bucket : buckets) {
subsetDf += bucket.subsetDf;
supersetDf += bucket.supersetDf;
aggregationsList.add(bucket.aggregations);
}
InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context);
return createBucket(subsetDf, buckets.get(0).subsetSize, supersetDf, buckets.get(0).supersetSize, aggs, buckets.get(0));
}

Nor supersetSize here:
for (InternalAggregation aggregation : aggregations) {
@SuppressWarnings("unchecked")
InternalSignificantTerms<A, B> terms = (InternalSignificantTerms<A, B>) aggregation;
globalSubsetSize += terms.getSubsetSize();
globalSupersetSize += terms.getSupersetSize();
}

3. Create a singleton SignificanceLookup that will only return backgroundFrequencies and supersetSize once per bucket

This seems to be the best solution to me because the sum bg_count will be correct going into the aggs level reduce so we won't need to modify that. Moreover, we're actually doing up to 2 * NUM_SLICES * NUM_BUCKETS extra queries to get the bg_count for every single bucket at the slice level, so this singleton solution will eliminate all but up to 2 of these extra queries.

Currently I am thinking we can use searchContext.isConcurrentSegmentSearchEnabled() here:

SignificanceLookup lookup = new SignificanceLookup(
queryShardContext,
config.fieldContext().fieldType(),
config.format(),
backgroundFilter
);

and then further use the queryShardContext to determine when to rest the Singleton.

Note that we will most likely also need to do this in SignificantTextAggregatorFactory here:

SignificanceLookup lookup = new SignificanceLookup(queryShardContext, fieldType, DocValueFormat.RAW, backgroundFilter);

However I'm seeing assert codec failures in that execution path as mentioned in #8509 (comment). Nevertheless the same solution should be applicable in both cases here.

I still need to hash out some of the finer details here, such as how to reset the Singleton class per bucket, however conceptually this solution is both the most intuitive and least intrusive.

@jed326
Copy link
Collaborator Author

jed326 commented Jul 14, 2023

Tagging @reta @sohami to see if you have any additional thoughts while I work on a PR for solution 3

@jed326
Copy link
Collaborator Author

jed326 commented Jul 17, 2023

After taking a look at this, I think solution 2 is actually much more straightforward. ReduceContext already has a isFinalReduce() method that is true for the coordinator level reduce. We can use that method in the places mentioned above and then we can leave the slice level query optimization for later. Will open a PR with this implementation shortly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.

2 participants