Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rest Api Compatibility] Deprecate the use of synced flush #75372

Merged
merged 9 commits into from
Jul 28, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -682,9 +682,6 @@ public void testRecovery() throws Exception {
flushRequest.addParameter("force", "true");
flushRequest.addParameter("wait_if_ongoing", "true");
assertOK(client().performRequest(flushRequest));
if (randomBoolean()) {
syncedFlush(index);
}

if (shouldHaveTranslog) {
// Update a few documents so we are sure to have a translog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.MediaType;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.rest.ESRestTestCase;
Expand Down Expand Up @@ -309,9 +312,58 @@ public void testSyncedFlushTransition() throws Exception {
try (RestClient newNodeClient = buildClient(restClientSettings(),
nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
Request request = new Request("POST", index + "/_flush/synced");
List<String> warningMsg = List.of("Synced flush was removed and a normal flush was performed instead. " +
"This transition will be removed in a future version.");
request.setOptions(RequestOptions.DEFAULT.toBuilder().setWarningsHandler(warnings -> warnings.equals(warningMsg) == false));
final String v7MediaType = XContentType.VND_JSON.toParsedMediaType()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure we would like to use synced flush here. Since it was deprecated in 7.x and is only available under rest compatibility in 8 I guess we should just use flush
WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole test is called testSyncedFlushTransition and starts with assertTrue("bwc version is on 7.x", nodes.getBWCVersion().before(Version.V_8_0_0)); so I think it's right to still use synced-flush here. We could reasonably duplicate this test as one which just uses a regular flush and will continue to work once 7.x is no more.

.responseContentTypeHeader(Map.of(MediaType.COMPATIBLE_WITH_PARAMETER_NAME,
String.valueOf(RestApiVersion.minimumSupported().major)));
List<String> warningMsg = List.of("Synced flush is deprecated and will be removed in 8.0." +
" Use flush at /_flush or /{index}/_flush instead.");
request.setOptions(RequestOptions.DEFAULT.toBuilder()
.setWarningsHandler(warnings -> warnings.equals(warningMsg) == false)
.addHeader("Accept", v7MediaType));

assertBusy(() -> {
Map<String, Object> result = ObjectPath.createFromResponse(newNodeClient.performRequest(request)).evaluate("_shards");
assertThat(result.get("total"), equalTo(totalShards));
assertThat(result.get("successful"), equalTo(totalShards));
assertThat(result.get("failed"), equalTo(0));
});
Map<String, Object> stats = entityAsMap(client().performRequest(new Request("GET", index + "/_stats?level=shards")));
assertThat(XContentMapValues.extractValue("indices." + index + ".total.translog.uncommitted_operations", stats), equalTo(0));
}
}

public void testFlushTransition() throws Exception {
Nodes nodes = buildNodeAndVersions();
assumeFalse("no new node found", nodes.getNewNodes().isEmpty());
assumeFalse("no bwc node found", nodes.getBWCNodes().isEmpty());
// Allocate shards to new nodes then verify flush requests processed by old nodes/new nodes
String newNodes = nodes.getNewNodes().stream().map(Node::getNodeName).collect(Collectors.joining(","));
int numShards = randomIntBetween(1, 10);
int numOfReplicas = randomIntBetween(0, nodes.getNewNodes().size() - 1);
int totalShards = numShards * (numOfReplicas + 1);
final String index = "test_flush";
createIndex(index, Settings.builder()
.put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), numShards)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numOfReplicas)
.put("index.routing.allocation.include._name", newNodes).build());
ensureGreen(index);
indexDocs(index, randomIntBetween(0, 100), between(1, 100));
try (RestClient oldNodeClient = buildClient(restClientSettings(),
nodes.getBWCNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
Request request = new Request("POST", index + "/_flush");
assertBusy(() -> {
Map<String, Object> result = ObjectPath.createFromResponse(oldNodeClient.performRequest(request)).evaluate("_shards");
assertThat(result.get("total"), equalTo(totalShards));
assertThat(result.get("successful"), equalTo(totalShards));
assertThat(result.get("failed"), equalTo(0));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering, how is _flush smarter then _flush/_synced that it won't fail on old nodes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand. The _flush API is ancient, it's fully supported on every version in scope here.

});
Map<String, Object> stats = entityAsMap(client().performRequest(new Request("GET", index + "/_stats?level=shards")));
assertThat(XContentMapValues.extractValue("indices." + index + ".total.translog.uncommitted_operations", stats), equalTo(0));
}
indexDocs(index, randomIntBetween(0, 100), between(1, 100));
try (RestClient newNodeClient = buildClient(restClientSettings(),
nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new))) {
Request request = new Request("POST", index + "/_flush");
assertBusy(() -> {
Map<String, Object> result = ObjectPath.createFromResponse(newNodeClient.performRequest(request)).evaluate("_shards");
assertThat(result.get("total"), equalTo(totalShards));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public void testRelocationWithConcurrentIndexing() throws Exception {
throw new IllegalStateException("unknown type " + CLUSTER_TYPE);
}
if (randomBoolean()) {
syncedFlush(index);
flush(index, randomBoolean());
}
}

Expand Down Expand Up @@ -309,7 +309,7 @@ public void testRecovery() throws Exception {
}
}
if (randomBoolean()) {
syncedFlush(index);
flush(index, randomBoolean());
}
ensureGreen(index);
}
Expand Down Expand Up @@ -584,7 +584,7 @@ public void testUpdateDoc() throws Exception {
assertThat(XContentMapValues.extractValue("_source.updated_field", doc), equalTo(updates.get(docId)));
}
if (randomBoolean()) {
syncedFlush(index);
flush(index, randomBoolean());
}
}

Expand Down
5 changes: 4 additions & 1 deletion rest-api-spec/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ tasks.named("yamlRestCompatTest").configure {
OS.current() != OS.WINDOWS
}
systemProperty 'tests.rest.blacklist', ([
'indices.flush/10_basic/Index synced flush rest test',
'search.aggregation/200_top_hits_metric/top_hits aggregation with sequence numbers',
'search/310_match_bool_prefix/multi_match multiple fields with cutoff_frequency throws exception', //cutoff_frequency
'search/340_type_query/type query', // type_query - probably should behave like match_all
Expand Down Expand Up @@ -227,6 +226,10 @@ tasks.named("transformV7RestTests").configure({ task ->
task.replaceValueTextByKeyValue("catch",
'/Please set node identifiers correctly. One and only one of \\[node_name\\], \\[node_names\\] and \\[node_ids\\] has to be set/',
'/You must set \\[node_names\\] or \\[node_ids\\] but not both/')

// sync_id is no longer available in SegmentInfos.userData // "indices.flush/10_basic/Index synced flush rest test"
task.replaceIsTrue("indices.testing.shards.0.0.commit.user_data.sync_id", "indices.testing.shards.0.0.commit.user_data")

})

tasks.register('enforceYamlTestConvention').configure {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
Expand All @@ -32,15 +31,23 @@

public class RestSyncedFlushAction extends BaseRestHandler {

private static final DeprecationLogger DEPRECATION_LOGGER = DeprecationLogger.getLogger(RestSyncedFlushAction.class);

private static final String DEPRECATION_MESSAGE =
"Synced flush is deprecated and will be removed in 8.0. Use flush at /_flush or /{index}/_flush instead.";
@Override
public List<Route> routes() {
return List.of(
new Route(GET, "/_flush/synced"),
new Route(POST, "/_flush/synced"),
new Route(GET, "/{index}/_flush/synced"),
new Route(POST, "/{index}/_flush/synced"));
Route.builder(GET, "/_flush/synced")
.deprecated(DEPRECATION_MESSAGE, RestApiVersion.V_7)
.build(),
Route.builder(POST, "/_flush/synced")
.deprecated(DEPRECATION_MESSAGE, RestApiVersion.V_7)
.build(),
Route.builder(GET, "/{index}/_flush/synced")
.deprecated(DEPRECATION_MESSAGE, RestApiVersion.V_7)
.build(),
Route.builder(POST, "/{index}/_flush/synced")
.deprecated(DEPRECATION_MESSAGE, RestApiVersion.V_7)
.build());
}

@Override
Expand All @@ -50,8 +57,6 @@ public String getName() {

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
DEPRECATION_LOGGER.deprecate(DeprecationCategory.API, "synced_flush",
"Synced flush was removed and a normal flush was performed instead. This transition will be removed in a future version.");
final FlushRequest flushRequest = new FlushRequest(Strings.splitStringByCommaToArray(request.param("index")));
flushRequest.indicesOptions(IndicesOptions.fromRequest(request, flushRequest.indicesOptions()));
return channel -> client.admin().indices().flush(flushRequest, new SimulateSyncedFlushResponseListener(channel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1651,48 +1651,6 @@ protected static Version minimumNodeVersion() throws IOException {
return minVersion;
}

protected void syncedFlush(String indexName) throws Exception {
final List<String> deprecationMessages = List.of(
"Synced flush is deprecated and will be removed in 8.0. Use flush at _/flush or /{index}/_flush instead.");
final List<String> fixedDeprecationMessages = List.of(
"Synced flush is deprecated and will be removed in 8.0. Use flush at /_flush or /{index}/_flush instead.");
final List<String> transitionMessages = List.of(
"Synced flush was removed and a normal flush was performed instead. This transition will be removed in a future version.");
final WarningsHandler warningsHandler;
if (minimumNodeVersion().onOrAfter(Version.V_8_0_0)) {
warningsHandler = warnings -> warnings.equals(transitionMessages) == false;
} else if (minimumNodeVersion().onOrAfter(Version.V_7_6_0)) {
warningsHandler = warnings -> warnings.equals(deprecationMessages) == false && warnings.equals(transitionMessages) == false &&
warnings.equals(fixedDeprecationMessages) == false;
} else if (nodeVersions.stream().anyMatch(n -> n.onOrAfter(Version.V_8_0_0))) {
warningsHandler = warnings -> warnings.isEmpty() == false && warnings.equals(transitionMessages) == false;
} else {
warningsHandler = warnings -> warnings.isEmpty() == false;
}
// We have to spin synced-flush requests here because we fire the global checkpoint sync for the last write operation.
// A synced-flush request considers the global checkpoint sync as an going operation because it acquires a shard permit.
assertBusy(() -> {
try {
final Request request = new Request("POST", indexName + "/_flush/synced");
request.setOptions(RequestOptions.DEFAULT.toBuilder().setWarningsHandler(warningsHandler));
Response resp = client().performRequest(request);
if (nodeVersions.stream().allMatch(v -> v.before(Version.V_8_0_0))) {
Map<String, Object> result = ObjectPath.createFromResponse(resp).evaluate("_shards");
assertThat(result.get("failed"), equalTo(0));
}
} catch (ResponseException ex) {
if (ex.getResponse().getStatusLine().getStatusCode() == RestStatus.CONFLICT.getStatus()
&& ex.getResponse().getWarnings().equals(transitionMessages)) {
logger.info("a normal flush was performed instead");
} else {
throw new AssertionError(ex); // cause assert busy to retry
}
}
});
// ensure the global checkpoint is synced; otherwise we might trim the commit with syncId
ensureGlobalCheckpointSynced(indexName);
}

@SuppressWarnings("unchecked")
private void ensureGlobalCheckpointSynced(String index) throws Exception {
assertBusy(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,12 +472,10 @@ private void assertUserExecutes(String user, String action, String index, boolea
if (userIsAllowed) {
assertAccessIsAllowed(user, "POST", "/" + index + "/_refresh");
assertAccessIsAllowed(user, "POST", "/" + index + "/_flush");
assertAccessIsAllowed(user, "POST", "/" + index + "/_flush/synced");
assertAccessIsAllowed(user, "POST", "/" + index + "/_forcemerge");
} else {
assertAccessIsDenied(user, "POST", "/" + index + "/_refresh");
assertAccessIsDenied(user, "POST", "/" + index + "/_flush");
assertAccessIsDenied(user, "POST", "/" + index + "/_flush/synced");
assertAccessIsDenied(user, "POST", "/" + index + "/_forcemerge");
}
break;
Expand Down