Skip to content

Commit

Permalink
Added Node Stats api changes for the Point in time
Browse files Browse the repository at this point in the history
Signed-off-by: Ajay Kumar Movva <movvaam@amazon.com>
  • Loading branch information
Ajay Kumar Movva committed Jul 27, 2022
1 parent 00db112 commit f93ddc5
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public static class Stats implements Writeable, ToXContentFragment {
private long suggestTimeInMillis;
private long suggestCurrent;

private long pitCount;
private long pitTimeInMillis;
private long pitCurrent;

private Stats() {
// for internal use, initializes all counts to 0
}
Expand All @@ -91,6 +95,9 @@ public Stats(
long scrollCount,
long scrollTimeInMillis,
long scrollCurrent,
long pitCount,
long pitTimeInMillis,
long pitCurrent,
long suggestCount,
long suggestTimeInMillis,
long suggestCurrent
Expand All @@ -110,6 +117,10 @@ public Stats(
this.suggestCount = suggestCount;
this.suggestTimeInMillis = suggestTimeInMillis;
this.suggestCurrent = suggestCurrent;

this.pitCount = pitCount;
this.pitTimeInMillis = pitTimeInMillis;
this.pitCurrent = pitCurrent;
}

private Stats(StreamInput in) throws IOException {
Expand All @@ -128,6 +139,10 @@ private Stats(StreamInput in) throws IOException {
suggestCount = in.readVLong();
suggestTimeInMillis = in.readVLong();
suggestCurrent = in.readVLong();

pitCount = in.readVLong();
pitTimeInMillis = in.readVLong();
pitCurrent = in.readVLong();
}

public void add(Stats stats) {
Expand All @@ -146,6 +161,10 @@ public void add(Stats stats) {
suggestCount += stats.suggestCount;
suggestTimeInMillis += stats.suggestTimeInMillis;
suggestCurrent += stats.suggestCurrent;

pitCount += stats.pitCount;
pitTimeInMillis += stats.pitTimeInMillis;
pitCurrent += stats.pitCurrent;
}

public void addForClosingShard(Stats stats) {
Expand All @@ -162,6 +181,10 @@ public void addForClosingShard(Stats stats) {

suggestCount += stats.suggestCount;
suggestTimeInMillis += stats.suggestTimeInMillis;

pitCount += stats.pitCount;
pitTimeInMillis += stats.pitTimeInMillis;
pitCurrent += stats.pitCurrent;
}

public long getQueryCount() {
Expand Down Expand Up @@ -212,6 +235,22 @@ public long getScrollCurrent() {
return scrollCurrent;
}

public long getPitCount() {
return pitCount;
}

public TimeValue getPitTime() {
return new TimeValue(pitTimeInMillis);
}

public long getPitTimeInMillis() {
return pitTimeInMillis;
}

public long getPitCurrent() {
return pitCurrent;
}

public long getSuggestCount() {
return suggestCount;
}
Expand Down Expand Up @@ -249,6 +288,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(suggestCount);
out.writeVLong(suggestTimeInMillis);
out.writeVLong(suggestCurrent);

out.writeVLong(pitCount);
out.writeVLong(pitTimeInMillis);
out.writeVLong(pitCurrent);
}

@Override
Expand All @@ -265,6 +308,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.humanReadableField(Fields.SCROLL_TIME_IN_MILLIS, Fields.SCROLL_TIME, getScrollTime());
builder.field(Fields.SCROLL_CURRENT, scrollCurrent);

builder.field(Fields.PIT_TOTAL, pitCount);
builder.humanReadableField(Fields.PIT_TIME_IN_MILLIS, Fields.PIT_TIME, getPitTime());
builder.field(Fields.PIT_CURRENT, pitCurrent);

builder.field(Fields.SUGGEST_TOTAL, suggestCount);
builder.humanReadableField(Fields.SUGGEST_TIME_IN_MILLIS, Fields.SUGGEST_TIME, getSuggestTime());
builder.field(Fields.SUGGEST_CURRENT, suggestCurrent);
Expand Down Expand Up @@ -385,6 +432,10 @@ static final class Fields {
static final String SCROLL_TIME = "scroll_time";
static final String SCROLL_TIME_IN_MILLIS = "scroll_time_in_millis";
static final String SCROLL_CURRENT = "scroll_current";
static final String PIT_TOTAL = "pit_total";
static final String PIT_TIME = "pit_time";
static final String PIT_TIME_IN_MILLIS = "pit_time_in_millis";
static final String PIT_CURRENT = "pit_current";
static final String SUGGEST_TOTAL = "suggest_total";
static final String SUGGEST_TIME = "suggest_time";
static final String SUGGEST_TIME_IN_MILLIS = "suggest_time_in_millis";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,18 @@ public void onFreeScrollContext(ReaderContext readerContext) {
totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - readerContext.getStartTimeInNano()));
}

@Override
public void onNewPitContext(ReaderContext readerContext) {
totalStats.pitCurrent.inc();
}

@Override
public void onFreePitContext(ReaderContext readerContext) {
totalStats.pitCurrent.dec();
assert totalStats.pitCurrent.count() >= 0;
totalStats.pitMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - readerContext.getStartTimeInNano()));
}

/**
* Holder of statistics values
*
Expand All @@ -203,10 +215,12 @@ static final class StatsHolder {
* for one-thousand times as long (i.e., scrolls that execute for almost twelve days on average).
*/
final MeanMetric scrollMetric = new MeanMetric();
final MeanMetric pitMetric = new MeanMetric();
final MeanMetric suggestMetric = new MeanMetric();
final CounterMetric queryCurrent = new CounterMetric();
final CounterMetric fetchCurrent = new CounterMetric();
final CounterMetric scrollCurrent = new CounterMetric();
final CounterMetric pitCurrent = new CounterMetric();
final CounterMetric suggestCurrent = new CounterMetric();

SearchStats.Stats stats() {
Expand All @@ -220,6 +234,9 @@ SearchStats.Stats stats() {
scrollMetric.count(),
TimeUnit.MICROSECONDS.toMillis(scrollMetric.sum()),
scrollCurrent.count(),
pitMetric.count(),
TimeUnit.MICROSECONDS.toMillis(pitMetric.sum()),
pitCurrent.count(),
suggestMetric.count(),
TimeUnit.NANOSECONDS.toMillis(suggestMetric.sum()),
suggestCurrent.count()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public void testShardLevelSearchGroupStats() throws Exception {
// let's create two dummy search stats with groups
Map<String, Stats> groupStats1 = new HashMap<>();
Map<String, Stats> groupStats2 = new HashMap<>();
groupStats2.put("group1", new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1));
SearchStats searchStats1 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats1);
SearchStats searchStats2 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats2);
groupStats2.put("group1", new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1));
SearchStats searchStats1 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats1);
SearchStats searchStats2 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats2);

// adding these two search stats and checking group stats are correct
searchStats1.add(searchStats2);
Expand Down Expand Up @@ -75,6 +75,9 @@ private static void assertStats(Stats stats, long equalTo) {
assertEquals(equalTo, stats.getScrollCount());
assertEquals(equalTo, stats.getScrollTimeInMillis());
assertEquals(equalTo, stats.getScrollCurrent());
assertEquals(equalTo, stats.getPitCount());
assertEquals(equalTo, stats.getPitTimeInMillis());
assertEquals(equalTo, stats.getPitCurrent());
assertEquals(equalTo, stats.getSuggestCount());
assertEquals(equalTo, stats.getSuggestTimeInMillis());
assertEquals(equalTo, stats.getSuggestCurrent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;

import java.util.ArrayList;
import java.util.HashSet;
Expand Down Expand Up @@ -70,6 +72,7 @@ public void testPit() throws Exception {
.get();
assertEquals(2, searchResponse.getSuccessfulShards());
assertEquals(2, searchResponse.getTotalShards());
validatePitStats("index", 2, 2);
}

public void testCreatePitWhileNodeDropWithAllowPartialCreationFalse() throws Exception {
Expand All @@ -82,6 +85,7 @@ public Settings onNodeStopped(String nodeName) throws Exception {
ExecutionException ex = expectThrows(ExecutionException.class, execute::get);
assertTrue(ex.getMessage().contains("Failed to execute phase [create_pit]"));
assertTrue(ex.getMessage().contains("Partial shards failure"));
validatePitStats("index", 0, 0);
return super.onNodeStopped(nodeName);
}
});
Expand All @@ -103,6 +107,7 @@ public Settings onNodeStopped(String nodeName) throws Exception {
.get();
assertEquals(1, searchResponse.getSuccessfulShards());
assertEquals(1, searchResponse.getTotalShards());
validatePitStats("index", 1, 1);
return super.onNodeStopped(nodeName);
}
});
Expand All @@ -124,6 +129,7 @@ public Settings onNodeStopped(String nodeName) throws Exception {
assertEquals(1, searchResponse.getFailedShards());
assertEquals(0, searchResponse.getSkippedShards());
assertEquals(2, searchResponse.getTotalShards());
validatePitStats("index", 1, 1);
return super.onNodeStopped(nodeName);
}
});
Expand Down Expand Up @@ -312,4 +318,16 @@ public void onFailure(Exception e) {}
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
}
}

public void validatePitStats(String index, long expectedPitCurrent, long expectedOpenContexts) throws ExecutionException,
InterruptedException {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.indices("index");
indicesStatsRequest.all();
IndicesStatsResponse indicesStatsResponse = client().admin().indices().stats(indicesStatsRequest).get();
long pitCurrent = indicesStatsResponse.getIndex(index).getTotal().search.getTotal().getPitCurrent();
long openContexts = indicesStatsResponse.getIndex(index).getTotal().search.getOpenContexts();
assertEquals(expectedPitCurrent, pitCurrent);
assertEquals(expectedOpenContexts, openContexts);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.test.OpenSearchSingleNodeTestCase;
import org.opensearch.index.IndexService;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;

import java.util.Map;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -72,7 +75,11 @@ public void testCreatePITSuccess() throws ExecutionException, InterruptedExcepti

SearchService service = getInstanceFromNode(SearchService.class);
assertEquals(2, service.getActiveContexts());
validatePitStats("index", 1, 0, 0);
validatePitStats("index", 1, 0, 1);
service.doClose(); // this kills the keep-alive reaper we have to reset the node after this test
validatePitStats("index", 0, 1, 0);
validatePitStats("index", 0, 1, 1);
}

public void testCreatePITWithMultipleIndicesSuccess() throws ExecutionException, InterruptedException {
Expand All @@ -88,7 +95,12 @@ public void testCreatePITWithMultipleIndicesSuccess() throws ExecutionException,
CreatePitResponse response = execute.get();
assertEquals(4, response.getSuccessfulShards());
assertEquals(4, service.getActiveContexts());

validatePitStats("index", 1, 0, 0);
validatePitStats("index1", 1, 0, 0);
service.doClose();
validatePitStats("index", 0, 1, 0);
validatePitStats("index1", 0, 1, 0);
}

public void testCreatePITWithShardReplicasSuccess() throws ExecutionException, InterruptedException {
Expand All @@ -109,7 +121,11 @@ public void testCreatePITWithShardReplicasSuccess() throws ExecutionException, I

SearchService service = getInstanceFromNode(SearchService.class);
assertEquals(2, service.getActiveContexts());
validatePitStats("index", 1, 0, 0);
validatePitStats("index", 1, 0, 1);
service.doClose();
validatePitStats("index", 0, 1, 0);
validatePitStats("index", 0, 1, 1);
}

public void testCreatePITWithNonExistentIndex() {
Expand Down Expand Up @@ -192,6 +208,9 @@ public void testPitSearchOnCloseIndex() throws ExecutionException, InterruptedEx
CreatePitResponse pitResponse = execute.get();
SearchService service = getInstanceFromNode(SearchService.class);
assertEquals(2, service.getActiveContexts());
validatePitStats("index", 1, 0, 0);
validatePitStats("index", 1, 0, 1);

client().admin().indices().prepareClose("index").get();
SearchPhaseExecutionException ex = expectThrows(SearchPhaseExecutionException.class, () -> {
SearchResponse searchResponse = client().prepareSearch()
Expand Down Expand Up @@ -239,7 +258,10 @@ public void testMaxOpenPitContexts() throws Exception {
+ "This limit can be set by changing the [search.max_open_pit_context] setting."
)
);
final int maxPitContexts = SearchService.MAX_OPEN_PIT_CONTEXT.get(Settings.EMPTY);
validatePitStats("index", maxPitContexts, 0, 0);
service.doClose();
validatePitStats("index", 0, maxPitContexts, 0);
}

public void testOpenPitContextsConcurrently() throws Exception {
Expand Down Expand Up @@ -285,7 +307,9 @@ public void testOpenPitContextsConcurrently() throws Exception {
thread.join();
}
assertThat(service.getActiveContexts(), equalTo(maxPitContexts));
validatePitStats("index", maxPitContexts, 0, 0);
service.doClose();
validatePitStats("index", 0, maxPitContexts, 0);
}

/**
Expand Down Expand Up @@ -453,9 +477,11 @@ public void testPitAfterUpdateIndex() throws Exception {
.getTotalHits().value,
Matchers.equalTo(0L)
);
validatePitStats("test", 1, 0, 0);
} finally {
service.doClose();
assertEquals(0, service.getActiveContexts());
validatePitStats("test", 0, 1, 0);
}
}

Expand Down Expand Up @@ -495,7 +521,20 @@ public void testConcurrentSearches() throws Exception {

SearchService service = getInstanceFromNode(SearchService.class);
assertEquals(2, service.getActiveContexts());
validatePitStats("index", 1, 0, 0);
validatePitStats("index", 1, 0, 1);
service.doClose();
assertEquals(0, service.getActiveContexts());
validatePitStats("index", 0, 1, 0);
validatePitStats("index", 0, 1, 1);
}

public void validatePitStats(String index, long expectedPitCurrent, long expectedPitCount, int shardId) throws ExecutionException,
InterruptedException {
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexServiceSafe(resolveIndex(index));
IndexShard indexShard = indexService.getShard(shardId);
assertEquals(expectedPitCurrent, indexShard.searchStats().getTotal().getPitCurrent());
assertEquals(expectedPitCount, indexShard.searchStats().getTotal().getPitCount());
}
}
Loading

0 comments on commit f93ddc5

Please sign in to comment.