Skip to content

Commit

Permalink
Replicate write failures (#23314)
Browse files Browse the repository at this point in the history
* Replicate write failures

Currently, when a primary write operation fails after generating
a sequence number, the failure is not communicated to the replicas.
Ideally, every operation which generates a sequence number on primary
should be recorded in all replicas.

In this change, a sequence number is associated with write operation
failure. When a failure with an assinged seqence number arrives at a
replica, the failure cause and sequence number is recorded in the translog
and the sequence number is marked as completed via executing `Engine.noOp`
on the replica engine.

* use zlong to serialize seq_no

* Incorporate feedback

* track write failures in translog as a noop in primary

* Add tests for replicating write failures.

Test that document failure (w/ seq no generated) are recorded
as no-op in the translog for primary and replica shards

* Update to master

* update shouldExecuteOnReplica comment

* rename indexshard noop to markSeqNoAsNoOp

* remove redundant conditional

* Consolidate possible replica action for bulk item request
depanding on it's primary execution

* remove bulk shard result abstraction

* fix failure handling logic for bwc

* add more tests

* minor fix

* cleanup

* incorporate feedback

* incorporate feedback

* add assert to remove handling noop primary response when 5.0 nodes are not supported
  • Loading branch information
areek committed Apr 19, 2017
1 parent 9e0ebc5 commit 4f773e2
Show file tree
Hide file tree
Showing 14 changed files with 521 additions and 207 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public class BulkItemRequest implements Streamable {

}

protected BulkItemRequest(int id, DocWriteRequest request) {
// NOTE: public for testing only
public BulkItemRequest(int id, DocWriteRequest request) {
this.id = id;
this.request = request;
}
Expand All @@ -56,13 +57,11 @@ public String index() {
return request.indices()[0];
}

// NOTE: protected for testing only
protected BulkItemResponse getPrimaryResponse() {
BulkItemResponse getPrimaryResponse() {
return primaryResponse;
}

// NOTE: protected for testing only
protected void setPrimaryResponse(BulkItemResponse primaryResponse) {
void setPrimaryResponse(BulkItemResponse primaryResponse) {
this.primaryResponse = primaryResponse;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;
Expand Down Expand Up @@ -171,17 +173,34 @@ public static class Failure implements Writeable, ToXContent {
private final String id;
private final Exception cause;
private final RestStatus status;
private final long seqNo;

Failure(String index, String type, String id, Exception cause, RestStatus status) {
/**
* For write failures before operation was assigned a sequence number.
*
* use @{link {@link #Failure(String, String, String, Exception, long)}}
* to record operation sequence no with failure
*/
public Failure(String index, String type, String id, Exception cause) {
this(index, type, id, cause, ExceptionsHelper.status(cause), SequenceNumbersService.UNASSIGNED_SEQ_NO);
}

public Failure(String index, String type, String id, Exception cause, RestStatus status) {
this(index, type, id, cause, status, SequenceNumbersService.UNASSIGNED_SEQ_NO);
}

/** For write failures after operation was assigned a sequence number. */
public Failure(String index, String type, String id, Exception cause, long seqNo) {
this(index, type, id, cause, ExceptionsHelper.status(cause), seqNo);
}

public Failure(String index, String type, String id, Exception cause, RestStatus status, long seqNo) {
this.index = index;
this.type = type;
this.id = id;
this.cause = cause;
this.status = status;
}

public Failure(String index, String type, String id, Exception cause) {
this(index, type, id, cause, ExceptionsHelper.status(cause));
this.seqNo = seqNo;
}

/**
Expand All @@ -193,6 +212,11 @@ public Failure(StreamInput in) throws IOException {
id = in.readOptionalString();
cause = in.readException();
status = ExceptionsHelper.status(cause);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
seqNo = in.readZLong();
} else {
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
}

@Override
Expand All @@ -201,6 +225,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(getType());
out.writeOptionalString(getId());
out.writeException(getCause());
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeZLong(getSeqNo());
}
}


Expand Down Expand Up @@ -246,6 +273,15 @@ public Exception getCause() {
return cause;
}

/**
* The operation sequence number generated by primary
* NOTE: {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
* indicates sequence number was not generated by primary
*/
public long getSeqNo() {
return seqNo;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(INDEX_FIELD, index);
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ protected abstract WriteReplicaResult<ReplicaRequest> shardOperationOnReplica(

/**
* Result of taking the action on the primary.
*
* NOTE: public for testing
*/
protected static class WritePrimaryResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>,
public static class WritePrimaryResult<ReplicaRequest extends ReplicatedWriteRequest<ReplicaRequest>,
Response extends ReplicationResponse & WriteResponse> extends PrimaryResult<ReplicaRequest, Response>
implements RespondingWriteResult {
boolean finishedAsyncActions;
Expand Down
30 changes: 18 additions & 12 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,6 @@ public Operation.TYPE getOperationType() {

void setTranslogLocation(Translog.Location translogLocation) {
if (freeze.get() == null) {
assert failure == null : "failure has to be null to set translog location";
this.translogLocation = translogLocation;
} else {
throw new IllegalStateException("result is already frozen");
Expand Down Expand Up @@ -432,7 +431,7 @@ public boolean isFound() {

}

static class NoOpResult extends Result {
public static class NoOpResult extends Result {

NoOpResult(long seqNo) {
super(Operation.TYPE.NO_OP, 0, seqNo);
Expand Down Expand Up @@ -1154,24 +1153,31 @@ public String reason() {
return reason;
}

public NoOp(
final Term uid,
final long seqNo,
final long primaryTerm,
final long version,
final VersionType versionType,
final Origin origin,
final long startTime,
final String reason) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
public NoOp(final long seqNo, final long primaryTerm, final Origin origin, final long startTime, final String reason) {
super(null, seqNo, primaryTerm, Versions.NOT_FOUND, null, origin, startTime);
this.reason = reason;
}

@Override
public Term uid() {
throw new UnsupportedOperationException();
}

@Override
public String type() {
throw new UnsupportedOperationException();
}

@Override
public long version() {
throw new UnsupportedOperationException();
}

@Override
public VersionType versionType() {
throw new UnsupportedOperationException();
}

@Override
String id() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -614,10 +614,16 @@ public IndexResult index(Index index) throws IOException {
indexResult = new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing,
plan.currentNotFoundOrDeleted);
}
if (indexResult.hasFailure() == false &&
index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
Translog.Location location =
translog.add(new Translog.Index(index, indexResult));
if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
final Translog.Location location;
if (indexResult.hasFailure() == false) {
location = translog.add(new Translog.Index(index, indexResult));
} else if (indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
// if we have document failure, record it as a no-op in the translog with the generated seq_no
location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().getMessage()));
} else {
location = null;
}
indexResult.setTranslogLocation(location);
}
if (indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
Expand Down Expand Up @@ -749,7 +755,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
* we return a `MATCH_ANY` version to indicate no document was index. The value is
* not used anyway
*/
return new IndexResult(ex, Versions.MATCH_ANY, index.seqNo());
return new IndexResult(ex, Versions.MATCH_ANY, plan.seqNoForIndexing);
} else {
throw ex;
}
Expand Down Expand Up @@ -900,10 +906,16 @@ public DeleteResult delete(Delete delete) throws IOException {
deleteResult = new DeleteResult(plan.versionOfDeletion, plan.seqNoOfDeletion,
plan.currentlyDeleted == false);
}
if (!deleteResult.hasFailure() &&
delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
Translog.Location location =
translog.add(new Translog.Delete(delete, deleteResult));
if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) {
final Translog.Location location;
if (deleteResult.hasFailure() == false) {
location = translog.add(new Translog.Delete(delete, deleteResult));
} else if (deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
location = translog.add(new Translog.NoOp(deleteResult.getSeqNo(),
delete.primaryTerm(), deleteResult.getFailure().getMessage()));
} else {
location = null;
}
deleteResult.setTranslogLocation(location);
}
if (deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
Expand Down
33 changes: 23 additions & 10 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -569,28 +569,34 @@ private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOExc
return result;
}

public Engine.NoOp prepareMarkingSeqNoAsNoOp(long seqNo, String reason) {
verifyReplicationTarget();
long startTime = System.nanoTime();
return new Engine.NoOp(seqNo, primaryTerm, Engine.Operation.Origin.REPLICA, startTime, reason);
}

public Engine.NoOpResult markSeqNoAsNoOp(Engine.NoOp noOp) throws IOException {
ensureWriteAllowed(noOp);
Engine engine = getEngine();
return engine.noOp(noOp);
}

public Engine.Delete prepareDeleteOnPrimary(String type, String id, long version, VersionType versionType) {
verifyPrimary();
final DocumentMapper documentMapper = docMapper(type).getDocumentMapper();
final MappedFieldType uidFieldType = documentMapper.uidMapper().fieldType();
final Query uidQuery = uidFieldType.termQuery(Uid.createUid(type, id), null);
final Term uid = MappedFieldType.extractTerm(uidQuery);
final Term uid = extractUid(type, id);
return prepareDelete(type, id, uid, SequenceNumbersService.UNASSIGNED_SEQ_NO, primaryTerm, version,
versionType, Engine.Operation.Origin.PRIMARY);
}

public Engine.Delete prepareDeleteOnReplica(String type, String id, long seqNo, long primaryTerm,
long version, VersionType versionType) {
verifyReplicationTarget();
final DocumentMapper documentMapper = docMapper(type).getDocumentMapper();
final MappedFieldType uidFieldType = documentMapper.uidMapper().fieldType();
final Query uidQuery = uidFieldType.termQuery(Uid.createUid(type, id), null);
final Term uid = MappedFieldType.extractTerm(uidQuery);
final Term uid = extractUid(type, id);
return prepareDelete(type, id, uid, seqNo, primaryTerm, version, versionType, Engine.Operation.Origin.REPLICA);
}

static Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, long primaryTerm, long version,
VersionType versionType, Engine.Operation.Origin origin) {
private static Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, long primaryTerm, long version,
VersionType versionType, Engine.Operation.Origin origin) {
long startTime = System.nanoTime();
return new Engine.Delete(type, id, uid, seqNo, primaryTerm, version, versionType, origin, startTime);
}
Expand All @@ -601,6 +607,13 @@ public Engine.DeleteResult delete(Engine.Delete delete) throws IOException {
return delete(engine, delete);
}

private Term extractUid(String type, String id) {
final DocumentMapper documentMapper = docMapper(type).getDocumentMapper();
final MappedFieldType uidFieldType = documentMapper.uidMapper().fieldType();
final Query uidQuery = uidFieldType.termQuery(Uid.createUid(type, id), null);
return MappedFieldType.extractTerm(uidQuery);
}

private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) throws IOException {
active.set(true);
final Engine.DeleteResult result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException;
import org.elasticsearch.index.mapper.DocumentMapperForType;
import org.elasticsearch.index.mapper.MapperException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Mapping;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.rest.RestStatus;

Expand Down Expand Up @@ -182,7 +180,7 @@ private void performRecoveryOperation(Engine engine, Translog.Operation operatio
final String reason = noOp.reason();
logger.trace("[translog] recover [no_op] op [({}, {})] of [{}]", seqNo, primaryTerm, reason);
final Engine.NoOp engineNoOp =
new Engine.NoOp(null, seqNo, primaryTerm, 0, VersionType.INTERNAL, origin, System.nanoTime(), reason);
new Engine.NoOp(seqNo, primaryTerm, origin, System.nanoTime(), reason);
noOp(engine, engineNoOp);
break;
default:
Expand Down
Loading

0 comments on commit 4f773e2

Please sign in to comment.