From f93ddc56f9e070c6c2e9a61e93dbddd566f23adc Mon Sep 17 00:00:00 2001 From: Ajay Kumar Movva Date: Thu, 28 Jul 2022 03:12:41 +0530 Subject: [PATCH] Added Node Stats api changes for the Point in time Signed-off-by: Ajay Kumar Movva --- .../index/search/stats/SearchStats.java | 51 +++++++++++++++++++ .../index/search/stats/ShardSearchStats.java | 17 +++++++ .../index/search/stats/SearchStatsTests.java | 9 ++-- .../search/CreatePitMultiNodeTests.java | 18 +++++++ .../search/CreatePitSingleNodeTests.java | 39 ++++++++++++++ .../search/DeletePitMultiNodeTests.java | 24 +++++++++ .../opensearch/search/SearchServiceTests.java | 21 ++++++-- 7 files changed, 173 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java b/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java index fe23000902608..5c19d4b4da4b9 100644 --- a/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java +++ b/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java @@ -77,6 +77,10 @@ public static class Stats implements Writeable, ToXContentFragment { private long suggestTimeInMillis; private long suggestCurrent; + private long pitCount; + private long pitTimeInMillis; + private long pitCurrent; + private Stats() { // for internal use, initializes all counts to 0 } @@ -91,6 +95,9 @@ public Stats( long scrollCount, long scrollTimeInMillis, long scrollCurrent, + long pitCount, + long pitTimeInMillis, + long pitCurrent, long suggestCount, long suggestTimeInMillis, long suggestCurrent @@ -110,6 +117,10 @@ public Stats( this.suggestCount = suggestCount; this.suggestTimeInMillis = suggestTimeInMillis; this.suggestCurrent = suggestCurrent; + + this.pitCount = pitCount; + this.pitTimeInMillis = pitTimeInMillis; + this.pitCurrent = pitCurrent; } private Stats(StreamInput in) throws IOException { @@ -128,6 +139,10 @@ private Stats(StreamInput in) throws IOException { suggestCount = in.readVLong(); suggestTimeInMillis = in.readVLong(); suggestCurrent = in.readVLong(); + + pitCount = in.readVLong(); + pitTimeInMillis = in.readVLong(); + pitCurrent = in.readVLong(); } public void add(Stats stats) { @@ -146,6 +161,10 @@ public void add(Stats stats) { suggestCount += stats.suggestCount; suggestTimeInMillis += stats.suggestTimeInMillis; suggestCurrent += stats.suggestCurrent; + + pitCount += stats.pitCount; + pitTimeInMillis += stats.pitTimeInMillis; + pitCurrent += stats.pitCurrent; } public void addForClosingShard(Stats stats) { @@ -162,6 +181,10 @@ public void addForClosingShard(Stats stats) { suggestCount += stats.suggestCount; suggestTimeInMillis += stats.suggestTimeInMillis; + + pitCount += stats.pitCount; + pitTimeInMillis += stats.pitTimeInMillis; + pitCurrent += stats.pitCurrent; } public long getQueryCount() { @@ -212,6 +235,22 @@ public long getScrollCurrent() { return scrollCurrent; } + public long getPitCount() { + return pitCount; + } + + public TimeValue getPitTime() { + return new TimeValue(pitTimeInMillis); + } + + public long getPitTimeInMillis() { + return pitTimeInMillis; + } + + public long getPitCurrent() { + return pitCurrent; + } + public long getSuggestCount() { return suggestCount; } @@ -249,6 +288,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(suggestCount); out.writeVLong(suggestTimeInMillis); out.writeVLong(suggestCurrent); + + out.writeVLong(pitCount); + out.writeVLong(pitTimeInMillis); + out.writeVLong(pitCurrent); } @Override @@ -265,6 +308,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.humanReadableField(Fields.SCROLL_TIME_IN_MILLIS, Fields.SCROLL_TIME, getScrollTime()); builder.field(Fields.SCROLL_CURRENT, scrollCurrent); + builder.field(Fields.PIT_TOTAL, pitCount); + builder.humanReadableField(Fields.PIT_TIME_IN_MILLIS, Fields.PIT_TIME, getPitTime()); + builder.field(Fields.PIT_CURRENT, pitCurrent); + builder.field(Fields.SUGGEST_TOTAL, suggestCount); builder.humanReadableField(Fields.SUGGEST_TIME_IN_MILLIS, Fields.SUGGEST_TIME, getSuggestTime()); builder.field(Fields.SUGGEST_CURRENT, suggestCurrent); @@ -385,6 +432,10 @@ static final class Fields { static final String SCROLL_TIME = "scroll_time"; static final String SCROLL_TIME_IN_MILLIS = "scroll_time_in_millis"; static final String SCROLL_CURRENT = "scroll_current"; + static final String PIT_TOTAL = "pit_total"; + static final String PIT_TIME = "pit_time"; + static final String PIT_TIME_IN_MILLIS = "pit_time_in_millis"; + static final String PIT_CURRENT = "pit_current"; static final String SUGGEST_TOTAL = "suggest_total"; static final String SUGGEST_TIME = "suggest_time"; static final String SUGGEST_TIME_IN_MILLIS = "suggest_time_in_millis"; diff --git a/server/src/main/java/org/opensearch/index/search/stats/ShardSearchStats.java b/server/src/main/java/org/opensearch/index/search/stats/ShardSearchStats.java index 3ef3571c75e59..6d0eb3a5949ca 100644 --- a/server/src/main/java/org/opensearch/index/search/stats/ShardSearchStats.java +++ b/server/src/main/java/org/opensearch/index/search/stats/ShardSearchStats.java @@ -187,6 +187,18 @@ public void onFreeScrollContext(ReaderContext readerContext) { totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - readerContext.getStartTimeInNano())); } + @Override + public void onNewPitContext(ReaderContext readerContext) { + totalStats.pitCurrent.inc(); + } + + @Override + public void onFreePitContext(ReaderContext readerContext) { + totalStats.pitCurrent.dec(); + assert totalStats.pitCurrent.count() >= 0; + totalStats.pitMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - readerContext.getStartTimeInNano())); + } + /** * Holder of statistics values * @@ -203,10 +215,12 @@ static final class StatsHolder { * for one-thousand times as long (i.e., scrolls that execute for almost twelve days on average). */ final MeanMetric scrollMetric = new MeanMetric(); + final MeanMetric pitMetric = new MeanMetric(); final MeanMetric suggestMetric = new MeanMetric(); final CounterMetric queryCurrent = new CounterMetric(); final CounterMetric fetchCurrent = new CounterMetric(); final CounterMetric scrollCurrent = new CounterMetric(); + final CounterMetric pitCurrent = new CounterMetric(); final CounterMetric suggestCurrent = new CounterMetric(); SearchStats.Stats stats() { @@ -220,6 +234,9 @@ SearchStats.Stats stats() { scrollMetric.count(), TimeUnit.MICROSECONDS.toMillis(scrollMetric.sum()), scrollCurrent.count(), + pitMetric.count(), + TimeUnit.MICROSECONDS.toMillis(pitMetric.sum()), + pitCurrent.count(), suggestMetric.count(), TimeUnit.NANOSECONDS.toMillis(suggestMetric.sum()), suggestCurrent.count() diff --git a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java index 4682d35411b78..7d2d8e38d066e 100644 --- a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java +++ b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java @@ -45,9 +45,9 @@ public void testShardLevelSearchGroupStats() throws Exception { // let's create two dummy search stats with groups Map groupStats1 = new HashMap<>(); Map groupStats2 = new HashMap<>(); - groupStats2.put("group1", new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1)); - SearchStats searchStats1 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats1); - SearchStats searchStats2 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats2); + groupStats2.put("group1", new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1)); + SearchStats searchStats1 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats1); + SearchStats searchStats2 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats2); // adding these two search stats and checking group stats are correct searchStats1.add(searchStats2); @@ -75,6 +75,9 @@ private static void assertStats(Stats stats, long equalTo) { assertEquals(equalTo, stats.getScrollCount()); assertEquals(equalTo, stats.getScrollTimeInMillis()); assertEquals(equalTo, stats.getScrollCurrent()); + assertEquals(equalTo, stats.getPitCount()); + assertEquals(equalTo, stats.getPitTimeInMillis()); + assertEquals(equalTo, stats.getPitCurrent()); assertEquals(equalTo, stats.getSuggestCount()); assertEquals(equalTo, stats.getSuggestTimeInMillis()); assertEquals(equalTo, stats.getSuggestCurrent()); diff --git a/server/src/test/java/org/opensearch/search/CreatePitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/CreatePitMultiNodeTests.java index 27d8f27add898..eddd69e535bdb 100644 --- a/server/src/test/java/org/opensearch/search/CreatePitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/CreatePitMultiNodeTests.java @@ -27,6 +27,8 @@ import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import java.util.ArrayList; import java.util.HashSet; @@ -70,6 +72,7 @@ public void testPit() throws Exception { .get(); assertEquals(2, searchResponse.getSuccessfulShards()); assertEquals(2, searchResponse.getTotalShards()); + validatePitStats("index", 2, 2); } public void testCreatePitWhileNodeDropWithAllowPartialCreationFalse() throws Exception { @@ -82,6 +85,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { ExecutionException ex = expectThrows(ExecutionException.class, execute::get); assertTrue(ex.getMessage().contains("Failed to execute phase [create_pit]")); assertTrue(ex.getMessage().contains("Partial shards failure")); + validatePitStats("index", 0, 0); return super.onNodeStopped(nodeName); } }); @@ -103,6 +107,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { .get(); assertEquals(1, searchResponse.getSuccessfulShards()); assertEquals(1, searchResponse.getTotalShards()); + validatePitStats("index", 1, 1); return super.onNodeStopped(nodeName); } }); @@ -124,6 +129,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { assertEquals(1, searchResponse.getFailedShards()); assertEquals(0, searchResponse.getSkippedShards()); assertEquals(2, searchResponse.getTotalShards()); + validatePitStats("index", 1, 1); return super.onNodeStopped(nodeName); } }); @@ -312,4 +318,16 @@ public void onFailure(Exception e) {} ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); } } + + public void validatePitStats(String index, long expectedPitCurrent, long expectedOpenContexts) throws ExecutionException, + InterruptedException { + IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); + indicesStatsRequest.indices("index"); + indicesStatsRequest.all(); + IndicesStatsResponse indicesStatsResponse = client().admin().indices().stats(indicesStatsRequest).get(); + long pitCurrent = indicesStatsResponse.getIndex(index).getTotal().search.getTotal().getPitCurrent(); + long openContexts = indicesStatsResponse.getIndex(index).getTotal().search.getOpenContexts(); + assertEquals(expectedPitCurrent, pitCurrent); + assertEquals(expectedOpenContexts, openContexts); + } } diff --git a/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java b/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java index 5c3c43af9cb66..7962a30e1ed79 100644 --- a/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java @@ -23,6 +23,9 @@ import org.opensearch.search.builder.PointInTimeBuilder; import org.opensearch.search.sort.SortOrder; import org.opensearch.test.OpenSearchSingleNodeTestCase; +import org.opensearch.index.IndexService; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -72,7 +75,11 @@ public void testCreatePITSuccess() throws ExecutionException, InterruptedExcepti SearchService service = getInstanceFromNode(SearchService.class); assertEquals(2, service.getActiveContexts()); + validatePitStats("index", 1, 0, 0); + validatePitStats("index", 1, 0, 1); service.doClose(); // this kills the keep-alive reaper we have to reset the node after this test + validatePitStats("index", 0, 1, 0); + validatePitStats("index", 0, 1, 1); } public void testCreatePITWithMultipleIndicesSuccess() throws ExecutionException, InterruptedException { @@ -88,7 +95,12 @@ public void testCreatePITWithMultipleIndicesSuccess() throws ExecutionException, CreatePitResponse response = execute.get(); assertEquals(4, response.getSuccessfulShards()); assertEquals(4, service.getActiveContexts()); + + validatePitStats("index", 1, 0, 0); + validatePitStats("index1", 1, 0, 0); service.doClose(); + validatePitStats("index", 0, 1, 0); + validatePitStats("index1", 0, 1, 0); } public void testCreatePITWithShardReplicasSuccess() throws ExecutionException, InterruptedException { @@ -109,7 +121,11 @@ public void testCreatePITWithShardReplicasSuccess() throws ExecutionException, I SearchService service = getInstanceFromNode(SearchService.class); assertEquals(2, service.getActiveContexts()); + validatePitStats("index", 1, 0, 0); + validatePitStats("index", 1, 0, 1); service.doClose(); + validatePitStats("index", 0, 1, 0); + validatePitStats("index", 0, 1, 1); } public void testCreatePITWithNonExistentIndex() { @@ -192,6 +208,9 @@ public void testPitSearchOnCloseIndex() throws ExecutionException, InterruptedEx CreatePitResponse pitResponse = execute.get(); SearchService service = getInstanceFromNode(SearchService.class); assertEquals(2, service.getActiveContexts()); + validatePitStats("index", 1, 0, 0); + validatePitStats("index", 1, 0, 1); + client().admin().indices().prepareClose("index").get(); SearchPhaseExecutionException ex = expectThrows(SearchPhaseExecutionException.class, () -> { SearchResponse searchResponse = client().prepareSearch() @@ -239,7 +258,10 @@ public void testMaxOpenPitContexts() throws Exception { + "This limit can be set by changing the [search.max_open_pit_context] setting." ) ); + final int maxPitContexts = SearchService.MAX_OPEN_PIT_CONTEXT.get(Settings.EMPTY); + validatePitStats("index", maxPitContexts, 0, 0); service.doClose(); + validatePitStats("index", 0, maxPitContexts, 0); } public void testOpenPitContextsConcurrently() throws Exception { @@ -285,7 +307,9 @@ public void testOpenPitContextsConcurrently() throws Exception { thread.join(); } assertThat(service.getActiveContexts(), equalTo(maxPitContexts)); + validatePitStats("index", maxPitContexts, 0, 0); service.doClose(); + validatePitStats("index", 0, maxPitContexts, 0); } /** @@ -453,9 +477,11 @@ public void testPitAfterUpdateIndex() throws Exception { .getTotalHits().value, Matchers.equalTo(0L) ); + validatePitStats("test", 1, 0, 0); } finally { service.doClose(); assertEquals(0, service.getActiveContexts()); + validatePitStats("test", 0, 1, 0); } } @@ -495,7 +521,20 @@ public void testConcurrentSearches() throws Exception { SearchService service = getInstanceFromNode(SearchService.class); assertEquals(2, service.getActiveContexts()); + validatePitStats("index", 1, 0, 0); + validatePitStats("index", 1, 0, 1); service.doClose(); assertEquals(0, service.getActiveContexts()); + validatePitStats("index", 0, 1, 0); + validatePitStats("index", 0, 1, 1); + } + + public void validatePitStats(String index, long expectedPitCurrent, long expectedPitCount, int shardId) throws ExecutionException, + InterruptedException { + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexServiceSafe(resolveIndex(index)); + IndexShard indexShard = indexService.getShard(shardId); + assertEquals(expectedPitCurrent, indexShard.searchStats().getTotal().getPitCurrent()); + assertEquals(expectedPitCount, indexShard.searchStats().getTotal().getPitCount()); } } diff --git a/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java b/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java index 89607b9201cd9..68afb97064941 100644 --- a/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java +++ b/server/src/test/java/org/opensearch/search/DeletePitMultiNodeTests.java @@ -11,6 +11,8 @@ import org.junit.After; import org.junit.Before; import org.opensearch.action.ActionFuture; +import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import org.opensearch.action.ActionListener; import org.opensearch.action.LatchedActionListener; import org.opensearch.action.search.CreatePitAction; @@ -76,6 +78,7 @@ public void testDeletePit() throws Exception { execute = client().execute(CreatePitAction.INSTANCE, request); pitResponse = execute.get(); pitIds.add(pitResponse.getId()); + validatePitStats("index", 10, 0); DeletePitRequest deletePITRequest = new DeletePitRequest(pitIds); ActionFuture deleteExecute = client().execute(DeletePitAction.INSTANCE, deletePITRequest); DeletePitResponse deletePITResponse = deleteExecute.get(); @@ -84,6 +87,7 @@ public void testDeletePit() throws Exception { assertTrue(pitIds.contains(deletePitInfo.getPitId())); assertTrue(deletePitInfo.isSuccessful()); } + validatePitStats("index", 0, 10); /** * Checking deleting the same PIT id again results in succeeded */ @@ -102,6 +106,7 @@ public void testDeletePitWithValidAndDeletedIds() throws Exception { CreatePitResponse pitResponse = execute.get(); List pitIds = new ArrayList<>(); pitIds.add(pitResponse.getId()); + validatePitStats("index", 5, 0); /** * Delete Pit #1 @@ -113,9 +118,11 @@ public void testDeletePitWithValidAndDeletedIds() throws Exception { assertTrue(pitIds.contains(deletePitInfo.getPitId())); assertTrue(deletePitInfo.isSuccessful()); } + validatePitStats("index", 0, 5); execute = client().execute(CreatePitAction.INSTANCE, request); pitResponse = execute.get(); pitIds.add(pitResponse.getId()); + validatePitStats("index", 5, 5); /** * Delete PIT with both Ids #1 (which is deleted) and #2 (which is present) */ @@ -126,6 +133,7 @@ public void testDeletePitWithValidAndDeletedIds() throws Exception { assertTrue(pitIds.contains(deletePitInfo.getPitId())); assertTrue(deletePitInfo.isSuccessful()); } + validatePitStats("index", 0, 10); } public void testDeletePitWithValidAndInvalidIds() throws Exception { @@ -148,6 +156,8 @@ public void testDeleteAllPits() throws Exception { client().prepareIndex("index1").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).execute().get(); ensureGreen(); createPitOnIndex("index1"); + validatePitStats("index", 5, 0); + validatePitStats("index1", 5, 0); DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); /** @@ -160,6 +170,8 @@ public void testDeleteAllPits() throws Exception { assertThat(deletePitInfo.getPitId(), not(blankOrNullString())); assertTrue(deletePitInfo.isSuccessful()); } + validatePitStats("index", 0, 5); + validatePitStats("index1", 0, 5); client().admin().indices().prepareDelete("index1").get(); } @@ -330,4 +342,16 @@ public void onFailure(Exception e) {} } } + public void validatePitStats(String index, long expectedPitCurrent, long expectedPitCount) throws ExecutionException, + InterruptedException { + IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); + indicesStatsRequest.indices(index); + indicesStatsRequest.all(); + IndicesStatsResponse indicesStatsResponse = client().admin().indices().stats(indicesStatsRequest).get(); + long pitCurrent = indicesStatsResponse.getIndex(index).getTotal().search.getTotal().getPitCurrent(); + long pitCount = indicesStatsResponse.getIndex(index).getTotal().search.getTotal().getPitCount(); + assertEquals(expectedPitCurrent, pitCurrent); + assertEquals(expectedPitCount, pitCount); + } + } diff --git a/server/src/test/java/org/opensearch/search/SearchServiceTests.java b/server/src/test/java/org/opensearch/search/SearchServiceTests.java index ecc28470b0eb2..2aef1803aa792 100644 --- a/server/src/test/java/org/opensearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/opensearch/search/SearchServiceTests.java @@ -1428,7 +1428,7 @@ private ReaderContext createReaderContext(IndexService indexService, IndexShard ); } - public void testDeletePitReaderContext() { + public void testDeletePitReaderContext() throws ExecutionException, InterruptedException { createIndex("index"); SearchService searchService = getInstanceFromNode(SearchService.class); PlainActionFuture future = new PlainActionFuture<>(); @@ -1442,6 +1442,7 @@ public void testDeletePitReaderContext() { contextIds.add(pitSearchContextIdForNode); assertThat(searchService.getActiveContexts(), equalTo(1)); + validatePitStats("index", 1, 0, 0); DeletePitResponse deletePitResponse = searchService.freeReaderContextsIfFound(contextIds); assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful()); // assert true for reader context not found @@ -1449,9 +1450,10 @@ public void testDeletePitReaderContext() { assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful()); // adding this assert to showcase behavior difference assertFalse(searchService.freeReaderContext(future.actionGet())); + validatePitStats("index", 0, 1, 0); } - public void testDeleteAllPitReaderContexts() { + public void testDeleteAllPitReaderContexts() throws ExecutionException, InterruptedException { createIndex("index"); SearchService searchService = getInstanceFromNode(SearchService.class); PlainActionFuture future = new PlainActionFuture<>(); @@ -1460,8 +1462,10 @@ public void testDeleteAllPitReaderContexts() { searchService.createPitReaderContext(new ShardId(resolveIndex("index"), 0), TimeValue.timeValueMinutes(between(1, 10)), future); future.actionGet(); assertThat(searchService.getActiveContexts(), equalTo(2)); + validatePitStats("index", 2, 0, 0); searchService.freeAllPitContexts(); assertThat(searchService.getActiveContexts(), equalTo(0)); + validatePitStats("index", 0, 2, 0); } public void testPitContextMaxKeepAlive() { @@ -1486,7 +1490,7 @@ public void testPitContextMaxKeepAlive() { assertThat(searchService.getActiveContexts(), equalTo(0)); } - public void testUpdatePitId() { + public void testUpdatePitId() throws ExecutionException, InterruptedException { createIndex("index"); SearchService searchService = getInstanceFromNode(SearchService.class); PlainActionFuture future = new PlainActionFuture<>(); @@ -1506,7 +1510,9 @@ public void testUpdatePitId() { assertTrue(updateResponse.getKeepAlive() == updateRequest.getKeepAlive()); assertTrue(updateResponse.getPitId().equalsIgnoreCase("pitId")); assertThat(searchService.getActiveContexts(), equalTo(1)); + validatePitStats("index", 1, 0, 0); assertTrue(searchService.freeReaderContext(future.actionGet())); + validatePitStats("index", 0, 1, 0); } public void testUpdatePitIdMaxKeepAlive() { @@ -1559,4 +1565,13 @@ public void testUpdatePitIdWithInvalidReaderId() { assertEquals("No search context found for id [" + id.getId() + "]", ex.getMessage()); assertThat(searchService.getActiveContexts(), equalTo(0)); } + + public void validatePitStats(String index, long expectedPitCurrent, long expectedPitCount, int shardId) throws ExecutionException, + InterruptedException { + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexServiceSafe(resolveIndex(index)); + IndexShard indexShard = indexService.getShard(shardId); + assertEquals(expectedPitCurrent, indexShard.searchStats().getTotal().getPitCurrent()); + assertEquals(expectedPitCount, indexShard.searchStats().getTotal().getPitCount()); + } }