diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d11c28ec2429..f4200e3aeaf69 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854)) - Introduce new dynamic cluster setting to control slice computation for concurrent segment search ([#9107](https://github.com/opensearch-project/OpenSearch/pull/9107)) - Implement on behalf of token passing for extensions ([#8679](https://github.com/opensearch-project/OpenSearch/pull/8679)) +- Add Doc Status Counter for Indexing Engine ([#4562](https://github.com/opensearch-project/OpenSearch/issues/4562)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/libs/core/src/main/java/org/opensearch/core/rest/RestStatus.java b/libs/core/src/main/java/org/opensearch/core/rest/RestStatus.java index 313bc23bedc90..5ee608c96aad8 100644 --- a/libs/core/src/main/java/org/opensearch/core/rest/RestStatus.java +++ b/libs/core/src/main/java/org/opensearch/core/rest/RestStatus.java @@ -561,4 +561,13 @@ public static RestStatus status(int successfulShards, int totalShards, ShardOper public static RestStatus fromCode(int code) { return CODE_TO_STATUS.get(code); } + + /** + * Get category class of a rest status code. + * + * @return Integer representing class category of the concrete rest status code + */ + public int getStatusFamilyCode() { + return status / 100; + } } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/11_indices_metrics.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/11_indices_metrics.yml index 1f1f42890355e..3f79227ce64e8 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/11_indices_metrics.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/11_indices_metrics.yml @@ -138,6 +138,35 @@ - is_false: nodes.$node_id.indices.translog - is_false: nodes.$node_id.indices.recovery +--- +"Metric - indexing doc_status": + - skip: + version: " - 2.99.99" + reason: "To be introduced in future release :: TODO: change if/when we backport to 2.x" + - do: + nodes.info: {} + - set: + nodes._arbitrary_key_: node_id + + - do: + nodes.stats: { metric: indices, index_metric: indexing } + + - is_false: nodes.$node_id.indices.docs + - is_false: nodes.$node_id.indices.store + - is_true: nodes.$node_id.indices.indexing + - is_true: nodes.$node_id.indices.indexing.doc_status + - is_false: nodes.$node_id.indices.get + - is_false: nodes.$node_id.indices.search + - is_false: nodes.$node_id.indices.merges + - is_false: nodes.$node_id.indices.refresh + - is_false: nodes.$node_id.indices.flush + - is_false: nodes.$node_id.indices.warmer + - is_false: nodes.$node_id.indices.query_cache + - is_false: nodes.$node_id.indices.fielddata + - is_false: nodes.$node_id.indices.completion + - is_false: nodes.$node_id.indices.segments + - is_false: nodes.$node_id.indices.translog + - is_false: nodes.$node_id.indices.recovery --- "Metric - recovery": diff --git a/server/src/internalClusterTest/java/org/opensearch/nodestats/NodeStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/nodestats/NodeStatsIT.java new file mode 100644 index 0000000000000..8882867e08c8e --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/nodestats/NodeStatsIT.java @@ -0,0 +1,101 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.nodestats; + +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.bulk.BulkItemResponse; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.client.Requests; +import org.opensearch.index.shard.IndexingStats; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope; +import org.opensearch.test.OpenSearchIntegTestCase.Scope; +import org.hamcrest.MatcherAssert; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.notNullValue; + +@ClusterScope(scope = Scope.SUITE, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false) +public class NodeStatsIT extends OpenSearchIntegTestCase { + + private static final String INDEX = "test_index"; + private static final AtomicLong[] expectedDocStatusCounter = Stream.generate(AtomicLong::new).limit(5).toArray(AtomicLong[]::new); + + public void testNodeIndicesStatsDocStatusStats() { + int sizeOfIndexRequests = scaledRandomIntBetween(10, 20); + int sizeOfDeleteRequests = scaledRandomIntBetween(5, sizeOfIndexRequests); + int sizeOfNotFoundRequests = scaledRandomIntBetween(5, sizeOfIndexRequests); + + BulkRequest bulkRequest = new BulkRequest(); + + for (int i = 0; i < sizeOfIndexRequests; ++i) { + bulkRequest.add(new IndexRequest(INDEX).id(String.valueOf(i)).source(Requests.INDEX_CONTENT_TYPE, "field", "value")); + } + + BulkResponse response = client().bulk(bulkRequest).actionGet(); + + MatcherAssert.assertThat(response.hasFailures(), equalTo(false)); + MatcherAssert.assertThat(response.getItems().length, equalTo(sizeOfIndexRequests)); + + for (BulkItemResponse itemResponse : response.getItems()) { + expectedDocStatusCounter[itemResponse.getResponse().status().getStatusFamilyCode() - 1].incrementAndGet(); + } + + refresh(INDEX); + bulkRequest.requests().clear(); + + for (int i = 0; i < sizeOfDeleteRequests; ++i) { + bulkRequest.add(new DeleteRequest(INDEX, String.valueOf(i))); + } + for (int i = 0; i < sizeOfNotFoundRequests; ++i) { + bulkRequest.add(new DeleteRequest(INDEX, String.valueOf(25 + i))); + } + + response = client().bulk(bulkRequest).actionGet(); + + MatcherAssert.assertThat(response.hasFailures(), equalTo(false)); + MatcherAssert.assertThat(response.getItems().length, equalTo(sizeOfDeleteRequests + sizeOfNotFoundRequests)); + + for (BulkItemResponse itemResponse : response.getItems()) { + expectedDocStatusCounter[itemResponse.getResponse().status().getStatusFamilyCode() - 1].incrementAndGet(); + } + + refresh(INDEX); + + NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().execute().actionGet(); + IndexingStats.Stats stats = nodesStatsResponse.getNodes().get(0).getIndices().getIndexing().getTotal(); + + MatcherAssert.assertThat(stats.getIndexCount(), greaterThan(0L)); + MatcherAssert.assertThat(stats.getIndexTime().duration(), greaterThan(0L)); + MatcherAssert.assertThat(stats.getIndexCurrent(), notNullValue()); + MatcherAssert.assertThat(stats.getIndexFailedCount(), notNullValue()); + MatcherAssert.assertThat(stats.getDeleteCount(), greaterThan(0L)); + MatcherAssert.assertThat(stats.getDeleteTime().duration(), greaterThan(0L)); + MatcherAssert.assertThat(stats.getDeleteCurrent(), notNullValue()); + MatcherAssert.assertThat(stats.getNoopUpdateCount(), notNullValue()); + MatcherAssert.assertThat(stats.isThrottled(), notNullValue()); + MatcherAssert.assertThat(stats.getThrottleTime(), notNullValue()); + + AtomicLong[] docStatusCounter = stats.getDocStatusStats().getDocStatusCounter(); + + MatcherAssert.assertThat(docStatusCounter.length, equalTo(expectedDocStatusCounter.length)); + + for (int i = 0; i < docStatusCounter.length; ++i) { + MatcherAssert.assertThat(docStatusCounter[i].longValue(), equalTo(expectedDocStatusCounter[i].longValue())); + } + } + +} diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java index d8b7facc09df2..0ec1c64f09152 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java @@ -79,6 +79,7 @@ import org.opensearch.index.VersionType; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.indices.IndexClosedException; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.SystemIndices; import org.opensearch.ingest.IngestService; import org.opensearch.node.NodeClosedException; @@ -100,6 +101,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerArray; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; import java.util.stream.Collectors; @@ -129,6 +131,7 @@ public class TransportBulkAction extends HandledTransportAction> entry : requestsByShard.entrySet()) { final ShardId shardId = entry.getKey(); final List requests = entry.getValue(); @@ -632,8 +645,11 @@ public void onResponse(BulkShardResponse bulkShardResponse) { if (bulkItemResponse.getResponse() != null) { bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo()); } + + docStatusCounter[bulkItemResponse.status().getStatusFamilyCode() - 1].incrementAndGet(); responses.set(bulkItemResponse.getItemId(), bulkItemResponse); } + if (counter.decrementAndGet() == 0) { finishHim(); } @@ -644,25 +660,28 @@ public void onFailure(Exception e) { // create failures for all relevant requests for (BulkItemRequest request : requests) { final String indexName = concreteIndices.getConcreteIndex(request.index()).getName(); - DocWriteRequest docWriteRequest = request.request(); - responses.set( + final DocWriteRequest docWriteRequest = request.request(); + final BulkItemResponse bulkItemResponse = new BulkItemResponse( request.id(), - new BulkItemResponse( - request.id(), - docWriteRequest.opType(), - new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e) - ) + docWriteRequest.opType(), + new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e) ); + + docStatusCounter[bulkItemResponse.status().getStatusFamilyCode() - 1].incrementAndGet(); + responses.set(request.id(), bulkItemResponse); } + if (counter.decrementAndGet() == 0) { finishHim(); } } private void finishHim() { - listener.onResponse( - new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)) - ); + BulkItemResponse[] response = responses.toArray(new BulkItemResponse[responses.length()]); + long tookMillis = buildTookInMillis(startTimeNanos); + + indicesService.addDocStatusCounter(docStatusCounter); + listener.onResponse(new BulkResponse(response, tookMillis)); } }, releasable::close)); } diff --git a/server/src/main/java/org/opensearch/action/update/TransportUpdateAction.java b/server/src/main/java/org/opensearch/action/update/TransportUpdateAction.java index 95735f71a38e7..838e06df772c4 100644 --- a/server/src/main/java/org/opensearch/action/update/TransportUpdateAction.java +++ b/server/src/main/java/org/opensearch/action/update/TransportUpdateAction.java @@ -62,6 +62,7 @@ import org.opensearch.core.common.io.stream.NotSerializableExceptionWrapper; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.index.shard.ShardId; +import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.MediaType; import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexService; @@ -154,16 +155,30 @@ public static void resolveAndValidateRouting(Metadata metadata, String concreteI @Override protected void doExecute(Task task, final UpdateRequest request, final ActionListener listener) { if (request.isRequireAlias() && (clusterService.state().getMetadata().hasAlias(request.index()) == false)) { - throw new IndexNotFoundException( + IndexNotFoundException e = new IndexNotFoundException( "[" + DocWriteRequest.REQUIRE_ALIAS + "] request flag is [true] and [" + request.index() + "] is not an alias", request.index() ); + + indicesService.incDocStatusCounter(e); + throw e; + } + + boolean shouldAutoCreate; + + try { + shouldAutoCreate = autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state()); + } catch (Exception e) { + indicesService.incDocStatusCounter(e); + throw e; } + // if we don't have a master, we don't have metadata, that's fine, let it find a cluster-manager using create index API - if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) { + if (shouldAutoCreate) { client.admin() .indices() .create( + new CreateIndexRequest().index(request.index()).cause("auto(update api)").clusterManagerNodeTimeout(request.timeout()), new ActionListener() { @Override @@ -182,6 +197,7 @@ public void onFailure(Exception e) { listener.onFailure(inner); } } else { + indicesService.incDocStatusCounter(e); listener.onFailure(e); } } @@ -193,7 +209,10 @@ public void onFailure(Exception e) { } private void innerExecute(final Task task, final UpdateRequest request, final ActionListener listener) { - super.doExecute(task, request, listener); + super.doExecute(task, request, ActionListener.wrap(listener::onResponse, e -> { + indicesService.incDocStatusCounter(e); + listener.onFailure(e); + })); } @Override @@ -330,6 +349,8 @@ protected void shardOperation(final UpdateRequest request, final ActionListener< shard.noopUpdate(); } } + + indicesService.incDocStatusCounter(RestStatus.OK.getStatusFamilyCode()); listener.onResponse(update); break; default: diff --git a/server/src/main/java/org/opensearch/index/shard/IndexingStats.java b/server/src/main/java/org/opensearch/index/shard/IndexingStats.java index 8953ef38da51b..1d22f6f36968f 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexingStats.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexingStats.java @@ -44,6 +44,9 @@ import java.io.IOException; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; /** * Tracks indexing statistics @@ -59,6 +62,120 @@ public class IndexingStats implements Writeable, ToXContentFragment { */ public static class Stats implements Writeable, ToXContentFragment { + /** + * Tracks item level rest category class codes during indexing + * + * @opensearch.internal + */ + public static class DocStatusStats implements Writeable, ToXContentFragment { + + final AtomicLong[] docStatusCounter; + + public DocStatusStats() { + this(Stream.generate(AtomicLong::new).limit(5).toArray(AtomicLong[]::new)); + } + + // public: visible for testing + public DocStatusStats(final AtomicLong[] docStatusCounter) { + this.docStatusCounter = docStatusCounter; + } + + public DocStatusStats(StreamInput in) throws IOException { + docStatusCounter = new AtomicLong[5]; + + for (int i = 0; i < docStatusCounter.length; ++i) { + docStatusCounter[i] = new AtomicLong(in.readLong()); + } + } + + public void inc(final int familyCode) { + docStatusCounter[familyCode - 1].incrementAndGet(); + } + + /** + * Accumulate stats from the passed Object + * + * @param stats Instance storing {@link DocStatusStats} + */ + public void add(final DocStatusStats stats) { + if (null == stats) { + return; + } + + for (int i = 0; i < docStatusCounter.length; ++i) { + docStatusCounter[i].addAndGet(stats.docStatusCounter[i].longValue()); + } + } + + public void add(final AtomicLong[] docStatusCounter) { + for (int i = 0; i < this.docStatusCounter.length; ++i) { + this.docStatusCounter[i].addAndGet(docStatusCounter[i].longValue()); + } + } + + public AtomicLong[] getDocStatusCounter() { + return docStatusCounter; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields.DOC_STATUS); + + for (int i = 0; i < docStatusCounter.length; ++i) { + long value = docStatusCounter[i].longValue(); + String key = i + 1 + "xx"; + + if (value > 0) { + builder.field(key, value); + } + } + + return builder.endObject(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + for (AtomicLong i : docStatusCounter) { + out.writeLong(i.longValue()); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (null == o || getClass() != o.getClass()) { + return false; + } + + AtomicLong[] other = ((DocStatusStats) o).getDocStatusCounter(); + + if (docStatusCounter.length != other.length) { + return false; + } + + for (int i = 0; i < docStatusCounter.length; ++i) { + if (docStatusCounter[i].longValue() != other[i].longValue()) { + return false; + } + } + + return true; + } + + @Override + public int hashCode() { + int hash = 1; + + for (AtomicLong i : docStatusCounter) { + hash = 31 * hash + (int) i.longValue(); + } + + return hash; + } + } + private long indexCount; private long indexTimeInMillis; private long indexCurrent; @@ -69,8 +186,11 @@ public static class Stats implements Writeable, ToXContentFragment { private long noopUpdateCount; private long throttleTimeInMillis; private boolean isThrottled; + private final DocStatusStats docStatusStats; - Stats() {} + Stats() { + docStatusStats = new DocStatusStats(); + } public Stats(StreamInput in) throws IOException { indexCount = in.readVLong(); @@ -83,6 +203,12 @@ public Stats(StreamInput in) throws IOException { noopUpdateCount = in.readVLong(); isThrottled = in.readBoolean(); throttleTimeInMillis = in.readLong(); + + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + docStatusStats = in.readOptionalWriteable(DocStatusStats::new); + } else { + docStatusStats = null; + } } public Stats( @@ -96,6 +222,34 @@ public Stats( long noopUpdateCount, boolean isThrottled, long throttleTimeInMillis + ) { + this( + indexCount, + indexTimeInMillis, + indexCurrent, + indexFailedCount, + deleteCount, + deleteTimeInMillis, + deleteCurrent, + noopUpdateCount, + isThrottled, + throttleTimeInMillis, + new DocStatusStats() + ); + } + + protected Stats( + long indexCount, + long indexTimeInMillis, + long indexCurrent, + long indexFailedCount, + long deleteCount, + long deleteTimeInMillis, + long deleteCurrent, + long noopUpdateCount, + boolean isThrottled, + long throttleTimeInMillis, + DocStatusStats docStatusStats ) { this.indexCount = indexCount; this.indexTimeInMillis = indexTimeInMillis; @@ -107,6 +261,7 @@ public Stats( this.noopUpdateCount = noopUpdateCount; this.isThrottled = isThrottled; this.throttleTimeInMillis = throttleTimeInMillis; + this.docStatusStats = docStatusStats; } public void add(Stats stats) { @@ -121,8 +276,10 @@ public void add(Stats stats) { noopUpdateCount += stats.noopUpdateCount; throttleTimeInMillis += stats.throttleTimeInMillis; - if (isThrottled != stats.isThrottled) { - isThrottled = true; // When combining if one is throttled set result to throttled. + isThrottled |= stats.isThrottled; // When combining if one is throttled set result to throttled. + + if (getDocStatusStats() != null) { + getDocStatusStats().add(stats.getDocStatusStats()); } } @@ -193,6 +350,10 @@ public long getNoopUpdateCount() { return noopUpdateCount; } + public DocStatusStats getDocStatusStats() { + return docStatusStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(indexCount); @@ -206,6 +367,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(isThrottled); out.writeLong(throttleTimeInMillis); + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeOptionalWriteable(docStatusStats); + } } @Override @@ -223,8 +387,53 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(Fields.IS_THROTTLED, isThrottled); builder.humanReadableField(Fields.THROTTLED_TIME_IN_MILLIS, Fields.THROTTLED_TIME, getThrottleTime()); + + if (getDocStatusStats() != null) { + getDocStatusStats().toXContent(builder, params); + } + return builder; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (null == o || getClass() != o.getClass()) { + return false; + } + + Stats other = (Stats) o; + return indexCount == other.indexCount + && indexTimeInMillis == other.indexTimeInMillis + && indexCurrent == other.indexCurrent + && indexFailedCount == other.indexFailedCount + && deleteCount == other.deleteCount + && deleteTimeInMillis == other.deleteTimeInMillis + && deleteCurrent == other.deleteCurrent + && noopUpdateCount == other.noopUpdateCount + && isThrottled == other.isThrottled + && throttleTimeInMillis == other.throttleTimeInMillis + && Objects.equals(getDocStatusStats(), other.getDocStatusStats()); + } + + @Override + public int hashCode() { + return Objects.hash( + indexCount, + indexTimeInMillis, + indexCurrent, + indexFailedCount, + deleteCount, + deleteTimeInMillis, + deleteCurrent, + noopUpdateCount, + isThrottled, + throttleTimeInMillis, + getDocStatusStats() + ); + } } private final Stats totalStats; @@ -279,7 +488,7 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par * * @opensearch.internal */ - static final class Fields { + private static final class Fields { static final String INDEXING = "indexing"; static final String INDEX_TOTAL = "index_total"; static final String INDEX_TIME = "index_time"; @@ -294,6 +503,7 @@ static final class Fields { static final String IS_THROTTLED = "is_throttled"; static final String THROTTLED_TIME_IN_MILLIS = "throttle_time_in_millis"; static final String THROTTLED_TIME = "throttle_time"; + static final String DOC_STATUS = "doc_status"; } @Override @@ -303,4 +513,20 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(false); } } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (null == o || getClass() != o.getClass()) { + return false; + } + return Objects.equals(getTotal(), ((IndexingStats) o).getTotal()); + } + + @Override + public int hashCode() { + return Objects.hash(getTotal()); + } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 297beff981722..59f2fb617045c 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -40,6 +40,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.RamUsageEstimator; +import org.opensearch.ExceptionsHelper; import org.opensearch.OpenSearchException; import org.opensearch.ResourceAlreadyExistsException; import org.opensearch.action.admin.indices.stats.CommonStats; @@ -180,6 +181,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -1052,6 +1054,18 @@ public IndicesQueryCache getIndicesQueryCache() { return indicesQueryCache; } + public void addDocStatusCounter(final AtomicLong[] docStatusCounter) { + oldShardsStats.indexingStats.getTotal().getDocStatusStats().add(docStatusCounter); + } + + public void incDocStatusCounter(final int familyCode) { + oldShardsStats.indexingStats.getTotal().getDocStatusStats().inc(familyCode); + } + + public void incDocStatusCounter(final Exception e) { + incDocStatusCounter(ExceptionsHelper.status(e).getStatusFamilyCode()); + } + /** * Statistics for old shards * diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index 9ef0f85893fc8..0f67eff26cbde 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -154,6 +154,7 @@ private void indicesThatCannotBeCreatedTestCase( Settings.EMPTY, new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) ), + null, new SystemIndices(emptyMap()) ) { @Override diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java index 5410d6a88a5b9..515f6eae28a34 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java @@ -171,6 +171,7 @@ class TestTransportBulkAction extends TransportBulkAction { SETTINGS, new ClusterService(SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) ), + null, new SystemIndices(emptyMap()) ); } diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java index 1229584fa99ac..866d153e54361 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java @@ -60,6 +60,7 @@ import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexingPressureService; import org.opensearch.index.VersionType; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.SystemIndexDescriptor; import org.opensearch.indices.SystemIndices; import org.opensearch.test.OpenSearchTestCase; @@ -87,6 +88,7 @@ import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; public class TransportBulkActionTests extends OpenSearchTestCase { @@ -114,6 +116,7 @@ class TestTransportBulkAction extends TransportBulkAction { new Resolver(), new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver(), new SystemIndices(emptyMap())), new IndexingPressureService(Settings.EMPTY, clusterService), + mock(IndicesService.class), new SystemIndices(emptyMap()) ); } diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java index b9dca5f2573f3..f51d9777b2d03 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java @@ -278,6 +278,7 @@ static class TestTransportBulkAction extends TransportBulkAction { indexNameExpressionResolver, autoCreateIndex, new IndexingPressureService(Settings.EMPTY, clusterService), + null, new SystemIndices(emptyMap()), relativeTimeProvider ); diff --git a/server/src/test/java/org/opensearch/core/RestStatusTests.java b/server/src/test/java/org/opensearch/core/RestStatusTests.java new file mode 100644 index 0000000000000..f8dba99aa8b60 --- /dev/null +++ b/server/src/test/java/org/opensearch/core/RestStatusTests.java @@ -0,0 +1,91 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.core; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.action.ShardOperationFailedException; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.PriorityQueue; + +public class RestStatusTests extends OpenSearchTestCase { + + public void testStatusReturns200ForNoFailures() { + int totalShards = randomIntBetween(1, 100); + int successfulShards = randomIntBetween(1, totalShards); + + assertEquals(RestStatus.OK, RestStatus.status(successfulShards, totalShards)); + } + + public void testStatusReturns503ForUnavailableShards() { + int totalShards = randomIntBetween(1, 100); + int successfulShards = 0; + + assertEquals(RestStatus.SERVICE_UNAVAILABLE, RestStatus.status(successfulShards, totalShards)); + } + + public void testStatusReturnsFailureStatusWhenFailuresExist() { + int totalShards = randomIntBetween(1, 100); + int successfulShards = 0; + + TestException[] failures = new TestException[totalShards]; + PriorityQueue heapOfFailures = new PriorityQueue<>((x, y) -> y.status().compareTo(x.status())); + + for (int i = 0; i < totalShards; ++i) { + /* + * Status here doesn't need to convey failure and is not as per rest + * contract. We're not testing the contract, but if status() returns + * the greatest rest code from the failures selection + */ + RestStatus status = randomFrom(RestStatus.values()); + TestException failure = new TestException(status); + + failures[i] = failure; + heapOfFailures.add(failure); + } + + assertEquals(heapOfFailures.peek().status(), RestStatus.status(successfulShards, totalShards, failures)); + } + + public void testSerialization() throws IOException { + final RestStatus status = randomFrom(RestStatus.values()); + + try (BytesStreamOutput out = new BytesStreamOutput()) { + RestStatus.writeTo(out, status); + + try (StreamInput in = out.bytes().streamInput()) { + RestStatus deserializedStatus = RestStatus.readFrom(in); + + assertEquals(status, deserializedStatus); + } + } + } + + private static class TestException extends ShardOperationFailedException { + TestException(final RestStatus status) { + super("super-idx", randomInt(), "gone-fishing", status, new Throwable("cake")); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + throw new IOException("not implemented"); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + throw new IOException("not implemented"); + } + } + +} diff --git a/server/src/test/java/org/opensearch/index/shard/IndexingStatsTests.java b/server/src/test/java/org/opensearch/index/shard/IndexingStatsTests.java new file mode 100644 index 0000000000000..4fa08f126d51e --- /dev/null +++ b/server/src/test/java/org/opensearch/index/shard/IndexingStatsTests.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.shard; + +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.test.AbstractWireSerializingTestCase; + +import java.util.concurrent.atomic.AtomicLong; + +public class IndexingStatsTests extends AbstractWireSerializingTestCase { + + @Override + protected IndexingStats createTestInstance() { + final AtomicLong[] docStatusCounter = new AtomicLong[5]; + for (int i = 0; i < docStatusCounter.length; ++i) { + docStatusCounter[i] = new AtomicLong(randomNonNegativeLong()); + } + + IndexingStats.Stats.DocStatusStats docStatusStats = new IndexingStats.Stats.DocStatusStats(docStatusCounter); + IndexingStats.Stats stats = new IndexingStats.Stats( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomBoolean(), + randomNonNegativeLong(), + docStatusStats + ); + + return new IndexingStats(stats); + } + + @Override + protected Writeable.Reader instanceReader() { + return IndexingStats::new; + } + +} diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 4f7697660096e..7e322fe52595b 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -2201,6 +2201,7 @@ public void onFailure(final Exception e) { indexNameExpressionResolver, new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, new SystemIndices(emptyMap())), new IndexingPressureService(settings, clusterService), + mock(IndicesService.class), new SystemIndices(emptyMap()) ) );