Skip to content

Commit

Permalink
Change order of recovery events with segrep.
Browse files Browse the repository at this point in the history
  1. Ping the primary to fetch its TRANSLOG_UUID
  2. Creates an empty store and local xlog using the primary's uuid.
  3. Starts the engine
  4. Wipes the local index (files will be replaced during copy).
  5. On getCheckpointMetadata creates retention lease and initiates tracking, and refreshes the primary.
  6. Copies segments & before completing recovery, replays a LuceneChangesSnapshot on the replica from its in-memory infos.

Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Jun 30, 2022
1 parent 3681ac7 commit 4222bee
Show file tree
Hide file tree
Showing 17 changed files with 689 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import com.carrotsearch.randomizedtesting.RandomizedTest;
import org.junit.Assert;
import org.opensearch.action.ActionListener;
import org.opensearch.action.admin.indices.segments.IndexShardSegments;
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.opensearch.action.admin.indices.segments.ShardSegments;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexModule;
import org.opensearch.index.engine.Segment;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationIT extends OpenSearchIntegTestCase {

private static final String INDEX_NAME = "test-idx-1";
private static final int SHARD_COUNT = 1;
private static final int REPLICA_COUNT = 1;

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, SHARD_COUNT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, REPLICA_COUNT)
.put(IndexModule.INDEX_QUERY_CACHE_ENABLED_SETTING.getKey(), false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
}

@Override
protected boolean addMockInternalEngine() {
return false;
}

public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);
refresh(INDEX_NAME);

// wait a short amount of time to give replication a chance to complete.
Thread.sleep(1000);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int expectedHitCount = initialDocCount + additionalDocCount;
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);

flushAndRefresh(INDEX_NAME);
Thread.sleep(1000);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);

ensureGreen(INDEX_NAME);
assertSegmentStats(REPLICA_COUNT);
}
}

public void testDeleteOperations() {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();

createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get();

assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);

client().prepareIndex(INDEX_NAME)
.setId("2")
.setSource("fooo", "baar")
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.get();

assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);

// Now delete with blockUntilRefresh
client().prepareDelete(INDEX_NAME, "1").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get();
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);

client().prepareDelete(INDEX_NAME, "2").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get();
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 0);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 0);
}

public void testReplicationAfterForceMerge() throws Exception {
final String nodeA = internalCluster().startNode();
final String nodeB = internalCluster().startNode();
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
"_doc",
client(),
-1,
RandomizedTest.scaledRandomIntBetween(2, 5),
false,
random()
)
) {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);

flush(INDEX_NAME);
// wait a short amount of time to give replication a chance to complete.
Thread.sleep(1000);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);

final int additionalDocCount = scaledRandomIntBetween(0, 200);
final int expectedHitCount = initialDocCount + additionalDocCount;

// Index a second set of docs so we can merge into one segment.
indexer.start(additionalDocCount);
waitForDocs(expectedHitCount, indexer);

// Force a merge here so that the in memory SegmentInfos does not reference old segments on disk.
// This case tests that replicas preserve these files so the local store is not corrupt.
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();
Thread.sleep(1000);
refresh(INDEX_NAME);
Thread.sleep(1000);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);

ensureGreen(INDEX_NAME);
assertSegmentStats(REPLICA_COUNT);
}
}

public void testReplicaSetupAfterPrimaryIndexesDocs() {
final String nodeA = internalCluster().startNode();
createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build());
ensureGreen(INDEX_NAME);

// Index a doc to create the first set of segments. _s1.si
client().prepareIndex(INDEX_NAME).setId("1").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get();
// Flush segments to disk and create a new commit point (Primary: segments_3, _s1.si)
flushAndRefresh(INDEX_NAME);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 1);

// Index to create another segment
client().prepareIndex(INDEX_NAME).setId("2").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).get();

// Force a merge here so that the in memory SegmentInfos does not reference old segments on disk.
// This case tests that we are still sending these older segments to replicas so the index on disk is not corrupt.
client().admin().indices().prepareForceMerge(INDEX_NAME).setMaxNumSegments(1).setFlush(false).get();
refresh(INDEX_NAME);

final String nodeB = internalCluster().startNode();
assertAcked(
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1))
);
ensureGreen(INDEX_NAME);
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
assertSegmentStats(REPLICA_COUNT);
}

private void assertSegmentStats(int numberOfReplicas) {
client().admin().indices().segments(new IndicesSegmentsRequest(), new ActionListener<>() {
@Override
public void onResponse(IndicesSegmentResponse indicesSegmentResponse) {

List<ShardSegments[]> segmentsByIndex = indicesSegmentResponse.getIndices()
.values()
.stream() // get list of IndexSegments
.flatMap(is -> is.getShards().values().stream()) // Map to shard replication group
.map(IndexShardSegments::getShards) // get list of segments across replication group
.collect(Collectors.toList());

// There will be an entry in the list for each index.
for (ShardSegments[] replicationGroupSegments : segmentsByIndex) {

// Separate Primary & replica shards ShardSegments.
final Map<Boolean, List<ShardSegments>> segmentListMap = Arrays.stream(replicationGroupSegments)
.collect(Collectors.groupingBy(s -> s.getShardRouting().primary()));
final List<ShardSegments> primaryShardSegmentsList = segmentListMap.get(true);
final List<ShardSegments> replicaShardSegments = segmentListMap.get(false);

assertEquals("There should only be one primary in the replicationGroup", primaryShardSegmentsList.size(), 1);
final ShardSegments primaryShardSegments = primaryShardSegmentsList.stream().findFirst().get();

// create a map of the primary's segments keyed by segment name, allowing us to compare the same segment found on
// replicas.
final Map<String, Segment> primarySegmentsMap = primaryShardSegments.getSegments()
.stream()
.collect(Collectors.toMap(Segment::getName, Function.identity()));
// For every replica, ensure that its segments are in the same state as on the primary.
// It is possible the primary has not cleaned up old segments that are not required on replicas, so we can't do a
// list comparison.
// This equality check includes search/committed properties on the Segment. Combined with docCount checks,
// this ensures the replica has correctly copied the latest segments and has all segments referenced by the latest
// commit point, even if they are not searchable.
assertEquals(
"There should be a ShardSegment entry for each replica in the replicationGroup",
numberOfReplicas,
replicaShardSegments.size()
);

for (ShardSegments shardSegment : replicaShardSegments) {
for (Segment replicaSegment : shardSegment.getSegments()) {
final Segment primarySegment = primarySegmentsMap.get(replicaSegment.getName());
assertEquals("Replica's segment should be identical to primary's version", replicaSegment, primarySegment);
}
}
}
}

@Override
public void onFailure(Exception e) {
Assert.fail("Error fetching segment stats");
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class FeatureFlags {
* and false otherwise.
*/
public static boolean isEnabled(String featureFlagName) {
return "true".equalsIgnoreCase(System.getProperty(featureFlagName));
return true;
// return "true".equalsIgnoreCase(System.getProperty(featureFlagName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,29 @@ public Translog.Snapshot newChangesSnapshot(
boolean requiredFullRange,
boolean accurateCount
) throws IOException {
throw new UnsupportedOperationException("Not implemented");
ensureOpen();
Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
try {
LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot(
searcher,
LuceneChangesSnapshot.DEFAULT_BATCH_SIZE,
fromSeqNo,
toSeqNo,
requiredFullRange,
accurateCount
);
searcher = null;
return snapshot;
} catch (Exception e) {
try {
maybeFailEngine("acquire changes snapshot", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw e;
} finally {
IOUtils.close(searcher);
}
}

@Override
Expand Down
47 changes: 46 additions & 1 deletion server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
import org.opensearch.index.mapper.DocumentMapper;
import org.opensearch.index.mapper.DocumentMapperForType;
import org.opensearch.index.mapper.IdFieldMapper;
import org.opensearch.index.mapper.MapperException;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.Mapping;
import org.opensearch.index.mapper.ParsedDocument;
Expand Down Expand Up @@ -164,6 +165,7 @@
import org.opensearch.indices.recovery.RecoveryListener;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.indices.recovery.RecoveryTarget;
import org.opensearch.indices.replication.SegmentReplicationTargetService;
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -2062,6 +2064,41 @@ public void finalizeRecovery() {
engine.config().setEnableGcDeletes(true);
}

/**
* Segrep specific method.
* This method will take a lucene changes snapshot of the current in-memory segmentInfos, starting
* from the last commit point's checkpoint. It then replays those operations onto the shard so that they are
* written into the translog.
*/
public void recoverTranslogFromLuceneChangesSnapshot() {
assert indexSettings.isSegRepEnabled() && shardRouting.primary() == false;
long startingSeqNo;
try {
final MetadataSnapshot metadata = store.getMetadata();
startingSeqNo = Long.parseLong(metadata.getCommitUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
} catch (IOException e) {
throw new OpenSearchException("fail", e);
}
try(
final Translog.Snapshot snapshot = getEngine().newChangesSnapshot("replication", startingSeqNo, Long.MAX_VALUE, false, true)) {
Translog.Operation operation;
while ((operation = snapshot.next()) != null) {
Engine.Result result = applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY);
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
throw new MapperException("mapping updates are not allowed [" + operation + "]");
}
if (result.getFailure() != null) {
if (Assertions.ENABLED && result.getFailure() instanceof MapperException == false) {
throw new AssertionError("unexpected failure while replicating translog entry", result.getFailure());
}
ExceptionsHelper.reThrowIfNotNull(result.getFailure());
}
}
} catch (Throwable e) {
logger.error("Error creating snapshot", e);
}
}

/**
* Returns {@code true} if this shard can ignore a recovery attempt made to it (since the already doing/done it)
*/
Expand Down Expand Up @@ -2981,6 +3018,7 @@ public void startRecovery(
RecoveryState recoveryState,
PeerRecoveryTargetService recoveryTargetService,
RecoveryListener recoveryListener,
SegmentReplicationTargetService segmentReplicationTargetService,
RepositoriesService repositoriesService,
Consumer<MappingMetadata> mappingUpdateConsumer,
IndicesService indicesService
Expand Down Expand Up @@ -3010,8 +3048,15 @@ public void startRecovery(
case PEER:
try {
markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);
recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener);
if (indexSettings.isSegRepEnabled()) {
// Start a "Recovery" using segment replication. This ensures the shard is tracked by the primary
// and started with the latest set of segments.
segmentReplicationTargetService.recoverShard(this, recoveryState.getSourceNode(), recoveryListener);
} else {
recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener);
}
} catch (Exception e) {
logger.error("Fail", e);
failShard("corrupted preexisting index", e);
recoveryListener.onFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true);
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,7 @@ static LoadedMetadata loadMetadata(SegmentInfos segmentInfos, Directory director
// version is written since 3.1+: we should have already hit IndexFormatTooOld.
throw new IllegalArgumentException("expected valid version value: " + info.info.toString());
}
if (version.onOrAfter(maxVersion)) {
if (maxVersion == null || version.onOrAfter(maxVersion)) {
maxVersion = version;
}
for (String file : info.files()) {
Expand Down
Loading

0 comments on commit 4222bee

Please sign in to comment.