Skip to content

Commit

Permalink
Indexing: add Doc Status Counter
Browse files Browse the repository at this point in the history
Currently, Opensearch returns a 200 OK response code for a Bulk API
call, even though there can be partial/complete failures within the
request E2E. Tracking these failures requires client to parse the
response on their side and make sense of them. But, a general idea
around trend in growth of different rest status codes at item level
can provide insights on how indexing engine is performing.

Signed-off-by: Rohit Ashiwal <rashiwal@amazon.com>
  • Loading branch information
r1walz committed Aug 27, 2023
1 parent f5a6e6d commit 0b5a326
Show file tree
Hide file tree
Showing 15 changed files with 612 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854))
- Introduce new dynamic cluster setting to control slice computation for concurrent segment search ([#9107](https://github.com/opensearch-project/OpenSearch/pull/9107))
- Implement on behalf of token passing for extensions ([#8679](https://github.com/opensearch-project/OpenSearch/pull/8679))
- Add Doc Status Counter for Indexing Engine ([#4562](https://github.com/opensearch-project/OpenSearch/issues/4562))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,4 +559,13 @@ public static RestStatus status(int successfulShards, int totalShards, ShardOper
public static RestStatus fromCode(int code) {
return CODE_TO_STATUS.get(code);
}

/**
* Get category class of a rest status code.
*
* @return Integer representing class category of the concrete rest status code
*/
public int getStatusFamilyCode() {
return status / 100;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,35 @@
- is_false: nodes.$node_id.indices.translog
- is_false: nodes.$node_id.indices.recovery

---
"Metric - indexing doc_status":
- skip:
version: " - 2.99.99"
reason: "To be introduced in future release :: TODO: change if/when we backport to 2.x"
- do:
nodes.info: {}
- set:
nodes._arbitrary_key_: node_id

- do:
nodes.stats: { metric: indices, index_metric: indexing }

- is_false: nodes.$node_id.indices.docs
- is_false: nodes.$node_id.indices.store
- is_true: nodes.$node_id.indices.indexing
- is_true: nodes.$node_id.indices.indexing.doc_status
- is_false: nodes.$node_id.indices.get
- is_false: nodes.$node_id.indices.search
- is_false: nodes.$node_id.indices.merges
- is_false: nodes.$node_id.indices.refresh
- is_false: nodes.$node_id.indices.flush
- is_false: nodes.$node_id.indices.warmer
- is_false: nodes.$node_id.indices.query_cache
- is_false: nodes.$node_id.indices.fielddata
- is_false: nodes.$node_id.indices.completion
- is_false: nodes.$node_id.indices.segments
- is_false: nodes.$node_id.indices.translog
- is_false: nodes.$node_id.indices.recovery

---
"Metric - recovery":
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.nodestats;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.client.Requests;
import org.opensearch.index.shard.IndexingStats;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
import org.hamcrest.MatcherAssert;

import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;

@ClusterScope(scope = Scope.SUITE, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false)
public class NodeStatsIT extends OpenSearchIntegTestCase {

private static final String INDEX = "test_index";
private static final AtomicLong[] expectedDocStatusCounter = Stream.generate(AtomicLong::new).limit(5).toArray(AtomicLong[]::new);

public void testNodeIndicesStatsDocStatusStats() {
int sizeOfIndexRequests = scaledRandomIntBetween(10, 20);
int sizeOfDeleteRequests = scaledRandomIntBetween(5, sizeOfIndexRequests);
int sizeOfNotFoundRequests = scaledRandomIntBetween(5, sizeOfIndexRequests);

BulkRequest bulkRequest = new BulkRequest();

for (int i = 0; i < sizeOfIndexRequests; ++i) {
bulkRequest.add(new IndexRequest(INDEX).id(String.valueOf(i)).source(Requests.INDEX_CONTENT_TYPE, "field", "value"));
}

BulkResponse response = client().bulk(bulkRequest).actionGet();

MatcherAssert.assertThat(response.hasFailures(), equalTo(false));
MatcherAssert.assertThat(response.getItems().length, equalTo(sizeOfIndexRequests));

for (BulkItemResponse itemResponse : response.getItems()) {
expectedDocStatusCounter[itemResponse.getResponse().status().getStatusFamilyCode() - 1].incrementAndGet();
}

refresh(INDEX);
bulkRequest.requests().clear();

for (int i = 0; i < sizeOfDeleteRequests; ++i) {
bulkRequest.add(new DeleteRequest(INDEX, String.valueOf(i)));
}
for (int i = 0; i < sizeOfNotFoundRequests; ++i) {
bulkRequest.add(new DeleteRequest(INDEX, String.valueOf(25 + i)));
}

response = client().bulk(bulkRequest).actionGet();

MatcherAssert.assertThat(response.hasFailures(), equalTo(false));
MatcherAssert.assertThat(response.getItems().length, equalTo(sizeOfDeleteRequests + sizeOfNotFoundRequests));

for (BulkItemResponse itemResponse : response.getItems()) {
expectedDocStatusCounter[itemResponse.getResponse().status().getStatusFamilyCode() - 1].incrementAndGet();
}

refresh(INDEX);

NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().execute().actionGet();
IndexingStats.Stats stats = nodesStatsResponse.getNodes().get(0).getIndices().getIndexing().getTotal();

MatcherAssert.assertThat(stats.getIndexCount(), greaterThan(0L));
MatcherAssert.assertThat(stats.getIndexTime().duration(), greaterThan(0L));
MatcherAssert.assertThat(stats.getIndexCurrent(), notNullValue());
MatcherAssert.assertThat(stats.getIndexFailedCount(), notNullValue());
MatcherAssert.assertThat(stats.getDeleteCount(), greaterThan(0L));
MatcherAssert.assertThat(stats.getDeleteTime().duration(), greaterThan(0L));
MatcherAssert.assertThat(stats.getDeleteCurrent(), notNullValue());
MatcherAssert.assertThat(stats.getNoopUpdateCount(), notNullValue());
MatcherAssert.assertThat(stats.isThrottled(), notNullValue());
MatcherAssert.assertThat(stats.getThrottleTime(), notNullValue());

AtomicLong[] docStatusCounter = stats.getDocStatusStats().getDocStatusCounter();

MatcherAssert.assertThat(docStatusCounter.length, equalTo(expectedDocStatusCounter.length));

for (int i = 0; i < docStatusCounter.length; ++i) {
MatcherAssert.assertThat(docStatusCounter[i].longValue(), equalTo(expectedDocStatusCounter[i].longValue()));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.opensearch.index.VersionType;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.indices.IndexClosedException;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.SystemIndices;
import org.opensearch.ingest.IngestService;
import org.opensearch.node.NodeClosedException;
Expand All @@ -100,6 +101,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -129,6 +131,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
private final IndexNameExpressionResolver indexNameExpressionResolver;
private static final String DROPPED_ITEM_WITH_AUTO_GENERATED_ID = "auto-generated";
private final IndexingPressureService indexingPressureService;
private final IndicesService indicesService;
private final SystemIndices systemIndices;

@Inject
Expand All @@ -143,6 +146,7 @@ public TransportBulkAction(
IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex,
IndexingPressureService indexingPressureService,
IndicesService indicesService,
SystemIndices systemIndices
) {
this(
Expand All @@ -156,6 +160,7 @@ public TransportBulkAction(
indexNameExpressionResolver,
autoCreateIndex,
indexingPressureService,
indicesService,
systemIndices,
System::nanoTime
);
Expand All @@ -172,6 +177,7 @@ public TransportBulkAction(
IndexNameExpressionResolver indexNameExpressionResolver,
AutoCreateIndex autoCreateIndex,
IndexingPressureService indexingPressureService,
IndicesService indicesService,
SystemIndices systemIndices,
LongSupplier relativeTimeProvider
) {
Expand All @@ -187,6 +193,7 @@ public TransportBulkAction(
this.client = client;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.indexingPressureService = indexingPressureService;
this.indicesService = indicesService;
this.systemIndices = systemIndices;
clusterService.addStateApplier(this.ingestForwarder);
}
Expand Down Expand Up @@ -218,6 +225,7 @@ protected void doExecute(Task task, BulkRequest bulkRequest, ActionListener<Bulk
try {
doInternalExecute(task, bulkRequest, executorName, releasingListener);
} catch (Exception e) {
indicesService.incDocStatusCounter(e);
releasingListener.onFailure(e);
}
}
Expand Down Expand Up @@ -265,6 +273,7 @@ protected void doInternalExecute(Task task, BulkRequest bulkRequest, String exec
ingestForwarder.forwardIngestRequest(BulkAction.INSTANCE, bulkRequest, listener);
}
} catch (Exception e) {
indicesService.incDocStatusCounter(e);
listener.onFailure(e);
}
return;
Expand Down Expand Up @@ -319,6 +328,12 @@ public void onResponse(CreateIndexResponse result) {
protected void doRun() {
executeBulk(task, bulkRequest, startTime, listener, responses, indicesThatCannotBeCreated);
}

@Override
public void onFailure(Exception e) {
indicesService.incDocStatusCounter(e);
super.onFailure(e);
}
});
}
}
Expand All @@ -345,6 +360,12 @@ protected void doRun() {
executeBulk(task, bulkRequest, startTime, wrappedListener, responses, indicesThatCannotBeCreated);
}

@Override
public void onFailure(Exception e) {
indicesService.incDocStatusCounter(e);
super.onFailure(e);
}

@Override
public void onRejection(Exception rejectedException) {
rejectedException.addSuppressed(e);
Expand Down Expand Up @@ -603,6 +624,12 @@ protected void doRun() {

final AtomicInteger counter = new AtomicInteger(requestsByShard.size());
String nodeId = clusterService.localNode().getId();

final AtomicLong[] docStatusCounter = new AtomicLong[5];
for (int i = 0; i < docStatusCounter.length; ++i) {
docStatusCounter[i] = new AtomicLong(0);
}

for (Map.Entry<ShardId, List<BulkItemRequest>> entry : requestsByShard.entrySet()) {
final ShardId shardId = entry.getKey();
final List<BulkItemRequest> requests = entry.getValue();
Expand Down Expand Up @@ -632,8 +659,11 @@ public void onResponse(BulkShardResponse bulkShardResponse) {
if (bulkItemResponse.getResponse() != null) {
bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo());
}

docStatusCounter[bulkItemResponse.status().getStatusFamilyCode() - 1].incrementAndGet();
responses.set(bulkItemResponse.getItemId(), bulkItemResponse);
}

if (counter.decrementAndGet() == 0) {
finishHim();
}
Expand All @@ -644,25 +674,28 @@ public void onFailure(Exception e) {
// create failures for all relevant requests
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
DocWriteRequest<?> docWriteRequest = request.request();
responses.set(
final DocWriteRequest<?> docWriteRequest = request.request();
final BulkItemResponse bulkItemResponse = new BulkItemResponse(
request.id(),
new BulkItemResponse(
request.id(),
docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
)
docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.id(), e)
);

docStatusCounter[bulkItemResponse.status().getStatusFamilyCode() - 1].incrementAndGet();
responses.set(request.id(), bulkItemResponse);
}

if (counter.decrementAndGet() == 0) {
finishHim();
}
}

private void finishHim() {
listener.onResponse(
new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos))
);
BulkItemResponse[] response = responses.toArray(new BulkItemResponse[responses.length()]);
long tookMillis = buildTookInMillis(startTimeNanos);

indicesService.addDocStatusCounter(docStatusCounter);
listener.onResponse(new BulkResponse(response, tookMillis));
}
}, releasable::close));
}
Expand Down
Loading

0 comments on commit 0b5a326

Please sign in to comment.