diff --git a/.gitignore b/.gitignore index b4ec8795057e2..5d7dbbefdc8b8 100644 --- a/.gitignore +++ b/.gitignore @@ -38,11 +38,14 @@ dependency-reduced-pom.xml # osx stuff .DS_Store +# default folders in which the create_bwc_index.py expects to find old es versions in +/backwards +/dev-tools/backwards + # needed in case docs build is run...maybe we can configure doc build to generate files under build? html_docs # random old stuff that we should look at the necessity of... /tmp/ -backwards/ eclipse-build diff --git a/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy b/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy index 74cae08298bcb..4c6771ccda7c8 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/gradle/test/ClusterFormationTasks.groovy @@ -268,6 +268,7 @@ class ClusterFormationTasks { static Task configureWriteConfigTask(String name, Project project, Task setup, NodeInfo node, NodeInfo seedNode) { Map esConfig = [ 'cluster.name' : node.clusterName, + 'node.name' : "node-" + node.nodeNum, 'pidfile' : node.pidFile, 'path.repo' : "${node.sharedDir}/repo", 'path.shared_data' : "${node.sharedDir}/", diff --git a/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java b/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java index 7a12ab8ace255..aef99494d9265 100644 --- a/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java +++ b/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.action; +import org.elasticsearch.Version; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.WriteResponse; @@ -214,7 +215,11 @@ public void readFrom(StreamInput in) throws IOException { type = in.readString(); id = in.readString(); version = in.readZLong(); - seqNo = in.readZLong(); + if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + seqNo = in.readZLong(); + } else { + seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; + } forcedRefresh = in.readBoolean(); result = Result.readFrom(in); } @@ -226,7 +231,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(type); out.writeString(id); out.writeZLong(version); - out.writeZLong(seqNo); + if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + out.writeZLong(seqNo); + } out.writeBoolean(forcedRefresh); result.writeTo(out); } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java index 83eaf11ca3a9e..ac32b16eb5711 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/ShardFlushRequest.java @@ -58,6 +58,6 @@ public void writeTo(StreamOutput out) throws IOException { @Override public String toString() { - return "flush {" + super.toString() + "}"; + return "flush {" + shardId + "}"; } } diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java index 877db0579a0ca..150b7c6a52bc5 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/stats/ShardStats.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.stats; +import org.elasticsearch.Version; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; @@ -103,7 +104,9 @@ public void readFrom(StreamInput in) throws IOException { statePath = in.readString(); dataPath = in.readString(); isCustomDataPath = in.readBoolean(); - seqNoStats = in.readOptionalWriteable(SeqNoStats::new); + if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + seqNoStats = in.readOptionalWriteable(SeqNoStats::new); + } } @Override @@ -114,7 +117,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(statePath); out.writeString(dataPath); out.writeBoolean(isCustomDataPath); - out.writeOptionalWriteable(seqNoStats); + if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + out.writeOptionalWriteable(seqNoStats); + } } @Override diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index cef89e1ce7855..86024e4dcd592 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -50,7 +50,6 @@ import org.elasticsearch.index.engine.EngineClosedException; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.mapper.MapperParsingException; -import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardClosedException; @@ -151,7 +150,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh final long version = indexResult.getVersion(); indexRequest.version(version); indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery()); - indexRequest.seqNo(indexResult.getSeqNo()); + indexRequest.setSeqNo(indexResult.getSeqNo()); assert indexRequest.versionType().validateVersionForWrites(indexRequest.version()); response = new IndexResponse(primary.shardId(), indexRequest.type(), indexRequest.id(), indexResult.getSeqNo(), indexResult.getVersion(), indexResult.isCreated()); @@ -175,7 +174,7 @@ private Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSh // update the request with the version so it will go to the replicas deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery()); deleteRequest.version(deleteResult.getVersion()); - deleteRequest.seqNo(deleteResult.getSeqNo()); + deleteRequest.setSeqNo(deleteResult.getSeqNo()); assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version()); response = new DeleteResponse(request.shardId(), deleteRequest.type(), deleteRequest.id(), deleteResult.getSeqNo(), deleteResult.getVersion(), deleteResult.isFound()); @@ -286,7 +285,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind final long version = updateOperationResult.getVersion(); indexRequest.version(version); indexRequest.versionType(indexRequest.versionType().versionTypeForReplicationAndRecovery()); - indexRequest.seqNo(updateOperationResult.getSeqNo()); + indexRequest.setSeqNo(updateOperationResult.getSeqNo()); assert indexRequest.versionType().validateVersionForWrites(indexRequest.version()); } break; @@ -297,7 +296,7 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind // update the request with the version so it will go to the replicas deleteRequest.versionType(deleteRequest.versionType().versionTypeForReplicationAndRecovery()); deleteRequest.version(updateOperationResult.getVersion()); - deleteRequest.seqNo(updateOperationResult.getSeqNo()); + deleteRequest.setSeqNo(updateOperationResult.getSeqNo()); assert deleteRequest.versionType().validateVersionForWrites(deleteRequest.version()); } break; @@ -349,9 +348,9 @@ private UpdateResultHolder executeUpdateRequest(UpdateRequest updateRequest, Ind break; } assert (replicaRequest.request() instanceof IndexRequest - && ((IndexRequest) replicaRequest.request()).seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) || + && ((IndexRequest) replicaRequest.request()).getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO) || (replicaRequest.request() instanceof DeleteRequest - && ((DeleteRequest) replicaRequest.request()).seqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO); + && ((DeleteRequest) replicaRequest.request()).getSeqNo() != SequenceNumbersService.UNASSIGNED_SEQ_NO); // successful operation break; // out of retry loop } else if (updateOperationResult.getFailure() instanceof VersionConflictEngineException == false) { diff --git a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index 0d0d76c76919c..5601d54ea4740 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.delete; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; @@ -39,7 +40,6 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -125,12 +125,14 @@ protected DeleteResponse newResponseInstance() { protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, IndexShard primary) throws Exception { final Engine.DeleteResult result = executeDeleteRequestOnPrimary(request, primary); final DeleteResponse response; + final DeleteRequest replicaRequest; if (result.hasFailure() == false) { // update the request with the version so it will go to the replicas request.versionType(request.versionType().versionTypeForReplicationAndRecovery()); request.version(result.getVersion()); - request.seqNo(result.getSeqNo()); + request.setSeqNo(result.getSeqNo()); assert request.versionType().validateVersionForWrites(request.version()); + replicaRequest = request; response = new DeleteResponse( primary.shardId(), request.type(), @@ -140,8 +142,9 @@ protected WritePrimaryResult shardOperationOnPrimary(DeleteRequest request, Inde result.isFound()); } else { response = null; + replicaRequest = null; } - return new WritePrimaryResult(request, response, result.getTranslogLocation(), result.getFailure(), primary); + return new WritePrimaryResult(replicaRequest, response, result.getTranslogLocation(), result.getFailure(), primary); } @Override @@ -158,7 +161,7 @@ public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest re public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) { final Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(), - request.seqNo(), request.primaryTerm(), request.version(), request.versionType()); + request.getSeqNo(), request.primaryTerm(), request.version(), request.versionType()); return replica.delete(delete); } diff --git a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java index de80f85b89f17..5809280946c03 100644 --- a/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -524,7 +524,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(routing); out.writeOptionalString(parent); if (out.getVersion().before(Version.V_6_0_0_alpha1_UNRELEASED)) { - out.writeOptionalString(null); + // Serialize a fake timestamp. 5.x expect this value to be set by the #process method so we can't use null. + // On the other hand, indices created on 5.x do not index the timestamp field. Therefore passing a 0 (or any value) for + // the transport layer OK as it will be ignored. + out.writeOptionalString("0"); out.writeOptionalWriteable(null); } out.writeBytesReference(source); diff --git a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 70220679752ec..9ed9f7f7cd11d 100644 --- a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -165,19 +165,22 @@ protected IndexResponse newResponseInstance() { protected WritePrimaryResult shardOperationOnPrimary(IndexRequest request, IndexShard primary) throws Exception { final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary, mappingUpdatedAction); final IndexResponse response; + final IndexRequest replicaRequest; if (indexResult.hasFailure() == false) { // update the version on request so it will happen on the replicas final long version = indexResult.getVersion(); request.version(version); request.versionType(request.versionType().versionTypeForReplicationAndRecovery()); - request.seqNo(indexResult.getSeqNo()); + request.setSeqNo(indexResult.getSeqNo()); assert request.versionType().validateVersionForWrites(request.version()); + replicaRequest = request; response = new IndexResponse(primary.shardId(), request.type(), request.id(), indexResult.getSeqNo(), indexResult.getVersion(), indexResult.isCreated()); } else { response = null; + replicaRequest = null; } - return new WritePrimaryResult(request, response, indexResult.getTranslogLocation(), indexResult.getFailure(), primary); + return new WritePrimaryResult(replicaRequest, response, indexResult.getTranslogLocation(), indexResult.getFailure(), primary); } @Override @@ -197,9 +200,9 @@ public static Engine.IndexResult executeIndexRequestOnReplica(IndexRequest reque final Engine.Index operation; try { - operation = replica.prepareIndexOnReplica(sourceToParse, request.seqNo(), request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry()); + operation = replica.prepareIndexOnReplica(sourceToParse, request.getSeqNo(), request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry()); } catch (MapperParsingException e) { - return new Engine.IndexResult(e, request.version(), request.seqNo()); + return new Engine.IndexResult(e, request.version(), request.getSeqNo()); } Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); if (update != null) { @@ -221,7 +224,7 @@ public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest reque try { operation = prepareIndexOperationOnPrimary(request, primary); } catch (MapperParsingException | IllegalArgumentException e) { - return new Engine.IndexResult(e, request.version(), request.seqNo()); + return new Engine.IndexResult(e, request.version(), request.getSeqNo()); } Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); final ShardId shardId = primary.shardId(); @@ -232,12 +235,12 @@ public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest reque mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update); } catch (IllegalArgumentException e) { // throws IAE on conflicts merging dynamic mappings - return new Engine.IndexResult(e, request.version(), request.seqNo()); + return new Engine.IndexResult(e, request.version(), request.getSeqNo()); } try { operation = prepareIndexOperationOnPrimary(request, primary); } catch (MapperParsingException | IllegalArgumentException e) { - return new Engine.IndexResult(e, request.version(), request.seqNo()); + return new Engine.IndexResult(e, request.version(), request.getSeqNo()); } update = operation.parsedDoc().dynamicMappingsUpdate(); if (update != null) { diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java index f431c67b2904b..b4731d19e29e4 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/BasicReplicationRequest.java @@ -37,4 +37,9 @@ public BasicReplicationRequest() { public BasicReplicationRequest(ShardId shardId) { super(shardId); } + + @Override + public String toString() { + return "BasicReplicationRequest{" + shardId + "}"; + } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java index fa02dac9e1e2d..107c791a069eb 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicatedWriteRequest.java @@ -19,12 +19,14 @@ package org.elasticsearch.action.support.replication; +import org.elasticsearch.Version; import org.elasticsearch.action.bulk.BulkShardRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.ShardId; import java.io.IOException; @@ -36,6 +38,8 @@ public abstract class ReplicatedWriteRequest> extends ReplicationRequest implements WriteRequest { private RefreshPolicy refreshPolicy = RefreshPolicy.NONE; + private long seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; + /** * Constructor for deserialization. */ @@ -62,11 +66,32 @@ public RefreshPolicy getRefreshPolicy() { public void readFrom(StreamInput in) throws IOException { super.readFrom(in); refreshPolicy = RefreshPolicy.readFrom(in); + if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + seqNo = in.readZLong(); + } else { + seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; + } } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); refreshPolicy.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + out.writeZLong(seqNo); + } + } + + /** + * Returns the sequence number for this operation. The sequence number is assigned while the operation + * is performed on the primary shard. + */ + public long getSeqNo() { + return seqNo; + } + + /** sets the sequence number for this operation. should only be called on the primary shard */ + public void setSeqNo(long seqNo) { + this.seqNo = seqNo; } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 47284789850a7..25dcc29a5c3a3 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -283,7 +283,7 @@ protected List getShards(ShardId shardId, ClusterState state) { } private void decPendingAndFinishIfNeeded() { - assert pendingActions.get() > 0; + assert pendingActions.get() > 0 : "pending action count goes below 0 for request [" + request + "]"; if (pendingActions.decrementAndGet() == 0) { finish(); } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index d520b3d4e70ce..091f96c408f67 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -55,7 +55,6 @@ public abstract class ReplicationRequest(request, primary.allocationId().getId())); @@ -950,6 +951,8 @@ public void failShard(String reason, Exception e) { public PrimaryResult perform(Request request) throws Exception { PrimaryResult result = shardOperationOnPrimary(request, indexShard); if (result.replicaRequest() != null) { + assert result.finalFailure == null : "a replica request [" + result.replicaRequest() + + "] with a primary failure [" + result.finalFailure + "]"; result.replicaRequest().primaryTerm(indexShard.getPrimaryTerm()); } return result; @@ -983,16 +986,25 @@ public ReplicaResponse(String allocationId, long localCheckpoint) { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - localCheckpoint = in.readZLong(); - allocationId = in.readString(); + if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + super.readFrom(in); + localCheckpoint = in.readZLong(); + allocationId = in.readString(); + } else { + // 5.x used to read empty responses, which don't really read anything off the stream, so just do nothing. + } } @Override public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeZLong(localCheckpoint); - out.writeString(allocationId); + if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + super.writeTo(out); + out.writeZLong(localCheckpoint); + out.writeString(allocationId); + } else { + // we use to write empty responses + Empty.INSTANCE.writeTo(out); + } } @Override @@ -1016,10 +1028,9 @@ public void performOn(ShardRouting replica, ReplicaRequest request, ActionListen listener.onFailure(new NoNodeAvailableException("unknown node [" + nodeId + "]")); return; } - transportService.sendRequest(node, transportReplicaAction, - new ConcreteShardRequest<>(request, replica.allocationId().getId()), transportOptions, - // Eclipse can't handle when this is <> so we specify the type here. - new ActionListenerResponseHandler(listener, ReplicaResponse::new)); + final ConcreteShardRequest concreteShardRequest = + new ConcreteShardRequest<>(request, replica.allocationId().getId()); + sendReplicaRequest(concreteShardRequest, node, listener); } @Override @@ -1060,6 +1071,14 @@ public void onFailure(Exception shardFailedError) { } } + /** sends the given replica request to the supplied nodes */ + protected void sendReplicaRequest(ConcreteShardRequest concreteShardRequest, DiscoveryNode node, + ActionListener listener) { + transportService.sendRequest(node, transportReplicaAction, concreteShardRequest, transportOptions, + // Eclipse can't handle when this is <> so we specify the type here. + new ActionListenerResponseHandler(listener, ReplicaResponse::new)); + } + /** a wrapper class to encapsulate a request when being sent to a specific allocation id **/ public static final class ConcreteShardRequest extends TransportRequest { diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java index 3ea61385f1c3f..0f9db99326da8 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java @@ -204,7 +204,7 @@ public void writeTo(StreamOutput out) throws IOException { // timestamp out.writeBoolean(false); // enabled out.writeString(DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.format()); - out.writeOptionalString(null); + out.writeOptionalString("now"); // 5.x default out.writeOptionalBoolean(null); } out.writeBoolean(hasParentField()); diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index 8e87729831398..6e13573794d79 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -20,20 +20,21 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -41,8 +42,6 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.io.UncheckedIOException; -import java.io.UnsupportedEncodingException; public class GlobalCheckpointSyncAction extends TransportReplicationAction { @@ -65,6 +64,17 @@ protected ReplicationResponse newResponseInstance() { return new ReplicationResponse(); } + @Override + protected void sendReplicaRequest(ConcreteShardRequest concreteShardRequest, DiscoveryNode node, + ActionListener listener) { + if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + super.sendReplicaRequest(concreteShardRequest, node, listener); + } else { + listener.onResponse( + new ReplicaResponse(concreteShardRequest.getTargetAllocationID(), SequenceNumbersService.UNASSIGNED_SEQ_NO)); + } + } + @Override protected PrimaryResult shardOperationOnPrimary(PrimaryRequest request, IndexShard indexShard) throws Exception { long checkpoint = indexShard.getGlobalCheckpoint(); @@ -105,6 +115,11 @@ private PrimaryRequest() { public PrimaryRequest(ShardId shardId) { super(shardId); } + + @Override + public String toString() { + return "GlobalCkpSyncPrimary{" + shardId + "}"; + } } public static final class ReplicaRequest extends ReplicationRequest { @@ -134,6 +149,14 @@ public void writeTo(StreamOutput out) throws IOException { public long getCheckpoint() { return checkpoint; } + + @Override + public String toString() { + return "GlobalCkpSyncReplica{" + + "checkpoint=" + checkpoint + + ", shardId=" + shardId + + '}'; + } } } diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index a49c2a97cb0fc..f0d72f6c4c5d2 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.util.Supplier; import org.apache.lucene.store.LockObtainFailedException; import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; @@ -561,9 +562,15 @@ private void updateShard(DiscoveryNodes nodes, ShardRouting shardRouting, Shard shard.updateRoutingEntry(shardRouting); if (shardRouting.primary()) { IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId()); - Set activeIds = indexShardRoutingTable.activeShards().stream().map(r -> r.allocationId().getId()) + Set activeIds = indexShardRoutingTable.activeShards().stream() + // filter to shards that track seq# and should be taken into consideration for checkpoint tracking + // shards on old nodes will go through a file based recovery which will also transfer seq# information. + .filter(sr -> nodes.get(sr.currentNodeId()).getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) + .map(r -> r.allocationId().getId()) .collect(Collectors.toSet()); - Set initializingIds = indexShardRoutingTable.getAllInitializingShards().stream().map(r -> r.allocationId().getId()) + Set initializingIds = indexShardRoutingTable.getAllInitializingShards().stream() + .filter(sr -> nodes.get(sr.currentNodeId()).getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) + .map(r -> r.allocationId().getId()) .collect(Collectors.toSet()); shard.updateAllocationIdsFromMaster(activeIds, initializingIds); } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 47325bf5f9818..74a57f3aa91e2 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -33,8 +33,6 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.shard.IndexShardNotStartedException; @@ -42,7 +40,6 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -366,13 +363,8 @@ public Request() { } @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); + public String toString() { + return "Request{}"; } } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 655244e286f68..4a9b70c7b9068 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -984,6 +984,11 @@ public void onRetry() { super.onRetry(); isRetrySet.set(true); } + + @Override + public String toString() { + return "Request{}"; + } } static class Response extends ReplicationResponse { diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 571bbfa72e0f7..cd71418f0e5cd 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -202,6 +202,11 @@ private static class TestRequest extends ReplicatedWriteRequest { public TestRequest() { setShardId(new ShardId("test", "test", 1)); } + + @Override + public String toString() { + return "TestRequest{}"; + } } private static class TestResponse extends ReplicationResponse implements WriteResponse { diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 2425552c24642..02b6eca43a338 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -433,7 +433,7 @@ protected PrimaryResult performOnPrimary(IndexShard primary, IndexRequest reques final long version = indexResult.getVersion(); request.version(version); request.versionType(request.versionType().versionTypeForReplicationAndRecovery()); - request.seqNo(indexResult.getSeqNo()); + request.setSeqNo(indexResult.getSeqNo()); assert request.versionType().validateVersionForWrites(request.version()); } request.primaryTerm(primary.getPrimaryTerm()); diff --git a/qa/backwards-5.0/build.gradle b/qa/backwards-5.0/build.gradle index b151485ef3fcb..5347429f03f49 100644 --- a/qa/backwards-5.0/build.gradle +++ b/qa/backwards-5.0/build.gradle @@ -16,9 +16,9 @@ apply plugin: 'elasticsearch.rest-test' integTest { includePackaged = true cluster { - numNodes = 2 - numBwcNodes = 1 - bwcVersion = "6.0.0-alpha1-SNAPSHOT" + numNodes = 4 + numBwcNodes = 2 + bwcVersion = "5.2.0-SNAPSHOT" setting 'logger.org.elasticsearch', 'DEBUG' } } diff --git a/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java b/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java new file mode 100644 index 0000000000000..87343e830e249 --- /dev/null +++ b/qa/backwards-5.0/src/test/java/org/elasticsearch/backwards/IndexingIT.java @@ -0,0 +1,332 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.backwards; + +import org.apache.http.HttpHost; +import org.apache.http.entity.StringEntity; +import org.apache.http.util.EntityUtils; +import org.elasticsearch.Version; +import org.elasticsearch.client.Response; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.seqno.SeqNoStats; +import org.elasticsearch.index.seqno.SequenceNumbersService; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.yaml.ObjectPath; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.equalTo; + +public class IndexingIT extends ESRestTestCase { + + private ObjectPath objectPath(Response response) throws IOException { + String body = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); + String contentType = response.getHeader("Content-Type"); + XContentType xContentType = XContentType.fromMediaTypeOrFormat(contentType); + return ObjectPath.createFromXContent(xContentType.xContent(), body); + } + + private void assertOK(Response response) { + assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201))); + } + + private void ensureGreen() throws IOException { + Map params = new HashMap<>(); + params.put("wait_for_status", "green"); + params.put("wait_for_no_relocating_shards", "true"); + assertOK(client().performRequest("GET", "_cluster/health", params)); + } + + private void createIndex(String name, Settings settings) throws IOException { + assertOK(client().performRequest("PUT", name, Collections.emptyMap(), + new StringEntity("{ \"settings\": " + Strings.toString(settings, true) + " }"))); + } + + private void updateIndexSetting(String name, Settings.Builder settings) throws IOException { + updateIndexSetting(name, settings.build()); + } + private void updateIndexSetting(String name, Settings settings) throws IOException { + assertOK(client().performRequest("PUT", name + "/_settings", Collections.emptyMap(), + new StringEntity(Strings.toString(settings, true)))); + } + + protected int indexDocs(String index, final int idStart, final int numDocs) throws IOException { + for (int i = 0; i < numDocs; i++) { + final int id = idStart + i; + assertOK(client().performRequest("PUT", index + "/test/" + id, emptyMap(), + new StringEntity("{\"test\": \"test_" + id + "\"}"))); + } + return numDocs; + } + + public void testSeqNoCheckpoints() throws Exception { + Nodes nodes = buildNodeAndVersions(); + logger.info("cluster discovered: {}", nodes.toString()); + final String bwcNames = nodes.getBWCNodes().stream().map(Node::getNodeName).collect(Collectors.joining(",")); + Settings.Builder settings = Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2) + .put("index.routing.allocation.include._name", bwcNames); + + final boolean checkGlobalCheckpoints = nodes.getMaster().getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED); + logger.info("master version is [{}], global checkpoints will be [{}]", nodes.getMaster().getVersion(), + checkGlobalCheckpoints ? "checked" : "not be checked"); + if (checkGlobalCheckpoints) { + settings.put(IndexSettings.INDEX_SEQ_NO_CHECKPOINT_SYNC_INTERVAL.getKey(), "100ms"); + } + final String index = "test"; + createIndex(index, settings.build()); + try (RestClient newNodeClient = buildClient(restClientSettings(), + nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) { + int numDocs = indexDocs(index, 0, randomInt(5)); + assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient); + + logger.info("allowing shards on all nodes"); + updateIndexSetting(index, Settings.builder().putNull("index.routing.allocation.include._name")); + ensureGreen(); + logger.info("indexing some more docs"); + numDocs += indexDocs(index, numDocs, randomInt(5)); + assertSeqNoOnShards(nodes, checkGlobalCheckpoints, 0, newNodeClient); + logger.info("moving primary to new node"); + Shard primary = buildShards(nodes, newNodeClient).stream().filter(Shard::isPrimary).findFirst().get(); + updateIndexSetting(index, Settings.builder().put("index.routing.allocation.exclude._name", primary.getNode().getNodeName())); + ensureGreen(); + logger.info("indexing some more docs"); + int numDocsOnNewPrimary = indexDocs(index, numDocs, randomInt(5)); + numDocs += numDocsOnNewPrimary; + assertSeqNoOnShards(nodes, checkGlobalCheckpoints, numDocsOnNewPrimary, newNodeClient); + } + } + + private void assertSeqNoOnShards(Nodes nodes, boolean checkGlobalCheckpoints, int numDocs, RestClient client) throws Exception { + assertBusy(() -> { + try { + List shards = buildShards(nodes, client); + Shard primaryShard = shards.stream().filter(Shard::isPrimary).findFirst().get(); + assertNotNull("failed to find primary shard", primaryShard); + final long expectedGlobalCkp; + final long expectMaxSeqNo; + logger.info("primary resolved to node {}", primaryShard.getNode()); + if (primaryShard.getNode().getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + expectMaxSeqNo = numDocs - 1; + expectedGlobalCkp = numDocs - 1; + } else { + expectedGlobalCkp = SequenceNumbersService.UNASSIGNED_SEQ_NO; + expectMaxSeqNo = SequenceNumbersService.NO_OPS_PERFORMED; + } + for (Shard shard : shards) { + if (shard.getNode().getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + final SeqNoStats seqNoStats = shard.getSeqNoStats(); + logger.info("stats for {}, primary [{}]: [{}]", shard.getNode(), shard.isPrimary(), seqNoStats); + assertThat("max_seq no on " + shard.getNode() + " is wrong", seqNoStats.getMaxSeqNo(), equalTo(expectMaxSeqNo)); + assertThat("localCheckpoint no on " + shard.getNode() + " is wrong", + seqNoStats.getLocalCheckpoint(), equalTo(expectMaxSeqNo)); + if (checkGlobalCheckpoints) { + assertThat("globalCheckpoint no on " + shard.getNode() + " is wrong", + seqNoStats.getGlobalCheckpoint(), equalTo(expectedGlobalCkp)); + } + } else { + logger.info("skipping seq no test on {}", shard.getNode()); + } + } + } catch (IOException e) { + throw new AssertionError("unexpected io exception", e); + } + }); + } + + private List buildShards(Nodes nodes, RestClient client) throws IOException { + Response response = client.performRequest("GET", "test/_stats", singletonMap("level", "shards")); + List shardStats = objectPath(response).evaluate("indices.test.shards.0"); + ArrayList shards = new ArrayList<>(); + for (Object shard : shardStats) { + final String nodeId = ObjectPath.evaluate(shard, "routing.node"); + final Boolean primary = ObjectPath.evaluate(shard, "routing.primary"); + final Node node = nodes.getSafe(nodeId); + final SeqNoStats seqNoStats; + if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1_UNRELEASED)) { + Integer maxSeqNo = ObjectPath.evaluate(shard, "seq_no.max"); + Integer localCheckpoint = ObjectPath.evaluate(shard, "seq_no.local_checkpoint"); + Integer globalCheckpoint = ObjectPath.evaluate(shard, "seq_no.global_checkpoint"); + seqNoStats = new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint); + } else { + seqNoStats = null; + } + shards.add(new Shard(node, primary, seqNoStats)); + } + return shards; + } + + private Nodes buildNodeAndVersions() throws IOException { + Response response = client().performRequest("GET", "_nodes"); + ObjectPath objectPath = objectPath(response); + Map nodesAsMap = objectPath.evaluate("nodes"); + Nodes nodes = new Nodes(); + for (String id : nodesAsMap.keySet()) { + nodes.add(new Node( + id, + objectPath.evaluate("nodes." + id + ".name"), + Version.fromString(objectPath.evaluate("nodes." + id + ".version")), + HttpHost.create(objectPath.evaluate("nodes." + id + ".http.publish_address")))); + } + response = client().performRequest("GET", "_cluster/state"); + nodes.setMasterNodeId(objectPath(response).evaluate("master_node")); + return nodes; + } + + final class Nodes extends HashMap { + + private String masterNodeId = null; + + public Node getMaster() { + return get(masterNodeId); + } + + public void setMasterNodeId(String id) { + if (get(id) == null) { + throw new IllegalArgumentException("node with id [" + id + "] not found. got:" + toString()); + } + masterNodeId = id; + } + + public void add(Node node) { + put(node.getId(), node); + } + + public List getNewNodes() { + Version bwcVersion = getBWCVersion(); + return values().stream().filter(n -> n.getVersion().after(bwcVersion)).collect(Collectors.toList()); + } + + public List getBWCNodes() { + Version bwcVersion = getBWCVersion(); + return values().stream().filter(n -> n.getVersion().equals(bwcVersion)).collect(Collectors.toList()); + } + + public Version getBWCVersion() { + if (isEmpty()) { + throw new IllegalStateException("no nodes available"); + } + return Version.fromId(values().stream().map(node -> node.getVersion().id).min(Integer::compareTo).get()); + } + + public Node getSafe(String id) { + Node node = get(id); + if (node == null) { + throw new IllegalArgumentException("node with id [" + id + "] not found"); + } + return node; + } + + @Override + public String toString() { + return "Nodes{" + + "masterNodeId='" + masterNodeId + "'\n" + + values().stream().map(Node::toString).collect(Collectors.joining("\n")) + + '}'; + } + } + + final class Node { + private final String id; + private final String nodeName; + private final Version version; + private final HttpHost publishAddress; + + Node(String id, String nodeName, Version version, HttpHost publishAddress) { + this.id = id; + this.nodeName = nodeName; + this.version = version; + this.publishAddress = publishAddress; + } + + public String getId() { + return id; + } + + public String getNodeName() { + return nodeName; + } + + public HttpHost getPublishAddress() { + return publishAddress; + } + + public Version getVersion() { + return version; + } + + @Override + public String toString() { + return "Node{" + + "id='" + id + '\'' + + ", nodeName='" + nodeName + '\'' + + ", version=" + version + + '}'; + } + } + + final class Shard { + private final Node node; + private final boolean Primary; + private final SeqNoStats seqNoStats; + + Shard(Node node, boolean primary, SeqNoStats seqNoStats) { + this.node = node; + Primary = primary; + this.seqNoStats = seqNoStats; + } + + public Node getNode() { + return node; + } + + public boolean isPrimary() { + return Primary; + } + + public SeqNoStats getSeqNoStats() { + return seqNoStats; + } + + @Override + public String toString() { + return "Shard{" + + "node=" + node + + ", Primary=" + Primary + + ", seqNoStats=" + seqNoStats + + '}'; + } + } +} diff --git a/qa/rolling-upgrade/build.gradle b/qa/rolling-upgrade/build.gradle index e17e245410820..182e6a9f7d947 100644 --- a/qa/rolling-upgrade/build.gradle +++ b/qa/rolling-upgrade/build.gradle @@ -25,7 +25,7 @@ task oldClusterTest(type: RestIntegTestTask) { mustRunAfter(precommit) cluster { distribution = 'zip' - bwcVersion = '6.0.0-alpha1-SNAPSHOT' // TODO: either randomize, or make this settable with sysprop + bwcVersion = '5.2.0-SNAPSHOT' // TODO: either randomize, or make this settable with sysprop numBwcNodes = 2 numNodes = 2 clusterName = 'rolling-upgrade' diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yaml index 8c7cd83b0e039..4a37734d28462 100755 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yaml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yaml @@ -1,5 +1,9 @@ --- "Help": + - skip: + version: " - 5.99.99" + reason: seq no stats were added in 6.0.0 + - do: cat.shards: help: true diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index 0fc8cb4506b0b..975e6e2f86682 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -111,8 +111,8 @@ public void initClient() throws IOException { } clusterHosts = unmodifiableList(hosts); logger.info("initializing REST clients against {}", clusterHosts); - client = buildClient(restClientSettings()); - adminClient = buildClient(restAdminSettings()); + client = buildClient(restClientSettings(), clusterHosts.toArray(new HttpHost[clusterHosts.size()])); + adminClient = buildClient(restAdminSettings(), clusterHosts.toArray(new HttpHost[clusterHosts.size()])); } assert client != null; assert adminClient != null; @@ -272,8 +272,8 @@ protected String getProtocol() { return "http"; } - private RestClient buildClient(Settings settings) throws IOException { - RestClientBuilder builder = RestClient.builder(clusterHosts.toArray(new HttpHost[clusterHosts.size()])); + protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException { + RestClientBuilder builder = RestClient.builder(hosts); String keystorePath = settings.get(TRUSTSTORE_PATH); if (keystorePath != null) { final String keystorePass = settings.get(TRUSTSTORE_PASSWORD); diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ObjectPath.java b/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ObjectPath.java index 6311944fdcbb7..265fd7b3e8561 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ObjectPath.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/yaml/ObjectPath.java @@ -46,17 +46,28 @@ public ObjectPath(Object object) { this.object = object; } + + /** + * A utility method that creates an {@link ObjectPath} via {@link #ObjectPath(Object)} returns + * the result of calling {@link #evaluate(String)} on it. + */ + public static T evaluate(Object object, String path) throws IOException { + return new ObjectPath(object).evaluate(path, Stash.EMPTY); + } + + /** * Returns the object corresponding to the provided path if present, null otherwise */ - public Object evaluate(String path) throws IOException { + public T evaluate(String path) throws IOException { return evaluate(path, Stash.EMPTY); } /** * Returns the object corresponding to the provided path if present, null otherwise */ - public Object evaluate(String path, Stash stash) throws IOException { + @SuppressWarnings("unchecked") + public T evaluate(String path, Stash stash) throws IOException { String[] parts = parsePath(path); Object object = this.object; for (String part : parts) { @@ -65,7 +76,7 @@ public Object evaluate(String path, Stash stash) throws IOException { return null; } } - return object; + return (T)object; } @SuppressWarnings("unchecked")