Skip to content

Commit

Permalink
introduce rest accounting for indexing actions
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Ashiwal <rashiwal@amazon.com>
  • Loading branch information
r1walz committed Jul 12, 2023
1 parent db90a41 commit 58b9254
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.IndexClosedException;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndices;
import org.opensearch.ingest.IngestService;
import org.opensearch.node.NodeClosedException;
Expand Down Expand Up @@ -130,6 +131,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
private final IndexNameExpressionResolver indexNameExpressionResolver;
private static final String DROPPED_ITEM_WITH_AUTO_GENERATED_ID = "auto-generated";
private final IndexingPressureService indexingPressureService;
private final IndicesService indicesService;
private final SystemIndices systemIndices;

@Inject
Expand All @@ -144,6 +146,7 @@ public TransportBulkAction(
IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex,
IndexingPressureService indexingPressureService,
IndicesService indicesService,
SystemIndices systemIndices
) {
this(
Expand All @@ -157,6 +160,7 @@ public TransportBulkAction(
indexNameExpressionResolver,
autoCreateIndex,
indexingPressureService,
indicesService,
systemIndices,
System::nanoTime
);
Expand All @@ -173,6 +177,7 @@ public TransportBulkAction(
IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex,
IndexingPressureService indexingPressureService,
IndicesService indicesService,
SystemIndices systemIndices,
LongSupplier relativeTimeProvider
) {
Expand All @@ -188,6 +193,7 @@ public TransportBulkAction(
this.client = client;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.indexingPressureService = indexingPressureService;
this.indicesService = indicesService;
this.systemIndices = systemIndices;
clusterService.addStateApplier(this.ingestForwarder);
}
Expand Down Expand Up @@ -637,6 +643,8 @@ public void onResponse(BulkShardResponse bulkShardResponse) {
if (bulkItemResponse.getResponse() != null) {
bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
}

indicesService.incrementDocStatusCounter(bulkItemResponse.status());
responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
}
if (counter.decrementAndGet() == 0) {
Expand All @@ -649,15 +657,15 @@ public void onFailure(Exception e) {
// create failures for all relevant requests
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
DocWriteRequest<?> docWriteRequest = request.request();
responses.set(
final DocWriteRequest<?> docWriteRequest = request.request();
final BulkItemResponse bulkItemResponse = new BulkItemResponse(
request.id(),
new BulkItemResponse(
request.id(),
docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
)
docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
);

indicesService.incrementDocStatusCounter(bulkItemResponse.status());
responses.set(request.id(), bulkItemResponse);
}
if (counter.decrementAndGet() == 0) {
finishHim();
Expand Down
81 changes: 79 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexingStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@

import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;

/**
* Tracks indexing statistics
Expand All @@ -59,6 +61,69 @@ public class IndexingStats implements Writeable, ToXContentFragment {
*/
public static class Stats implements Writeable, ToXContentFragment {

/**
* Tracks item level rest status codes during indexing
*
* @opensearch.internal
*/
public static class DocStatusStats implements Writeable, ToXContentFragment {

private final Map<Integer, AtomicLong> docStatusCounter;

public DocStatusStats() {
this.docStatusCounter = new TreeMap<>();
}

public DocStatusStats(StreamInput in) throws IOException {
int size = in.readInt();
docStatusCounter = new TreeMap<>();

for (int i = 0; i < size; ++i) {
docStatusCounter.put(in.readInt(), new AtomicLong(in.readLong()));
}
}

public void add(DocStatusStats stats) {
for (Map.Entry<Integer, AtomicLong> entry : stats.docStatusCounter.entrySet()) {
synchronized (this) {
int k = entry.getKey();
AtomicLong v = entry.getValue();

docStatusCounter.putIfAbsent(k, new AtomicLong(0));
docStatusCounter.get(k).addAndGet(v.longValue());
}
}
}

public void inc(int status) {
synchronized (this) {
docStatusCounter.computeIfAbsent(status, s -> new AtomicLong(0));
docStatusCounter.get(status).incrementAndGet();
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("doc_status");

for (Map.Entry<Integer, AtomicLong> entry : docStatusCounter.entrySet()) {
builder.field(String.valueOf(entry.getKey()), entry.getValue().longValue());
}

return builder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeInt(docStatusCounter.size());

for (Map.Entry<Integer, AtomicLong> entry : docStatusCounter.entrySet()) {
out.writeInt(entry.getKey());
out.writeLong(entry.getValue().longValue());
}
}
}

private long indexCount;
private long indexTimeInMillis;
private long indexCurrent;
Expand All @@ -70,7 +135,11 @@ public static class Stats implements Writeable, ToXContentFragment {
private long throttleTimeInMillis;
private boolean isThrottled;

Stats() {}
private final DocStatusStats docStatusStats;

Stats() {
docStatusStats = new DocStatusStats();
}

public Stats(StreamInput in) throws IOException {
indexCount = in.readVLong();
Expand All @@ -83,6 +152,7 @@ public Stats(StreamInput in) throws IOException {
noopUpdateCount = in.readVLong();
isThrottled = in.readBoolean();
throttleTimeInMillis = in.readLong();
docStatusStats = new DocStatusStats(in);
}

public Stats(
Expand All @@ -107,6 +177,7 @@ public Stats(
this.noopUpdateCount = noopUpdateCount;
this.isThrottled = isThrottled;
this.throttleTimeInMillis = throttleTimeInMillis;
this.docStatusStats = new DocStatusStats();
}

public void add(Stats stats) {
Expand All @@ -124,6 +195,7 @@ public void add(Stats stats) {
if (isThrottled != stats.isThrottled) {
isThrottled = true; // When combining if one is throttled set result to throttled.
}
docStatusStats.add(stats.getBulkStats());
}

/**
Expand Down Expand Up @@ -193,6 +265,10 @@ public long getNoopUpdateCount() {
return noopUpdateCount;
}

public DocStatusStats getBulkStats() {
return docStatusStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(indexCount);
Expand All @@ -205,7 +281,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(noopUpdateCount);
out.writeBoolean(isThrottled);
out.writeLong(throttleTimeInMillis);

docStatusStats.writeTo(out);
}

@Override
Expand All @@ -223,6 +299,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

builder.field(Fields.IS_THROTTLED, isThrottled);
builder.humanReadableField(Fields.THROTTLED_TIME_IN_MILLIS, Fields.THROTTLED_TIME, getThrottleTime());
docStatusStats.toXContent(builder, params);
return builder;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.plugins.PluginsService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestStatus;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.search.internal.AliasFilter;
Expand Down Expand Up @@ -1015,6 +1016,14 @@ public IndicesQueryCache getIndicesQueryCache() {
return indicesQueryCache;
}

public void incrementDocStatusCounter(final RestStatus status) {
oldShardsStats
.indexingStats
.getTotal()
.getBulkStats()
.inc(status.getStatus());
}

/**
* Statistics for old shards
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ private void indicesThatCannotBeCreatedTestCase(
Settings.EMPTY,
new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null)
),
null,
new SystemIndices(emptyMap())
) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ class TestTransportBulkAction extends TransportBulkAction {
SETTINGS,
new ClusterService(SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null)
),
null,
new SystemIndices(emptyMap())
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class TestTransportBulkAction extends TransportBulkAction {
new Resolver(),
new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver(), new SystemIndices(emptyMap())),
new IndexingPressureService(Settings.EMPTY, clusterService),
null,
new SystemIndices(emptyMap())
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ static class TestTransportBulkAction extends TransportBulkAction {
indexNameExpressionResolver,
autoCreateIndex,
new IndexingPressureService(Settings.EMPTY, clusterService),
null,
new SystemIndices(emptyMap()),
relativeTimeProvider
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1976,6 +1976,7 @@ public void onFailure(final Exception e) {
indexNameExpressionResolver,
new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver, new SystemIndices(emptyMap())),
new IndexingPressureService(settings, clusterService),
null,
new SystemIndices(emptyMap())
)
);
Expand Down

0 comments on commit 58b9254

Please sign in to comment.