From 1999e98cdb912af80871072073b72446d6b161e4 Mon Sep 17 00:00:00 2001 From: Rohit Ashiwal Date: Tue, 4 Jul 2023 14:51:07 +0530 Subject: [PATCH] Indexing: add Doc status counter Currently, Opensearch returns a 200 OK response code for a Bulk API call, even though there can be partial/complete failures within the request E2E. Tracking these failures requires client to parse the response on their side and make sense of them. But, a general idea around trend in growth of different rest status codes at item level can provide insights on how indexing engine is performing. Signed-off-by: Rohit Ashiwal --- CHANGELOG.md | 1 + .../org/opensearch/core/rest/RestStatus.java | 11 ++ .../test/nodes.stats/11_indices_metrics.yml | 29 ++++ .../org/opensearch/nodestats/NodeStatsIT.java | 107 ++++++++++++++ .../admin/indices/stats/CommonStats.java | 37 +++++ .../action/bulk/TransportBulkAction.java | 22 ++- .../common/settings/ClusterSettings.java | 1 + .../opensearch/index/shard/IndexingStats.java | 139 +++++++++++++++++- .../opensearch/indices/IndicesService.java | 49 ++++++ .../cluster/node/stats/NodeStatsTests.java | 65 +++++++- ...ActionIndicesThatCannotBeCreatedTests.java | 1 + .../bulk/TransportBulkActionIngestTests.java | 1 + .../action/bulk/TransportBulkActionTests.java | 1 + .../bulk/TransportBulkActionTookTests.java | 1 + .../org/opensearch/rest/RestStatusTests.java | 24 +++ .../snapshots/SnapshotResiliencyTests.java | 1 + 16 files changed, 477 insertions(+), 13 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/nodestats/NodeStatsIT.java create mode 100644 server/src/test/java/org/opensearch/rest/RestStatusTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index d3eda3f852a24..ad754aa4f97bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -74,6 +74,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Added - Add server version as REST response header [#6583](https://github.com/opensearch-project/OpenSearch/issues/6583) - Start replication checkpointTimers on primary before segments upload to remote store. ([#8221]()https://github.com/opensearch-project/OpenSearch/pull/8221) +- Add Doc Status Counter for Indexing Engine ([#4562](https://github.com/opensearch-project/OpenSearch/issues/4562)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307)) diff --git a/libs/core/src/main/java/org/opensearch/core/rest/RestStatus.java b/libs/core/src/main/java/org/opensearch/core/rest/RestStatus.java index ae4f4c65b28d2..06f6fe46ffbf8 100644 --- a/libs/core/src/main/java/org/opensearch/core/rest/RestStatus.java +++ b/libs/core/src/main/java/org/opensearch/core/rest/RestStatus.java @@ -431,6 +431,13 @@ public enum RestStatus { * next-hop server. */ EXPECTATION_FAILED(417), + /** + * Any attempt to brew coffee with a teapot should result in the error code "418 I'm a teapot". The resulting + * entity body MAY be short and stout. + * + * @see I'm a teapot! + */ + I_AM_A_TEAPOT(418), /** * The request was directed at a server that is not able to produce a response. This can be sent by a server * that is not configured to produce responses for the combination of scheme and authority that are included @@ -559,4 +566,8 @@ public static RestStatus status(int successfulShards, int totalShards, ShardOper public static RestStatus fromCode(int code) { return CODE_TO_STATUS.get(code); } + + public static Boolean isValidRestCode(int code) { + return fromCode(code) != null; + } } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/11_indices_metrics.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/11_indices_metrics.yml index 1f1f42890355e..3f79227ce64e8 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/11_indices_metrics.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/11_indices_metrics.yml @@ -138,6 +138,35 @@ - is_false: nodes.$node_id.indices.translog - is_false: nodes.$node_id.indices.recovery +--- +"Metric - indexing doc_status": + - skip: + version: " - 2.99.99" + reason: "To be introduced in future release :: TODO: change if/when we backport to 2.x" + - do: + nodes.info: {} + - set: + nodes._arbitrary_key_: node_id + + - do: + nodes.stats: { metric: indices, index_metric: indexing } + + - is_false: nodes.$node_id.indices.docs + - is_false: nodes.$node_id.indices.store + - is_true: nodes.$node_id.indices.indexing + - is_true: nodes.$node_id.indices.indexing.doc_status + - is_false: nodes.$node_id.indices.get + - is_false: nodes.$node_id.indices.search + - is_false: nodes.$node_id.indices.merges + - is_false: nodes.$node_id.indices.refresh + - is_false: nodes.$node_id.indices.flush + - is_false: nodes.$node_id.indices.warmer + - is_false: nodes.$node_id.indices.query_cache + - is_false: nodes.$node_id.indices.fielddata + - is_false: nodes.$node_id.indices.completion + - is_false: nodes.$node_id.indices.segments + - is_false: nodes.$node_id.indices.translog + - is_false: nodes.$node_id.indices.recovery --- "Metric - recovery": diff --git a/server/src/internalClusterTest/java/org/opensearch/nodestats/NodeStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/nodestats/NodeStatsIT.java new file mode 100644 index 0000000000000..1e58dbaed7f65 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/nodestats/NodeStatsIT.java @@ -0,0 +1,107 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.nodestats; + +import org.hamcrest.MatcherAssert; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.bulk.BulkItemResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.client.Requests; +import org.opensearch.index.shard.IndexingStats; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.OpenSearchIntegTestCase.Scope; +import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.equalTo; + +@ClusterScope(scope = Scope.SUITE, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false) +public class NodeStatsIT extends OpenSearchIntegTestCase { + + private static final String INDEX = "test_index"; + private static final Map expectedDocStatusCounter; + + static { + expectedDocStatusCounter = new HashMap<>(); + expectedDocStatusCounter.put(200, new AtomicLong(0)); + expectedDocStatusCounter.put(201, new AtomicLong(0)); + expectedDocStatusCounter.put(404, new AtomicLong(0)); + } + + public void testNodeIndicesStatsBulk() { + int sizeOfIndexRequests = scaledRandomIntBetween(10, 20); + int sizeOfDeleteRequests = scaledRandomIntBetween(5, sizeOfIndexRequests); + int sizeOfNotFountRequests = scaledRandomIntBetween(5, sizeOfIndexRequests); + + BulkRequest bulkRequest = new BulkRequest(); + + for (int i = 0; i < sizeOfIndexRequests; ++i) { + bulkRequest.add(new IndexRequest(INDEX).id(String.valueOf(i)).source(Requests.INDEX_CONTENT_TYPE, "field", "value")); + } + + BulkResponse response = client().bulk(bulkRequest).actionGet(); + + MatcherAssert.assertThat(response.hasFailures(), equalTo(false)); + MatcherAssert.assertThat(response.getItems().length, equalTo(sizeOfIndexRequests)); + + for (BulkItemResponse itemResponse : response.getItems()) { + expectedDocStatusCounter.get(itemResponse.getResponse().status().getStatus()).incrementAndGet(); + } + + refresh(INDEX); + bulkRequest.requests().clear(); + + for (int i = 0; i < sizeOfDeleteRequests; ++i) { + bulkRequest.add(new DeleteRequest(INDEX, String.valueOf(i))); + } + for (int i = 0; i < sizeOfNotFountRequests; ++i) { + bulkRequest.add(new DeleteRequest(INDEX, String.valueOf(25 + i))); + } + + response = client().bulk(bulkRequest).actionGet(); + + MatcherAssert.assertThat(response.hasFailures(), equalTo(false)); + MatcherAssert.assertThat(response.getItems().length, equalTo(sizeOfDeleteRequests + sizeOfNotFountRequests)); + + for (BulkItemResponse itemResponse : response.getItems()) { + expectedDocStatusCounter.get(itemResponse.getResponse().status().getStatus()).incrementAndGet(); + } + + refresh(INDEX); + + NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().execute().actionGet(); + IndexingStats.Stats stats = nodesStatsResponse.getNodes().get(0).getIndices().getIndexing().getTotal(); + + MatcherAssert.assertThat(stats.getIndexCount(), greaterThan(0L)); + MatcherAssert.assertThat(stats.getIndexTime().duration(), greaterThan(0L)); + MatcherAssert.assertThat(stats.getIndexCurrent(), notNullValue()); + MatcherAssert.assertThat(stats.getIndexFailedCount(), notNullValue()); + MatcherAssert.assertThat(stats.getDeleteCount(), greaterThan(0L)); + MatcherAssert.assertThat(stats.getDeleteTime().duration(), greaterThan(0L)); + MatcherAssert.assertThat(stats.getDeleteCurrent(), notNullValue()); + MatcherAssert.assertThat(stats.getNoopUpdateCount(), notNullValue()); + MatcherAssert.assertThat(stats.isThrottled(), notNullValue()); + MatcherAssert.assertThat(stats.getThrottleTime(), notNullValue()); + + Map docStatusCounter = stats.getDocStatusStats().getDocStatusCounter(); + + for (Integer key : docStatusCounter.keySet()) { + MatcherAssert.assertThat(docStatusCounter.get(key).longValue(), equalTo(expectedDocStatusCounter.get(key).longValue())); + } + } + +} diff --git a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java index 5a3a34e9a2ebe..7942594bf8652 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java @@ -264,6 +264,43 @@ public CommonStats(StreamInput in) throws IOException { recoveryStats = in.readOptionalWriteable(RecoveryStats::new); } + // public: visible for testing + public CommonStats( + DocsStats docs, + StoreStats store, + IndexingStats indexing, + GetStats get, + SearchStats search, + MergeStats merge, + RefreshStats refresh, + FlushStats flush, + WarmerStats warmer, + QueryCacheStats queryCache, + FieldDataStats fieldData, + CompletionStats completion, + SegmentsStats segments, + TranslogStats translog, + RequestCacheStats requestCache, + RecoveryStats recoveryStats + ) { + this.docs = docs; + this.store = store; + this.indexing = indexing; + this.get = get; + this.search = search; + this.merge = merge; + this.refresh = refresh; + this.flush = flush; + this.warmer = warmer; + this.queryCache = queryCache; + this.fieldData = fieldData; + this.completion = completion; + this.segments = segments; + this.translog = translog; + this.requestCache = requestCache; + this.recoveryStats = recoveryStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(docs); diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java index f6ca9022a5bff..d55d03101bd94 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java @@ -79,6 +79,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.core.index.shard.ShardId; import org.opensearch.indices.IndexClosedException; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.SystemIndices; import org.opensearch.ingest.IngestService; import org.opensearch.node.NodeClosedException; @@ -129,6 +130,7 @@ public class TransportBulkAction extends HandledTransportAction docWriteRequest = request.request(); - responses.set( + final DocWriteRequest docWriteRequest = request.request(); + final BulkItemResponse bulkItemResponse = new BulkItemResponse( request.id(), - new BulkItemResponse( - request.id(), - docWriteRequest.opType(), - new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e) - ) + docWriteRequest.opType(), + new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e) ); + + indicesService.incrementDocStatusCounter(bulkItemResponse.status()); + responses.set(request.id(), bulkItemResponse); } if (counter.decrementAndGet() == 0) { finishHim(); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index d70ea16cf5fdd..8d2cda829c768 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -261,6 +261,7 @@ public void apply(Settings value, Settings current, Settings previous) { IndicesService.INDICES_ID_FIELD_DATA_ENABLED_SETTING, IndicesService.WRITE_DANGLING_INDICES_INFO_SETTING, IndicesService.CLUSTER_REPLICATION_TYPE_SETTING, + IndicesService.INDEXING_DOC_STATUS_KEYS_SETTING, MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING, MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING, Metadata.SETTING_READ_ONLY_SETTING, diff --git a/server/src/main/java/org/opensearch/index/shard/IndexingStats.java b/server/src/main/java/org/opensearch/index/shard/IndexingStats.java index f45417a20036e..ea1766f3ec958 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexingStats.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexingStats.java @@ -44,6 +44,8 @@ import java.io.IOException; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; /** * Tracks indexing statistics @@ -59,6 +61,78 @@ public class IndexingStats implements Writeable, ToXContentFragment { */ public static class Stats implements Writeable, ToXContentFragment { + /** + * Tracks item level rest status codes during indexing + * + * @opensearch.internal + */ + public static class DocStatusStats implements Writeable, ToXContentFragment { + + private final Map docStatusCounter; + + public DocStatusStats() { + this(new ConcurrentHashMap<>()); + } + + // public: visible for testing + public DocStatusStats(Map docStatusCounter) { + this.docStatusCounter = docStatusCounter; + } + + public DocStatusStats(StreamInput in) throws IOException { + int size = in.readInt(); + docStatusCounter = new ConcurrentHashMap<>(); + + for (int i = 0; i < size; ++i) { + docStatusCounter.put(in.readInt(), new AtomicLong(in.readLong())); + } + } + + public void add(DocStatusStats stats) { + if (null == stats || null == stats.docStatusCounter) { + return; + } + + for (Map.Entry entry : stats.docStatusCounter.entrySet()) { + int k = entry.getKey(); + AtomicLong v = entry.getValue(); + + docStatusCounter.computeIfAbsent(k, x -> new AtomicLong(0)); + docStatusCounter.get(k).addAndGet(v.longValue()); + } + } + + public void inc(int status) { + docStatusCounter.computeIfAbsent(status, x -> new AtomicLong(0)); + docStatusCounter.get(status).incrementAndGet(); + } + + public Map getDocStatusCounter() { + return docStatusCounter; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields.DOC_STATUS); + + for (Map.Entry entry : docStatusCounter.entrySet()) { + builder.field(String.valueOf(entry.getKey()), entry.getValue().longValue()); + } + + return builder.endObject(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeInt(docStatusCounter.size()); + + for (Map.Entry entry : docStatusCounter.entrySet()) { + out.writeInt(entry.getKey()); + out.writeLong(entry.getValue().longValue()); + } + } + } + private long indexCount; private long indexTimeInMillis; private long indexCurrent; @@ -70,7 +144,11 @@ public static class Stats implements Writeable, ToXContentFragment { private long throttleTimeInMillis; private boolean isThrottled; - Stats() {} + private final DocStatusStats docStatusStats; + + Stats() { + docStatusStats = new DocStatusStats(); + } public Stats(StreamInput in) throws IOException { indexCount = in.readVLong(); @@ -83,6 +161,12 @@ public Stats(StreamInput in) throws IOException { noopUpdateCount = in.readVLong(); isThrottled = in.readBoolean(); throttleTimeInMillis = in.readLong(); + + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + docStatusStats = in.readOptionalWriteable(DocStatusStats::new); + } else { + docStatusStats = null; + } } public Stats( @@ -96,6 +180,35 @@ public Stats( long noopUpdateCount, boolean isThrottled, long throttleTimeInMillis + ) { + this( + indexCount, + indexTimeInMillis, + indexCurrent, + indexFailedCount, + deleteCount, + deleteTimeInMillis, + deleteCurrent, + noopUpdateCount, + isThrottled, + throttleTimeInMillis, + new DocStatusStats() + ); + } + + // public: visible for testing + public Stats( + long indexCount, + long indexTimeInMillis, + long indexCurrent, + long indexFailedCount, + long deleteCount, + long deleteTimeInMillis, + long deleteCurrent, + long noopUpdateCount, + boolean isThrottled, + long throttleTimeInMillis, + DocStatusStats docStatusStats ) { this.indexCount = indexCount; this.indexTimeInMillis = indexTimeInMillis; @@ -107,6 +220,7 @@ public Stats( this.noopUpdateCount = noopUpdateCount; this.isThrottled = isThrottled; this.throttleTimeInMillis = throttleTimeInMillis; + this.docStatusStats = docStatusStats; } public void add(Stats stats) { @@ -121,8 +235,10 @@ public void add(Stats stats) { noopUpdateCount += stats.noopUpdateCount; throttleTimeInMillis += stats.throttleTimeInMillis; - if (isThrottled != stats.isThrottled) { - isThrottled = true; // When combining if one is throttled set result to throttled. + isThrottled |= stats.isThrottled; // When combining if one is throttled set result to throttled. + + if (getDocStatusStats() != null) { + getDocStatusStats().add(stats.getDocStatusStats()); } } @@ -193,6 +309,10 @@ public long getNoopUpdateCount() { return noopUpdateCount; } + public DocStatusStats getDocStatusStats() { + return docStatusStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(indexCount); @@ -206,6 +326,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(isThrottled); out.writeLong(throttleTimeInMillis); + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeOptionalWriteable(docStatusStats); + } } @Override @@ -223,6 +346,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(Fields.IS_THROTTLED, isThrottled); builder.humanReadableField(Fields.THROTTLED_TIME_IN_MILLIS, Fields.THROTTLED_TIME, getThrottleTime()); + + if (getDocStatusStats() != null) { + getDocStatusStats().toXContent(builder, params); + } + return builder; } } @@ -235,7 +363,7 @@ public IndexingStats() { public IndexingStats(StreamInput in) throws IOException { totalStats = new Stats(in); - if (in.getVersion().before(Version.V_2_0_0)) { + if (in.getVersion().before(Version.V_3_0_0)) { if (in.readBoolean()) { Map typeStats = in.readMap(StreamInput::readString, Stats::new); assert typeStats.size() == 1; @@ -294,12 +422,13 @@ static final class Fields { static final String IS_THROTTLED = "is_throttled"; static final String THROTTLED_TIME_IN_MILLIS = "throttle_time_in_millis"; static final String THROTTLED_TIME = "throttle_time"; + static final String DOC_STATUS = "doc_status"; } @Override public void writeTo(StreamOutput out) throws IOException { totalStats.writeTo(out); - if (out.getVersion().before(Version.V_2_0_0)) { + if (out.getVersion().before(Version.V_3_0_0)) { out.writeBoolean(false); } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 0ffad8ce65b7a..2dea84d4b8ac9 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -150,6 +150,7 @@ import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.plugins.PluginsService; import org.opensearch.repositories.RepositoriesService; +import org.opensearch.core.rest.RestStatus; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.internal.AliasFilter; @@ -283,6 +284,46 @@ public class IndicesService extends AbstractLifecycleComponent Property.Final ); + private static final List INDEXING_DOC_STATUS_DEFAULT_KEYS = List.of( + "200", + "201", + "400", + "401", + "403", + "404", + "405", + "409", + "412", + "429", + "500", + "502", + "503", + "504" + ); + + private static String validateDocStatusKey(String key) { + int result; + + try { + result = Integer.parseInt(key); + } catch (Exception e) { + throw new IllegalArgumentException("Illegal value for rest status code: " + key); + } + + if (RestStatus.isValidRestCode(result)) { + return key; + } else { + throw new IllegalArgumentException("Illegal value for rest status code: " + key); + } + } + + public static final Setting> INDEXING_DOC_STATUS_KEYS_SETTING = Setting.listSetting( + "cluster.doc_status_keys", + INDEXING_DOC_STATUS_DEFAULT_KEYS, + IndicesService::validateDocStatusKey, + Property.NodeScope + ); + /** * The node's settings. */ @@ -1012,6 +1053,14 @@ public IndicesQueryCache getIndicesQueryCache() { return indicesQueryCache; } + public void incrementDocStatusCounter(final RestStatus status) { + int code = status.getStatus(); + + if (INDEXING_DOC_STATUS_KEYS_SETTING.get(clusterService.getSettings()).contains(String.valueOf(code))) { + oldShardsStats.indexingStats.getTotal().getDocStatusStats().inc(code); + } + } + /** * Statistics for old shards * diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index cbf7032b50ca5..c367c6936182b 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -32,6 +32,7 @@ package org.opensearch.action.admin.cluster.node.stats; +import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.WeightedRoutingStats; import org.opensearch.cluster.service.ClusterManagerThrottlingStats; @@ -42,6 +43,8 @@ import org.opensearch.cluster.coordination.PendingClusterStateStats; import org.opensearch.cluster.coordination.PublishClusterStateStats; import org.opensearch.http.HttpStats; +import org.opensearch.index.shard.IndexingStats; +import org.opensearch.indices.NodeIndicesStats; import org.opensearch.indices.breaker.AllCircuitBreakerStats; import org.opensearch.indices.breaker.CircuitBreakerStats; import org.opensearch.ingest.IngestStats; @@ -65,6 +68,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -83,6 +87,31 @@ public void testSerialization() throws IOException { NodeStats deserializedNodeStats = new NodeStats(in); assertEquals(nodeStats.getNode(), deserializedNodeStats.getNode()); assertEquals(nodeStats.getTimestamp(), deserializedNodeStats.getTimestamp()); + if (nodeStats.getIndices() == null) { + assertNull(deserializedNodeStats.getIndices()); + } else { + IndexingStats.Stats indexingStats = nodeStats.getIndices().getIndexing().getTotal(); + IndexingStats.Stats deserializedIndexingStats = deserializedNodeStats.getIndices().getIndexing().getTotal(); + + assertEquals(indexingStats.getIndexCount(), deserializedIndexingStats.getIndexCount()); + assertEquals(indexingStats.getIndexTime(), deserializedIndexingStats.getIndexTime()); + assertEquals(indexingStats.getIndexCurrent(), deserializedIndexingStats.getIndexCurrent()); + assertEquals(indexingStats.getIndexFailedCount(), deserializedIndexingStats.getIndexFailedCount()); + assertEquals(indexingStats.getDeleteCount(), deserializedIndexingStats.getDeleteCount()); + assertEquals(indexingStats.getDeleteCurrent(), deserializedIndexingStats.getDeleteCurrent()); + assertEquals(indexingStats.getDeleteTime(), deserializedIndexingStats.getDeleteTime()); + assertEquals(indexingStats.getNoopUpdateCount(), deserializedIndexingStats.getNoopUpdateCount()); + assertEquals(indexingStats.isThrottled(), deserializedIndexingStats.isThrottled()); + assertEquals(indexingStats.getThrottleTime(), deserializedIndexingStats.getThrottleTime()); + + Map indexingDocStatusCounter = indexingStats.getDocStatusStats().getDocStatusCounter(); + Map deserializedDocStatusCounter = deserializedIndexingStats.getDocStatusStats() + .getDocStatusCounter(); + + for (Integer key : indexingDocStatusCounter.keySet()) { + assertEquals(indexingDocStatusCounter.get(key).longValue(), deserializedDocStatusCounter.get(key).longValue()); + } + } if (nodeStats.getOs() == null) { assertNull(deserializedNodeStats.getOs()); } else { @@ -722,7 +751,7 @@ public static NodeStats createNodeStats() { return new NodeStats( node, randomNonNegativeLong(), - null, + createNodeIndicesStats(), osStats, processStats, jvmStats, @@ -747,6 +776,40 @@ public static NodeStats createNodeStats() { ); } + private static NodeIndicesStats createNodeIndicesStats() { + if (rarely()) { + return null; + } + + Map docStatusCounter = new HashMap<>(); + int size = randomInt(5); + + for (int i = 0; i < size; ++i) { + docStatusCounter.put(randomInt(), new AtomicLong(randomLong())); + } + + IndexingStats.Stats.DocStatusStats docStatusStats = new IndexingStats.Stats.DocStatusStats(docStatusCounter); + IndexingStats.Stats stats = new IndexingStats.Stats( + Math.abs(randomInt()), + Math.abs(randomInt()), + Math.abs(randomInt()), + Math.abs(randomInt()), + Math.abs(randomInt()), + Math.abs(randomInt()), + Math.abs(randomInt()), + Math.abs(randomInt()), + randomBoolean(), + Math.abs(randomInt()), + docStatusStats + ); + IndexingStats indexingStats = new IndexingStats(stats); + + return new NodeIndicesStats( + new CommonStats(null, null, indexingStats, null, null, null, null, null, null, null, null, null, null, null, null, null), + Collections.emptyMap() + ); + } + private OperationStats getPipelineStats(List pipelineStats, String id) { return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null); } diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index 0846a5f8dec5c..79237216ed16a 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -154,6 +154,7 @@ private void indicesThatCannotBeCreatedTestCase( Settings.EMPTY, new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) ), + null, new SystemIndices(emptyMap()) ) { @Override diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java index 6a514b47e55a4..75928a179ad72 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java @@ -170,6 +170,7 @@ class TestTransportBulkAction extends TransportBulkAction { SETTINGS, new ClusterService(SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) ), + null, new SystemIndices(emptyMap()) ); } diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java index d53b860e6524a..580bc46765809 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java @@ -114,6 +114,7 @@ class TestTransportBulkAction extends TransportBulkAction { new Resolver(), new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver(), new SystemIndices(emptyMap())), new IndexingPressureService(Settings.EMPTY, clusterService), + null, new SystemIndices(emptyMap()) ); } diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java index 2361b69e9b82c..854b0478932bf 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java @@ -278,6 +278,7 @@ static class TestTransportBulkAction extends TransportBulkAction { indexNameExpressionResolver, autoCreateIndex, new IndexingPressureService(Settings.EMPTY, clusterService), + null, new SystemIndices(emptyMap()), relativeTimeProvider ); diff --git a/server/src/test/java/org/opensearch/rest/RestStatusTests.java b/server/src/test/java/org/opensearch/rest/RestStatusTests.java new file mode 100644 index 0000000000000..56aa386ecbaaf --- /dev/null +++ b/server/src/test/java/org/opensearch/rest/RestStatusTests.java @@ -0,0 +1,24 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest; + +import org.opensearch.core.rest.RestStatus; +import org.opensearch.test.OpenSearchTestCase; + +public class RestStatusTests extends OpenSearchTestCase { + + public void testValidRestCode() { + assertTrue(RestStatus.isValidRestCode(418)); + } + + public void testInvalidRestCode() { + assertFalse(RestStatus.isValidRestCode(1999)); + } + +} diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index a121a190096b4..39dbf99b4e796 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -1977,6 +1977,7 @@ public void onFailure(final Exception e) { indexNameExpressionResolver, new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, new SystemIndices(emptyMap())), new IndexingPressureService(settings, clusterService), + mock(IndicesService.class), new SystemIndices(emptyMap()) ) );