diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java index 19f610908677f..76f9733b0c9b7 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java @@ -80,6 +80,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.IndexClosedException; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.SystemIndices; import org.opensearch.ingest.IngestService; import org.opensearch.node.NodeClosedException; @@ -130,6 +131,7 @@ public class TransportBulkAction extends HandledTransportAction docWriteRequest = request.request(); - responses.set( + final DocWriteRequest docWriteRequest = request.request(); + final BulkItemResponse bulkItemResponse = new BulkItemResponse( request.id(), - new BulkItemResponse( - request.id(), - docWriteRequest.opType(), - new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e) - ) + docWriteRequest.opType(), + new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e) ); + + indicesService.incrementDocStatusCounter(bulkItemResponse.status()); + responses.set(request.id(), bulkItemResponse); } if (counter.decrementAndGet() == 0) { finishHim(); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexingStats.java b/server/src/main/java/org/opensearch/index/shard/IndexingStats.java index d40878674ac13..bc9b70b43a15b 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexingStats.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexingStats.java @@ -44,6 +44,8 @@ import java.io.IOException; import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; /** * Tracks indexing statistics @@ -59,6 +61,69 @@ public class IndexingStats implements Writeable, ToXContentFragment { */ public static class Stats implements Writeable, ToXContentFragment { + /** + * Tracks item level rest status codes during indexing + * + * @opensearch.internal + */ + public static class DocStatusStats implements Writeable, ToXContentFragment { + + private final Map docStats; + + public DocStatusStats() { + this.docStats = new TreeMap<>(); + } + + public DocStatusStats(StreamInput in) throws IOException { + int size = in.readInt(); + docStats = new TreeMap<>(); + + for (int i = 0; i < size; ++i) { + docStats.put(in.readInt(), new AtomicLong(in.readLong())); + } + } + + public void add(DocStatusStats stats) { + for (Map.Entry entry : stats.docStats.entrySet()) { + synchronized (this) { + int k = entry.getKey(); + AtomicLong v = entry.getValue(); + + docStats.putIfAbsent(k, new AtomicLong(0)); + docStats.get(k).addAndGet(v.longValue()); + } + } + } + + public void inc(int status) { + synchronized (this) { + docStats.computeIfAbsent(status, s -> new AtomicLong(0)); + docStats.get(status).incrementAndGet(); + } + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("doc_status"); + + for (Map.Entry entry : docStats.entrySet()) { + builder.field(String.valueOf(entry.getKey()), entry.getValue().longValue()); + } + + return builder.endObject(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeInt(docStats.size()); + + for (Map.Entry entry : docStats.entrySet()) { + out.writeInt(entry.getKey()); + out.writeLong(entry.getValue().longValue()); + } + } + } + private long indexCount; private long indexTimeInMillis; private long indexCurrent; @@ -70,7 +135,11 @@ public static class Stats implements Writeable, ToXContentFragment { private long throttleTimeInMillis; private boolean isThrottled; - Stats() {} + private final DocStatusStats docStatusStats; + + Stats() { + docStatusStats = new DocStatusStats(); + } public Stats(StreamInput in) throws IOException { indexCount = in.readVLong(); @@ -83,6 +152,7 @@ public Stats(StreamInput in) throws IOException { noopUpdateCount = in.readVLong(); isThrottled = in.readBoolean(); throttleTimeInMillis = in.readLong(); + docStatusStats = new DocStatusStats(in); } public Stats( @@ -107,6 +177,7 @@ public Stats( this.noopUpdateCount = noopUpdateCount; this.isThrottled = isThrottled; this.throttleTimeInMillis = throttleTimeInMillis; + this.docStatusStats = new DocStatusStats(); } public void add(Stats stats) { @@ -124,6 +195,7 @@ public void add(Stats stats) { if (isThrottled != stats.isThrottled) { isThrottled = true; // When combining if one is throttled set result to throttled. } + docStatusStats.add(stats.getBulkStats()); } /** @@ -193,6 +265,10 @@ public long getNoopUpdateCount() { return noopUpdateCount; } + public DocStatusStats getBulkStats() { + return docStatusStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(indexCount); @@ -205,7 +281,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(noopUpdateCount); out.writeBoolean(isThrottled); out.writeLong(throttleTimeInMillis); - + docStatusStats.writeTo(out); } @Override @@ -223,6 +299,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(Fields.IS_THROTTLED, isThrottled); builder.humanReadableField(Fields.THROTTLED_TIME_IN_MILLIS, Fields.THROTTLED_TIME, getThrottleTime()); + docStatusStats.toXContent(builder, params); return builder; } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 81b1be0f58804..ffd2b03b4ef96 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -152,6 +152,7 @@ import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.plugins.PluginsService; import org.opensearch.repositories.RepositoriesService; +import org.opensearch.rest.RestStatus; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.search.internal.AliasFilter; @@ -1015,6 +1016,14 @@ public IndicesQueryCache getIndicesQueryCache() { return indicesQueryCache; } + public void incrementDocStatusCounter(final RestStatus status) { + oldShardsStats + .indexingStats + .getTotal() + .getBulkStats() + .inc(status.getStatus()); + } + /** * Statistics for old shards * diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java index d53b860e6524a..580bc46765809 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionTests.java @@ -114,6 +114,7 @@ class TestTransportBulkAction extends TransportBulkAction { new Resolver(), new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver(), new SystemIndices(emptyMap())), new IndexingPressureService(Settings.EMPTY, clusterService), + null, new SystemIndices(emptyMap()) ); }