Skip to content

Commit

Permalink
Indexing: add Doc Status Counter
Browse files Browse the repository at this point in the history
Currently, Opensearch returns a 200 OK response code for a Bulk API
call, even though there can be partial/complete failures within the
request E2E. Tracking these failures requires client to parse the
response on their side and make sense of them. But, a general idea
around trend in growth of different rest status codes at item level
can provide insights on how indexing engine is performing.

Signed-off-by: Rohit Ashiwal <rashiwal@amazon.com>
  • Loading branch information
r1walz committed Sep 27, 2023
1 parent 8807d7a commit 2d98dde
Show file tree
Hide file tree
Showing 16 changed files with 823 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Telemetry-Otel] Added support for OtlpGrpcSpanExporter exporter ([#9666](https://github.com/opensearch-project/OpenSearch/pull/9666))
- Async blob read support for encrypted containers ([#10131](https://github.com/opensearch-project/OpenSearch/pull/10131))
- Add capability to restrict async durability mode for remote indexes ([#10189](https://github.com/opensearch-project/OpenSearch/pull/10189))
- Add Doc Status Counter for Indexing Engine ([#4562](https://github.com/opensearch-project/OpenSearch/issues/4562))

### Dependencies
- Bump `peter-evans/create-or-update-comment` from 2 to 3 ([#9575](https://github.com/opensearch-project/OpenSearch/pull/9575))
Expand Down
10 changes: 10 additions & 0 deletions libs/core/src/main/java/org/opensearch/core/rest/RestStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ public enum RestStatus {
INSUFFICIENT_STORAGE(507);

private static final Map<Integer, RestStatus> CODE_TO_STATUS;

static {
RestStatus[] values = values();
Map<Integer, RestStatus> codeToStatus = new HashMap<>(values.length);
Expand All @@ -527,6 +528,15 @@ public int getStatus() {
return status;
}

/**
* Get category class of a rest status code.
*
* @return Integer representing class category of the concrete rest status code
*/
public int getStatusFamilyCode() {
return status / 100;
}

public static RestStatus readFrom(StreamInput in) throws IOException {
return RestStatus.valueOf(in.readString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,35 @@
- is_false: nodes.$node_id.indices.translog
- is_false: nodes.$node_id.indices.recovery

---
"Metric - indexing doc_status":
- skip:
version: " - 2.99.99"
reason: "To be introduced in future release :: TODO: change if/when we backport to 2.x"
- do:
nodes.info: {}
- set:
nodes._arbitrary_key_: node_id

- do:
nodes.stats: { metric: indices, index_metric: indexing }

- is_false: nodes.$node_id.indices.docs
- is_false: nodes.$node_id.indices.store
- is_true: nodes.$node_id.indices.indexing
- is_true: nodes.$node_id.indices.indexing.doc_status
- is_false: nodes.$node_id.indices.get
- is_false: nodes.$node_id.indices.search
- is_false: nodes.$node_id.indices.merges
- is_false: nodes.$node_id.indices.refresh
- is_false: nodes.$node_id.indices.flush
- is_false: nodes.$node_id.indices.warmer
- is_false: nodes.$node_id.indices.query_cache
- is_false: nodes.$node_id.indices.fielddata
- is_false: nodes.$node_id.indices.completion
- is_false: nodes.$node_id.indices.segments
- is_false: nodes.$node_id.indices.translog
- is_false: nodes.$node_id.indices.recovery

---
"Metric - recovery":
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.nodestats;

import org.opensearch.ExceptionsHelper;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.index.shard.IndexingStats.Stats.DocStatusStats;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase.ClusterScope;
import org.opensearch.test.OpenSearchIntegTestCase.Scope;
import org.hamcrest.MatcherAssert;

import java.util.Map;

import static java.util.Collections.singletonMap;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

@ClusterScope(scope = Scope.SUITE, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false)
public class NodeStatsIT extends OpenSearchIntegTestCase {

private static final DocStatusStats expectedDocStatusStats = new DocStatusStats();
private static final String FIELD = "dummy_field";
private static final String VALUE = "dummy_value";
private static final Map<String, Object> SOURCE = singletonMap(FIELD, VALUE);

public void testNodeIndicesStatsDocStatusStatsBulk() {
final String INDEX = "bulk_index";

int sizeOfIndexRequests = scaledRandomIntBetween(10, 20);
int sizeOfDeleteRequests = scaledRandomIntBetween(5, sizeOfIndexRequests);
int sizeOfNotFoundRequests = scaledRandomIntBetween(5, sizeOfIndexRequests);

BulkRequest bulkRequest = new BulkRequest();

for (int i = 0; i < sizeOfIndexRequests; ++i) {
bulkRequest.add(new IndexRequest(INDEX).id(String.valueOf(i)).source(SOURCE));
}

BulkResponse response = client().bulk(bulkRequest).actionGet();

MatcherAssert.assertThat(response.hasFailures(), equalTo(false));
MatcherAssert.assertThat(response.getItems().length, equalTo(sizeOfIndexRequests));

for (BulkItemResponse itemResponse : response.getItems()) {
updateExpectedDocStatusCounter(itemResponse.getResponse());
}

refresh(INDEX);
bulkRequest.requests().clear();

for (int i = 0; i < sizeOfDeleteRequests; ++i) {
bulkRequest.add(new DeleteRequest(INDEX, String.valueOf(i)));
}
for (int i = 0; i < sizeOfNotFoundRequests; ++i) {
bulkRequest.add(new DeleteRequest(INDEX, String.valueOf(25 + i)));
}

response = client().bulk(bulkRequest).actionGet();

MatcherAssert.assertThat(response.hasFailures(), equalTo(false));
MatcherAssert.assertThat(response.getItems().length, equalTo(sizeOfDeleteRequests + sizeOfNotFoundRequests));

for (BulkItemResponse itemResponse : response.getItems()) {
updateExpectedDocStatusCounter(itemResponse.getResponse());
}

refresh(INDEX);
assertDocStatusStats();
}

public void testNodeIndicesStatsDocStatusStatsIndex() {
final String INDEX = "test_index";
final String ID = "id";
{
// Testing Normal Index
IndexResponse response = client().index(new IndexRequest(INDEX).id(ID).source(SOURCE)).actionGet();
updateExpectedDocStatusCounter(response);

MatcherAssert.assertThat(response.getResult(), equalTo(DocWriteResponse.Result.CREATED));
assertDocStatusStats();
}
{
// Testing Missing Alias
updateExpectedDocStatusCounter(
expectThrows(
IndexNotFoundException.class,
() -> client().index(new IndexRequest(INDEX).id("missing_alias").setRequireAlias(true).source(SOURCE)).actionGet()
)
);
assertDocStatusStats();
}
{
// Test Missing Pipeline: Ingestion failure, not Indexing failure
expectThrows(
IllegalArgumentException.class,
() -> client().index(new IndexRequest(INDEX).id("missing_pipeline").setPipeline("missing").source(SOURCE)).actionGet()
);
assertDocStatusStats();
}
{
// Testing Version Conflict
final String docId = "version_conflict";

updateExpectedDocStatusCounter(client().index(new IndexRequest(INDEX).id(docId).source(SOURCE)).actionGet());
updateExpectedDocStatusCounter(
expectThrows(
VersionConflictEngineException.class,
() -> client().index(new IndexRequest(INDEX).id(docId).source(SOURCE).setIfSeqNo(1L).setIfPrimaryTerm(99L)).actionGet()
)
);
assertDocStatusStats();
}
}

public void testNodeIndicesStatsDocStatusStatsCreate() {
final String INDEX = "create_index";
final String ID = "id";
{
// Testing Creation
IndexResponse response = client().index(new IndexRequest(INDEX).id(ID).source(SOURCE).create(true)).actionGet();
updateExpectedDocStatusCounter(response);

MatcherAssert.assertThat(response.getResult(), equalTo(DocWriteResponse.Result.CREATED));
assertDocStatusStats();
}
{
// Testing Version Conflict
final String docId = "version_conflict";

updateExpectedDocStatusCounter(client().index(new IndexRequest(INDEX).id(docId).source(SOURCE)).actionGet());
updateExpectedDocStatusCounter(
expectThrows(
VersionConflictEngineException.class,
() -> client().index(new IndexRequest(INDEX).id(docId).source(SOURCE).create(true)).actionGet()
)
);
assertDocStatusStats();
}
}

public void testNodeIndicesStatsDocStatusStatsDelete() {
final String INDEX = "delete_index";
final String ID = "id";
{
// Testing Deletion
IndexResponse response = client().index(new IndexRequest(INDEX).id(ID).source(SOURCE)).actionGet();
updateExpectedDocStatusCounter(response);

DeleteResponse deleteResponse = client().delete(new DeleteRequest(INDEX, ID)).actionGet();
updateExpectedDocStatusCounter(deleteResponse);

MatcherAssert.assertThat(response.getSeqNo(), greaterThanOrEqualTo(0L));
MatcherAssert.assertThat(deleteResponse.getResult(), equalTo(DocWriteResponse.Result.DELETED));
assertDocStatusStats();
}
{
// Testing Non-Existing Doc
updateExpectedDocStatusCounter(client().delete(new DeleteRequest(INDEX, "does_not_exist")).actionGet());
assertDocStatusStats();
}
{
// Testing Version Conflict
final String docId = "version_conflict";

updateExpectedDocStatusCounter(client().index(new IndexRequest(INDEX).id(docId).source(SOURCE)).actionGet());
updateExpectedDocStatusCounter(
expectThrows(
VersionConflictEngineException.class,
() -> client().delete(new DeleteRequest(INDEX, docId).setIfSeqNo(2L).setIfPrimaryTerm(99L)).actionGet()
)
);

assertDocStatusStats();
}
}

public void testNodeIndicesStatsDocStatusStatsUpdate() {
final String INDEX = "update_index";
final String ID = "id";
{
// Testing Not Found
updateExpectedDocStatusCounter(
expectThrows(DocumentMissingException.class, () -> client().update(new UpdateRequest(INDEX, ID).doc(SOURCE)).actionGet())
);
assertDocStatusStats();
}
{
// Testing NoOp Update
updateExpectedDocStatusCounter(client().index(new IndexRequest(INDEX).id(ID).source(SOURCE)).actionGet());

UpdateResponse response = client().update(new UpdateRequest(INDEX, ID).doc(SOURCE)).actionGet();
updateExpectedDocStatusCounter(response);

MatcherAssert.assertThat(response.getResult(), equalTo(DocWriteResponse.Result.NOOP));
assertDocStatusStats();
}
{
// Testing Update
final String UPDATED_VALUE = "updated_value";
UpdateResponse response = client().update(new UpdateRequest(INDEX, ID).doc(singletonMap(FIELD, UPDATED_VALUE))).actionGet();
updateExpectedDocStatusCounter(response);

MatcherAssert.assertThat(response.getResult(), equalTo(DocWriteResponse.Result.UPDATED));
assertDocStatusStats();
}
{
// Testing Missing Alias
updateExpectedDocStatusCounter(
expectThrows(
IndexNotFoundException.class,
() -> client().update(new UpdateRequest(INDEX, ID).setRequireAlias(true).doc(new IndexRequest().source(SOURCE)))
.actionGet()
)
);
assertDocStatusStats();
}
{
// Testing Version Conflict
final String docId = "version_conflict";

updateExpectedDocStatusCounter(client().index(new IndexRequest(INDEX).id(docId).source(SOURCE)).actionGet());
updateExpectedDocStatusCounter(
expectThrows(
VersionConflictEngineException.class,
() -> client().update(new UpdateRequest(INDEX, docId).doc(SOURCE).setIfSeqNo(2L).setIfPrimaryTerm(99L)).actionGet()
)
);
assertDocStatusStats();
}
}

private void assertDocStatusStats() {
DocStatusStats docStatusStats = client().admin()
.cluster()
.prepareNodesStats()
.execute()
.actionGet()
.getNodes()
.get(0)
.getIndices()
.getIndexing()
.getTotal()
.getDocStatusStats();

MatcherAssert.assertThat(docStatusStats, equalTo(expectedDocStatusStats));
}

private void updateExpectedDocStatusCounter(DocWriteResponse r) {
expectedDocStatusStats.inc(r.status());
}

private void updateExpectedDocStatusCounter(Exception e) {
expectedDocStatusStats.inc(ExceptionsHelper.status(e));
}

}
Loading

0 comments on commit 2d98dde

Please sign in to comment.