Skip to content

Commit

Permalink
Simplify TransportShardBulkAction#performOnReplica (elastic#41065)
Browse files Browse the repository at this point in the history
* Simplify TransortShardBulkAction#performOnReplica
* Resolve TODO since 8.0 doesn't have to worry about pre 6.x nodes
* Remove test for removed method since the logic is now completely internal to `performOnReplica`
  • Loading branch information
original-brownbear authored and Gurkan Kaymak committed May 27, 2019
1 parent 8fecd1c commit 605934f
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,8 @@ private static boolean isConflictException(final Exception e) {
/**
* Creates a new bulk item result from the given requests and result of performing the update operation on the shard.
*/
static BulkItemResponse processUpdateResponse(final UpdateRequest updateRequest, final String concreteIndex,
BulkItemResponse operationResponse,
final UpdateHelper.Result translate) {
private static BulkItemResponse processUpdateResponse(final UpdateRequest updateRequest, final String concreteIndex,
BulkItemResponse operationResponse, final UpdateHelper.Result translate) {
final BulkItemResponse response;
DocWriteResponse.Result translatedResult = translate.getResponseResult();
if (operationResponse.isFailed()) {
Expand Down Expand Up @@ -382,54 +381,6 @@ static BulkItemResponse processUpdateResponse(final UpdateRequest updateRequest,
return response;
}


/** Modes for executing item request on replica depending on corresponding primary execution result */
public enum ReplicaItemExecutionMode {

/**
* When primary execution succeeded
*/
NORMAL,

/**
* When primary execution failed before sequence no was generated
* or primary execution was a noop (only possible when request is originating from pre-6.0 nodes)
*/
NOOP,

/**
* When primary execution failed after sequence no was generated
*/
FAILURE
}

/**
* Determines whether a bulk item request should be executed on the replica.
*
* @return {@link ReplicaItemExecutionMode#NORMAL} upon normal primary execution with no failures
* {@link ReplicaItemExecutionMode#FAILURE} upon primary execution failure after sequence no generation
* {@link ReplicaItemExecutionMode#NOOP} upon primary execution failure before sequence no generation or
* when primary execution resulted in noop (only possible for write requests from pre-6.0 nodes)
*/
static ReplicaItemExecutionMode replicaItemExecutionMode(final BulkItemRequest request, final int index) {
final BulkItemResponse primaryResponse = request.getPrimaryResponse();
assert primaryResponse != null : "expected primary response to be set for item [" + index + "] request [" + request.request() + "]";
if (primaryResponse.isFailed()) {
return primaryResponse.getFailure().getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
? ReplicaItemExecutionMode.FAILURE // we have a seq no generated with the failure, replicate as no-op
: ReplicaItemExecutionMode.NOOP; // no seq no generated, ignore replication
} else {
// TODO: once we know for sure that every operation that has been processed on the primary is assigned a seq#
// (i.e., all nodes on the cluster are on v6.0.0 or higher) we can use the existence of a seq# to indicate whether
// an operation should be processed or be treated as a noop. This means we could remove this method and the
// ReplicaItemExecutionMode enum and have a simple boolean check for seq != UNASSIGNED_SEQ_NO which will work for
// both failures and indexing operations.
return primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP
? ReplicaItemExecutionMode.NORMAL // execution successful on primary
: ReplicaItemExecutionMode.NOOP; // ignore replication
}
}

@Override
public WriteReplicaResult<BulkShardRequest> shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception {
final Translog.Location location = performOnReplica(request, replica);
Expand All @@ -442,25 +393,22 @@ public static Translog.Location performOnReplica(BulkShardRequest request, Index
BulkItemRequest item = request.items()[i];
final Engine.Result operationResult;
DocWriteRequest<?> docWriteRequest = item.request();
switch (replicaItemExecutionMode(item, i)) {
case NORMAL:
final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse();
operationResult = performOpOnReplica(primaryResponse, docWriteRequest, replica);
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
break;
case NOOP:
break;
case FAILURE:
final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure();
assert failure.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "seq no must be assigned";
operationResult = replica.markSeqNoAsNoop(failure.getSeqNo(), failure.getMessage());
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
break;
default:
throw new IllegalStateException("illegal replica item execution mode for: " + docWriteRequest);
final BulkItemResponse response = item.getPrimaryResponse();
final BulkItemResponse.Failure failure = response.getFailure();
final DocWriteResponse writeResponse = response.getResponse();
final long seqNum = failure == null ? writeResponse.getSeqNo() : failure.getSeqNo();
if (seqNum == SequenceNumbers.UNASSIGNED_SEQ_NO) {
assert failure != null || writeResponse.getResult() == DocWriteResponse.Result.NOOP
|| writeResponse.getResult() == DocWriteResponse.Result.NOT_FOUND;
continue;
}
if (failure == null) {
operationResult = performOpOnReplica(writeResponse, docWriteRequest, replica);
} else {
operationResult = replica.markSeqNoAsNoop(seqNum, failure.getMessage());
}
assert operationResult != null : "operation result must never be null when primary response has no failure";
location = syncOperationResultOrThrow(operationResult, location);
}
return location;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.bulk.TransportShardBulkAction.ReplicaItemExecutionMode;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
Expand Down Expand Up @@ -59,7 +58,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.action.bulk.TransportShardBulkAction.replicaItemExecutionMode;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.notNullValue;
Expand Down Expand Up @@ -96,47 +94,6 @@ private IndexMetaData indexMetaData() throws IOException {
.primaryTerm(0, 1).build();
}

public void testShouldExecuteReplicaItem() throws Exception {
// Successful index request should be replicated
DocWriteRequest<IndexRequest> writeRequest = new IndexRequest("index", "_doc", "id")
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
DocWriteResponse response = new IndexResponse(shardId, "type", "id", 1, 17, 1, randomBoolean());
BulkItemRequest request = new BulkItemRequest(0, writeRequest);
request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, response));
assertThat(replicaItemExecutionMode(request, 0),
equalTo(ReplicaItemExecutionMode.NORMAL));

// Failed index requests without sequence no should not be replicated
writeRequest = new IndexRequest("index", "_doc", "id")
.source(Requests.INDEX_CONTENT_TYPE, "foo", "bar");
request = new BulkItemRequest(0, writeRequest);
request.setPrimaryResponse(
new BulkItemResponse(0, DocWriteRequest.OpType.INDEX,
new BulkItemResponse.Failure("index", "type", "id",
new IllegalArgumentException("i died"))));
assertThat(replicaItemExecutionMode(request, 0),
equalTo(ReplicaItemExecutionMode.NOOP));

// Failed index requests with sequence no should be replicated
request = new BulkItemRequest(0, writeRequest);
request.setPrimaryResponse(
new BulkItemResponse(0, DocWriteRequest.OpType.INDEX,
new BulkItemResponse.Failure("index", "type", "id",
new IllegalArgumentException(
"i died after sequence no was generated"),
1)));
assertThat(replicaItemExecutionMode(request, 0),
equalTo(ReplicaItemExecutionMode.FAILURE));
// NOOP requests should not be replicated
DocWriteRequest<UpdateRequest> updateRequest = new UpdateRequest("index", "type", "id");
response = new UpdateResponse(shardId, "type", "id", 1, DocWriteResponse.Result.NOOP);
request = new BulkItemRequest(0, updateRequest);
request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.UPDATE,
response));
assertThat(replicaItemExecutionMode(request, 0),
equalTo(ReplicaItemExecutionMode.NOOP));
}

public void testExecuteBulkIndexRequest() throws Exception {
IndexShard shard = newStartedShard(true);

Expand Down

0 comments on commit 605934f

Please sign in to comment.