Skip to content

Commit

Permalink
introduce rest accounting for indexing actions
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Ashiwal <rashiwal@amazon.com>
  • Loading branch information
r1walz committed Jul 14, 2023
1 parent ce5a172 commit ffe7cb9
Show file tree
Hide file tree
Showing 12 changed files with 217 additions and 8 deletions.
11 changes: 11 additions & 0 deletions libs/core/src/main/java/org/opensearch/rest/RestStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,13 @@ public enum RestStatus {
* next-hop server.
*/
EXPECTATION_FAILED(417),
/**
* Any attempt to brew coffee with a teapot should result in the error code "418 I'm a teapot". The resulting
* entity body MAY be short and stout.
* <p>
* @see <a href="https://www.rfc-editor.org/rfc/rfc2324#section-2.3.2">I'm a teapot!</a>
*/
I_AM_A_TEAPOT(418),
/**
* The request was directed at a server that is not able to produce a response. This can be sent by a server
* that is not configured to produce responses for the combination of scheme and authority that are included
Expand Down Expand Up @@ -559,4 +566,8 @@ public static RestStatus status(int successfulShards, int totalShards, ShardOper
public static RestStatus fromCode(int code) {
return CODE_TO_STATUS.get(code);
}

public static Boolean isValidRestCode(int code) {
return null != fromCode(code);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,34 @@
- is_false: nodes.$node_id.indices.translog
- is_false: nodes.$node_id.indices.recovery

---
"Metric - indexing doc_status":
- skip:
features: [arbitrary_key]
- do:
nodes.info: {}
- set:
nodes._arbitrary_key_: node_id

- do:
nodes.stats: { metric: indices, index_metric: indexing }

- is_false: nodes.$node_id.indices.docs
- is_false: nodes.$node_id.indices.store
- is_true: nodes.$node_id.indices.indexing
- is_true: nodes.$node_id.indices.indexing.doc_status
- is_false: nodes.$node_id.indices.get
- is_false: nodes.$node_id.indices.search
- is_false: nodes.$node_id.indices.merges
- is_false: nodes.$node_id.indices.refresh
- is_false: nodes.$node_id.indices.flush
- is_false: nodes.$node_id.indices.warmer
- is_false: nodes.$node_id.indices.query_cache
- is_false: nodes.$node_id.indices.fielddata
- is_false: nodes.$node_id.indices.completion
- is_false: nodes.$node_id.indices.segments
- is_false: nodes.$node_id.indices.translog
- is_false: nodes.$node_id.indices.recovery

---
"Metric - recovery":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.ShardId;
import org.opensearch.indices.IndexClosedException;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndices;
import org.opensearch.ingest.IngestService;
import org.opensearch.node.NodeClosedException;
Expand Down Expand Up @@ -130,6 +131,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
private final IndexNameExpressionResolver indexNameExpressionResolver;
private static final String DROPPED_ITEM_WITH_AUTO_GENERATED_ID = "auto-generated";
private final IndexingPressureService indexingPressureService;
private final IndicesService indicesService;
private final SystemIndices systemIndices;

@Inject
Expand All @@ -144,6 +146,7 @@ public TransportBulkAction(
IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex,
IndexingPressureService indexingPressureService,
IndicesService indicesService,
SystemIndices systemIndices
) {
this(
Expand All @@ -157,6 +160,7 @@ public TransportBulkAction(
indexNameExpressionResolver,
autoCreateIndex,
indexingPressureService,
indicesService,
systemIndices,
System::nanoTime
);
Expand All @@ -173,6 +177,7 @@ public TransportBulkAction(
IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex,
IndexingPressureService indexingPressureService,
IndicesService indicesService,
SystemIndices systemIndices,
LongSupplier relativeTimeProvider
) {
Expand All @@ -188,6 +193,7 @@ public TransportBulkAction(
this.client = client;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.indexingPressureService = indexingPressureService;
this.indicesService = indicesService;
this.systemIndices = systemIndices;
clusterService.addStateApplier(this.ingestForwarder);
}
Expand Down Expand Up @@ -637,6 +643,8 @@ public void onResponse(BulkShardResponse bulkShardResponse) {
if (bulkItemResponse.getResponse() != null) {
bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
}

indicesService.incrementDocStatusCounter(bulkItemResponse.status());
responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
}
if (counter.decrementAndGet() == 0) {
Expand All @@ -649,15 +657,15 @@ public void onFailure(Exception e) {
// create failures for all relevant requests
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
DocWriteRequest<?> docWriteRequest = request.request();
responses.set(
final DocWriteRequest<?> docWriteRequest = request.request();
final BulkItemResponse bulkItemResponse = new BulkItemResponse(
request.id(),
new BulkItemResponse(
request.id(),
docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
)
docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
);

indicesService.incrementDocStatusCounter(bulkItemResponse.status());
responses.set(request.id(), bulkItemResponse);
}
if (counter.decrementAndGet() == 0) {
finishHim();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ public void apply(Settings value, Settings current, Settings previous) {
IndicesService.INDICES_ID_FIELD_DATA_ENABLED_SETTING,
IndicesService.WRITE_DANGLING_INDICES_INFO_SETTING,
IndicesService.CLUSTER_REPLICATION_TYPE_SETTING,
IndicesService.INDEXING_DOC_STATUS_KEYS_SETTING,
MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING,
MappingUpdatedAction.INDICES_MAX_IN_FLIGHT_UPDATES_SETTING,
Metadata.SETTING_READ_ONLY_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@

import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;

/**
* Tracks indexing statistics
Expand All @@ -59,6 +61,69 @@ public class IndexingStats implements Writeable, ToXContentFragment {
*/
public static class Stats implements Writeable, ToXContentFragment {

/**
* Tracks item level rest status codes during indexing
*
* @opensearch.internal
*/
public static class DocStatusStats implements Writeable, ToXContentFragment {

private final Map<Integer, AtomicLong> docStatusCounter;

public DocStatusStats() {
this.docStatusCounter = new TreeMap<>();
}

public DocStatusStats(StreamInput in) throws IOException {
int size = in.readInt();
docStatusCounter = new TreeMap<>();

for (int i = 0; i < size; ++i) {
docStatusCounter.put(in.readInt(), new AtomicLong(in.readLong()));
}
}

public void add(DocStatusStats stats) {
for (Map.Entry<Integer, AtomicLong> entry : stats.docStatusCounter.entrySet()) {
synchronized (this) {
int k = entry.getKey();
AtomicLong v = entry.getValue();

docStatusCounter.putIfAbsent(k, new AtomicLong(0));
docStatusCounter.get(k).addAndGet(v.longValue());
}
}
}

public void inc(int status) {
synchronized (this) {
docStatusCounter.computeIfAbsent(status, s -> new AtomicLong(0));
docStatusCounter.get(status).incrementAndGet();
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.DOC_STATUS);

for (Map.Entry<Integer, AtomicLong> entry : docStatusCounter.entrySet()) {
builder.field(String.valueOf(entry.getKey()), entry.getValue().longValue());
}

return builder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeInt(docStatusCounter.size());

for (Map.Entry<Integer, AtomicLong> entry : docStatusCounter.entrySet()) {
out.writeInt(entry.getKey());
out.writeLong(entry.getValue().longValue());
}
}
}

private long indexCount;
private long indexTimeInMillis;
private long indexCurrent;
Expand All @@ -70,7 +135,11 @@ public static class Stats implements Writeable, ToXContentFragment {
private long throttleTimeInMillis;
private boolean isThrottled;

Stats() {}
private final DocStatusStats docStatusStats;

Stats() {
docStatusStats = new DocStatusStats();
}

public Stats(StreamInput in) throws IOException {
indexCount = in.readVLong();
Expand All @@ -83,6 +152,12 @@ public Stats(StreamInput in) throws IOException {
noopUpdateCount = in.readVLong();
isThrottled = in.readBoolean();
throttleTimeInMillis = in.readLong();

if (in.getVersion().onOrAfter(Version.V_2_10_0)) {
docStatusStats = in.readOptionalWriteable(DocStatusStats::new);
} else {
docStatusStats = null;
}
}

public Stats(
Expand All @@ -107,6 +182,7 @@ public Stats(
this.noopUpdateCount = noopUpdateCount;
this.isThrottled = isThrottled;
this.throttleTimeInMillis = throttleTimeInMillis;
this.docStatusStats = new DocStatusStats();
}

public void add(Stats stats) {
Expand All @@ -124,6 +200,7 @@ public void add(Stats stats) {
if (isThrottled != stats.isThrottled) {
isThrottled = true; // When combining if one is throttled set result to throttled.
}
docStatusStats.add(stats.getDocStatusStats());
}

/**
Expand Down Expand Up @@ -193,6 +270,10 @@ public long getNoopUpdateCount() {
return noopUpdateCount;
}

public DocStatusStats getDocStatusStats() {
return docStatusStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(indexCount);
Expand All @@ -206,6 +287,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(isThrottled);
out.writeLong(throttleTimeInMillis);

if (out.getVersion().onOrAfter(Version.V_2_10_0)) {
out.writeOptionalWriteable(docStatusStats);
}
}

@Override
Expand All @@ -223,6 +307,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws

builder.field(Fields.IS_THROTTLED, isThrottled);
builder.humanReadableField(Fields.THROTTLED_TIME_IN_MILLIS, Fields.THROTTLED_TIME, getThrottleTime());

if (getDocStatusStats() != null) {
getDocStatusStats().toXContent(builder, params);
}

return builder;
}
}
Expand Down Expand Up @@ -294,6 +383,7 @@ static final class Fields {
static final String IS_THROTTLED = "is_throttled";
static final String THROTTLED_TIME_IN_MILLIS = "throttle_time_in_millis";
static final String THROTTLED_TIME = "throttle_time";
static final String DOC_STATUS = "doc_status";
}

@Override
Expand Down
43 changes: 43 additions & 0 deletions server/src/main/java/org/opensearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
import org.opensearch.plugins.IndexStorePlugin;
import org.opensearch.plugins.PluginsService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestStatus;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.search.internal.AliasFilter;
Expand Down Expand Up @@ -285,6 +286,36 @@ public class IndicesService extends AbstractLifecycleComponent
Property.Final
);

private static final List<String> INDEXING_DOC_STATUS_DEFAULT_KEYS = List.of(
"200", "201", "202",
"400", "401", "403", "404", "429",
"500", "502", "504"
);

private static String validateDocStatusKey(String key) {
int result;

try {
result = Integer.parseInt(key);
} catch (Exception e) {
throw new IllegalArgumentException("Illegal value for rest status code: " + key);
}

if (RestStatus.isValidRestCode(result)) {
return key;
} else {
throw new IllegalArgumentException("Illegal value for rest status code: " + key);
}
}

public static final Setting<List<String>> INDEXING_DOC_STATUS_KEYS_SETTING = Setting.listSetting(
"cluster.doc_status_keys",
INDEXING_DOC_STATUS_DEFAULT_KEYS,
IndicesService::validateDocStatusKey,
Property.Consistent,
Property.NodeScope
);

/**
* The node's settings.
*/
Expand Down Expand Up @@ -1015,6 +1046,18 @@ public IndicesQueryCache getIndicesQueryCache() {
return indicesQueryCache;
}

public void incrementDocStatusCounter(final RestStatus status) {
int code = status.getStatus();

if (INDEXING_DOC_STATUS_KEYS_SETTING.get(clusterService.getSettings()).contains(String.valueOf(code))) {
oldShardsStats
.indexingStats
.getTotal()
.getDocStatusStats()
.inc(code);
}
}

/**
* Statistics for old shards
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ private void indicesThatCannotBeCreatedTestCase(
Settings.EMPTY,
new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null)
),
null,
new SystemIndices(emptyMap())
) {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ class TestTransportBulkAction extends TransportBulkAction {
SETTINGS,
new ClusterService(SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null)
),
null,
new SystemIndices(emptyMap())
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class TestTransportBulkAction extends TransportBulkAction {
new Resolver(),
new AutoCreateIndex(Settings.EMPTY, clusterService.getClusterSettings(), new Resolver(), new SystemIndices(emptyMap())),
new IndexingPressureService(Settings.EMPTY, clusterService),
null,
new SystemIndices(emptyMap())
);
}
Expand Down
Loading

0 comments on commit ffe7cb9

Please sign in to comment.