Skip to content

Commit

Permalink
Add support for ccr follow info api to HLRC. (elastic#39115)
Browse files Browse the repository at this point in the history
This API was introduces after elastic#33824 was closed.
  • Loading branch information
martijnvg authored and weizijun committed Feb 22, 2019
1 parent dbd6e08 commit 0b70b99
Show file tree
Hide file tree
Showing 11 changed files with 595 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.elasticsearch.client.ccr.CcrStatsRequest;
import org.elasticsearch.client.ccr.CcrStatsResponse;
import org.elasticsearch.client.ccr.DeleteAutoFollowPatternRequest;
import org.elasticsearch.client.ccr.FollowInfoRequest;
import org.elasticsearch.client.ccr.FollowInfoResponse;
import org.elasticsearch.client.ccr.FollowStatsRequest;
import org.elasticsearch.client.ccr.FollowStatsResponse;
import org.elasticsearch.client.ccr.GetAutoFollowPatternRequest;
Expand Down Expand Up @@ -452,4 +454,46 @@ public void getFollowStatsAsync(FollowStatsRequest request,
);
}

/**
* Gets follow info for specific indices.
*
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ccr-get-follow-info.html">
* the docs</a> for more.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public FollowInfoResponse getFollowInfo(FollowInfoRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(
request,
CcrRequestConverters::getFollowInfo,
options,
FollowInfoResponse::fromXContent,
Collections.emptySet()
);
}

/**
* Asynchronously gets follow info for specific indices.
*
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ccr-get-follow-info.html">
* the docs</a> for more.
*
* @param request the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
*/
public void getFollowInfoAsync(FollowInfoRequest request,
RequestOptions options,
ActionListener<FollowInfoResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(
request,
CcrRequestConverters::getFollowInfo,
options,
FollowInfoResponse::fromXContent,
listener,
Collections.emptySet()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.http.client.methods.HttpPut;
import org.elasticsearch.client.ccr.CcrStatsRequest;
import org.elasticsearch.client.ccr.DeleteAutoFollowPatternRequest;
import org.elasticsearch.client.ccr.FollowInfoRequest;
import org.elasticsearch.client.ccr.FollowStatsRequest;
import org.elasticsearch.client.ccr.GetAutoFollowPatternRequest;
import org.elasticsearch.client.ccr.PauseFollowRequest;
Expand Down Expand Up @@ -119,4 +120,12 @@ static Request getFollowStats(FollowStatsRequest followStatsRequest) {
return new Request(HttpGet.METHOD_NAME, endpoint);
}

static Request getFollowInfo(FollowInfoRequest followInfoRequest) {
String endpoint = new RequestConverters.EndpointBuilder()
.addPathPart(followInfoRequest.getFollowerIndex())
.addPathPartAsIs("_ccr", "info")
.build();
return new Request(HttpGet.METHOD_NAME, endpoint);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;
Expand All @@ -41,6 +43,44 @@ public class FollowConfig {
static final ParseField MAX_RETRY_DELAY_FIELD = new ParseField("max_retry_delay");
static final ParseField READ_POLL_TIMEOUT = new ParseField("read_poll_timeout");

private static final ObjectParser<FollowConfig, Void> PARSER = new ObjectParser<>(
"follow_config",
true,
FollowConfig::new);

static {
PARSER.declareInt(FollowConfig::setMaxReadRequestOperationCount, MAX_READ_REQUEST_OPERATION_COUNT);
PARSER.declareInt(FollowConfig::setMaxOutstandingReadRequests, MAX_OUTSTANDING_READ_REQUESTS);
PARSER.declareField(
FollowConfig::setMaxReadRequestSize,
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_READ_REQUEST_SIZE.getPreferredName()),
MAX_READ_REQUEST_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareInt(FollowConfig::setMaxWriteRequestOperationCount, MAX_WRITE_REQUEST_OPERATION_COUNT);
PARSER.declareField(
FollowConfig::setMaxWriteRequestSize,
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_REQUEST_SIZE.getPreferredName()),
MAX_WRITE_REQUEST_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareInt(FollowConfig::setMaxOutstandingWriteRequests, MAX_OUTSTANDING_WRITE_REQUESTS);
PARSER.declareInt(FollowConfig::setMaxWriteBufferCount, MAX_WRITE_BUFFER_COUNT);
PARSER.declareField(
FollowConfig::setMaxWriteBufferSize,
(p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), MAX_WRITE_BUFFER_SIZE.getPreferredName()),
MAX_WRITE_BUFFER_SIZE,
ObjectParser.ValueType.STRING);
PARSER.declareField(FollowConfig::setMaxRetryDelay,
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY_FIELD.getPreferredName()),
MAX_RETRY_DELAY_FIELD, ObjectParser.ValueType.STRING);
PARSER.declareField(FollowConfig::setReadPollTimeout,
(p, c) -> TimeValue.parseTimeValue(p.text(), READ_POLL_TIMEOUT.getPreferredName()),
READ_POLL_TIMEOUT, ObjectParser.ValueType.STRING);
}

static FollowConfig fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

private Integer maxReadRequestOperationCount;
private Integer maxOutstandingReadRequests;
private ByteSizeValue maxReadRequestSize;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client.ccr;

import org.elasticsearch.client.Validatable;

import java.util.Objects;

public final class FollowInfoRequest implements Validatable {

private final String followerIndex;

public FollowInfoRequest(String followerIndex) {
this.followerIndex = Objects.requireNonNull(followerIndex);
}

public String getFollowerIndex() {
return followerIndex;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client.ccr;

import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.XContentParser;

import java.util.List;
import java.util.Objects;

public final class FollowInfoResponse {

static final ParseField FOLLOWER_INDICES_FIELD = new ParseField("follower_indices");

private static final ConstructingObjectParser<FollowInfoResponse, Void> PARSER = new ConstructingObjectParser<>(
"indices",
true,
args -> {
@SuppressWarnings("unchecked")
List<FollowerInfo> infos = (List<FollowerInfo>) args[0];
return new FollowInfoResponse(infos);
});

static {
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), FollowerInfo.PARSER, FOLLOWER_INDICES_FIELD);
}

public static FollowInfoResponse fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

private final List<FollowerInfo> infos;

FollowInfoResponse(List<FollowerInfo> infos) {
this.infos = infos;
}

public List<FollowerInfo> getInfos() {
return infos;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FollowInfoResponse that = (FollowInfoResponse) o;
return infos.equals(that.infos);
}

@Override
public int hashCode() {
return Objects.hash(infos);
}

public static final class FollowerInfo {

static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index");
static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster");
static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index");
static final ParseField STATUS_FIELD = new ParseField("status");
static final ParseField PARAMETERS_FIELD = new ParseField("parameters");

private static final ConstructingObjectParser<FollowerInfo, Void> PARSER = new ConstructingObjectParser<>(
"follower_info",
true,
args -> {
return new FollowerInfo((String) args[0], (String) args[1], (String) args[2],
Status.fromString((String) args[3]), (FollowConfig) args[4]);
});

static {
PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOWER_INDEX_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), REMOTE_CLUSTER_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX_FIELD);
PARSER.declareString(ConstructingObjectParser.constructorArg(), STATUS_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> FollowConfig.fromXContent(p), PARAMETERS_FIELD);
}

private final String followerIndex;
private final String remoteCluster;
private final String leaderIndex;
private final Status status;
private final FollowConfig parameters;

FollowerInfo(String followerIndex, String remoteCluster, String leaderIndex, Status status,
FollowConfig parameters) {
this.followerIndex = followerIndex;
this.remoteCluster = remoteCluster;
this.leaderIndex = leaderIndex;
this.status = status;
this.parameters = parameters;
}

public String getFollowerIndex() {
return followerIndex;
}

public String getRemoteCluster() {
return remoteCluster;
}

public String getLeaderIndex() {
return leaderIndex;
}

public Status getStatus() {
return status;
}

public FollowConfig getParameters() {
return parameters;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
FollowerInfo that = (FollowerInfo) o;
return Objects.equals(followerIndex, that.followerIndex) &&
Objects.equals(remoteCluster, that.remoteCluster) &&
Objects.equals(leaderIndex, that.leaderIndex) &&
status == that.status &&
Objects.equals(parameters, that.parameters);
}

@Override
public int hashCode() {
return Objects.hash(followerIndex, remoteCluster, leaderIndex, status, parameters);
}

}

public enum Status {

ACTIVE("active"),
PAUSED("paused");

private final String name;

Status(String name) {
this.name = name;
}

public String getName() {
return name;
}

public static Status fromString(String value) {
switch (value) {
case "active":
return Status.ACTIVE;
case "paused":
return Status.PAUSED;
default:
throw new IllegalArgumentException("unexpected status value [" + value + "]");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.elasticsearch.client.ccr.CcrStatsRequest;
import org.elasticsearch.client.ccr.CcrStatsResponse;
import org.elasticsearch.client.ccr.DeleteAutoFollowPatternRequest;
import org.elasticsearch.client.ccr.FollowInfoRequest;
import org.elasticsearch.client.ccr.FollowInfoResponse;
import org.elasticsearch.client.ccr.FollowStatsRequest;
import org.elasticsearch.client.ccr.FollowStatsResponse;
import org.elasticsearch.client.ccr.GetAutoFollowPatternRequest;
Expand Down Expand Up @@ -113,6 +115,15 @@ public void testIndexFollowing() throws Exception {

try {
assertBusy(() -> {
FollowInfoRequest followInfoRequest = new FollowInfoRequest("follower");
FollowInfoResponse followInfoResponse =
execute(followInfoRequest, ccrClient::getFollowInfo, ccrClient::getFollowInfoAsync);
assertThat(followInfoResponse.getInfos().size(), equalTo(1));
assertThat(followInfoResponse.getInfos().get(0).getFollowerIndex(), equalTo("follower"));
assertThat(followInfoResponse.getInfos().get(0).getLeaderIndex(), equalTo("leader"));
assertThat(followInfoResponse.getInfos().get(0).getRemoteCluster(), equalTo("local_cluster"));
assertThat(followInfoResponse.getInfos().get(0).getStatus(), equalTo(FollowInfoResponse.Status.ACTIVE));

FollowStatsRequest followStatsRequest = new FollowStatsRequest("follower");
FollowStatsResponse followStatsResponse =
execute(followStatsRequest, ccrClient::getFollowStats, ccrClient::getFollowStatsAsync);
Expand Down Expand Up @@ -170,6 +181,17 @@ public void testIndexFollowing() throws Exception {
pauseFollowResponse = execute(pauseFollowRequest, ccrClient::pauseFollow, ccrClient::pauseFollowAsync);
assertThat(pauseFollowResponse.isAcknowledged(), is(true));

assertBusy(() -> {
FollowInfoRequest followInfoRequest = new FollowInfoRequest("follower");
FollowInfoResponse followInfoResponse =
execute(followInfoRequest, ccrClient::getFollowInfo, ccrClient::getFollowInfoAsync);
assertThat(followInfoResponse.getInfos().size(), equalTo(1));
assertThat(followInfoResponse.getInfos().get(0).getFollowerIndex(), equalTo("follower"));
assertThat(followInfoResponse.getInfos().get(0).getLeaderIndex(), equalTo("leader"));
assertThat(followInfoResponse.getInfos().get(0).getRemoteCluster(), equalTo("local_cluster"));
assertThat(followInfoResponse.getInfos().get(0).getStatus(), equalTo(FollowInfoResponse.Status.PAUSED));
});

// Need to close index prior to unfollowing it:
CloseIndexRequest closeIndexRequest = new CloseIndexRequest("follower");
org.elasticsearch.action.support.master.AcknowledgedResponse closeIndexReponse =
Expand Down
Loading

0 comments on commit 0b70b99

Please sign in to comment.