Skip to content

Commit

Permalink
Make the TransportRolloverAction execute in one cluster state update (e…
Browse files Browse the repository at this point in the history
…lastic#50388)

This commit makes the TransportRolloverAction more resilient, by having it execute
only one cluster state update that creates the new (rollover index), rolls over
the alias from the source to the target index and set the RolloverInfo on the
source index. Before these 3 steps were represented as 3 chained cluster state
updates, which would've seen the user manually intervene if, say, the alias
rollover cluster state update (second in the chain) failed but the creation of
the rollover index (first in the chain) update succeeded

* Rename innerExecute to applyAliasActions

(cherry picked from commit 1ba4339)
Signed-off-by: Andrei Dan <andrei.dan@elastic.co>
  • Loading branch information
andreidan committed Dec 20, 2019
1 parent c37c53a commit b6aa7fd
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.action.admin.indices.rollover;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
Expand Down Expand Up @@ -150,56 +149,44 @@ public void onResponse(IndicesStatsResponse statsResponse) {
new RolloverResponse(sourceIndexName, rolloverIndexName, conditionResults, true, false, false, false));
return;
}
List<Condition<?>> metConditions = rolloverRequest.getConditions().values().stream()
List<Condition<?>> metConditions = rolloverRequest.getConditions().values().stream()
.filter(condition -> conditionResults.get(condition.toString())).collect(Collectors.toList());
if (conditionResults.size() == 0 || metConditions.size() > 0) {
CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(unresolvedName, rolloverIndexName,
rolloverRequest);
createIndexService.createIndex(updateRequest, ActionListener.wrap(createIndexClusterStateUpdateResponse -> {
final IndicesAliasesClusterStateUpdateRequest aliasesUpdateRequest;
if (explicitWriteIndex) {
aliasesUpdateRequest = prepareRolloverAliasesWriteIndexUpdateRequest(sourceIndexName,
rolloverIndexName, rolloverRequest);
} else {
aliasesUpdateRequest = prepareRolloverAliasesUpdateRequest(sourceIndexName,
rolloverIndexName, rolloverRequest);
CreateIndexClusterStateUpdateRequest createIndexRequest = prepareCreateIndexRequest(unresolvedName,
rolloverIndexName, rolloverRequest);
clusterService.submitStateUpdateTask("rollover_index source [" + sourceIndexName + "] to target ["
+ rolloverIndexName + "]", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
ClusterState newState = createIndexService.applyCreateIndexRequest(currentState, createIndexRequest);
newState = indexAliasesService.applyAliasActions(newState,
rolloverAliasToNewIndex(sourceIndexName, rolloverIndexName, rolloverRequest, explicitWriteIndex));
RolloverInfo rolloverInfo = new RolloverInfo(rolloverRequest.getAlias(), metConditions,
threadPool.absoluteTimeInMillis());
return ClusterState.builder(newState)
.metaData(MetaData.builder(newState.metaData())
.put(IndexMetaData.builder(newState.metaData().index(sourceIndexName))
.putRolloverInfo(rolloverInfo))).build();
}
indexAliasesService.indicesAliases(aliasesUpdateRequest,
ActionListener.wrap(aliasClusterStateUpdateResponse -> {
if (aliasClusterStateUpdateResponse.isAcknowledged()) {
clusterService.submitStateUpdateTask("update_rollover_info", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
RolloverInfo rolloverInfo = new RolloverInfo(rolloverRequest.getAlias(), metConditions,
threadPool.absoluteTimeInMillis());
return ClusterState.builder(currentState)
.metaData(MetaData.builder(currentState.metaData())
.put(IndexMetaData.builder(currentState.metaData().index(sourceIndexName))
.putRolloverInfo(rolloverInfo))).build();
}

@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
activeShardsObserver.waitForActiveShards(new String[]{rolloverIndexName},
rolloverRequest.getCreateIndexRequest().waitForActiveShards(),
rolloverRequest.masterNodeTimeout(),
isShardsAcknowledged -> listener.onResponse(new RolloverResponse(
sourceIndexName, rolloverIndexName, conditionResults, false, true, true,
isShardsAcknowledged)),
listener::onFailure);
}
});
} else {
listener.onResponse(new RolloverResponse(sourceIndexName, rolloverIndexName, conditionResults,
false, true, false, false));
}
}, listener::onFailure));
}, listener::onFailure));
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (newState.equals(oldState) == false) {
activeShardsObserver.waitForActiveShards(new String[]{rolloverIndexName},
rolloverRequest.getCreateIndexRequest().waitForActiveShards(),
rolloverRequest.masterNodeTimeout(),
isShardsAcknowledged -> listener.onResponse(new RolloverResponse(
sourceIndexName, rolloverIndexName, conditionResults, false, true, true,
isShardsAcknowledged)),
listener::onFailure);
}
}
});
} else {
// conditions not met
listener.onResponse(
Expand All @@ -216,29 +203,24 @@ public void onFailure(Exception e) {
);
}

static IndicesAliasesClusterStateUpdateRequest prepareRolloverAliasesUpdateRequest(String oldIndex, String newIndex,
RolloverRequest request) {
List<AliasAction> actions = unmodifiableList(Arrays.asList(
new AliasAction.Add(newIndex, request.getAlias(), null, null, null, null),
new AliasAction.Remove(oldIndex, request.getAlias())));
final IndicesAliasesClusterStateUpdateRequest updateRequest = new IndicesAliasesClusterStateUpdateRequest(actions)
.ackTimeout(request.ackTimeout())
.masterNodeTimeout(request.masterNodeTimeout());
return updateRequest;
}

static IndicesAliasesClusterStateUpdateRequest prepareRolloverAliasesWriteIndexUpdateRequest(String oldIndex, String newIndex,
RolloverRequest request) {
List<AliasAction> actions = unmodifiableList(Arrays.asList(
new AliasAction.Add(newIndex, request.getAlias(), null, null, null, true),
new AliasAction.Add(oldIndex, request.getAlias(), null, null, null, false)));
final IndicesAliasesClusterStateUpdateRequest updateRequest = new IndicesAliasesClusterStateUpdateRequest(actions)
.ackTimeout(request.ackTimeout())
.masterNodeTimeout(request.masterNodeTimeout());
return updateRequest;
/**
* Creates the alias actions to reflect the alias rollover from the old (source) index to the new (target/rolled over) index. An
* alias pointing to multiple indices will have to be an explicit write index (ie. the old index alias has is_write_index set to true)
* in which case, after the rollover, the new index will need to be the explicit write index.
*/
static List<AliasAction> rolloverAliasToNewIndex(String oldIndex, String newIndex, RolloverRequest request,
boolean explicitWriteIndex) {
if (explicitWriteIndex) {
return unmodifiableList(Arrays.asList(
new AliasAction.Add(newIndex, request.getAlias(), null, null, null, true),
new AliasAction.Add(oldIndex, request.getAlias(), null, null, null, false)));
} else {
return unmodifiableList(Arrays.asList(
new AliasAction.Add(newIndex, request.getAlias(), null, null, null, null),
new AliasAction.Remove(oldIndex, request.getAlias())));
}
}


static String generateRolloverIndexName(String sourceIndexName, IndexNameExpressionResolver indexNameExpressionResolver) {
String resolvedName = indexNameExpressionResolver.resolveDateMathExpression(sourceIndexName);
final boolean isDateMath = sourceIndexName.equals(resolvedName) == false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public void onFailure(String source, Exception e) {
* Handles the cluster state transition to a version that reflects the {@link CreateIndexClusterStateUpdateRequest}.
* All the requested changes are firstly validated before mutating the {@link ClusterState}.
*/
ClusterState applyCreateIndexRequest(ClusterState currentState, CreateIndexClusterStateUpdateRequest request) throws Exception {
public ClusterState applyCreateIndexRequest(ClusterState currentState, CreateIndexClusterStateUpdateRequest request) throws Exception {
logger.trace("executing IndexCreationTask for [{}] against cluster state version [{}]", request, currentState.version());
Index createdIndex = null;
String removalExtraInfo = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,15 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {

@Override
public ClusterState execute(ClusterState currentState) {
return innerExecute(currentState, request.actions());
return applyAliasActions(currentState, request.actions());
}
});
}

ClusterState innerExecute(ClusterState currentState, Iterable<AliasAction> actions) {
/**
* Handles the cluster state transition to a version that reflects the provided {@link AliasAction}s.
*/
public ClusterState applyAliasActions(ClusterState currentState, Iterable<AliasAction> actions) {
List<Index> indicesToClose = new ArrayList<>();
Map<String, IndexService> indices = new HashMap<>();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
Expand Down Expand Up @@ -219,15 +218,13 @@ public void testEvaluateWithoutMetaData() {
results2.forEach((k, v) -> assertFalse(v));
}

public void testCreateUpdateAliasRequest() {
public void testRolloverAliasActions() {
String sourceAlias = randomAlphaOfLength(10);
String sourceIndex = randomAlphaOfLength(10);
String targetIndex = randomAlphaOfLength(10);
final RolloverRequest rolloverRequest = new RolloverRequest(sourceAlias, targetIndex);
final IndicesAliasesClusterStateUpdateRequest updateRequest =
TransportRolloverAction.prepareRolloverAliasesUpdateRequest(sourceIndex, targetIndex, rolloverRequest);

List<AliasAction> actions = updateRequest.actions();
List<AliasAction> actions = TransportRolloverAction.rolloverAliasToNewIndex(sourceIndex, targetIndex, rolloverRequest, false);
assertThat(actions, hasSize(2));
boolean foundAdd = false;
boolean foundRemove = false;
Expand All @@ -246,15 +243,13 @@ public void testCreateUpdateAliasRequest() {
assertTrue(foundRemove);
}

public void testCreateUpdateAliasRequestWithExplicitWriteIndex() {
public void testRolloverAliasActionsWithExplicitWriteIndex() {
String sourceAlias = randomAlphaOfLength(10);
String sourceIndex = randomAlphaOfLength(10);
String targetIndex = randomAlphaOfLength(10);
final RolloverRequest rolloverRequest = new RolloverRequest(sourceAlias, targetIndex);
final IndicesAliasesClusterStateUpdateRequest updateRequest =
TransportRolloverAction.prepareRolloverAliasesWriteIndexUpdateRequest(sourceIndex, targetIndex, rolloverRequest);
List<AliasAction> actions = TransportRolloverAction.rolloverAliasToNewIndex(sourceIndex, targetIndex, rolloverRequest, true);

List<AliasAction> actions = updateRequest.actions();
assertThat(actions, hasSize(2));
boolean foundAddWrite = false;
boolean foundRemoveWrite = false;
Expand Down
Loading

0 comments on commit b6aa7fd

Please sign in to comment.