Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify TransportShardBulkAction#performOnReplica #41065

Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,8 @@ private static boolean isConflictException(final Exception e) {
/**
* Creates a new bulk item result from the given requests and result of performing the update operation on the shard.
*/
static BulkItemResponse processUpdateResponse(final UpdateRequest updateRequest, final String concreteIndex,
BulkItemResponse operationResponse,
final UpdateHelper.Result translate) {
private static BulkItemResponse processUpdateResponse(final UpdateRequest updateRequest, final String concreteIndex,
BulkItemResponse operationResponse, final UpdateHelper.Result translate) {
final BulkItemResponse response;
DocWriteResponse.Result translatedResult = translate.getResponseResult();
if (operationResponse.isFailed()) {
Expand Down Expand Up @@ -382,54 +381,6 @@ static BulkItemResponse processUpdateResponse(final UpdateRequest updateRequest,
return response;
}


/** Modes for executing item request on replica depending on corresponding primary execution result */
public enum ReplicaItemExecutionMode {

/**
* When primary execution succeeded
*/
NORMAL,

/**
* When primary execution failed before sequence no was generated
* or primary execution was a noop (only possible when request is originating from pre-6.0 nodes)
*/
NOOP,

/**
* When primary execution failed after sequence no was generated
*/
FAILURE
}

/**
* Determines whether a bulk item request should be executed on the replica.
*
* @return {@link ReplicaItemExecutionMode#NORMAL} upon normal primary execution with no failures
* {@link ReplicaItemExecutionMode#FAILURE} upon primary execution failure after sequence no generation
* {@link ReplicaItemExecutionMode#NOOP} upon primary execution failure before sequence no generation or
* when primary execution resulted in noop (only possible for write requests from pre-6.0 nodes)
*/
static ReplicaItemExecutionMode replicaItemExecutionMode(final BulkItemRequest request, final int index) {
final BulkItemResponse primaryResponse = request.getPrimaryResponse();
assert primaryResponse != null : "expected primary response to be set for item [" + index + "] request [" + request.request() + "]";
if (primaryResponse.isFailed()) {
return primaryResponse.getFailure().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
? ReplicaItemExecutionMode.FAILURE // we have a seq no generated with the failure, replicate as no-op
: ReplicaItemExecutionMode.NOOP; // no seq no generated, ignore replication
} else {
// TODO: once we know for sure that every operation that has been processed on the primary is assigned a seq#
// (i.e., all nodes on the cluster are on v6.0.0 or higher) we can use the existence of a seq# to indicate whether
// an operation should be processed or be treated as a noop. This means we could remove this method and the
// ReplicaItemExecutionMode enum and have a simple boolean check for seq != UNASSIGNED_SEQ_NO which will work for
// both failures and indexing operations.
return primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP
? ReplicaItemExecutionMode.NORMAL // execution successful on primary
: ReplicaItemExecutionMode.NOOP; // ignore replication
}
}

@Override
public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
final Translog.Location location = performOnReplica(request, replica);
Expand All @@ -442,25 +393,20 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
BulkItemRequest item = request.items()[i];
final Engine.Result operationResult;
DocWriteRequest<?> docWriteRequest = item.request();
switch (replicaItemExecutionMode(item, i)) {
case NORMAL:
final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse();
operationResult = performOpOnReplica(primaryResponse, docWriteRequest, replica);
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
break;
case NOOP:
break;
case FAILURE:
final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure();
assert failure.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "seq no must be assigned";
operationResult = replica.markSeqNoAsNoop(failure.getSeqNo(), failure.getMessage());
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
break;
default:
throw new IllegalStateException("illegal replica item execution mode for: " + docWriteRequest);
final BulkItemResponse response = item.getPrimaryResponse();
final BulkItemResponse.Failure failure = response.getFailure();
final DocWriteResponse writeResponse = response.getResponse();
final long seqNum = failure == null ? writeResponse.getSeqNo() : failure.getSeqNo();
if (seqNum == SequenceNumbers.UNASSIGNED_SEQ_NO) {
continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we assert failure is not null here?

Copy link
Member Author

@original-brownbear original-brownbear Apr 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to add a little more detail here: the assertion trips on responses like:

{"update":{"_index":"test_index","_type":"test_type","_id":"test_id_1","_version":2,"result":"noop","forced_refresh":true,"_shards":{"total":0,"successful":0,"failed":0},"get":{"_seq_no":3,"_primary_term":1,"found":true,"_source":{"bar":"foo"}},"status":200}}

in the BwC tests.

}
if (failure == null) {
operationResult = performOpOnReplica(writeResponse, docWriteRequest, replica);
} else {
operationResult = replica.markSeqNoAsNoop(seqNum, failure.getMessage());
}
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
}
return location;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.bulk.TransportShardBulkAction.ReplicaItemExecutionMode;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -59,7 +58,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.action.bulk.TransportShardBulkAction.replicaItemExecutionMode;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.notNullValue;
Expand Down Expand Up @@ -96,47 +94,6 @@ private IndexMetaData indexMetaData() throws IOException {
.primaryTerm(0, 1).build();
}

public void testShouldExecuteReplicaItem() throws Exception {
// Successful index request should be replicated
DocWriteRequest<IndexRequest> writeRequest = new IndexRequest("index", "_doc", "id")
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
DocWriteResponse response = new IndexResponse(shardId, "type", "id", 1, 17, 1, randomBoolean());
BulkItemRequest request = new BulkItemRequest(0, writeRequest);
request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, response));
assertThat(replicaItemExecutionMode(request, 0),
equalTo(ReplicaItemExecutionMode.NORMAL));

// Failed index requests without sequence no should not be replicated
writeRequest = new IndexRequest("index", "_doc", "id")
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
request = new BulkItemRequest(0, writeRequest);
request.setPrimaryResponse(
new BulkItemResponse(0, DocWriteRequest.OpType.INDEX,
new BulkItemResponse.Failure("index", "type", "id",
new IllegalArgumentException("i died"))));
assertThat(replicaItemExecutionMode(request, 0),
equalTo(ReplicaItemExecutionMode.NOOP));

// Failed index requests with sequence no should be replicated
request = new BulkItemRequest(0, writeRequest);
request.setPrimaryResponse(
new BulkItemResponse(0, DocWriteRequest.OpType.INDEX,
new BulkItemResponse.Failure("index", "type", "id",
new IllegalArgumentException(
"i died after sequence no was generated"),
1)));
assertThat(replicaItemExecutionMode(request, 0),
equalTo(ReplicaItemExecutionMode.FAILURE));
// NOOP requests should not be replicated
DocWriteRequest<UpdateRequest> updateRequest = new UpdateRequest("index", "type", "id");
response = new UpdateResponse(shardId, "type", "id", 1, DocWriteResponse.Result.NOOP);
request = new BulkItemRequest(0, updateRequest);
request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.UPDATE,
response));
assertThat(replicaItemExecutionMode(request, 0),
equalTo(ReplicaItemExecutionMode.NOOP));
}

public void testExecuteBulkIndexRequest() throws Exception {
IndexShard shard = newStartedShard(true);

Expand Down