Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate synced flush #50835

Merged
merged 2 commits into from
Jan 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,10 @@ public Cancellable flushAsync(FlushRequest flushRequest, RequestOptions options,
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
* @deprecated synced flush is deprecated and will be removed in 8.0.
* Use {@link #flush(FlushRequest, RequestOptions)} instead.
*/
@Deprecated
public SyncedFlushResponse flushSynced(SyncedFlushRequest syncedFlushRequest, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(syncedFlushRequest, IndicesRequestConverters::flushSynced, options,
SyncedFlushResponse::fromXContent, emptySet());
Expand All @@ -615,7 +618,10 @@ public SyncedFlushResponse flushSynced(SyncedFlushRequest syncedFlushRequest, Re
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return cancellable that may be used to cancel the request
* @deprecated synced flush is deprecated and will be removed in 8.0.
* Use {@link #flushAsync(FlushRequest, RequestOptions, ActionListener)} instead.
*/
@Deprecated
public Cancellable flushSyncedAsync(SyncedFlushRequest syncedFlushRequest, RequestOptions options,
ActionListener<SyncedFlushResponse> listener) {
return restHighLevelClient.performRequestAsyncAndParseEntity(syncedFlushRequest, IndicesRequestConverters::flushSynced, options,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.flush.SyncedFlushService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.admin.indices.RestCreateIndexAction;
import org.elasticsearch.rest.action.admin.indices.RestGetFieldMappingAction;
Expand Down Expand Up @@ -982,7 +983,8 @@ public void testSyncedFlush() throws IOException {
createIndex(index, settings);
SyncedFlushRequest syncedFlushRequest = new SyncedFlushRequest(index);
SyncedFlushResponse flushResponse =
execute(syncedFlushRequest, highLevelClient().indices()::flushSynced, highLevelClient().indices()::flushSyncedAsync);
execute(syncedFlushRequest, highLevelClient().indices()::flushSynced, highLevelClient().indices()::flushSyncedAsync,
expectWarnings(SyncedFlushService.SYNCED_FLUSH_DEPRECATION_MESSAGE));
assertThat(flushResponse.totalShards(), equalTo(1));
assertThat(flushResponse.successfulShards(), equalTo(1));
assertThat(flushResponse.failedShards(), equalTo(0));
Expand All @@ -997,7 +999,8 @@ public void testSyncedFlush() throws IOException {
execute(
syncedFlushRequest,
highLevelClient().indices()::flushSynced,
highLevelClient().indices()::flushSyncedAsync
highLevelClient().indices()::flushSyncedAsync,
expectWarnings(SyncedFlushService.SYNCED_FLUSH_DEPRECATION_MESSAGE)
)
);
assertEquals(RestStatus.NOT_FOUND, exception.status());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,8 @@ public void testApiNamingConventions() throws Exception {
// looking like it doesn't have a valid implementatation when it does.
apiUnsupported.remove("indices.get_template");


// Synced flush is deprecated
apiUnsupported.remove("indices.flush_synced");

for (Map.Entry<String, Set<Method>> entry : methods.entrySet()) {
String apiName = entry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,9 @@ public void testSyncedFlushIndex() throws Exception {
// end::flush-synced-request-indicesOptions

// tag::flush-synced-execute
SyncedFlushResponse flushSyncedResponse = client.indices().flushSynced(request, RequestOptions.DEFAULT);
SyncedFlushResponse flushSyncedResponse = client.indices().flushSynced(request, expectWarnings(
"Synced flush is deprecated and will be removed in 8.0. Use flush at _/flush or /{index}/_flush instead."
));
// end::flush-synced-execute

// tag::flush-synced-response
Expand Down Expand Up @@ -1063,7 +1065,9 @@ public void onFailure(Exception e) {
listener = new LatchedActionListener<>(listener, latch);

// tag::flush-synced-execute-async
client.indices().flushSyncedAsync(request, RequestOptions.DEFAULT, listener); // <1>
client.indices().flushSyncedAsync(request, expectWarnings(
"Synced flush is deprecated and will be removed in 8.0. Use flush at _/flush or /{index}/_flush instead."
), listener); // <1>
// end::flush-synced-execute-async

assertTrue(latch.await(30L, TimeUnit.SECONDS));
Expand Down
4 changes: 4 additions & 0 deletions docs/reference/indices/synced-flush.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
<titleabbrev>Synced flush</titleabbrev>
++++

deprecated::[7.6, synced-flush is deprecated and will be removed in 8.0.
Use <<indices-flush,flush>> instead. A <<indices-flush,flush>> has the
same effect as a synced flush on Elasticsearch 7.6 or later]

Performs a synced flush on one or more indices.

[source,console]
Expand Down
4 changes: 4 additions & 0 deletions docs/reference/upgrade/synced-flush.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@
--------------------------------------------------
POST _flush/synced
--------------------------------------------------
// TEST[skip: will fail as synced flush is deprecated]

When you perform a synced flush, check the response to make sure there are
no failures. Synced flush operations that fail due to pending indexing
operations are listed in the response body, although the request itself
still returns a 200 OK status. If there are failures, reissue the request.

Note that synced flush is deprecated and will be removed in 8.0. A flush
has the same effect as a synced flush on Elasticsearch 7.6 or later.
Original file line number Diff line number Diff line change
Expand Up @@ -745,7 +745,7 @@ public void testRecovery() throws Exception {
// We had a bug before where we failed to perform peer recovery with sync_id from 5.x to 6.x.
// We added this synced flush so we can exercise different paths of recovery code.
try {
client().performRequest(new Request("POST", index + "/_flush/synced"));
performSyncedFlush(index);
} catch (ResponseException ignored) {
// synced flush is optional here
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ private void syncedFlush(String index) throws Exception {
// A synced-flush request considers the global checkpoint sync as an going operation because it acquires a shard permit.
assertBusy(() -> {
try {
Response resp = client().performRequest(new Request("POST", index + "/_flush/synced"));
Response resp = performSyncedFlush(index);
Map<String, Object> result = ObjectPath.createFromResponse(resp).evaluate("_shards");
assertThat(result.get("failed"), equalTo(0));
} catch (ResponseException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"indices.flush_synced":{
"documentation":{
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/indices-synced-flush-api.html",
"description":"Performs a synced flush operation on one or more indices."
"description":"Performs a synced flush operation on one or more indices. Synced flush is deprecated and will be removed in 8.0. Use flush instead"
},
"stability":"stable",
"url":{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,54 +88,6 @@
- match:
$body: |
/^$/

- do:
indices.create:
index: sync_id_test
body:
settings:
number_of_shards: 5
number_of_replicas: 0

- do:
indices.flush_synced:
index: sync_id_test

- is_false: _shards.failed

- do:
cat.shards:
index: sync_id_test
h: index,state,sync_id
# 20 chars for sync ids with 5.x which uses time-based uuids and 22 with 6.x which uses random uuids
- match:
$body: |
/^(sync_id_test\s+STARTED\s+[A-Za-z0-9_\-]{20,22}\n){5}$/

- do:
indices.delete:
index: sync_id_test

- do:
indices.create:
index: sync_id_no_flush_test
body:
settings:
number_of_shards: 5
number_of_replicas: 0

- do:
cat.shards:
index: sync_id_no_flush_test
h: index,state,sync_id
- match:
$body: |
/^(sync_id_no_flush_test\s+STARTED\s+\n){5}$/

- do:
indices.delete:
index: sync_id_no_flush_test

- do:
indices.create:
index: index1
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,32 @@
---
"Index synced flush rest test":
- do:
indices.create:
index: testing
body:
settings:
index:
number_of_replicas: 0
- skip:
version: " - 7.5.99"
reason: "synced flush is deprecated in 7.6"
features: "warnings"
- do:
indices.create:
index: testing
body:
settings:
index:
number_of_replicas: 0

- do:
cluster.health:
wait_for_status: green
- do:
indices.flush_synced:
index: testing
- do:
cluster.health:
wait_for_status: green
- do:
warnings:
- Synced flush is deprecated and will be removed in 8.0. Use flush at _/flush or /{index}/_flush instead.
indices.flush_synced:
index: testing

- is_false: _shards.failed
- is_false: _shards.failed

- do:
indices.stats: {level: shards}
- do:
indices.stats: {level: shards}

- is_true: indices.testing.shards.0.0.commit.user_data.sync_id
- is_true: indices.testing.shards.0.0.commit.user_data.sync_id

---
"Flush stats":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
Expand Down Expand Up @@ -71,11 +72,17 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.StreamSupport;

public class SyncedFlushService implements IndexEventListener {

private static final Logger logger = LogManager.getLogger(SyncedFlushService.class);

private static final DeprecationLogger DEPRECATION_LOGGER = new DeprecationLogger(logger);

public static final String SYNCED_FLUSH_DEPRECATION_MESSAGE =
"Synced flush is deprecated and will be removed in 8.0. Use flush at _/flush or /{index}/_flush instead.";

private static final String PRE_SYNCED_FLUSH_ACTION_NAME = "internal:indices/flush/synced/pre";
private static final String SYNCED_FLUSH_ACTION_NAME = "internal:indices/flush/synced/sync";
private static final String IN_FLIGHT_OPS_ACTION_NAME = "internal:indices/flush/synced/in_flight";
Expand Down Expand Up @@ -130,6 +137,9 @@ public void attemptSyncedFlush(final String[] aliasesOrIndices,
IndicesOptions indicesOptions,
final ActionListener<SyncedFlushResponse> listener) {
final ClusterState state = clusterService.state();
if (StreamSupport.stream(state.nodes().spliterator(), false).allMatch(n -> n.getVersion().onOrAfter(Version.V_7_6_0))) {
DEPRECATION_LOGGER.deprecatedAndMaybeLog("synced_flush", SYNCED_FLUSH_DEPRECATION_MESSAGE);
}
final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, indicesOptions, aliasesOrIndices);
final Map<String, List<ShardsSyncedFlushResult>> results = ConcurrentCollections.newConcurrentMap();
int numberOfShards = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.indices.flush.SyncedFlushService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -1222,4 +1223,19 @@ protected static Version minimumNodeVersion() throws IOException {
assertNotNull(minVersion);
return minVersion;
}

protected static Response performSyncedFlush(String indexName) throws IOException {
final Request request = new Request("POST", indexName + "/_flush/synced");
final List<String> expectedWarnings = Collections.singletonList(SyncedFlushService.SYNCED_FLUSH_DEPRECATION_MESSAGE);
if (nodeVersions.stream().allMatch(version -> version.onOrAfter(Version.V_7_6_0))) {
final Builder options = RequestOptions.DEFAULT.toBuilder();
options.setWarningsHandler(warnings -> warnings.equals(expectedWarnings) == false);
request.setOptions(options);
} else if (nodeVersions.stream().anyMatch(version -> version.onOrAfter(Version.V_7_6_0))) {
final Builder options = RequestOptions.DEFAULT.toBuilder();
options.setWarningsHandler(warnings -> warnings.isEmpty() == false && warnings.equals(expectedWarnings) == false);
request.setOptions(options);
}
return client().performRequest(request);
}
}