Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Enable merge on refresh and merge on commit on Opensearch #2600

Merged
merged 1 commit into from
Mar 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.FINAL_PIPELINE,
MetadataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING,
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING,
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED,
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Expand Down
50 changes: 50 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,27 @@ public final class IndexSettings {
Setting.Property.IndexScope
);

/**
* Expert: sets the amount of time to wait for merges (during {@link org.apache.lucene.index.IndexWriter#commit}
* or {@link org.apache.lucene.index.IndexWriter#getReader(boolean, boolean)}) returned by MergePolicy.findFullFlushMerges(...).
* If this time is reached, we proceed with the commit based on segments merged up to that point. The merges are not
* aborted, and will still run to completion independent of the commit or getReader call, like natural segment merges.
*/
public static final Setting<TimeValue> INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME = Setting.timeSetting(
"index.merge_on_flush.max_full_flush_merge_wait_time",
new TimeValue(10, TimeUnit.SECONDS),
new TimeValue(0, TimeUnit.MILLISECONDS),
Property.Dynamic,
Property.IndexScope
);

public static final Setting<Boolean> INDEX_MERGE_ON_FLUSH_ENABLED = Setting.boolSetting(
"index.merge_on_flush.enabled",
false,
Property.IndexScope,
Property.Dynamic
);

private final Index index;
private final Version version;
private final Logger logger;
Expand Down Expand Up @@ -584,6 +605,15 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
*/
private volatile int maxRegexLength;

/**
* The max amount of time to wait for merges
*/
private volatile TimeValue maxFullFlushMergeWaitTime;
/**
* Is merge of flush enabled or not
*/
private volatile boolean mergeOnFlushEnabled;

/**
* Returns the default search fields for this index.
*/
Expand Down Expand Up @@ -696,6 +726,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
mappingTotalFieldsLimit = scopedSettings.get(INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING);
mappingDepthLimit = scopedSettings.get(INDEX_MAPPING_DEPTH_LIMIT_SETTING);
mappingFieldNameLengthLimit = scopedSettings.get(INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING);
maxFullFlushMergeWaitTime = scopedSettings.get(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME);
mergeOnFlushEnabled = scopedSettings.get(INDEX_MERGE_ON_FLUSH_ENABLED);

scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
scopedSettings.addSettingsUpdateConsumer(
Expand Down Expand Up @@ -765,6 +797,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING, this::setMappingTotalFieldsLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_DEPTH_LIMIT_SETTING, this::setMappingDepthLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAPPING_FIELD_NAME_LENGTH_LIMIT_SETTING, this::setMappingFieldNameLengthLimit);
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME, this::setMaxFullFlushMergeWaitTime);
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_ENABLED, this::setMergeOnFlushEnabled);
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
Expand Down Expand Up @@ -1328,4 +1362,20 @@ public long getMappingFieldNameLengthLimit() {
private void setMappingFieldNameLengthLimit(long value) {
this.mappingFieldNameLengthLimit = value;
}

private void setMaxFullFlushMergeWaitTime(TimeValue timeValue) {
this.maxFullFlushMergeWaitTime = timeValue;
}

private void setMergeOnFlushEnabled(boolean enabled) {
this.mergeOnFlushEnabled = enabled;
}

public TimeValue getMaxFullFlushMergeWaitTime() {
return this.maxFullFlushMergeWaitTime;
}

public boolean isMergeOnFlushEnabled() {
return mergeOnFlushEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.lucene.index.ShuffleForcedMergePolicy;
import org.apache.lucene.index.SoftDeletesRetentionMergePolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.sandbox.index.MergeOnFlushMergePolicy;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.DocIdSetIterator;
Expand Down Expand Up @@ -2425,6 +2426,21 @@ private IndexWriterConfig getIndexWriterConfig() {
// to enable it.
mergePolicy = new ShuffleForcedMergePolicy(mergePolicy);
}

if (config().getIndexSettings().isMergeOnFlushEnabled()) {
final long maxFullFlushMergeWaitMillis = config().getIndexSettings().getMaxFullFlushMergeWaitTime().millis();
if (maxFullFlushMergeWaitMillis > 0) {
iwc.setMaxFullFlushMergeWaitMillis(maxFullFlushMergeWaitMillis);
mergePolicy = new MergeOnFlushMergePolicy(mergePolicy);
} else {
logger.warn(
"The {} is enabled but {} is set to 0, merge on flush will not be activated",
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(),
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey()
);
}
}

iwc.setMergePolicy(new OpenSearchMergePolicy(mergePolicy));
iwc.setSimilarity(engineConfig.getSimilarity());
iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,212 @@ public void testSegments() throws Exception {
}
}

public void testMergeSegmentsOnCommitIsDisabled() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

final Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(0))
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true);
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);

try (
Store store = createStore();
InternalEngine engine = createEngine(
config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get)
)
) {
assertThat(engine.segments(false), empty());
int numDocsFirstSegment = randomIntBetween(5, 50);
Set<String> liveDocsFirstSegment = new HashSet<>();
for (int i = 0; i < numDocsFirstSegment; i++) {
String id = Integer.toString(i);
ParsedDocument doc = testParsedDocument(id, null, testDocument(), B_1, null);
engine.index(indexForDoc(doc));
liveDocsFirstSegment.add(id);
}
engine.refresh("test");
List<Segment> segments = engine.segments(randomBoolean());
assertThat(segments, hasSize(1));
assertThat(segments.get(0).getNumDocs(), equalTo(liveDocsFirstSegment.size()));
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
assertFalse(segments.get(0).committed);
int deletes = 0;
int updates = 0;
int appends = 0;
int iterations = scaledRandomIntBetween(1, 50);
for (int i = 0; i < iterations && liveDocsFirstSegment.isEmpty() == false; i++) {
String idToUpdate = randomFrom(liveDocsFirstSegment);
liveDocsFirstSegment.remove(idToUpdate);
ParsedDocument doc = testParsedDocument(idToUpdate, null, testDocument(), B_1, null);
if (randomBoolean()) {
engine.delete(new Engine.Delete(doc.id(), newUid(doc), primaryTerm.get()));
deletes++;
} else {
engine.index(indexForDoc(doc));
updates++;
}
if (randomBoolean()) {
engine.index(indexForDoc(testParsedDocument(UUIDs.randomBase64UUID(), null, testDocument(), B_1, null)));
appends++;
}
}

boolean committed = randomBoolean();
if (committed) {
engine.flush();
}

engine.refresh("test");
segments = engine.segments(randomBoolean());

assertThat(segments, hasSize(2));
assertThat(segments, hasSize(2));
assertThat(segments.get(0).getNumDocs(), equalTo(liveDocsFirstSegment.size()));
assertThat(segments.get(0).getDeletedDocs(), equalTo(updates + deletes));
assertThat(segments.get(0).committed, equalTo(committed));

assertThat(segments.get(1).getNumDocs(), equalTo(updates + appends));
assertThat(segments.get(1).getDeletedDocs(), equalTo(deletes)); // delete tombstones
assertThat(segments.get(1).committed, equalTo(committed));
}
}

public void testMergeSegmentsOnCommit() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

final Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(5000))
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true);
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);

try (
Store store = createStore();
InternalEngine engine = createEngine(
config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get)
)
) {
assertThat(engine.segments(false), empty());
int numDocsFirstSegment = randomIntBetween(5, 50);
Set<String> liveDocsFirstSegment = new HashSet<>();
for (int i = 0; i < numDocsFirstSegment; i++) {
String id = Integer.toString(i);
ParsedDocument doc = testParsedDocument(id, null, testDocument(), B_1, null);
engine.index(indexForDoc(doc));
liveDocsFirstSegment.add(id);
}
engine.refresh("test");
List<Segment> segments = engine.segments(randomBoolean());
assertThat(segments, hasSize(1));
assertThat(segments.get(0).getNumDocs(), equalTo(liveDocsFirstSegment.size()));
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
assertFalse(segments.get(0).committed);
int deletes = 0;
int updates = 0;
int appends = 0;
int iterations = scaledRandomIntBetween(1, 50);
for (int i = 0; i < iterations && liveDocsFirstSegment.isEmpty() == false; i++) {
String idToUpdate = randomFrom(liveDocsFirstSegment);
liveDocsFirstSegment.remove(idToUpdate);
ParsedDocument doc = testParsedDocument(idToUpdate, null, testDocument(), B_1, null);
if (randomBoolean()) {
engine.delete(new Engine.Delete(doc.id(), newUid(doc), primaryTerm.get()));
deletes++;
} else {
engine.index(indexForDoc(doc));
updates++;
}
if (randomBoolean()) {
engine.index(indexForDoc(testParsedDocument(UUIDs.randomBase64UUID(), null, testDocument(), B_1, null)));
appends++;
}
}

boolean committed = randomBoolean();
if (committed) {
engine.flush();
}

engine.refresh("test");
segments = engine.segments(randomBoolean());

// All segments have to be merged into one
assertThat(segments, hasSize(1));
assertThat(segments.get(0).getNumDocs(), equalTo(numDocsFirstSegment + appends - deletes));
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
assertThat(segments.get(0).committed, equalTo(committed));
}
}

// this test writes documents to the engine while concurrently flushing/commit
public void testConcurrentMergeSegmentsOnCommit() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);

final Settings.Builder settings = Settings.builder()
.put(defaultSettings.getSettings())
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME.getKey(), TimeValue.timeValueMillis(5000))
.put(IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED.getKey(), true);
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);

try (
Store store = createStore();
InternalEngine engine = createEngine(
config(indexSettings, store, createTempDir(), NoMergePolicy.INSTANCE, null, null, globalCheckpoint::get)
)
) {
final int numIndexingThreads = scaledRandomIntBetween(3, 8);
final int numDocsPerThread = randomIntBetween(500, 1000);
final CyclicBarrier barrier = new CyclicBarrier(numIndexingThreads + 1);
final List<Thread> indexingThreads = new ArrayList<>();
final CountDownLatch doneLatch = new CountDownLatch(numIndexingThreads);
// create N indexing threads to index documents simultaneously
for (int threadNum = 0; threadNum < numIndexingThreads; threadNum++) {
final int threadIdx = threadNum;
Thread indexingThread = new Thread(() -> {
try {
barrier.await(); // wait for all threads to start at the same time
// index random number of docs
for (int i = 0; i < numDocsPerThread; i++) {
final String id = "thread" + threadIdx + "#" + i;
ParsedDocument doc = testParsedDocument(id, null, testDocument(), B_1, null);
engine.index(indexForDoc(doc));
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
doneLatch.countDown();
}

});
indexingThreads.add(indexingThread);
}

// start the indexing threads
for (Thread thread : indexingThreads) {
thread.start();
}
barrier.await(); // wait for indexing threads to all be ready to start
assertThat(doneLatch.await(10, TimeUnit.SECONDS), is(true));

boolean committed = randomBoolean();
if (committed) {
engine.flush();
}

engine.refresh("test");
List<Segment> segments = engine.segments(randomBoolean());

// All segments have to be merged into one
assertThat(segments, hasSize(1));
assertThat(segments.get(0).getNumDocs(), equalTo(numIndexingThreads * numDocsPerThread));
assertThat(segments.get(0).committed, equalTo(committed));
}
}

public void testCommitStats() throws IOException {
final AtomicLong maxSeqNo = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final AtomicLong localCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
Expand Down