Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BWC layer to seq no infra and enable BWC tests #22185

Merged
merged 24 commits into from
Dec 19, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a9d297d
wire level compatibility
bleskes Dec 9, 2016
adabd89
bwc test
bleskes Dec 10, 2016
4cacc8b
strengthen test
bleskes Dec 12, 2016
c2cb646
only account for shards on new nodes for global checkpoints
bleskes Dec 12, 2016
02b223c
fix timestamp for now as it makes assertion fail
bleskes Dec 12, 2016
2db7210
linting
bleskes Dec 12, 2016
634d92f
improve assertion message
bleskes Dec 12, 2016
4ea5dd9
line length
bleskes Dec 12, 2016
26596ff
Merge remote-tracking branch 'upstream/master' into seq_no_bwc
bleskes Dec 13, 2016
001f7de
add skip version to cat.shards help test
bleskes Dec 13, 2016
cc9a486
force all replication requests to have toString
bleskes Dec 14, 2016
f2ca825
missing else :(
bleskes Dec 14, 2016
a945030
Merge remote-tracking branch 'upstream/master' into seq_no_bwc
bleskes Dec 14, 2016
f4dada9
Merge remote-tracking branch 'upstream/master' into seq_no_bwc
bleskes Dec 14, 2016
059638b
feedback and nocommit removal
bleskes Dec 15, 2016
2a15b15
feedback
bleskes Dec 16, 2016
647d5ee
update gitignore to explicitly point to backwards release folder (ins…
bleskes Dec 16, 2016
76338bf
Merge remote-tracking branch 'upstream/master' into seq_no_bwc
bleskes Dec 16, 2016
896482f
Merge remote-tracking branch 'upstream/master' into seq_no_bwc
bleskes Dec 17, 2016
f73d165
fix compilation
bleskes Dec 17, 2016
2cc2610
don't replicate on failures (like version conflicts)
bleskes Dec 17, 2016
bc60297
Merge remote-tracking branch 'upstream/master' into seq_no_bwc
bleskes Dec 18, 2016
2ecbac0
Merge remote-tracking branch 'upstream/master' into seq_no_bwc
bleskes Dec 19, 2016
7577b12
put back diamond operator for ecplise
bleskes Dec 19, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@ dependency-reduced-pom.xml
# osx stuff
.DS_Store

# default folders in which the create_bwc_index.py expects to find old es versions in
/backwards
/dev-tools/backwards

# needed in case docs build is run...maybe we can configure doc build to generate files under build?
html_docs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? this is where our bwc index creation python tool stores their versions?

Copy link
Contributor Author

@bleskes bleskes Dec 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm... maybe we need a different solution then - the problem is that with this line the this file was ignored by git.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

call the folder bwc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@s1monw I pushed another commit updating gitignore to be more explicit about the backwords folder (rather than using a global glob pattern). I also updated the comments around it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++


# random old stuff that we should look at the necessity of...
/tmp/
backwards/
eclipse-build

Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ class ClusterFormationTasks {
static Task configureWriteConfigTask(String name, Project project, Task setup, NodeInfo node, NodeInfo seedNode) {
Map esConfig = [
'cluster.name' : node.clusterName,
'node.name' : "node-" + node.nodeNum,
'pidfile' : node.pidFile,
'path.repo' : "${node.sharedDir}/repo",
'path.shared_data' : "${node.sharedDir}/",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.action;

import org.elasticsearch.Version;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
import org.elasticsearch.action.support.WriteResponse;
Expand Down Expand Up @@ -214,7 +215,11 @@ public void readFrom(StreamInput in) throws IOException {
type = in.readString();
id = in.readString();
version = in.readZLong();
seqNo = in.readZLong();
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
seqNo = in.readZLong();
} else {
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
forcedRefresh = in.readBoolean();
result = Result.readFrom(in);
}
Expand All @@ -226,7 +231,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(type);
out.writeString(id);
out.writeZLong(version);
out.writeZLong(seqNo);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeZLong(seqNo);
}
out.writeBoolean(forcedRefresh);
result.writeTo(out);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,6 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public String toString() {
return "flush {" + super.toString() + "}";
return "flush {" + shardId + "}";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.indices.stats;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -103,7 +104,9 @@ public void readFrom(StreamInput in) throws IOException {
statePath = in.readString();
dataPath = in.readString();
isCustomDataPath = in.readBoolean();
seqNoStats = in.readOptionalWriteable(SeqNoStats::new);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
seqNoStats = in.readOptionalWriteable(SeqNoStats::new);
}
}

@Override
Expand All @@ -114,7 +117,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(statePath);
out.writeString(dataPath);
out.writeBoolean(isCustomDataPath);
out.writeOptionalWriteable(seqNoStats);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeOptionalWriteable(seqNoStats);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
Expand Down Expand Up @@ -151,7 +150,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
final long version = indexResult.getVersion();
indexRequest.version(version);
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
indexRequest.seqNo(indexResult.getSeqNo());
indexRequest.setSeqNo(indexResult.getSeqNo());
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(), indexResult.getSeqNo(),
indexResult.getVersion(), indexResult.isCreated());
Expand All @@ -175,7 +174,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh
// update the request with the version so it will go to the replicas
deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery());
deleteRequest.version(deleteResult.getVersion());
deleteRequest.seqNo(deleteResult.getSeqNo());
deleteRequest.setSeqNo(deleteResult.getSeqNo());
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());
response = new DeleteResponse(request.shardId(), deleteRequest.type(), deleteRequest.id(), deleteResult.getSeqNo(),
deleteResult.getVersion(), deleteResult.isFound());
Expand Down Expand Up @@ -286,7 +285,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
final long version = updateOperationResult.getVersion();
indexRequest.version(version);
indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery());
indexRequest.seqNo(updateOperationResult.getSeqNo());
indexRequest.setSeqNo(updateOperationResult.getSeqNo());
assert indexRequest.versionType().validateVersionForWrites(indexRequest.version());
}
break;
Expand All @@ -297,7 +296,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
// update the request with the version so it will go to the replicas
deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery());
deleteRequest.version(updateOperationResult.getVersion());
deleteRequest.seqNo(updateOperationResult.getSeqNo());
deleteRequest.setSeqNo(updateOperationResult.getSeqNo());
assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version());
}
break;
Expand Down Expand Up @@ -349,9 +348,9 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind
break;
}
assert (replicaRequest.request() instanceof IndexRequest
&& ((IndexRequest) replicaRequest.request()).seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) ||
&& ((IndexRequest) replicaRequest.request()).getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) ||
(replicaRequest.request() instanceof DeleteRequest
&& ((DeleteRequest) replicaRequest.request()).seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO);
&& ((DeleteRequest) replicaRequest.request()).getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO);
// successful operation
break; // out of retry loop
} else if (updateOperationResult.getFailure() instanceof VersionConflictEngineException == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.delete;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
Expand All @@ -39,7 +40,6 @@
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -125,12 +125,14 @@ protected DeleteResponse newResponseInstance() {
protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, IndexShard primary) throws Exception {
final Engine.DeleteResult result = executeDeleteRequestOnPrimary(request, primary);
final DeleteResponse response;
final DeleteRequest replicaRequest;
if (result.hasFailure() == false) {
// update the request with the version so it will go to the replicas
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
request.version(result.getVersion());
request.seqNo(result.getSeqNo());
request.setSeqNo(result.getSeqNo());
assert request.versionType().validateVersionForWrites(request.version());
replicaRequest = request;
response = new DeleteResponse(
primary.shardId(),
request.type(),
Expand All @@ -140,8 +142,9 @@ protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, Inde
result.isFound());
} else {
response = null;
replicaRequest = null;
}
return new WritePrimaryResult(request, response, result.getTranslogLocation(), result.getFailure(), primary);
return new WritePrimaryResult(replicaRequest, response, result.getTranslogLocation(), result.getFailure(), primary);
}

@Override
Expand All @@ -158,7 +161,7 @@ public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest re

public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) {
final Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(),
request.seqNo(), request.primaryTerm(), request.version(), request.versionType());
request.getSeqNo(), request.primaryTerm(), request.version(), request.versionType());
return replica.delete(delete);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(routing);
out.writeOptionalString(parent);
if (out.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeOptionalString(null);
// Serialize a fake timestamp. 5.x expect this value to be set by the #process method so we can't use null.
// On the other hand, indices created on 5.x do not index the timestamp field. Therefore passing a 0 (or any value) for
// the transport layer OK as it will be ignored.
out.writeOptionalString("0");
out.writeOptionalWriteable(null);
}
out.writeBytesReference(source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,19 +165,22 @@ protected IndexResponse newResponseInstance() {
protected WritePrimaryResult shardOperationOnPrimary(IndexRequest request, IndexShard primary) throws Exception {
final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary, mappingUpdatedAction);
final IndexResponse response;
final IndexRequest replicaRequest;
if (indexResult.hasFailure() == false) {
// update the version on request so it will happen on the replicas
final long version = indexResult.getVersion();
request.version(version);
request.versionType(request.versionType().versionTypeForReplicationAndRecovery());
request.seqNo(indexResult.getSeqNo());
request.setSeqNo(indexResult.getSeqNo());
assert request.versionType().validateVersionForWrites(request.version());
replicaRequest = request;
response = new IndexResponse(primary.shardId(), request.type(), request.id(), indexResult.getSeqNo(),
indexResult.getVersion(), indexResult.isCreated());
} else {
response = null;
replicaRequest = null;
}
return new WritePrimaryResult(request, response, indexResult.getTranslogLocation(), indexResult.getFailure(), primary);
return new WritePrimaryResult(replicaRequest, response, indexResult.getTranslogLocation(), indexResult.getFailure(), primary);
}

@Override
Expand All @@ -197,9 +200,9 @@ public static Engine.IndexResult executeIndexRequestOnReplica(IndexRequest reque

final Engine.Index operation;
try {
operation = replica.prepareIndexOnReplica(sourceToParse, request.seqNo(), request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
operation = replica.prepareIndexOnReplica(sourceToParse, request.getSeqNo(), request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
} catch (MapperParsingException e) {
return new Engine.IndexResult(e, request.version(), request.seqNo());
return new Engine.IndexResult(e, request.version(), request.getSeqNo());
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
Expand All @@ -221,7 +224,7 @@ public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest reque
try {
operation = prepareIndexOperationOnPrimary(request, primary);
} catch (MapperParsingException | IllegalArgumentException e) {
return new Engine.IndexResult(e, request.version(), request.seqNo());
return new Engine.IndexResult(e, request.version(), request.getSeqNo());
}
Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
final ShardId shardId = primary.shardId();
Expand All @@ -232,12 +235,12 @@ public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest reque
mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update);
} catch (IllegalArgumentException e) {
// throws IAE on conflicts merging dynamic mappings
return new Engine.IndexResult(e, request.version(), request.seqNo());
return new Engine.IndexResult(e, request.version(), request.getSeqNo());
}
try {
operation = prepareIndexOperationOnPrimary(request, primary);
} catch (MapperParsingException | IllegalArgumentException e) {
return new Engine.IndexResult(e, request.version(), request.seqNo());
return new Engine.IndexResult(e, request.version(), request.getSeqNo());
}
update = operation.parsedDoc().dynamicMappingsUpdate();
if (update != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,9 @@ public BasicReplicationRequest() {
public BasicReplicationRequest(ShardId shardId) {
super(shardId);
}

@Override
public String toString() {
return "BasicReplicationRequest{" + shardId + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

package org.elasticsearch.action.support.replication;

import org.elasticsearch.Version;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
Expand All @@ -36,6 +38,8 @@
public abstract class ReplicatedWriteRequest<R extends ReplicatedWriteRequest<R>> extends ReplicationRequest<R> implements WriteRequest<R> {
private RefreshPolicy refreshPolicy = RefreshPolicy.NONE;

private long seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;

/**
* Constructor for deserialization.
*/
Expand All @@ -62,11 +66,32 @@ public RefreshPolicy getRefreshPolicy() {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
refreshPolicy = RefreshPolicy.readFrom(in);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
seqNo = in.readZLong();
} else {
seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO;
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
refreshPolicy.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) {
out.writeZLong(seqNo);
}
}

/**
* Returns the sequence number for this operation. The sequence number is assigned while the operation
* is performed on the primary shard.
*/
public long getSeqNo() {
return seqNo;
}

/** sets the sequence number for this operation. should only be called on the primary shard */
public void setSeqNo(long seqNo) {
this.seqNo = seqNo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ protected List<ShardRouting> getShards(ShardId shardId, ClusterState state) {
}

private void decPendingAndFinishIfNeeded() {
assert pendingActions.get() > 0;
assert pendingActions.get() > 0 : "pending action count goes below 0 for request [" + request + "]";
if (pendingActions.decrementAndGet() == 0) {
finish();
}
Expand Down
Loading