Skip to content

Commit

Permalink
Handle snapshot lifecycle policy updates and deletions
Browse files Browse the repository at this point in the history
This adds logic to `SnapshotLifecycleService` to handle updates and deletes for
snapshot policies. Policies with incremented versions have the old policy
cancelled and the new one scheduled. Deleted policies have their schedules
cancelled when they are no longer present in the cluster state metadata.

Relates to elastic#38461
  • Loading branch information
dakrone committed Mar 14, 2019
1 parent 25ebdf3 commit a7a5366
Show file tree
Hide file tree
Showing 5 changed files with 313 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.elasticsearch.xpack.indexlifecycle.action.TransportStartILMAction;
import org.elasticsearch.xpack.indexlifecycle.action.TransportStopILMAction;
import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleService;
import org.elasticsearch.xpack.snapshotlifecycle.SnapshotLifecycleTask;
import org.elasticsearch.xpack.snapshotlifecycle.action.DeleteSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.GetSnapshotLifecycleAction;
import org.elasticsearch.xpack.snapshotlifecycle.action.PutSnapshotLifecycleAction;
Expand Down Expand Up @@ -151,7 +152,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
}
indexLifecycleInitialisationService.set(new IndexLifecycleService(settings, client, clusterService, threadPool,
getClock(), System::currentTimeMillis, xContentRegistry));
snapshotLifecycleService.set(new SnapshotLifecycleService(settings, client, clusterService, getClock()));
snapshotLifecycleService.set(new SnapshotLifecycleService(settings,
() -> new SnapshotLifecycleTask(client), clusterService, getClock()));
return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;

Expand All @@ -35,7 +36,7 @@ public class SnapshotLifecycleMetadata implements XPackMetaDataCustom {
private final Map<String, SnapshotLifecyclePolicyMetadata> snapshotConfigurations;

public SnapshotLifecycleMetadata(Map<String, SnapshotLifecyclePolicyMetadata> snapshotConfigurations) {
this.snapshotConfigurations = Collections.unmodifiableMap(snapshotConfigurations);
this.snapshotConfigurations = new HashMap<>(snapshotConfigurations);
// TODO: maybe operation mode here so it can be disabled/re-enabled separately like ILM is
}

Expand All @@ -44,7 +45,7 @@ public SnapshotLifecycleMetadata(StreamInput in) throws IOException {
}

public Map<String, SnapshotLifecyclePolicyMetadata> getSnapshotConfigurations() {
return this.snapshotConfigurations;
return Collections.unmodifiableMap(this.snapshotConfigurations);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
Expand All @@ -23,6 +22,10 @@
import java.io.Closeable;
import java.time.Clock;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* {@code SnapshotLifecycleService} manages snapshot policy scheduling and triggering of the
Expand All @@ -32,28 +35,31 @@
public class SnapshotLifecycleService implements LocalNodeMasterListener, Closeable, ClusterStateListener {

private static final Logger logger = LogManager.getLogger(SnapshotLifecycleMetadata.class);
private static final String JOB_PATTERN_SUFFIX = "-\\d+$";

private final SchedulerEngine scheduler;
private final ClusterService clusterService;
private final SnapshotLifecycleTask snapshotTask;
private final Map<String, SchedulerEngine.Job> scheduledTasks = ConcurrentCollections.newConcurrentMap();
private volatile boolean isMaster = false;

public SnapshotLifecycleService(Settings settings, Client client, ClusterService clusterService,
public SnapshotLifecycleService(Settings settings,
Supplier<SnapshotLifecycleTask> taskSupplier,
ClusterService clusterService,
Clock clock) {
this.scheduler = new SchedulerEngine(settings, clock);
this.clusterService = clusterService;
this.snapshotTask = new SnapshotLifecycleTask(client);
this.snapshotTask = taskSupplier.get();
clusterService.addLocalNodeMasterListener(this); // TODO: change this not to use 'this'
clusterService.addListener(this);
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
public void clusterChanged(final ClusterChangedEvent event) {
if (this.isMaster) {
// TODO: handle modified policies (currently they are ignored)
// TODO: handle deleted policies
scheduleSnapshotJobs(event.state());
final ClusterState state = event.state();
scheduleSnapshotJobs(state);
cleanupDeletedPolicies(state);
}
}

Expand All @@ -71,6 +77,11 @@ public void offMaster() {
cancelSnapshotJobs();
}

// Only used for testing
SchedulerEngine getScheduler() {
return this.scheduler;
}

/**
* Schedule all non-scheduled snapshot jobs contained in the cluster state
*/
Expand All @@ -81,35 +92,85 @@ public void scheduleSnapshotJobs(final ClusterState state) {
}
}

public void cleanupDeletedPolicies(final ClusterState state) {
SnapshotLifecycleMetadata snapMeta = state.metaData().custom(SnapshotLifecycleMetadata.TYPE);
if (snapMeta != null) {
// Retrieve all of the expected policy job ids from the policies in the metadata
final Set<String> policyJobIds = snapMeta.getSnapshotConfigurations().values().stream()
.map(SnapshotLifecycleService::getJobId)
.collect(Collectors.toSet());

// Cancel all jobs that are *NOT* in the scheduled tasks map
scheduledTasks.keySet().stream()
.filter(jobId -> policyJobIds.contains(jobId) == false)
.forEach(this::cancelScheduledSnapshot);
}
}

/**
* Schedule the {@link SnapshotLifecyclePolicy} job if it does not already exist. If the job already
* exists it is not interfered with.
* Schedule the {@link SnapshotLifecyclePolicy} job if it does not already exist. First checks
* to see if any previous versions of the policy were scheduled, and if so, cancels those. If
* the same version of a policy has already been scheduled it does not overwrite the job.
*/
public void maybeScheduleSnapshot(final SnapshotLifecyclePolicyMetadata snapshotLifecyclePolicy) {
final String jobId = snapshotLifecyclePolicy.getPolicy().getId();
final String jobId = getJobId(snapshotLifecyclePolicy);
final Pattern existingJobPattern = Pattern.compile(snapshotLifecyclePolicy.getPolicy().getId() + JOB_PATTERN_SUFFIX);

// Find and cancel any existing jobs for this policy
final boolean existingJobsFoundAndCancelled = scheduledTasks.keySet().stream()
// Find all jobs matching the `jobid-\d+` pattern
.filter(jId -> existingJobPattern.matcher(jId).matches())
// Filter out a job that has not been changed (matches the id exactly meaning the version is the same)
.filter(jId -> jId.equals(jobId) == false)
.map(existingJobId -> {
// Cancel existing job so the new one can be scheduled
logger.debug("removing existing snapshot lifecycle job [{}] as it has been updated", existingJobId);
scheduledTasks.remove(existingJobId);
boolean existed = scheduler.remove(existingJobId);
assert existed : "expected job for " + existingJobId + " to exist in scheduler";
return existed;
})
.reduce(false, (a, b) -> a || b);

// Now atomically schedule the new job and add it to the scheduled tasks map. If the jobId
// is identical to an existing job (meaning the version has not changed) then this does
// not reschedule it.
scheduledTasks.computeIfAbsent(jobId, id -> {
final SchedulerEngine.Job job = new SchedulerEngine.Job(jobId,
new CronSchedule(snapshotLifecyclePolicy.getPolicy().getSchedule()));
logger.info("scheduling snapshot lifecycle job [{}]", jobId);
if (existingJobsFoundAndCancelled) {
logger.info("rescheduling updated snapshot lifecycle job [{}]", jobId);
} else {
logger.info("scheduling snapshot lifecycle job [{}]", jobId);
}
scheduler.add(job);
return job;
});
}

/**
* Generate the job id for a given policy metadata. The job id is {@code <policyid>-<version>}
*/
static String getJobId(SnapshotLifecyclePolicyMetadata policyMeta) {
return policyMeta.getPolicy().getId() + "-" + policyMeta.getVersion();
}

/**
* Cancel all scheduled snapshot jobs
*/
public void cancelSnapshotJobs() {
logger.trace("cancelling all snapshot lifecycle jobs");
scheduler.scheduledJobIds().forEach(scheduler::remove);
scheduledTasks.clear();
}

/**
* Cancel the given snapshot lifecycle id
* Cancel the given policy job id (from {@link #getJobId(SnapshotLifecyclePolicyMetadata)}
*/
public void cancelScheduledSnapshot(final String snapshotLifecycleId) {
scheduledTasks.remove(snapshotLifecycleId);
scheduler.remove(snapshotLifecycleId);
public void cancelScheduledSnapshot(final String lifecycleJobId) {
logger.debug("cancelling snapshot lifecycle job [{}] as it no longer exists", lifecycleJobId);
scheduledTasks.remove(lifecycleJobId);
scheduler.remove(lifecycleJobId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class SnapshotLifecycleTask implements SchedulerEngine.Listener {

private final Client client;

SnapshotLifecycleTask(final Client client) {
public SnapshotLifecycleTask(final Client client) {
this.client = client;
}

Expand Down
Loading

0 comments on commit a7a5366

Please sign in to comment.