Skip to content

Commit

Permalink
introduce rest accounting for indexing actions
Browse files Browse the repository at this point in the history
  • Loading branch information
r1walz committed Jul 4, 2023
1 parent 6ee9f7b commit 4efa30c
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SparseFixedBitSet;
import org.opensearch.LegacyESVersion;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.core.Assertions;
import org.opensearch.OpenSearchParseException;
import org.opensearch.ExceptionsHelper;
Expand Down Expand Up @@ -78,8 +79,10 @@
import org.opensearch.index.IndexingPressureService;
import org.opensearch.index.VersionType;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.IndexingStats;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.IndexClosedException;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndices;
import org.opensearch.ingest.IngestService;
import org.opensearch.node.NodeClosedException;
Expand Down Expand Up @@ -130,6 +133,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
private final IndexNameExpressionResolver indexNameExpressionResolver;
private static final String DROPPED_ITEM_WITH_AUTO_GENERATED_ID = "auto-generated";
private final IndexingPressureService indexingPressureService;
private final IndicesService indicesService;
private final SystemIndices systemIndices;

@Inject
Expand All @@ -144,6 +148,7 @@ public TransportBulkAction(
IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex,
IndexingPressureService indexingPressureService,
IndicesService indicesService,
SystemIndices systemIndices
) {
this(
Expand All @@ -157,6 +162,7 @@ public TransportBulkAction(
indexNameExpressionResolver,
autoCreateIndex,
indexingPressureService,
indicesService,
systemIndices,
System::nanoTime
);
Expand All @@ -173,6 +179,7 @@ public TransportBulkAction(
IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex,
IndexingPressureService indexingPressureService,
IndicesService indicesService,
SystemIndices systemIndices,
LongSupplier relativeTimeProvider
) {
Expand All @@ -188,6 +195,7 @@ public TransportBulkAction(
this.client = client;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.indexingPressureService = indexingPressureService;
this.indicesService = indicesService;
this.systemIndices = systemIndices;
clusterService.addStateApplier(this.ingestForwarder);
}
Expand Down Expand Up @@ -637,6 +645,8 @@ public void onResponse(BulkShardResponse bulkShardResponse) {
if (bulkItemResponse.getResponse() != null) {
bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
}

indicesService.updateBulkItemRestStatus(bulkItemResponse.status());
responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
}
if (counter.decrementAndGet() == 0) {
Expand Down
81 changes: 79 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexingStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@

import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;

/**
* Tracks indexing statistics
Expand All @@ -59,6 +61,69 @@ public class IndexingStats implements Writeable, ToXContentFragment {
*/
public static class Stats implements Writeable, ToXContentFragment {

/**
* Tracks item level rest status codes during indexing
*
* @opensearch.internal
*/
public static class BulkStats implements Writeable, ToXContentFragment {

private final Map<Integer, AtomicLong> stats;

public BulkStats() {
this.stats = new TreeMap<>();
}

public BulkStats(StreamInput in) throws IOException {
int size = in.readVInt();
stats = new TreeMap<>();

for (int i = 0; i < size; ++i) {
stats.put(in.readInt(), new AtomicLong(in.readLong()));
}
}

public void put(int status) {
synchronized (this) {
stats.computeIfAbsent(status, s -> new AtomicLong(0));
stats.get(status).incrementAndGet();
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("bulk");

for (Map.Entry<Integer, AtomicLong> entry : stats.entrySet()) {
builder.field(String.valueOf(entry.getKey()), entry.getValue().longValue());
}

return builder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeInt(stats.size());

for (Map.Entry<Integer, AtomicLong> entry : stats.entrySet()) {
out.writeInt(entry.getKey());
out.writeLong(entry.getValue().longValue());
}
}

public void add(BulkStats other) {
for (Map.Entry<Integer, AtomicLong> entry : other.stats.entrySet()) {
synchronized (this) {
int k = entry.getKey();
AtomicLong v = entry.getValue();

stats.putIfAbsent(k, new AtomicLong(0));
stats.get(k).addAndGet(v.longValue());
}
}
}
}

private long indexCount;
private long indexTimeInMillis;
private long indexCurrent;
Expand All @@ -70,7 +135,11 @@ public static class Stats implements Writeable, ToXContentFragment {
private long throttleTimeInMillis;
private boolean isThrottled;

Stats() {}
private final BulkStats bulkStats;

Stats() {
bulkStats = new BulkStats();
}

public Stats(StreamInput in) throws IOException {
indexCount = in.readVLong();
Expand All @@ -83,6 +152,7 @@ public Stats(StreamInput in) throws IOException {
noopUpdateCount = in.readVLong();
isThrottled = in.readBoolean();
throttleTimeInMillis = in.readLong();
bulkStats = new BulkStats(in);
}

public Stats(
Expand All @@ -107,6 +177,7 @@ public Stats(
this.noopUpdateCount = noopUpdateCount;
this.isThrottled = isThrottled;
this.throttleTimeInMillis = throttleTimeInMillis;
this.bulkStats = new BulkStats();
}

public void add(Stats stats) {
Expand All @@ -124,6 +195,7 @@ public void add(Stats stats) {
if (isThrottled != stats.isThrottled) {
isThrottled = true; // When combining if one is throttled set result to throttled.
}
bulkStats.add(stats.getBulkStats());
}

/**
Expand Down Expand Up @@ -193,6 +265,10 @@ public long getNoopUpdateCount() {
return noopUpdateCount;
}

public BulkStats getBulkStats() {
return bulkStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(indexCount);
Expand All @@ -205,7 +281,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(noopUpdateCount);
out.writeBoolean(isThrottled);
out.writeLong(throttleTimeInMillis);

bulkStats.writeTo(out);
}

@Override
Expand All @@ -223,6 +299,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

builder.field(Fields.IS_THROTTLED, isThrottled);
builder.humanReadableField(Fields.THROTTLED_TIME_IN_MILLIS, Fields.THROTTLED_TIME, getThrottleTime());
bulkStats.toXContent(builder, params);
return builder;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.plugins.PluginsService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestStatus;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.search.internal.AliasFilter;
Expand Down Expand Up @@ -1015,6 +1016,14 @@ public IndicesQueryCache getIndicesQueryCache() {
return indicesQueryCache;
}

public void updateBulkItemRestStatus(final RestStatus status) {
oldShardsStats
.indexingStats
.getTotal()
.getBulkStats()
.put(status.getStatus());
}

/**
* Statistics for old shards
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class TestTransportBulkAction extends TransportBulkAction {
new Resolver(),
new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver(), new SystemIndices(emptyMap())),
new IndexingPressureService(Settings.EMPTY, clusterService),
null,
new SystemIndices(emptyMap())
);
}
Expand Down

0 comments on commit 4efa30c

Please sign in to comment.