diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java index 3023ecb1856a4..50da1476f49f3 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java @@ -38,7 +38,8 @@ public class BulkItemRequest implements Streamable { } - protected BulkItemRequest(int id, DocWriteRequest request) { + // NOTE: public for testing only + public BulkItemRequest(int id, DocWriteRequest request) { this.id = id; this.request = request; } @@ -56,13 +57,11 @@ public String index() { return request.indices()[0]; } - // NOTE: protected for testing only - protected BulkItemResponse getPrimaryResponse() { + BulkItemResponse getPrimaryResponse() { return primaryResponse; } - // NOTE: protected for testing only - protected void setPrimaryResponse(BulkItemResponse primaryResponse) { + void setPrimaryResponse(BulkItemResponse primaryResponse) { this.primaryResponse = primaryResponse; } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java index 2e2a7f1540108..68cede5d25178 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java @@ -37,6 +37,8 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.rest.RestStatus; import java.io.IOException; @@ -171,17 +173,34 @@ public static class Failure implements Writeable, ToXContent { private final String id; private final Exception cause; private final RestStatus status; + private final long seqNo; - Failure(String index, String type, String id, Exception cause, RestStatus status) { + /** + * For write failures before operation was assigned a sequence number. + * + * use @{link {@link #Failure(String, String, String, Exception, long)}} + * to record operation sequence no with failure + */ + public Failure(String index, String type, String id, Exception cause) { + this(index, type, id, cause, ExceptionsHelper.status(cause), SequenceNumbersService.UNASSIGNED_SEQ_NO); + } + + public Failure(String index, String type, String id, Exception cause, RestStatus status) { + this(index, type, id, cause, status, SequenceNumbersService.UNASSIGNED_SEQ_NO); + } + + /** For write failures after operation was assigned a sequence number. */ + public Failure(String index, String type, String id, Exception cause, long seqNo) { + this(index, type, id, cause, ExceptionsHelper.status(cause), seqNo); + } + + public Failure(String index, String type, String id, Exception cause, RestStatus status, long seqNo) { this.index = index; this.type = type; this.id = id; this.cause = cause; this.status = status; - } - - public Failure(String index, String type, String id, Exception cause) { - this(index, type, id, cause, ExceptionsHelper.status(cause)); + this.seqNo = seqNo; } /** @@ -193,6 +212,11 @@ public Failure(StreamInput in) throws IOException { id = in.readOptionalString(); cause = in.readException(); status = ExceptionsHelper.status(cause); + if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + seqNo = in.readZLong(); + } else { + seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; + } } @Override @@ -201,6 +225,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(getType()); out.writeOptionalString(getId()); out.writeException(getCause()); + if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + out.writeZLong(getSeqNo()); + } } @@ -246,6 +273,15 @@ public Exception getCause() { return cause; } + /** + * The operation sequence number generated by primary + * NOTE: {@link SequenceNumbersService#UNASSIGNED_SEQ_NO} + * indicates sequence number was not generated by primary + */ + public long getSeqNo() { + return seqNo; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field(INDEX_FIELD, index); diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 697f4c2f9938e..170f2d3053627 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -23,6 +23,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteRequest; @@ -43,7 +44,6 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; @@ -65,13 +65,9 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.index.translog.Translog.Location; -import org.elasticsearch.action.bulk.BulkItemResultHolder; -import org.elasticsearch.action.bulk.BulkItemResponse; import java.io.IOException; import java.util.Map; -import java.util.Objects; import java.util.function.LongSupplier; /** Performs shard-level bulk (index, delete or update) operations */ @@ -113,12 +109,20 @@ protected boolean resolveIndex() { @Override public WritePrimaryResult shardOperationOnPrimary( BulkShardRequest request, IndexShard primary) throws Exception { + return performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, new ConcreteMappingUpdatePerformer()); + } + + public static WritePrimaryResult performOnPrimary( + BulkShardRequest request, + IndexShard primary, + UpdateHelper updateHelper, + LongSupplier nowInMillisSupplier, + MappingUpdatePerformer mappingUpdater) throws Exception { final IndexMetaData metaData = primary.indexSettings().getIndexMetaData(); Translog.Location location = null; - final MappingUpdatePerformer mappingUpdater = new ConcreteMappingUpdatePerformer(); for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) { location = executeBulkItemRequest(metaData, primary, request, location, requestIndex, - updateHelper, threadPool::absoluteTimeInMillis, mappingUpdater); + updateHelper, nowInMillisSupplier, mappingUpdater); } BulkItemResponse[] responses = new BulkItemResponse[request.items().length]; BulkItemRequest[] items = request.items(); @@ -129,7 +133,6 @@ public WritePrimaryResult shardOperationOnP return new WritePrimaryResult<>(request, response, location, null, primary, logger); } - private static BulkItemResultHolder executeIndexRequest(final IndexRequest indexRequest, final BulkItemRequest bulkItemRequest, final IndexShard primary, @@ -208,7 +211,8 @@ static BulkItemResponse createPrimaryResponse(BulkItemResultHolder bulkItemResul // Make sure to use request.index() here, if you // use docWriteRequest.index() it will use the // concrete index instead of an alias if used! - new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(), failure)); + new BulkItemResponse.Failure(request.index(), docWriteRequest.type(), docWriteRequest.id(), + failure, operationResult.getSeqNo())); } else { assert replicaRequest.getPrimaryResponse() != null : "replica request must have a primary response"; return null; @@ -221,7 +225,7 @@ static Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSha BulkShardRequest request, Translog.Location location, int requestIndex, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier, - final MappingUpdatePerformer mappingUpdater) throws Exception { + final MappingUpdatePerformer mappingUpdater) throws Exception { final DocWriteRequest itemRequest = request.items()[requestIndex].request(); final DocWriteRequest.OpType opType = itemRequest.opType(); final BulkItemResultHolder responseHolder; @@ -358,58 +362,129 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq return new BulkItemResultHolder(updateResponse, updateOperationResult, replicaRequest); } - static boolean shouldExecuteReplicaItem(final BulkItemRequest request, final int index) { + /** Modes for executing item request on replica depending on corresponding primary execution result */ + public enum ReplicaItemExecutionMode { + + /** + * When primary execution succeeded + */ + NORMAL, + + /** + * When primary execution failed before sequence no was generated + * or primary execution was a noop (only possible when request is originating from pre-6.0 nodes) + */ + NOOP, + + /** + * When primary execution failed after sequence no was generated + */ + FAILURE + } + + static { + assert Version.CURRENT.minimumCompatibilityVersion().after(Version.V_5_0_0) == false: + "Remove logic handling NoOp result from primary response; see TODO in replicaItemExecutionMode" + + " as the current minimum compatible version [" + + Version.CURRENT.minimumCompatibilityVersion() + "] is after 5.0"; + } + + /** + * Determines whether a bulk item request should be executed on the replica. + * @return {@link ReplicaItemExecutionMode#NORMAL} upon normal primary execution with no failures + * {@link ReplicaItemExecutionMode#FAILURE} upon primary execution failure after sequence no generation + * {@link ReplicaItemExecutionMode#NOOP} upon primary execution failure before sequence no generation or + * when primary execution resulted in noop (only possible for write requests from pre-6.0 nodes) + */ + static ReplicaItemExecutionMode replicaItemExecutionMode(final BulkItemRequest request, final int index) { final BulkItemResponse primaryResponse = request.getPrimaryResponse(); - assert primaryResponse != null : "expected primary response to be set for item [" + index + "] request ["+ request.request() +"]"; - return primaryResponse.isFailed() == false && - primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP; + assert primaryResponse != null : "expected primary response to be set for item [" + index + "] request [" + request.request() + "]"; + if (primaryResponse.isFailed()) { + return primaryResponse.getFailure().getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO + ? ReplicaItemExecutionMode.FAILURE // we have a seq no generated with the failure, replicate as no-op + : ReplicaItemExecutionMode.NOOP; // no seq no generated, ignore replication + } else { + // NOTE: write requests originating from pre-6.0 nodes can send a no-op operation to + // the replica; we ignore replication + // TODO: remove noOp result check from primary response, when pre-6.0 nodes are not supported + // we should return ReplicationItemExecutionMode.NORMAL instead + return primaryResponse.getResponse().getResult() != DocWriteResponse.Result.NOOP + ? ReplicaItemExecutionMode.NORMAL // execution successful on primary + : ReplicaItemExecutionMode.NOOP; // ignore replication + } } @Override public WriteReplicaResult shardOperationOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { + final Translog.Location location = performOnReplica(request, replica); + return new WriteReplicaResult<>(request, location, null, replica, logger); + } + + public static Translog.Location performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { Translog.Location location = null; for (int i = 0; i < request.items().length; i++) { BulkItemRequest item = request.items()[i]; - if (shouldExecuteReplicaItem(item, i)) { - DocWriteRequest docWriteRequest = item.request(); - DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse(); - final Engine.Result operationResult; - try { - switch (docWriteRequest.opType()) { - case CREATE: - case INDEX: - operationResult = executeIndexRequestOnReplica(primaryResponse, (IndexRequest) docWriteRequest, replica); - break; - case DELETE: - operationResult = executeDeleteRequestOnReplica(primaryResponse, (DeleteRequest) docWriteRequest, replica); - break; - default: - throw new IllegalStateException("Unexpected request operation type on replica: " - + docWriteRequest.opType().getLowercase()); - } - if (operationResult.hasFailure()) { - // check if any transient write operation failures should be bubbled up - Exception failure = operationResult.getFailure(); - assert failure instanceof VersionConflictEngineException - || failure instanceof MapperParsingException - : "expected any one of [version conflict, mapper parsing, engine closed, index shard closed]" + - " failures. got " + failure; - if (!TransportActions.isShardNotAvailableException(failure)) { - throw failure; + final Engine.Result operationResult; + DocWriteRequest docWriteRequest = item.request(); + try { + switch (replicaItemExecutionMode(item, i)) { + case NORMAL: + final DocWriteResponse primaryResponse = item.getPrimaryResponse().getResponse(); + switch (docWriteRequest.opType()) { + case CREATE: + case INDEX: + operationResult = executeIndexRequestOnReplica(primaryResponse, (IndexRequest) docWriteRequest, replica); + break; + case DELETE: + operationResult = executeDeleteRequestOnReplica(primaryResponse, (DeleteRequest) docWriteRequest, replica); + break; + default: + throw new IllegalStateException("Unexpected request operation type on replica: " + + docWriteRequest.opType().getLowercase()); } - } else { - location = locationToSync(location, operationResult.getTranslogLocation()); - } - } catch (Exception e) { - // if its not an ignore replica failure, we need to make sure to bubble up the failure - // so we will fail the shard - if (!TransportActions.isShardNotAvailableException(e)) { - throw e; - } + assert operationResult != null : "operation result must never be null when primary response has no failure"; + location = syncOperationResultOrThrow(operationResult, location); + break; + case NOOP: + break; + case FAILURE: + final BulkItemResponse.Failure failure = item.getPrimaryResponse().getFailure(); + assert failure.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO : "seq no must be assigned"; + operationResult = executeFailureNoOpOnReplica(failure, replica); + assert operationResult != null : "operation result must never be null when primary response has no failure"; + location = syncOperationResultOrThrow(operationResult, location); + break; + default: + throw new IllegalStateException("illegal replica item execution mode for: " + item.request()); + } + } catch (Exception e) { + // if its not an ignore replica failure, we need to make sure to bubble up the failure + // so we will fail the shard + if (!TransportActions.isShardNotAvailableException(e)) { + throw e; } } } - return new WriteReplicaResult<>(request, location, null, replica, logger); + return location; + } + + /** Syncs operation result to the translog or throws a shard not available failure */ + private static Translog.Location syncOperationResultOrThrow(final Engine.Result operationResult, + final Translog.Location currentLocation) throws Exception { + final Translog.Location location; + if (operationResult.hasFailure()) { + // check if any transient write operation failures should be bubbled up + Exception failure = operationResult.getFailure(); + assert failure instanceof MapperParsingException : "expected mapper parsing failures. got " + failure; + if (!TransportActions.isShardNotAvailableException(failure)) { + throw failure; + } else { + location = currentLocation; + } + } else { + location = locationToSync(currentLocation, operationResult.getTranslogLocation()); + } + return location; } private static Translog.Location locationToSync(Translog.Location current, @@ -429,7 +504,7 @@ private static Translog.Location locationToSync(Translog.Location current, * Execute the given {@link IndexRequest} on a replica shard, throwing a * {@link RetryOnReplicaException} if the operation needs to be re-tried. */ - public static Engine.IndexResult executeIndexRequestOnReplica( + private static Engine.IndexResult executeIndexRequestOnReplica( DocWriteResponse primaryResponse, IndexRequest request, IndexShard replica) throws IOException { @@ -472,7 +547,7 @@ static Engine.Index prepareIndexOperationOnReplica( } /** Utility method to prepare an index operation on primary shards */ - static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) { + private static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard primary) { final SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.index(), request.type(), request.id(), request.source(), request.getContentType()) @@ -482,8 +557,8 @@ static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexSh } /** Executes index operation on primary shard after updates mapping if dynamic mappings are found */ - public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary, - MappingUpdatePerformer mappingUpdater) throws Exception { + static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary, + MappingUpdatePerformer mappingUpdater) throws Exception { // Update the mappings if parsing the documents includes new dynamic updates final Engine.Index preUpdateOperation; final Mapping mappingUpdate; @@ -533,6 +608,12 @@ private static Engine.DeleteResult executeDeleteRequestOnReplica(DocWriteRespons return replica.delete(delete); } + private static Engine.NoOpResult executeFailureNoOpOnReplica(BulkItemResponse.Failure primaryFailure, IndexShard replica) throws IOException { + final Engine.NoOp noOp = replica.prepareMarkingSeqNoAsNoOp( + primaryFailure.getSeqNo(), primaryFailure.getMessage()); + return replica.markSeqNoAsNoOp(noOp); + } + class ConcreteMappingUpdatePerformer implements MappingUpdatePerformer { public void updateMappings(final Mapping update, final ShardId shardId, diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index ae4ae78c03386..938e90b82b2fb 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -94,8 +94,10 @@ protected abstract WriteReplicaResult shardOperationOnReplica( /** * Result of taking the action on the primary. + * + * NOTE: public for testing */ - protected static class WritePrimaryResult, + public static class WritePrimaryResult, Response extends ReplicationResponse & WriteResponse> extends PrimaryResult implements RespondingWriteResult { boolean finishedAsyncActions; diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 59655abf2894c..45b731cd9cff1 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -363,7 +363,6 @@ public Operation.TYPE getOperationType() { void setTranslogLocation(Translog.Location translogLocation) { if (freeze.get() == null) { - assert failure == null : "failure has to be null to set translog location"; this.translogLocation = translogLocation; } else { throw new IllegalStateException("result is already frozen"); @@ -432,7 +431,7 @@ public boolean isFound() { } - static class NoOpResult extends Result { + public static class NoOpResult extends Result { NoOpResult(long seqNo) { super(Operation.TYPE.NO_OP, 0, seqNo); @@ -1154,24 +1153,31 @@ public String reason() { return reason; } - public NoOp( - final Term uid, - final long seqNo, - final long primaryTerm, - final long version, - final VersionType versionType, - final Origin origin, - final long startTime, - final String reason) { - super(uid, seqNo, primaryTerm, version, versionType, origin, startTime); + public NoOp(final long seqNo, final long primaryTerm, final Origin origin, final long startTime, final String reason) { + super(null, seqNo, primaryTerm, Versions.NOT_FOUND, null, origin, startTime); this.reason = reason; } + @Override + public Term uid() { + throw new UnsupportedOperationException(); + } + @Override public String type() { throw new UnsupportedOperationException(); } + @Override + public long version() { + throw new UnsupportedOperationException(); + } + + @Override + public VersionType versionType() { + throw new UnsupportedOperationException(); + } + @Override String id() { throw new UnsupportedOperationException(); diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 0bed51e0e24a1..544b68add136f 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -614,10 +614,16 @@ public IndexResult index(Index index) throws IOException { indexResult = new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted); } - if (indexResult.hasFailure() == false && - index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { - Translog.Location location = - translog.add(new Translog.Index(index, indexResult)); + if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { + final Translog.Location location; + if (indexResult.hasFailure() == false) { + location = translog.add(new Translog.Index(index, indexResult)); + } else if (indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + // if we have document failure, record it as a no-op in the translog with the generated seq_no + location = translog.add(new Translog.NoOp(indexResult.getSeqNo(), index.primaryTerm(), indexResult.getFailure().getMessage())); + } else { + location = null; + } indexResult.setTranslogLocation(location); } if (indexResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { @@ -749,7 +755,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) * we return a `MATCH_ANY` version to indicate no document was index. The value is * not used anyway */ - return new IndexResult(ex, Versions.MATCH_ANY, index.seqNo()); + return new IndexResult(ex, Versions.MATCH_ANY, plan.seqNoForIndexing); } else { throw ex; } @@ -900,10 +906,16 @@ public DeleteResult delete(Delete delete) throws IOException { deleteResult = new DeleteResult(plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false); } - if (!deleteResult.hasFailure() && - delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { - Translog.Location location = - translog.add(new Translog.Delete(delete, deleteResult)); + if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { + final Translog.Location location; + if (deleteResult.hasFailure() == false) { + location = translog.add(new Translog.Delete(delete, deleteResult)); + } else if (deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + location = translog.add(new Translog.NoOp(deleteResult.getSeqNo(), + delete.primaryTerm(), deleteResult.getFailure().getMessage())); + } else { + location = null; + } deleteResult.setTranslogLocation(location); } if (deleteResult.getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 1dee58ced002b..589572fff3fe6 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -569,12 +569,21 @@ private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOExc return result; } + public Engine.NoOp prepareMarkingSeqNoAsNoOp(long seqNo, String reason) { + verifyReplicationTarget(); + long startTime = System.nanoTime(); + return new Engine.NoOp(seqNo, primaryTerm, Engine.Operation.Origin.REPLICA, startTime, reason); + } + + public Engine.NoOpResult markSeqNoAsNoOp(Engine.NoOp noOp) throws IOException { + ensureWriteAllowed(noOp); + Engine engine = getEngine(); + return engine.noOp(noOp); + } + public Engine.Delete prepareDeleteOnPrimary(String type, String id, long version, VersionType versionType) { verifyPrimary(); - final DocumentMapper documentMapper = docMapper(type).getDocumentMapper(); - final MappedFieldType uidFieldType = documentMapper.uidMapper().fieldType(); - final Query uidQuery = uidFieldType.termQuery(Uid.createUid(type, id), null); - final Term uid = MappedFieldType.extractTerm(uidQuery); + final Term uid = extractUid(type, id); return prepareDelete(type, id, uid, SequenceNumbersService.UNASSIGNED_SEQ_NO, primaryTerm, version, versionType, Engine.Operation.Origin.PRIMARY); } @@ -582,15 +591,12 @@ public Engine.Delete prepareDeleteOnPrimary(String type, String id, long version public Engine.Delete prepareDeleteOnReplica(String type, String id, long seqNo, long primaryTerm, long version, VersionType versionType) { verifyReplicationTarget(); - final DocumentMapper documentMapper = docMapper(type).getDocumentMapper(); - final MappedFieldType uidFieldType = documentMapper.uidMapper().fieldType(); - final Query uidQuery = uidFieldType.termQuery(Uid.createUid(type, id), null); - final Term uid = MappedFieldType.extractTerm(uidQuery); + final Term uid = extractUid(type, id); return prepareDelete(type, id, uid, seqNo, primaryTerm, version, versionType, Engine.Operation.Origin.REPLICA); } - static Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, - VersionType versionType, Engine.Operation.Origin origin) { + private static Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, + VersionType versionType, Engine.Operation.Origin origin) { long startTime = System.nanoTime(); return new Engine.Delete(type, id, uid, seqNo, primaryTerm, version, versionType, origin, startTime); } @@ -601,6 +607,13 @@ public Engine.DeleteResult delete(Engine.Delete delete) throws IOException { return delete(engine, delete); } + private Term extractUid(String type, String id) { + final DocumentMapper documentMapper = docMapper(type).getDocumentMapper(); + final MappedFieldType uidFieldType = documentMapper.uidMapper().fieldType(); + final Query uidQuery = uidFieldType.termQuery(Uid.createUid(type, id), null); + return MappedFieldType.extractTerm(uidQuery); + } + private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) throws IOException { active.set(true); final Engine.DeleteResult result; diff --git a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java index d5aadc1664ea4..8842cbf3c0bd4 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java +++ b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java @@ -23,7 +23,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.IgnoreOnRecoveryEngineException; import org.elasticsearch.index.mapper.DocumentMapperForType; @@ -31,7 +30,6 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.Uid; -import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.rest.RestStatus; @@ -182,7 +180,7 @@ private void performRecoveryOperation(Engine engine, Translog.Operation operatio final String reason = noOp.reason(); logger.trace("[translog] recover [no_op] op [({}, {})] of [{}]", seqNo, primaryTerm, reason); final Engine.NoOp engineNoOp = - new Engine.NoOp(null, seqNo, primaryTerm, 0, VersionType.INTERNAL, origin, System.nanoTime(), reason); + new Engine.NoOp(seqNo, primaryTerm, origin, System.nanoTime(), reason); noOp(engine, engineNoOp); break; default: diff --git a/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 355b3978cbf46..4016c2cbdef2b 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -23,7 +23,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder; +import org.elasticsearch.action.bulk.TransportShardBulkAction.ReplicaItemExecutionMode; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; @@ -34,14 +34,9 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Requests; -import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.lucene.uid.Versions; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.Index; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.VersionConflictEngineException; @@ -52,15 +47,12 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.action.bulk.TransportShardBulkAction; -import org.elasticsearch.action.bulk.MappingUpdatePerformer; -import org.elasticsearch.action.bulk.BulkItemResultHolder; +import org.mockito.ArgumentCaptor; import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.elasticsearch.action.bulk.TransportShardBulkAction.replicaItemExecutionMode; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.Matchers.containsString; @@ -96,26 +88,38 @@ public void testShouldExecuteReplicaItem() throws Exception { DocWriteResponse response = new IndexResponse(shardId, "type", "id", 1, 1, randomBoolean()); BulkItemRequest request = new BulkItemRequest(0, writeRequest); request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, response)); - assertTrue(TransportShardBulkAction.shouldExecuteReplicaItem(request, 0)); + assertThat(replicaItemExecutionMode(request, 0), + equalTo(ReplicaItemExecutionMode.NORMAL)); - // Failed index requests should not be replicated (for now!) + // Failed index requests without sequence no should not be replicated writeRequest = new IndexRequest("index", "type", "id") .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar"); - response = new IndexResponse(shardId, "type", "id", 1, 1, randomBoolean()); request = new BulkItemRequest(0, writeRequest); request.setPrimaryResponse( new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, new BulkItemResponse.Failure("index", "type", "id", new IllegalArgumentException("i died")))); - assertFalse(TransportShardBulkAction.shouldExecuteReplicaItem(request, 0)); + assertThat(replicaItemExecutionMode(request, 0), + equalTo(ReplicaItemExecutionMode.NOOP)); + // Failed index requests with sequence no should be replicated + request = new BulkItemRequest(0, writeRequest); + request.setPrimaryResponse( + new BulkItemResponse(0, DocWriteRequest.OpType.INDEX, + new BulkItemResponse.Failure("index", "type", "id", + new IllegalArgumentException( + "i died after sequence no was generated"), + 1))); + assertThat(replicaItemExecutionMode(request, 0), + equalTo(ReplicaItemExecutionMode.FAILURE)); // NOOP requests should not be replicated writeRequest = new UpdateRequest("index", "type", "id"); response = new UpdateResponse(shardId, "type", "id", 1, DocWriteResponse.Result.NOOP); request = new BulkItemRequest(0, writeRequest); request.setPrimaryResponse(new BulkItemResponse(0, DocWriteRequest.OpType.UPDATE, response)); - assertFalse(TransportShardBulkAction.shouldExecuteReplicaItem(request, 0)); + assertThat(replicaItemExecutionMode(request, 0), + equalTo(ReplicaItemExecutionMode.NOOP)); } @@ -515,6 +519,35 @@ public void testCalculateTranslogLocation() throws Exception { } + public void testNoOpReplicationOnPrimaryDocumentFailure() throws Exception { + final IndexShard shard = spy(newStartedShard(false)); + BulkItemRequest itemRequest = new BulkItemRequest(0, + new IndexRequest("index", "type") + .source(Requests.INDEX_CONTENT_TYPE, "foo", "bar") + ); + final String failureMessage = "simulated primary failure"; + itemRequest.setPrimaryResponse(new BulkItemResponse(0, + randomFrom( + DocWriteRequest.OpType.CREATE, + DocWriteRequest.OpType.DELETE, + DocWriteRequest.OpType.INDEX + ), + new BulkItemResponse.Failure("index", "type", "1", + new IOException(failureMessage), 1L) + )); + BulkItemRequest[] itemRequests = new BulkItemRequest[1]; + itemRequests[0] = itemRequest; + BulkShardRequest bulkShardRequest = new BulkShardRequest( + shard.shardId(), RefreshPolicy.NONE, itemRequests); + TransportShardBulkAction.performOnReplica(bulkShardRequest, shard); + ArgumentCaptor noOp = ArgumentCaptor.forClass(Engine.NoOp.class); + verify(shard, times(1)).markSeqNoAsNoOp(noOp.capture()); + final Engine.NoOp noOpValue = noOp.getValue(); + assertThat(noOpValue.seqNo(), equalTo(1L)); + assertThat(noOpValue.reason(), containsString(failureMessage)); + closeShards(shard); + } + public void testMappingUpdateParsesCorrectNumberOfTimes() throws Exception { IndexMetaData metaData = indexMetaData(); logger.info("--> metadata.getIndex(): {}", metaData.getIndex()); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 71d754ddfb6ca..af53c4997fde0 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -2857,10 +2857,13 @@ public void testHandleDocumentFailure() throws Exception { } Engine.IndexResult indexResult = engine.index(indexForDoc(doc1)); assertNotNull(indexResult.getFailure()); - + // document failures should be recorded in translog + assertNotNull(indexResult.getTranslogLocation()); throwingIndexWriter.get().clearFailure(); indexResult = engine.index(indexForDoc(doc1)); assertNull(indexResult.getFailure()); + // document failures should be recorded in translog + assertNotNull(indexResult.getTranslogLocation()); engine.index(indexForDoc(doc2)); // test failure while deleting @@ -3672,12 +3675,9 @@ public long generateSeqNo() { final String reason = randomAlphaOfLength(16); noOpEngine.noOp( new Engine.NoOp( - null, - maxSeqNo + 1, + maxSeqNo + 1, primaryTerm, - 0, - VersionType.INTERNAL, - randomFrom(PRIMARY, REPLICA, PEER_RECOVERY, LOCAL_TRANSLOG_RECOVERY), + randomFrom(PRIMARY, REPLICA, PEER_RECOVERY, LOCAL_TRANSLOG_RECOVERY), System.nanoTime(), reason)); assertThat(noOpEngine.seqNoService().getLocalCheckpoint(), equalTo((long) (maxSeqNo + 1))); diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index c35f72d208533..2243a5769b99a 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -22,21 +22,21 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.bulk.BulkItemRequest; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.bulk.BulkShardResponse; +import org.elasticsearch.action.bulk.TransportShardBulkAction; import org.elasticsearch.action.bulk.TransportShardBulkActionTests; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction.ReplicaResponse; +import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.action.support.replication.TransportWriteActionTestHelper; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -50,7 +50,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; -import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; @@ -58,6 +57,7 @@ import org.elasticsearch.index.shard.IndexShardTestCase; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.RecoveryTarget; @@ -77,8 +77,6 @@ import java.util.stream.Collectors; import java.util.stream.StreamSupport; -import static org.elasticsearch.action.bulk.TransportShardBulkAction.executeIndexRequestOnPrimary; -import static org.elasticsearch.action.bulk.TransportShardBulkAction.executeIndexRequestOnReplica; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -147,9 +145,13 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { public int indexDocs(final int numOfDoc) throws Exception { for (int doc = 0; doc < numOfDoc; doc++) { final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", Integer.toString(docId.incrementAndGet())) - .source("{}", XContentType.JSON); - final IndexResponse response = index(indexRequest); - assertEquals(DocWriteResponse.Result.CREATED, response.getResult()); + .source("{}", XContentType.JSON); + final BulkItemResponse response = index(indexRequest); + if (response.isFailed()) { + throw response.getFailure().getCause(); + } else { + assertEquals(DocWriteResponse.Result.CREATED, response.getResponse().getResult()); + } } primary.updateGlobalCheckpointOnPrimary(); return numOfDoc; @@ -158,43 +160,29 @@ public int indexDocs(final int numOfDoc) throws Exception { public int appendDocs(final int numOfDoc) throws Exception { for (int doc = 0; doc < numOfDoc; doc++) { final IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON); - final IndexResponse response = index(indexRequest); - assertEquals(DocWriteResponse.Result.CREATED, response.getResult()); + final BulkItemResponse response = index(indexRequest); + if (response.isFailed()) { + throw response.getFailure().getCause(); + } else if (response.isFailed() == false) { + assertEquals(DocWriteResponse.Result.CREATED, response.getResponse().getResult()); + } } primary.updateGlobalCheckpointOnPrimary(); return numOfDoc; } - public IndexResponse index(IndexRequest indexRequest) throws Exception { - PlainActionFuture listener = new PlainActionFuture<>(); + public BulkItemResponse index(IndexRequest indexRequest) throws Exception { + PlainActionFuture listener = new PlainActionFuture<>(); final ActionListener wrapBulkListener = ActionListener.wrap( - bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0].getResponse()), + bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0]), listener::onFailure); BulkItemRequest[] items = new BulkItemRequest[1]; - items[0] = new TestBulkItemRequest(0, indexRequest); + items[0] = new BulkItemRequest(0, indexRequest); BulkShardRequest request = new BulkShardRequest(shardId, indexRequest.getRefreshPolicy(), items); new IndexingAction(request, wrapBulkListener, this).execute(); return listener.get(); } - /** BulkItemRequest exposing get/set primary response */ - public class TestBulkItemRequest extends BulkItemRequest { - - TestBulkItemRequest(int id, DocWriteRequest request) { - super(id, request); - } - - @Override - protected void setPrimaryResponse(BulkItemResponse primaryResponse) { - super.setPrimaryResponse(primaryResponse); - } - - @Override - protected BulkItemResponse getPrimaryResponse() { - return super.getPrimaryResponse(); - } - } - public synchronized void startAll() throws IOException { startReplicas(replicas.size()); } @@ -442,7 +430,7 @@ protected Set getInSyncAllocationIds(ShardId shardId, ClusterState clust protected abstract PrimaryResult performOnPrimary(IndexShard primary, Request request) throws Exception; - protected abstract void performOnReplica(ReplicaRequest request, IndexShard replica) throws IOException; + protected abstract void performOnReplica(ReplicaRequest request, IndexShard replica) throws Exception; class PrimaryRef implements ReplicationOperation.Primary { @@ -539,46 +527,53 @@ class IndexingAction extends ReplicationAction result = executeShardBulkOnPrimary(primary, request); + return new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful); } @Override - protected void performOnReplica(BulkShardRequest request, IndexShard replica) throws IOException { - final ReplicationGroup.TestBulkItemRequest bulkItemRequest = ((ReplicationGroup.TestBulkItemRequest) request.items()[0]); - final DocWriteResponse primaryResponse = bulkItemRequest.getPrimaryResponse().getResponse(); - indexOnReplica(primaryResponse, ((IndexRequest) bulkItemRequest.request()), replica); + protected void performOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { + executeShardBulkOnReplica(replica, request); + } + } + + private TransportWriteAction.WritePrimaryResult executeShardBulkOnPrimary(IndexShard primary, BulkShardRequest request) throws Exception { + for (BulkItemRequest itemRequest : request.items()) { + if (itemRequest.request() instanceof IndexRequest) { + ((IndexRequest) itemRequest.request()).process(null, index.getName()); + } } + final TransportWriteAction.WritePrimaryResult result = + TransportShardBulkAction.performOnPrimary(request, primary, null, + System::currentTimeMillis, new TransportShardBulkActionTests.NoopMappingUpdatePerformer()); + request.primaryTerm(primary.getPrimaryTerm()); + TransportWriteActionTestHelper.performPostWriteActions(primary, request, result.location, logger); + return result; + } + + private void executeShardBulkOnReplica(IndexShard replica, BulkShardRequest request) throws Exception { + final Translog.Location location = TransportShardBulkAction.performOnReplica(request, replica); + TransportWriteActionTestHelper.performPostWriteActions(replica, request, location, logger); } /** * indexes the given requests on the supplied primary, modifying it for replicas */ - protected IndexResponse indexOnPrimary(IndexRequest request, IndexShard primary) throws Exception { - final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary, - new TransportShardBulkActionTests.NoopMappingUpdatePerformer()); - request.primaryTerm(primary.getPrimaryTerm()); - TransportWriteActionTestHelper.performPostWriteActions(primary, request, indexResult.getTranslogLocation(), logger); - return new IndexResponse( - primary.shardId(), - request.type(), - request.id(), - indexResult.getSeqNo(), - indexResult.getVersion(), - indexResult.isCreated()); + BulkShardRequest indexOnPrimary(IndexRequest request, IndexShard primary) throws Exception { + final BulkItemRequest bulkItemRequest = new BulkItemRequest(0, request); + BulkItemRequest[] bulkItemRequests = new BulkItemRequest[1]; + bulkItemRequests[0] = bulkItemRequest; + final BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, request.getRefreshPolicy(), bulkItemRequests); + final TransportWriteAction.WritePrimaryResult result = + executeShardBulkOnPrimary(primary, bulkShardRequest); + return result.replicaRequest(); } /** * indexes the given requests on the supplied replica shard */ - protected void indexOnReplica(DocWriteResponse response, IndexRequest request, IndexShard replica) throws IOException { - final Engine.IndexResult result = executeIndexRequestOnReplica(response, request, replica); - TransportWriteActionTestHelper.performPostWriteActions(replica, request, result.getTranslogLocation(), logger); + void indexOnReplica(BulkShardRequest request, IndexShard replica) throws Exception { + executeShardBulkOnReplica(replica, request); } class GlobalCheckpointSync extends ReplicationAction future = shards.asyncRecoverReplica(replica, (indexShard, node) - -> new RecoveryTarget(indexShard, node, recoveryListener, version -> {}) { + -> new RecoveryTarget(indexShard, node, recoveryListener, version -> { + }) { @Override public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException { super.cleanFiles(totalTranslogOps, sourceMetaData); @@ -113,8 +122,8 @@ public void testInheritMaxValidAutoIDTimestampOnRecovery() throws Exception { shards.startAll(); final IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}", XContentType.JSON); indexRequest.onRetry(); // force an update of the timestamp - final IndexResponse response = shards.index(indexRequest); - assertEquals(DocWriteResponse.Result.CREATED, response.getResult()); + final BulkItemResponse response = shards.index(indexRequest); + assertEquals(DocWriteResponse.Result.CREATED, response.getResponse().getResult()); if (randomBoolean()) { // lets check if that also happens if no translog record is replicated shards.flush(); } @@ -147,7 +156,7 @@ public void testCheckpointsAdvance() throws Exception { final SeqNoStats shardStats = shard.seqNoStats(); final ShardRouting shardRouting = shard.routingEntry(); logger.debug("seq_no stats for {}: {}", shardRouting, XContentHelper.toString(shardStats, - new ToXContent.MapParams(Collections.singletonMap("pretty", "false")))); + new ToXContent.MapParams(Collections.singletonMap("pretty", "false")))); assertThat(shardRouting + " local checkpoint mismatch", shardStats.getLocalCheckpoint(), equalTo(numDocs - 1L)); assertThat(shardRouting + " global checkpoint mismatch", shardStats.getGlobalCheckpoint(), equalTo(numDocs - 1L)); @@ -158,7 +167,7 @@ public void testCheckpointsAdvance() throws Exception { public void testConflictingOpsOnReplica() throws Exception { Map mappings = - Collections.singletonMap("type", "{ \"type\": { \"properties\": { \"f\": { \"type\": \"keyword\"} }}}"); + Collections.singletonMap("type", "{ \"type\": { \"properties\": { \"f\": { \"type\": \"keyword\"} }}}"); try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(2, mappings))) { shards.startAll(); IndexShard replica1 = shards.getReplicas().get(0); @@ -180,4 +189,128 @@ public void testConflictingOpsOnReplica() throws Exception { } } } + + /** + * test document failures (failures after seq_no generation) are added as noop operation to the translog + * for primary and replica shards + */ + public void testDocumentFailureReplication() throws Exception { + final String failureMessage = "simulated document failure"; + final ThrowingDocumentFailureEngineFactory throwingDocumentFailureEngineFactory = + new ThrowingDocumentFailureEngineFactory(failureMessage); + try (ReplicationGroup shards = new ReplicationGroup(buildIndexMetaData(0)) { + @Override + protected EngineFactory getEngineFactory(ShardRouting routing) { + return throwingDocumentFailureEngineFactory; + }}) { + + // test only primary + shards.startPrimary(); + BulkItemResponse response = shards.index( + new IndexRequest(index.getName(), "testDocumentFailureReplication", "1") + .source("{}", XContentType.JSON) + ); + assertTrue(response.isFailed()); + assertNoOpTranslogOperationForDocumentFailure(shards, 1, failureMessage); + shards.assertAllEqual(0); + + // add some replicas + int nReplica = randomIntBetween(1, 3); + for (int i = 0; i < nReplica; i++) { + shards.addReplica(); + } + shards.startReplicas(nReplica); + response = shards.index( + new IndexRequest(index.getName(), "testDocumentFailureReplication", "1") + .source("{}", XContentType.JSON) + ); + assertTrue(response.isFailed()); + assertNoOpTranslogOperationForDocumentFailure(shards, 2, failureMessage); + shards.assertAllEqual(0); + } + } + + /** + * test request failures (failures before seq_no generation) are not added as a noop to translog + */ + public void testRequestFailureReplication() throws Exception { + try (ReplicationGroup shards = createGroup(0)) { + shards.startAll(); + BulkItemResponse response = shards.index( + new IndexRequest(index.getName(), "testRequestFailureException", "1") + .source("{}", XContentType.JSON) + .version(2) + ); + assertTrue(response.isFailed()); + assertThat(response.getFailure().getCause(), instanceOf(VersionConflictEngineException.class)); + shards.assertAllEqual(0); + for (IndexShard indexShard : shards) { + try(Translog.View view = indexShard.acquireTranslogView()) { + assertThat(view.totalOperations(), equalTo(0)); + } + } + + // add some replicas + int nReplica = randomIntBetween(1, 3); + for (int i = 0; i < nReplica; i++) { + shards.addReplica(); + } + shards.startReplicas(nReplica); + response = shards.index( + new IndexRequest(index.getName(), "testRequestFailureException", "1") + .source("{}", XContentType.JSON) + .version(2) + ); + assertTrue(response.isFailed()); + assertThat(response.getFailure().getCause(), instanceOf(VersionConflictEngineException.class)); + shards.assertAllEqual(0); + for (IndexShard indexShard : shards) { + try(Translog.View view = indexShard.acquireTranslogView()) { + assertThat(view.totalOperations(), equalTo(0)); + } + } + } + } + + /** Throws documentFailure on every indexing operation */ + static class ThrowingDocumentFailureEngineFactory implements EngineFactory { + final String documentFailureMessage; + + ThrowingDocumentFailureEngineFactory(String documentFailureMessage) { + this.documentFailureMessage = documentFailureMessage; + } + + @Override + public Engine newReadWriteEngine(EngineConfig config) { + return InternalEngineTests.createInternalEngine((directory, writerConfig) -> + new IndexWriter(directory, writerConfig) { + @Override + public long addDocument(Iterable doc) throws IOException { + assert documentFailureMessage != null; + throw new IOException(documentFailureMessage); + } + }, null, config); + } + } + + private static void assertNoOpTranslogOperationForDocumentFailure( + Iterable replicationGroup, + int expectedOperation, + String failureMessage) throws IOException { + for (IndexShard indexShard : replicationGroup) { + try(Translog.View view = indexShard.acquireTranslogView()) { + assertThat(view.totalOperations(), equalTo(expectedOperation)); + final Translog.Snapshot snapshot = view.snapshot(); + long expectedSeqNo = 0L; + Translog.Operation op = snapshot.next(); + do { + assertThat(op.opType(), equalTo(Translog.Operation.Type.NO_OP)); + assertThat(op.seqNo(), equalTo(expectedSeqNo)); + assertThat(((Translog.NoOp) op).reason(), containsString(failureMessage)); + op = snapshot.next(); + expectedSeqNo++; + } while (op != null); + } + } + } } diff --git a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index 12f749e681918..139c7f500d8d7 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -24,9 +24,9 @@ import org.apache.lucene.index.IndexableField; import org.apache.lucene.util.IOUtils; import org.elasticsearch.action.admin.indices.flush.FlushRequest; +import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.index.engine.Engine; @@ -168,8 +168,8 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { for (int i = 0; i < rollbackDocs; i++) { final IndexRequest indexRequest = new IndexRequest(index.getName(), "type", "rollback_" + i) .source("{}", XContentType.JSON); - final IndexResponse primaryResponse = indexOnPrimary(indexRequest, oldPrimary); - indexOnReplica(primaryResponse, indexRequest, replica); + final BulkShardRequest bulkShardRequest = indexOnPrimary(indexRequest, oldPrimary); + indexOnReplica(bulkShardRequest, replica); } if (randomBoolean()) { oldPrimary.flush(new FlushRequest(index.getName())); diff --git a/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java index f0be775306725..6ef40a7778236 100644 --- a/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java +++ b/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -41,6 +41,7 @@ import java.util.Map; import java.util.stream.Collectors; +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiOfLength; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; import static org.hamcrest.Matchers.anyOf; @@ -76,7 +77,7 @@ private int indexDocs(String index, final int idStart, final int numDocs) throws for (int i = 0; i < numDocs; i++) { final int id = idStart + i; assertOK(client().performRequest("PUT", index + "/test/" + id, emptyMap(), - new StringEntity("{\"test\": \"test_" + id + "\"}", ContentType.APPLICATION_JSON))); + new StringEntity("{\"test\": \"test_" + randomAsciiOfLength(2) + "\"}", ContentType.APPLICATION_JSON))); } return numDocs; } @@ -116,7 +117,7 @@ public void testIndexVersionPropagation() throws Exception { .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2) .put("index.routing.allocation.include._name", bwcNames); - final String index = "test"; + final String index = "indexversionprop"; final int minUpdates = 5; final int maxUpdates = 10; createIndex(index, settings.build()); @@ -130,7 +131,9 @@ public void testIndexVersionPropagation() throws Exception { updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name")); ensureGreen(); assertOK(client().performRequest("POST", index + "/_refresh")); - List shards = buildShards(nodes, newNodeClient); + List shards = buildShards(index, nodes, newNodeClient); + Shard primary = buildShards(index, nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); + logger.info("primary resolved to: " + primary.getNode().getNodeName()); for (Shard shard : shards) { assertVersion(index, 1, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc1); assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 1); @@ -140,13 +143,15 @@ public void testIndexVersionPropagation() throws Exception { logger.info("indexing docs with [{}] concurrent updates after allowing shards on all nodes", nUpdates); final int finalVersionForDoc2 = indexDocWithConcurrentUpdates(index, 2, nUpdates); assertOK(client().performRequest("POST", index + "/_refresh")); - shards = buildShards(nodes, newNodeClient); + shards = buildShards(index, nodes, newNodeClient); + primary = shards.stream().filter(Shard::isPrimary).findFirst().get(); + logger.info("primary resolved to: " + primary.getNode().getNodeName()); for (Shard shard : shards) { assertVersion(index, 2, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc2); assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 2); } - Shard primary = buildShards(nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); + primary = shards.stream().filter(Shard::isPrimary).findFirst().get(); logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName()); updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName())); ensureGreen(); @@ -154,7 +159,7 @@ public void testIndexVersionPropagation() throws Exception { logger.info("indexing docs with [{}] concurrent updates after moving primary", nUpdates); final int finalVersionForDoc3 = indexDocWithConcurrentUpdates(index, 3, nUpdates); assertOK(client().performRequest("POST", index + "/_refresh")); - shards = buildShards(nodes, newNodeClient); + shards = buildShards(index, nodes, newNodeClient); for (Shard shard : shards) { assertVersion(index, 3, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc3); assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 3); @@ -167,7 +172,7 @@ public void testIndexVersionPropagation() throws Exception { logger.info("indexing doc with [{}] concurrent updates after setting number of replicas to 0", nUpdates); final int finalVersionForDoc4 = indexDocWithConcurrentUpdates(index, 4, nUpdates); assertOK(client().performRequest("POST", index + "/_refresh")); - shards = buildShards(nodes, newNodeClient); + shards = buildShards(index, nodes, newNodeClient); for (Shard shard : shards) { assertVersion(index, 4, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc4); assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 4); @@ -180,7 +185,7 @@ public void testIndexVersionPropagation() throws Exception { logger.info("indexing doc with [{}] concurrent updates after setting number of replicas to 1", nUpdates); final int finalVersionForDoc5 = indexDocWithConcurrentUpdates(index, 5, nUpdates); assertOK(client().performRequest("POST", index + "/_refresh")); - shards = buildShards(nodes, newNodeClient); + shards = buildShards(index, nodes, newNodeClient); for (Shard shard : shards) { assertVersion(index, 5, "_only_nodes:" + shard.getNode().getNodeName(), finalVersionForDoc5); assertCount(index, "_only_nodes:" + shard.getNode().getNodeName(), 5); @@ -216,7 +221,7 @@ public void testSeqNoCheckpoints() throws Exception { final int numberOfInitialDocs = 1 + randomInt(5); logger.info("indexing [{}] docs initially", numberOfInitialDocs); numDocs += indexDocs(index, 0, numberOfInitialDocs); - assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient); + assertSeqNoOnShards(index, nodes, checkGlobalCheckpoints, 0, newNodeClient); logger.info("allowing shards on all nodes"); updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name")); ensureGreen(); @@ -227,8 +232,8 @@ public void testSeqNoCheckpoints() throws Exception { final int numberOfDocsAfterAllowingShardsOnAllNodes = 1 + randomInt(5); logger.info("indexing [{}] docs after allowing shards on all nodes", numberOfDocsAfterAllowingShardsOnAllNodes); numDocs += indexDocs(index, numDocs, numberOfDocsAfterAllowingShardsOnAllNodes); - assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient); - Shard primary = buildShards(nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); + assertSeqNoOnShards(index, nodes, checkGlobalCheckpoints, 0, newNodeClient); + Shard primary = buildShards(index, nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); logger.info("moving primary to new node by excluding {}", primary.getNode().getNodeName()); updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName())); ensureGreen(); @@ -237,7 +242,7 @@ public void testSeqNoCheckpoints() throws Exception { logger.info("indexing [{}] docs after moving primary", numberOfDocsAfterMovingPrimary); numDocsOnNewPrimary += indexDocs(index, numDocs, numberOfDocsAfterMovingPrimary); numDocs += numberOfDocsAfterMovingPrimary; - assertSeqNoOnShards(nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient); + assertSeqNoOnShards(index, nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient); /* * Dropping the number of replicas to zero, and then increasing it to one triggers a recovery thus exercising any BWC-logic in * the recovery code. @@ -255,7 +260,7 @@ public void testSeqNoCheckpoints() throws Exception { // the number of documents on the primary and on the recovered replica should match the number of indexed documents assertCount(index, "_primary", numDocs); assertCount(index, "_replica", numDocs); - assertSeqNoOnShards(nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient); + assertSeqNoOnShards(index, nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient); } } @@ -274,10 +279,11 @@ private void assertVersion(final String index, final int docId, final String pre assertThat("version mismatch for doc [" + docId + "] preference [" + preference + "]", actualVersion, equalTo(expectedVersion)); } - private void assertSeqNoOnShards(Nodes nodes, boolean checkGlobalCheckpoints, int numDocs, RestClient client) throws Exception { + private void assertSeqNoOnShards(String index, Nodes nodes, boolean checkGlobalCheckpoints, int numDocs, RestClient client) + throws Exception { assertBusy(() -> { try { - List shards = buildShards(nodes, client); + List shards = buildShards(index, nodes, client); Shard primaryShard = shards.stream().filter(Shard::isPrimary).findFirst().get(); assertNotNull("failed to find primary shard", primaryShard); final long expectedGlobalCkp; @@ -311,9 +317,9 @@ private void assertSeqNoOnShards(Nodes nodes, boolean checkGlobalCheckpoints, in }); } - private List buildShards(Nodes nodes, RestClient client) throws IOException { - Response response = client.performRequest("GET", "test/_stats", singletonMap("level", "shards")); - List shardStats = ObjectPath.createFromResponse(response).evaluate("indices.test.shards.0"); + private List buildShards(String index, Nodes nodes, RestClient client) throws IOException { + Response response = client.performRequest("GET", index + "/_stats", singletonMap("level", "shards")); + List shardStats = ObjectPath.createFromResponse(response).evaluate("indices." + index + ".shards.0"); ArrayList shards = new ArrayList<>(); for (Object shard : shardStats) { final String nodeId = ObjectPath.evaluate(shard, "routing.node");