Skip to content

Commit

Permalink
Add a frozen engine implementation (#34357)
Browse files Browse the repository at this point in the history
This change adds a `frozen` engine that allows lazily open a directory reader
on a read-only shard. The engine wraps general purpose searchers in a LazyDirectoryReader
that also allows to release and reset the underlying index readers after any and before
secondary search phases.

Relates to #34352
  • Loading branch information
s1monw committed Nov 10, 2018
1 parent b417c7a commit 9bdea09
Show file tree
Hide file tree
Showing 11 changed files with 941 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -277,5 +277,4 @@ public static void fsync(final Path fileToSync, final boolean isDir) throws IOEx
throw ioe;
}
}

}
103 changes: 102 additions & 1 deletion server/src/main/java/org/elasticsearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@
import org.apache.lucene.document.LatLonDocValuesField;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.Fields;
import org.apache.lucene.index.FilterCodecReader;
import org.apache.lucene.index.FilterDirectoryReader;
import org.apache.lucene.index.FilterLeafReader;
Expand All @@ -40,13 +44,20 @@
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafMetaData;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.index.StoredFieldVisitor;
import org.apache.lucene.index.Terms;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Explanation;
import org.apache.lucene.search.FieldDoc;
Expand All @@ -55,6 +66,7 @@
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.ScorerSupplier;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.search.SortedSetSortField;
Expand Down Expand Up @@ -204,7 +216,7 @@ public static SegmentInfos pruneUnreferencedFiles(String segmentsFileName, Direc
throw new IllegalStateException("no commit found in the directory");
}
}
final CommitPoint cp = new CommitPoint(si, directory);
final IndexCommit cp = getIndexCommit(si, directory);
try (IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)
.setSoftDeletesField(Lucene.SOFT_DELETES_FIELD)
.setIndexCommit(cp)
Expand All @@ -216,6 +228,13 @@ public static SegmentInfos pruneUnreferencedFiles(String segmentsFileName, Direc
return si;
}

/**
* Returns an index commit for the given {@link SegmentInfos} in the given directory.
*/
public static IndexCommit getIndexCommit(SegmentInfos si, Directory directory) throws IOException {
return new CommitPoint(si, directory);
}

/**
* This method removes all lucene files from the given directory. It will first try to delete all commit points / segments
* files to ensure broken commits or corrupted indices will not be opened in the future. If any of the segment files can't be deleted
Expand Down Expand Up @@ -923,6 +942,88 @@ public static NumericDocValuesField newSoftDeletesField() {
return new NumericDocValuesField(SOFT_DELETES_FIELD, 1);
}

/**
* Returns an empty leaf reader with the given max docs. The reader will be fully deleted.
*/
public static LeafReader emptyReader(final int maxDoc) {
return new LeafReader() {
final Bits liveDocs = new Bits.MatchNoBits(maxDoc);

public Terms terms(String field) {
return null;
}

public NumericDocValues getNumericDocValues(String field) {
return null;
}

public BinaryDocValues getBinaryDocValues(String field) {
return null;
}

public SortedDocValues getSortedDocValues(String field) {
return null;
}

public SortedNumericDocValues getSortedNumericDocValues(String field) {
return null;
}

public SortedSetDocValues getSortedSetDocValues(String field) {
return null;
}

public NumericDocValues getNormValues(String field) {
return null;
}

public FieldInfos getFieldInfos() {
return new FieldInfos(new FieldInfo[0]);
}

public Bits getLiveDocs() {
return this.liveDocs;
}

public PointValues getPointValues(String fieldName) {
return null;
}

public void checkIntegrity() {
}

public Fields getTermVectors(int docID) {
return null;
}

public int numDocs() {
return 0;
}

public int maxDoc() {
return maxDoc;
}

public void document(int docID, StoredFieldVisitor visitor) {
}

protected void doClose() {
}

public LeafMetaData getMetaData() {
return new LeafMetaData(Version.LATEST.major, Version.LATEST, (Sort)null);
}

public CacheHelper getCoreCacheHelper() {
return null;
}

public CacheHelper getReaderCacheHelper() {
return null;
}
};
}

/**
* Scans sequence numbers (i.e., {@link SeqNoFieldMapper#NAME}) between {@code fromSeqNo}(inclusive) and {@code toSeqNo}(inclusive)
* in the provided directory reader. This method invokes the callback {@code onNewSeqNo} whenever a sequence number value is found.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ public final CommitStats commitStats() {
/**
* Global stats on segments.
*/
public final SegmentsStats segmentsStats(boolean includeSegmentFileSizes) {
public SegmentsStats segmentsStats(boolean includeSegmentFileSizes) {
ensureOpen();
Set<String> segmentName = new HashSet<>();
SegmentsStats stats = new SegmentsStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ final class RamAccountingSearcherFactory extends SearcherFactory {

@Override
public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader) throws IOException {
processReaders(reader, previousReader);
return super.newSearcher(reader, previousReader);
}

public void processReaders(IndexReader reader, IndexReader previousReader) {
final CircuitBreaker breaker = breakerService.getBreaker(CircuitBreaker.ACCOUNTING);

// Construct a list of the previous segment readers, we only want to track memory used
Expand Down Expand Up @@ -79,6 +84,5 @@ public IndexSearcher newSearcher(IndexReader reader, IndexReader previousReader)
segmentReader.getCoreCacheHelper().addClosedListener(k -> breaker.addWithoutBreaking(-ramBytesUsed));
}
}
return super.newSearcher(reader, previousReader);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
*
* @see #ReadOnlyEngine(EngineConfig, SeqNoStats, TranslogStats, boolean, Function)
*/
public final class ReadOnlyEngine extends Engine {
public class ReadOnlyEngine extends Engine {

private final SegmentInfos lastCommittedSegmentInfos;
private final SeqNoStats seqNoStats;
Expand All @@ -66,6 +66,7 @@ public final class ReadOnlyEngine extends Engine {
private final IndexCommit indexCommit;
private final Lock indexWriterLock;
private final DocsStats docsStats;
protected final RamAccountingSearcherFactory searcherFactory;

/**
* Creates a new ReadOnlyEngine. This ctor can also be used to open a read-only engine on top of an already opened
Expand All @@ -82,6 +83,7 @@ public final class ReadOnlyEngine extends Engine {
public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats translogStats, boolean obtainLock,
Function<DirectoryReader, DirectoryReader> readerWrapperFunction) {
super(config);
this.searcherFactory = new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService());
try {
Store store = config.getStore();
store.incRef();
Expand All @@ -96,14 +98,10 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
this.lastCommittedSegmentInfos = Lucene.readSegmentInfos(directory);
this.translogStats = translogStats == null ? new TranslogStats(0, 0, 0, 0, 0) : translogStats;
this.seqNoStats = seqNoStats == null ? buildSeqNoStats(lastCommittedSegmentInfos) : seqNoStats;
reader = ElasticsearchDirectoryReader.wrap(open(directory), config.getShardId());
if (config.getIndexSettings().isSoftDeleteEnabled()) {
reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);
}
reader = readerWrapperFunction.apply(reader);
this.indexCommit = reader.getIndexCommit();
this.searcherManager = new SearcherManager(reader,
new RamAccountingSearcherFactory(engineConfig.getCircuitBreakerService()));
this.indexCommit = Lucene.getIndexCommit(lastCommittedSegmentInfos, directory);
reader = open(indexCommit);
reader = wrapReader(reader, readerWrapperFunction);
searcherManager = new SearcherManager(reader, searcherFactory);
this.docsStats = docsStats(lastCommittedSegmentInfos);
this.indexWriterLock = indexWriterLock;
success = true;
Expand All @@ -117,8 +115,17 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
}
}

protected DirectoryReader open(final Directory directory) throws IOException {
return DirectoryReader.open(directory);
protected final DirectoryReader wrapReader(DirectoryReader reader,
Function<DirectoryReader, DirectoryReader> readerWrapperFunction) throws IOException {
reader = ElasticsearchDirectoryReader.wrap(reader, engineConfig.getShardId());
if (engineConfig.getIndexSettings().isSoftDeleteEnabled()) {
reader = new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETES_FIELD);
}
return readerWrapperFunction.apply(reader);
}

protected DirectoryReader open(IndexCommit commit) throws IOException {
return DirectoryReader.open(commit);
}

private DocsStats docsStats(final SegmentInfos lastCommittedSegmentInfos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ final class DefaultSearchContext extends SearchContext {
private final Map<String, SearchExtBuilder> searchExtBuilders = new HashMap<>();
private final Map<Class<?>, Collector> queryCollectors = new HashMap<>();
private final QueryShardContext queryShardContext;
private FetchPhase fetchPhase;
private final FetchPhase fetchPhase;

DefaultSearchContext(long id, ShardSearchRequest request, SearchShardTarget shardTarget,
Engine.Searcher engineSearcher, ClusterService clusterService, IndexService indexService,
Expand Down Expand Up @@ -186,7 +186,7 @@ final class DefaultSearchContext extends SearchContext {

@Override
public void doClose() {
// clear and scope phase we have
// clear and scope phase we have
Releasables.close(searcher, engineSearcher);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,8 @@ public CollectionStatistics collectionStatistics(String field) throws IOExceptio
public DirectoryReader getDirectoryReader() {
return engineSearcher.getDirectoryReader();
}

public Engine.Searcher getEngineSearcher() {
return engineSearcher;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5168,15 +5168,15 @@ public void testLuceneSnapshotRefreshesOnlyOnce() throws Exception {
null,
new ReferenceManager.RefreshListener() {
@Override
public void beforeRefresh() throws IOException {
public void beforeRefresh() {
refreshCounter.incrementAndGet();
}

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
public void afterRefresh(boolean didRefresh) {

}
}, null, () -> SequenceNumbers.NO_OPS_PERFORMED))) {
}, null, () -> SequenceNumbers.NO_OPS_PERFORMED, new NoneCircuitBreakerService()))) {
for (long seqNo = 0; seqNo <= maxSeqNo; seqNo++) {
final ParsedDocument doc = testParsedDocument("id_" + seqNo, null, testDocumentWithTextField("test"),
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -573,13 +574,14 @@ public EngineConfig config(IndexSettings indexSettings, Store store, Path transl

public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
ReferenceManager.RefreshListener refreshListener, Sort indexSort, LongSupplier globalCheckpointSupplier) {
return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null, indexSort, globalCheckpointSupplier);
return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null, indexSort, globalCheckpointSupplier,
new NoneCircuitBreakerService());
}

public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
ReferenceManager.RefreshListener externalRefreshListener,
ReferenceManager.RefreshListener internalRefreshListener,
Sort indexSort, LongSupplier globalCheckpointSupplier) {
Sort indexSort, LongSupplier globalCheckpointSupplier, CircuitBreakerService breakerService) {
IndexWriterConfig iwc = newIndexWriterConfig();
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
Engine.EventListener listener = new Engine.EventListener() {
Expand All @@ -596,7 +598,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) {
mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), extRefreshListenerList, intRefreshListenerList, indexSort,
new NoneCircuitBreakerService(),
breakerService,
globalCheckpointSupplier == null ?
new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}) :
globalCheckpointSupplier, primaryTerm::get, tombstoneDocSupplier());
Expand Down
Loading

0 comments on commit 9bdea09

Please sign in to comment.