Skip to content

Commit

Permalink
Add a java level freeze/unfreeze API (#35353)
Browse files Browse the repository at this point in the history
This change adds a high level freeze API that allows to mark an
index as frozen and vice versa. Indices must be closed in order to
become frozen and an open but frozen index must be closed to be
defrosted. This change also adds a index.frozen setting to
mark frozen indices and integrates the frozen engine with the
SearchOperationListener that resets and releases the directory
reader after and before search phases.

Relates to #34352
Depends on #34357
  • Loading branch information
s1monw committed Nov 10, 2018
1 parent 9bdea09 commit 5718b14
Show file tree
Hide file tree
Showing 5 changed files with 535 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@
import org.apache.lucene.util.Bits;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.transport.TransportRequest;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -59,6 +63,8 @@
* stats in order to obtain the number of reopens.
*/
public final class FrozenEngine extends ReadOnlyEngine {
public static final Setting<Boolean> INDEX_FROZEN = Setting.boolSetting("index.frozen", false, Setting.Property.IndexScope,
Setting.Property.PrivateIndex);
private volatile DirectoryReader lastOpenedReader;

public FrozenEngine(EngineConfig config) {
Expand Down Expand Up @@ -232,6 +238,49 @@ static LazyDirectoryReader unwrapLazyReader(DirectoryReader reader) {
return null;
}

/*
* We register this listener for a frozen index that will
* 1. reset the reader every time the search context is validated which happens when the context is looked up ie. on a fetch phase
* etc.
* 2. register a releasable resource that is cleaned after each phase that releases the reader for this searcher
*/
public static class ReacquireEngineSearcherListener implements SearchOperationListener {

@Override
public void validateSearchContext(SearchContext context, TransportRequest transportRequest) {
Searcher engineSearcher = context.searcher().getEngineSearcher();
LazyDirectoryReader lazyDirectoryReader = unwrapLazyReader(engineSearcher.getDirectoryReader());
if (lazyDirectoryReader != null) {
try {
lazyDirectoryReader.reset();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
// also register a release resource in this case if we have multiple roundtrips like in DFS
registerRelease(context, lazyDirectoryReader);
}
}

private void registerRelease(SearchContext context, LazyDirectoryReader lazyDirectoryReader) {
context.addReleasable(() -> {
try {
lazyDirectoryReader.release();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}, SearchContext.Lifetime.PHASE);
}

@Override
public void onNewContext(SearchContext context) {
Searcher engineSearcher = context.searcher().getEngineSearcher();
LazyDirectoryReader lazyDirectoryReader = unwrapLazyReader(engineSearcher.getDirectoryReader());
if (lazyDirectoryReader != null) {
registerRelease(context, lazyDirectoryReader);
}
}
}

/**
* This class allows us to use the same high level reader across multiple search phases but replace the underpinnings
* on/after each search phase. This is really important otherwise we would hold on to multiple readers across phases.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
package org.elasticsearch.xpack.core;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.license.LicensingClient;
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
import org.elasticsearch.protocol.xpack.XPackInfoResponse;
import org.elasticsearch.xpack.core.action.TransportFreezeIndexAction;
import org.elasticsearch.xpack.core.action.XPackInfoAction;
import org.elasticsearch.xpack.core.action.XPackInfoRequestBuilder;
import org.elasticsearch.xpack.core.ccr.client.CcrClient;
Expand Down Expand Up @@ -103,4 +105,8 @@ public XPackInfoRequestBuilder prepareInfo() {
public void info(XPackInfoRequest request, ActionListener<XPackInfoResponse> listener) {
client.execute(XPackInfoAction.INSTANCE, request, listener);
}

public void freeze(TransportFreezeIndexAction.FreezeRequest request, ActionListener<AcknowledgedResponse> listener) {
client.execute(TransportFreezeIndexAction.FreezeIndexAction.INSTANCE, request, listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.engine.FrozenEngine;
import org.elasticsearch.license.LicenseService;
import org.elasticsearch.license.LicensesMetaData;
import org.elasticsearch.license.Licensing;
Expand All @@ -55,6 +57,7 @@
import org.elasticsearch.snapshots.SourceOnlySnapshotRepository;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.action.TransportFreezeIndexAction;
import org.elasticsearch.xpack.core.action.TransportXPackInfoAction;
import org.elasticsearch.xpack.core.action.TransportXPackUsageAction;
import org.elasticsearch.xpack.core.action.XPackInfoAction;
Expand Down Expand Up @@ -266,6 +269,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> actions = new ArrayList<>();
actions.add(new ActionHandler<>(XPackInfoAction.INSTANCE, TransportXPackInfoAction.class));
actions.add(new ActionHandler<>(XPackUsageAction.INSTANCE, TransportXPackUsageAction.class));
actions.add(new ActionHandler<>(TransportFreezeIndexAction.FreezeIndexAction.INSTANCE,
TransportFreezeIndexAction.class));
actions.addAll(licensing.getActions());
return actions;
}
Expand Down Expand Up @@ -359,14 +364,26 @@ public Map<String, Repository.Factory> getRepositories(Environment env, NamedXCo
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
if (indexSettings.getValue(SourceOnlySnapshotRepository.SOURCE_ONLY)) {
return Optional.of(SourceOnlySnapshotRepository.getEngineFactory());
} else if (indexSettings.getValue(FrozenEngine.INDEX_FROZEN)) {
return Optional.of(FrozenEngine::new);
}

return Optional.empty();
}

@Override
public List<Setting<?>> getSettings() {
List<Setting<?>> settings = super.getSettings();
settings.add(SourceOnlySnapshotRepository.SOURCE_ONLY);
settings.add(FrozenEngine.INDEX_FROZEN);
return settings;
}

@Override
public void onIndexModule(IndexModule indexModule) {
if (FrozenEngine.INDEX_FROZEN.get(indexModule.getSettings())) {
indexModule.addSearchOperationListener(new FrozenEngine.ReacquireEngineSearcherListener());
}
super.onIndexModule(indexModule);
}
}
Loading

0 comments on commit 5718b14

Please sign in to comment.