diff --git a/libs/core/src/main/java/org/opensearch/rest/RestStatus.java b/libs/core/src/main/java/org/opensearch/rest/RestStatus.java index 8c718a5c8091c..affcc56b9d594 100644 --- a/libs/core/src/main/java/org/opensearch/rest/RestStatus.java +++ b/libs/core/src/main/java/org/opensearch/rest/RestStatus.java @@ -431,6 +431,13 @@ public enum RestStatus { * next-hop server. */ EXPECTATION_FAILED(417), + /** + * Any attempt to brew coffee with a teapot should result in the error code "418 I'm a teapot". The resulting + * entity body MAY be short and stout. + *

+ * @see I'm a teapot! + */ + I_AM_A_TEAPOT(418), /** * The request was directed at a server that is not able to produce a response. This can be sent by a server * that is not configured to produce responses for the combination of scheme and authority that are included @@ -559,4 +566,8 @@ public static RestStatus status(int successfulShards, int totalShards, ShardOper public static RestStatus fromCode(int code) { return CODE_TO_STATUS.get(code); } + + public static Boolean isValidRestCode(int code) { + return null != fromCode(code); + } } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/11_indices_metrics.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/11_indices_metrics.yml index 1f1f42890355e..b0ce51902a7cc 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/11_indices_metrics.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/11_indices_metrics.yml @@ -138,6 +138,34 @@ - is_false: nodes.$node_id.indices.translog - is_false: nodes.$node_id.indices.recovery +--- +"Metric - indexing doc_status": + - skip: + features: [arbitrary_key] + - do: + nodes.info: {} + - set: + nodes._arbitrary_key_: node_id + + - do: + nodes.stats: { metric: indices, index_metric: indexing } + + - is_false: nodes.$node_id.indices.docs + - is_false: nodes.$node_id.indices.store + - is_true: nodes.$node_id.indices.indexing + - is_true: nodes.$node_id.indices.indexing.doc_status + - is_false: nodes.$node_id.indices.get + - is_false: nodes.$node_id.indices.search + - is_false: nodes.$node_id.indices.merges + - is_false: nodes.$node_id.indices.refresh + - is_false: nodes.$node_id.indices.flush + - is_false: nodes.$node_id.indices.warmer + - is_false: nodes.$node_id.indices.query_cache + - is_false: nodes.$node_id.indices.fielddata + - is_false: nodes.$node_id.indices.completion + - is_false: nodes.$node_id.indices.segments + - is_false: nodes.$node_id.indices.translog + - is_false: nodes.$node_id.indices.recovery --- "Metric - recovery": diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java index 18a39afc48079..f7a26ae12e9c9 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java @@ -80,6 +80,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndexClosedException; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.SystemIndices; import org.opensearch.ingest.IngestService; import org.opensearch.node.NodeClosedException; @@ -130,6 +131,7 @@ public class TransportBulkAction extends HandledTransportAction docWriteRequest = request.request(); - responses.set( + final DocWriteRequest docWriteRequest = request.request(); + final BulkItemResponse bulkItemResponse = new BulkItemResponse( request.id(), - new BulkItemResponse( - request.id(), - docWriteRequest.opType(), - new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e) - ) + docWriteRequest.opType(), + new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e) ); + + indicesService.incrementDocStatusCounter(bulkItemResponse.status()); + responses.set(request.id(), bulkItemResponse); } if (counter.decrementAndGet() == 0) { finishHim(); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 3271fe466f297..7be99490319c9 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -262,6 +262,7 @@ public void apply(Settings value, Settings current, Settings previous) { IndicesService.INDICES_ID_FIELD_DATA_ENABLED_SETTING, IndicesService.WRITE_DANGLING_INDICES_INFO_SETTING, IndicesService.CLUSTER_REPLICATION_TYPE_SETTING, + IndicesService.INDEXING_DOC_STATUS_KEYS_SETTING, MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING, MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING, Metadata.SETTING_READ_ONLY_SETTING, diff --git a/server/src/main/java/org/opensearch/index/shard/IndexingStats.java b/server/src/main/java/org/opensearch/index/shard/IndexingStats.java index d40878674ac13..0636587ad2659 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexingStats.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexingStats.java @@ -44,6 +44,8 @@ import java.io.IOException; import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; /** * Tracks indexing statistics @@ -59,6 +61,69 @@ public class IndexingStats implements Writeable, ToXContentFragment { */ public static class Stats implements Writeable, ToXContentFragment { + /** + * Tracks item level rest status codes during indexing + * + * @opensearch.internal + */ + public static class DocStatusStats implements Writeable, ToXContentFragment { + + private final Map docStatusCounter; + + public DocStatusStats() { + this.docStatusCounter = new TreeMap<>(); + } + + public DocStatusStats(StreamInput in) throws IOException { + int size = in.readInt(); + docStatusCounter = new TreeMap<>(); + + for (int i = 0; i < size; ++i) { + docStatusCounter.put(in.readInt(), new AtomicLong(in.readLong())); + } + } + + public void add(DocStatusStats stats) { + for (Map.Entry entry : stats.docStatusCounter.entrySet()) { + synchronized (this) { + int k = entry.getKey(); + AtomicLong v = entry.getValue(); + + docStatusCounter.putIfAbsent(k, new AtomicLong(0)); + docStatusCounter.get(k).addAndGet(v.longValue()); + } + } + } + + public void inc(int status) { + synchronized (this) { + docStatusCounter.computeIfAbsent(status, s -> new AtomicLong(0)); + docStatusCounter.get(status).incrementAndGet(); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields.DOC_STATUS); + + for (Map.Entry entry : docStatusCounter.entrySet()) { + builder.field(String.valueOf(entry.getKey()), entry.getValue().longValue()); + } + + return builder.endObject(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeInt(docStatusCounter.size()); + + for (Map.Entry entry : docStatusCounter.entrySet()) { + out.writeInt(entry.getKey()); + out.writeLong(entry.getValue().longValue()); + } + } + } + private long indexCount; private long indexTimeInMillis; private long indexCurrent; @@ -70,7 +135,11 @@ public static class Stats implements Writeable, ToXContentFragment { private long throttleTimeInMillis; private boolean isThrottled; - Stats() {} + private final DocStatusStats docStatusStats; + + Stats() { + docStatusStats = new DocStatusStats(); + } public Stats(StreamInput in) throws IOException { indexCount = in.readVLong(); @@ -83,6 +152,12 @@ public Stats(StreamInput in) throws IOException { noopUpdateCount = in.readVLong(); isThrottled = in.readBoolean(); throttleTimeInMillis = in.readLong(); + + if (in.getVersion().onOrAfter(Version.V_2_10_0)) { + docStatusStats = in.readOptionalWriteable(DocStatusStats::new); + } else { + docStatusStats = null; + } } public Stats( @@ -107,6 +182,7 @@ public Stats( this.noopUpdateCount = noopUpdateCount; this.isThrottled = isThrottled; this.throttleTimeInMillis = throttleTimeInMillis; + this.docStatusStats = new DocStatusStats(); } public void add(Stats stats) { @@ -124,6 +200,7 @@ public void add(Stats stats) { if (isThrottled != stats.isThrottled) { isThrottled = true; // When combining if one is throttled set result to throttled. } + docStatusStats.add(stats.getDocStatusStats()); } /** @@ -193,6 +270,10 @@ public long getNoopUpdateCount() { return noopUpdateCount; } + public DocStatusStats getDocStatusStats() { + return docStatusStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(indexCount); @@ -206,6 +287,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeBoolean(isThrottled); out.writeLong(throttleTimeInMillis); + if (out.getVersion().onOrAfter(Version.V_2_10_0)) { + out.writeOptionalWriteable(docStatusStats); + } } @Override @@ -223,6 +307,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(Fields.IS_THROTTLED, isThrottled); builder.humanReadableField(Fields.THROTTLED_TIME_IN_MILLIS, Fields.THROTTLED_TIME, getThrottleTime()); + + if (getDocStatusStats() != null) { + getDocStatusStats().toXContent(builder, params); + } + return builder; } } @@ -294,6 +383,7 @@ static final class Fields { static final String IS_THROTTLED = "is_throttled"; static final String THROTTLED_TIME_IN_MILLIS = "throttle_time_in_millis"; static final String THROTTLED_TIME = "throttle_time"; + static final String DOC_STATUS = "doc_status"; } @Override diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 140f891d845ec..40dde1a54b042 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -152,6 +152,7 @@ import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.plugins.PluginsService; import org.opensearch.repositories.RepositoriesService; +import org.opensearch.rest.RestStatus; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.internal.AliasFilter; @@ -285,6 +286,36 @@ public class IndicesService extends AbstractLifecycleComponent Property.Final ); + private static final List INDEXING_DOC_STATUS_DEFAULT_KEYS = List.of( + "200", "201", "202", + "400", "401", "403", "404", "429", + "500", "502", "504" + ); + + private static String validateDocStatusKey(String key) { + int result; + + try { + result = Integer.parseInt(key); + } catch (Exception e) { + throw new IllegalArgumentException("Illegal value for rest status code: " + key); + } + + if (RestStatus.isValidRestCode(result)) { + return key; + } else { + throw new IllegalArgumentException("Illegal value for rest status code: " + key); + } + } + + public static final Setting> INDEXING_DOC_STATUS_KEYS_SETTING = Setting.listSetting( + "cluster.doc_status_keys", + INDEXING_DOC_STATUS_DEFAULT_KEYS, + IndicesService::validateDocStatusKey, + Property.Consistent, + Property.NodeScope + ); + /** * The node's settings. */ @@ -1015,6 +1046,18 @@ public IndicesQueryCache getIndicesQueryCache() { return indicesQueryCache; } + public void incrementDocStatusCounter(final RestStatus status) { + int code = status.getStatus(); + + if (INDEXING_DOC_STATUS_KEYS_SETTING.get(clusterService.getSettings()).contains(String.valueOf(code))) { + oldShardsStats + .indexingStats + .getTotal() + .getDocStatusStats() + .inc(code); + } + } + /** * Statistics for old shards * diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java index 0846a5f8dec5c..79237216ed16a 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIndicesThatCannotBeCreatedTests.java @@ -154,6 +154,7 @@ private void indicesThatCannotBeCreatedTestCase( Settings.EMPTY, new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) ), + null, new SystemIndices(emptyMap()) ) { @Override diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java index 6a514b47e55a4..75928a179ad72 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java @@ -170,6 +170,7 @@ class TestTransportBulkAction extends TransportBulkAction { SETTINGS, new ClusterService(SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) ), + null, new SystemIndices(emptyMap()) ); } diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java index d53b860e6524a..580bc46765809 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java @@ -114,6 +114,7 @@ class TestTransportBulkAction extends TransportBulkAction { new Resolver(), new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver(), new SystemIndices(emptyMap())), new IndexingPressureService(Settings.EMPTY, clusterService), + null, new SystemIndices(emptyMap()) ); } diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java index 2361b69e9b82c..854b0478932bf 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTookTests.java @@ -278,6 +278,7 @@ static class TestTransportBulkAction extends TransportBulkAction { indexNameExpressionResolver, autoCreateIndex, new IndexingPressureService(Settings.EMPTY, clusterService), + null, new SystemIndices(emptyMap()), relativeTimeProvider ); diff --git a/server/src/test/java/org/opensearch/rest/RestStatusTests.java b/server/src/test/java/org/opensearch/rest/RestStatusTests.java new file mode 100644 index 0000000000000..61c5cce0c08fe --- /dev/null +++ b/server/src/test/java/org/opensearch/rest/RestStatusTests.java @@ -0,0 +1,23 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest; + +import org.opensearch.test.OpenSearchTestCase; + +public class RestStatusTests extends OpenSearchTestCase { + + public void testValidRestCode() { + assertTrue(RestStatus.isValidRestCode(418)); + } + + public void testInvalidRestCode() { + assertFalse(RestStatus.isValidRestCode(1999)); + } + +} diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 88899a1b282af..ec78575e85309 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -1977,6 +1977,7 @@ public void onFailure(final Exception e) { indexNameExpressionResolver, new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, new SystemIndices(emptyMap())), new IndexingPressureService(settings, clusterService), + null, new SystemIndices(emptyMap()) ) );