Skip to content

Commit

Permalink
Add BWC layer to seq no infra and enable BWC tests (#22185)
Browse files Browse the repository at this point in the history
Sequence BWC logic consists of two elements:

1) Wire level BWC using stream versions.
2) A changed to the global checkpoint maintenance semantics.

For the sequence number infra to work with a mixed version clusters, we have to consider situation where the primary is on an old node and replicas are on new ones (i.e., the replicas will receive operations without seq#) and also the reverse (i.e., the primary sends operations to a replica but the replica can't process the seq# and respond with local checkpoint). An new primary with an old replica is a rare because we do not allow a replica to recover from a new primary. However, it can occur if the old primary failed and a new replica was promoted or during primary relocation where the source primary is treated as a replica until the master starts the target.

1) Old Primary & New Replica - this case is easy as is taken care of by the wire level BWC. All incoming requests will have their seq# set to `UNASSIGNED_SEQ_NO`, which doesn't confuse the local checkpoint logic (keeping it at `NO_OPS_PERFORMED`) 
2) New Primary & Old replica - this one is trickier as the global checkpoint service currently takes all in sync replicas into consideration for the global checkpoint calculation. In order to deal with old replicas, we change the semantics to say all *new node* in sync replicas. That means the replicas on old nodes don't count for the global checkpointing. In this state the seq# infra is not fully operational (you can't search on it, because copies may miss it) but it is maintained on shards that can support it. The old replicas will have to go through a file based recovery at some point and will get the seq# information at that point. There is still an edge case where a new primary fails and an old replica takes over. I'lll discuss this one with @ywelsch as I prefer to avoid it completely.

This PR also re-enables the BWC tests which were disabled. As such it had to fix any BWC issue that had crept in. Most notably an issue with the removal of the `timestamp` field in #21670.

The commit also includes a fix for the default value of the seq number field in replicated write requests (it was 0 but should be -2), that surface some other minor bugs which are fixed as well.

Last - I added some debugging tools like more sane node names and forcing replication request to implement a `toString`
  • Loading branch information
bleskes committed Dec 19, 2016
1 parent b58bbb9 commit b857b31
Show file tree
Hide file tree
Showing 27 changed files with 519 additions and 89 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@ dependency-reduced-pom.xml
# osx stuff
.DS_Store

# default folders in which the create_bwc_index.py expects to find old es versions in
/backwards
/dev-tools/backwards

# needed in case docs build is run...maybe we can configure doc build to generate files under build?
html_docs

# random old stuff that we should look at the necessity of...
/tmp/
backwards/
eclipse-build

Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ class ClusterFormationTasks {
static Task configureWriteConfigTask(String name, Project project, Task setup, NodeInfo node, NodeInfo seedNode) {
Map esConfig = [
'cluster.name' : node.clusterName,
'node.name' : "node-" + node.nodeNum,
'pidfile' : node.pidFile,
'path.repo' : "${node.sharedDir}/repo",
'path.shared_data' : "${node.sharedDir}/",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.action;

import org.elasticsearch.Version;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.support.WriteResponse;
Expand Down Expand Up @@ -214,7 +215,11 @@ public void readFrom(StreamInput in) throws IOException {
type = in.readString();
id = in.readString();
version = in.readZLong();
seqNo = in.readZLong();
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
seqNo = in.readZLong();
} else {
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
forcedRefresh = in.readBoolean();
result = Result.readFrom(in);
}
Expand All @@ -226,7 +231,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(type);
out.writeString(id);
out.writeZLong(version);
out.writeZLong(seqNo);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeZLong(seqNo);
}
out.writeBoolean(forcedRefresh);
result.writeTo(out);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,6 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public String toString() {
return "flush {" + super.toString() + "}";
return "flush {" + shardId + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.indices.stats;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -103,7 +104,9 @@ public void readFrom(StreamInput in) throws IOException {
statePath = in.readString();
dataPath = in.readString();
isCustomDataPath = in.readBoolean();
seqNoStats = in.readOptionalWriteable(SeqNoStats::new);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
seqNoStats = in.readOptionalWriteable(SeqNoStats::new);
}
}

@Override
Expand All @@ -114,7 +117,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(statePath);
out.writeString(dataPath);
out.writeBoolean(isCustomDataPath);
out.writeOptionalWriteable(seqNoStats);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeOptionalWriteable(seqNoStats);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
Expand Down Expand Up @@ -151,7 +150,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
final long version = indexResult.getVersion();
indexRequest.version(version);
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
indexRequest.seqNo(indexResult.getSeqNo());
indexRequest.setSeqNo(indexResult.getSeqNo());
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(), indexResult.getSeqNo(),
indexResult.getVersion(), indexResult.isCreated());
Expand All @@ -175,7 +174,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
// update the request with the version so it will go to the replicas
deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery());
deleteRequest.version(deleteResult.getVersion());
deleteRequest.seqNo(deleteResult.getSeqNo());
deleteRequest.setSeqNo(deleteResult.getSeqNo());
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());
response = new DeleteResponse(request.shardId(), deleteRequest.type(), deleteRequest.id(), deleteResult.getSeqNo(),
deleteResult.getVersion(), deleteResult.isFound());
Expand Down Expand Up @@ -286,7 +285,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
final long version = updateOperationResult.getVersion();
indexRequest.version(version);
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
indexRequest.seqNo(updateOperationResult.getSeqNo());
indexRequest.setSeqNo(updateOperationResult.getSeqNo());
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
}
break;
Expand All @@ -297,7 +296,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
// update the request with the version so it will go to the replicas
deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery());
deleteRequest.version(updateOperationResult.getVersion());
deleteRequest.seqNo(updateOperationResult.getSeqNo());
deleteRequest.setSeqNo(updateOperationResult.getSeqNo());
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());
}
break;
Expand Down Expand Up @@ -349,9 +348,9 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
break;
}
assert (replicaRequest.request() instanceof IndexRequest
&& ((IndexRequest) replicaRequest.request()).seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) ||
&& ((IndexRequest) replicaRequest.request()).getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) ||
(replicaRequest.request() instanceof DeleteRequest
&& ((DeleteRequest) replicaRequest.request()).seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO);
&& ((DeleteRequest) replicaRequest.request()).getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO);
// successful operation
break; // out of retry loop
} else if (updateOperationResult.getFailure() instanceof VersionConflictEngineException == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.delete;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
Expand All @@ -39,7 +40,6 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -125,12 +125,14 @@ protected DeleteResponse newResponseInstance() {
protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, IndexShard primary) throws Exception {
final Engine.DeleteResult result = executeDeleteRequestOnPrimary(request, primary);
final DeleteResponse response;
final DeleteRequest replicaRequest;
if (result.hasFailure() == false) {
// update the request with the version so it will go to the replicas
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
request.version(result.getVersion());
request.seqNo(result.getSeqNo());
request.setSeqNo(result.getSeqNo());
assert request.versionType().validateVersionForWrites(request.version());
replicaRequest = request;
response = new DeleteResponse(
primary.shardId(),
request.type(),
Expand All @@ -140,8 +142,9 @@ protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, Inde
result.isFound());
} else {
response = null;
replicaRequest = null;
}
return new WritePrimaryResult(request, response, result.getTranslogLocation(), result.getFailure(), primary);
return new WritePrimaryResult(replicaRequest, response, result.getTranslogLocation(), result.getFailure(), primary);
}

@Override
Expand All @@ -158,7 +161,7 @@ public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest re

public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) {
final Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(),
request.seqNo(), request.primaryTerm(), request.version(), request.versionType());
request.getSeqNo(), request.primaryTerm(), request.version(), request.versionType());
return replica.delete(delete);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(routing);
out.writeOptionalString(parent);
if (out.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeOptionalString(null);
// Serialize a fake timestamp. 5.x expect this value to be set by the #process method so we can't use null.
// On the other hand, indices created on 5.x do not index the timestamp field. Therefore passing a 0 (or any value) for
// the transport layer OK as it will be ignored.
out.writeOptionalString("0");
out.writeOptionalWriteable(null);
}
out.writeBytesReference(source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,19 +165,22 @@ protected IndexResponse newResponseInstance() {
protected WritePrimaryResult shardOperationOnPrimary(IndexRequest request, IndexShard primary) throws Exception {
final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary, mappingUpdatedAction);
final IndexResponse response;
final IndexRequest replicaRequest;
if (indexResult.hasFailure() == false) {
// update the version on request so it will happen on the replicas
final long version = indexResult.getVersion();
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
request.seqNo(indexResult.getSeqNo());
request.setSeqNo(indexResult.getSeqNo());
assert request.versionType().validateVersionForWrites(request.version());
replicaRequest = request;
response = new IndexResponse(primary.shardId(), request.type(), request.id(), indexResult.getSeqNo(),
indexResult.getVersion(), indexResult.isCreated());
} else {
response = null;
replicaRequest = null;
}
return new WritePrimaryResult(request, response, indexResult.getTranslogLocation(), indexResult.getFailure(), primary);
return new WritePrimaryResult(replicaRequest, response, indexResult.getTranslogLocation(), indexResult.getFailure(), primary);
}

@Override
Expand All @@ -197,9 +200,9 @@ public static Engine.IndexResult executeIndexRequestOnReplica(IndexRequest reque

final Engine.Index operation;
try {
operation = replica.prepareIndexOnReplica(sourceToParse, request.seqNo(), request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
operation = replica.prepareIndexOnReplica(sourceToParse, request.getSeqNo(), request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
} catch (MapperParsingException e) {
return new Engine.IndexResult(e, request.version(), request.seqNo());
return new Engine.IndexResult(e, request.version(), request.getSeqNo());
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
Expand All @@ -221,7 +224,7 @@ public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest reque
try {
operation = prepareIndexOperationOnPrimary(request, primary);
} catch (MapperParsingException | IllegalArgumentException e) {
return new Engine.IndexResult(e, request.version(), request.seqNo());
return new Engine.IndexResult(e, request.version(), request.getSeqNo());
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final ShardId shardId = primary.shardId();
Expand All @@ -232,12 +235,12 @@ public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest reque
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update);
} catch (IllegalArgumentException e) {
// throws IAE on conflicts merging dynamic mappings
return new Engine.IndexResult(e, request.version(), request.seqNo());
return new Engine.IndexResult(e, request.version(), request.getSeqNo());
}
try {
operation = prepareIndexOperationOnPrimary(request, primary);
} catch (MapperParsingException | IllegalArgumentException e) {
return new Engine.IndexResult(e, request.version(), request.seqNo());
return new Engine.IndexResult(e, request.version(), request.getSeqNo());
}
update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,9 @@ public BasicReplicationRequest() {
public BasicReplicationRequest(ShardId shardId) {
super(shardId);
}

@Override
public String toString() {
return "BasicReplicationRequest{" + shardId + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

package org.elasticsearch.action.support.replication;

import org.elasticsearch.Version;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
Expand All @@ -36,6 +38,8 @@
public abstract class ReplicatedWriteRequest<R extends ReplicatedWriteRequest<R>> extends ReplicationRequest<R> implements WriteRequest<R> {
private RefreshPolicy refreshPolicy = RefreshPolicy.NONE;

private long seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;

/**
* Constructor for deserialization.
*/
Expand All @@ -62,11 +66,32 @@ public RefreshPolicy getRefreshPolicy() {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
refreshPolicy = RefreshPolicy.readFrom(in);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
seqNo = in.readZLong();
} else {
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
refreshPolicy.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeZLong(seqNo);
}
}

/**
* Returns the sequence number for this operation. The sequence number is assigned while the operation
* is performed on the primary shard.
*/
public long getSeqNo() {
return seqNo;
}

/** sets the sequence number for this operation. should only be called on the primary shard */
public void setSeqNo(long seqNo) {
this.seqNo = seqNo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ protected List<ShardRouting> getShards(ShardId shardId, ClusterState state) {
}

private void decPendingAndFinishIfNeeded() {
assert pendingActions.get() > 0;
assert pendingActions.get() > 0 : "pending action count goes below 0 for request [" + request + "]";
if (pendingActions.decrementAndGet() == 0) {
finish();
}
Expand Down
Loading

0 comments on commit b857b31

Please sign in to comment.