Skip to content

Commit

Permalink
making waitForAssignment same
Browse files Browse the repository at this point in the history
Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
  • Loading branch information
gbbafna committed Sep 5, 2024
1 parent 9b58deb commit 60822d7
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public LocalStorePeerRecoverySourceHandler(
@Override
protected void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener, Consumer<Exception> onFailure) throws IOException {
final SetOnce<RetentionLease> retentionLeaseRef = new SetOnce<>();
waitForAssignment(retentionLeaseRef);
waitForAssignmentPropagate(retentionLeaseRef);
final Closeable retentionLock = shard.acquireHistoryRetentionLock();
resources.add(retentionLock);
final long startingSeqNo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.lucene.util.ArrayUtil;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.StepListener;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.ThreadedActionListener;
import org.opensearch.action.support.replication.ReplicationResponse;
Expand Down Expand Up @@ -83,12 +84,14 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.IntSupplier;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -195,24 +198,49 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
protected abstract void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener, Consumer<Exception> onFailure)
throws IOException;

protected void waitForAssignment(SetOnce<RetentionLease> retentionLeaseRef) {
RunUnderPrimaryPermit.run(() -> {
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
ShardRouting targetShardRouting = routingTable.getByAllocationId(request.targetAllocationId());
if (targetShardRouting == null) {
logger.debug(
"delaying recovery of {} as it is not listed as assigned to target node {}",
request.shardId(),
request.targetNode()
);
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
}
assert targetShardRouting.initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
retentionLeaseRef.set(shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting)));
}, shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger);
}
/*
Waits for cluster state propagation of assignment of replica on the target node
*/
void waitForAssignmentPropagate(SetOnce<RetentionLease> retentionLeaseRef) {
BackoffPolicy EXPONENTIAL_BACKOFF_POLICY = BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(1000), 5);
AtomicReference<ShardRouting> targetShardRouting = new AtomicReference<>();
Iterator<TimeValue> backoffDelayIterator = EXPONENTIAL_BACKOFF_POLICY.iterator();
while (backoffDelayIterator.hasNext()) {
RunUnderPrimaryPermit.run(() -> {
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
targetShardRouting.set(routingTable.getByAllocationId(request.targetAllocationId()));
if (targetShardRouting.get() == null) {
logger.info(
"delaying recovery of {} as it is not listed as assigned to target node {}",
request.shardId(),
request.targetNode()
);
Thread.sleep(backoffDelayIterator.next().millis());
}
if (targetShardRouting.get() != null) {
assert targetShardRouting.get().initializing() : "expected recovery target to be initializing but was "
+ targetShardRouting;
retentionLeaseRef.set(
shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting.get()))
);
}

},
shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ",
shard,
cancellableThreads,
logger
);

if (targetShardRouting.get() != null) {
return;
}
}
if (targetShardRouting.get() != null) {
return;
}
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
}

protected void finalizeStepAndCompleteFuture(
long startingSeqNo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,12 @@

import org.apache.lucene.index.IndexCommit;
import org.opensearch.action.StepListener;
import org.opensearch.action.bulk.BackoffPolicy;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.common.SetOnce;
import org.opensearch.common.concurrent.GatedCloseable;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.engine.RecoveryEngineException;
import org.opensearch.index.seqno.ReplicationTracker;
import org.opensearch.index.seqno.RetentionLease;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.shard.IndexShard;
Expand All @@ -28,8 +24,6 @@
import org.opensearch.transport.Transports;

import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

/**
Expand Down Expand Up @@ -57,7 +51,7 @@ protected void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener,
// updated segments file on refresh, flushes, and merges. In recovery, here, only file-based recovery is performed
// and there is no translog replay done.
final SetOnce<RetentionLease> retentionLeaseRef = new SetOnce<>();
waitForAssignment(retentionLeaseRef);
waitForAssignmentPropagate(retentionLeaseRef);
final StepListener<SendFileResult> sendFileStep = new StepListener<>();
final StepListener<TimeValue> prepareEngineStep = new StepListener<>();
final StepListener<SendSnapshotResult> sendSnapshotStep = new StepListener<>();
Expand Down Expand Up @@ -112,40 +106,4 @@ protected void innerRecoveryToTarget(ActionListener<RecoveryResponse> listener,
finalizeStepAndCompleteFuture(startingSeqNo, sendSnapshotStep, sendFileStep, prepareEngineStep, onFailure);
}

protected void waitForAssignment(SetOnce<RetentionLease> retentionLeaseRef) {
BackoffPolicy EXPONENTIAL_BACKOFF_POLICY = BackoffPolicy.exponentialBackoff(
TimeValue.timeValueMillis(100),
3
);
AtomicReference<ShardRouting> targetShardRouting = new AtomicReference<>();
Iterator<TimeValue> backoffDelayIterator = EXPONENTIAL_BACKOFF_POLICY.iterator();
while (backoffDelayIterator.hasNext() ) {
RunUnderPrimaryPermit.run(() -> {
final IndexShardRoutingTable routingTable = shard.getReplicationGroup().getRoutingTable();
targetShardRouting.set(routingTable.getByAllocationId(request.targetAllocationId()));
if (targetShardRouting.get() == null) {
logger.info(
"delaying recovery of {} as it is not listed as assigned to target node {}",
request.shardId(),
request.targetNode()
);
Thread.sleep(backoffDelayIterator.next().millis());
}
if (targetShardRouting.get() != null) {
assert targetShardRouting.get().initializing() : "expected recovery target to be initializing but was " + targetShardRouting;
retentionLeaseRef.set(shard.getRetentionLeases().get(ReplicationTracker.getPeerRecoveryRetentionLeaseId(targetShardRouting.get())));
}

}, shardId + " validating recovery target [" + request.targetAllocationId() + "] registered ", shard, cancellableThreads, logger);

if (targetShardRouting.get() != null) {
return;
}
}
if (targetShardRouting.get() != null) {
return;
}
throw new DelayRecoveryException("source node does not have the shard listed in its state as allocated on the node");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.tests.store.BaseDirectoryWrapper;
import org.junit.After;
import org.junit.Before;
import org.opensearch.ExceptionsHelper;
import org.opensearch.Version;
import org.opensearch.action.LatchedActionListener;
Expand Down Expand Up @@ -92,7 +90,6 @@
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexShardRelocatedException;
import org.opensearch.index.shard.IndexShardState;
import org.opensearch.index.shard.ReplicationGroup;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.index.translog.Translog;
Expand All @@ -106,6 +103,8 @@
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.io.OutputStream;
Expand Down Expand Up @@ -757,7 +756,7 @@ public void testThrowExceptionOnNoTargetInRouting() throws IOException {
when(shard.seqNoStats()).thenReturn(mock(SeqNoStats.class));
when(shard.segmentStats(anyBoolean(), anyBoolean())).thenReturn(mock(SegmentsStats.class));
when(shard.isRelocatedPrimary()).thenReturn(false);
final ReplicationGroup replicationGroup = mock(ReplicationGroup.class);
final org.opensearch.index.shard.ReplicationGroup replicationGroup = mock(org.opensearch.index.shard.ReplicationGroup.class);
final IndexShardRoutingTable routingTable = mock(IndexShardRoutingTable.class);
when(routingTable.getByAllocationId(anyString())).thenReturn(null);
when(shard.getReplicationGroup()).thenReturn(replicationGroup);
Expand Down Expand Up @@ -842,7 +841,7 @@ void phase2(
handler.recoverToTarget(future);
future.actionGet();
});
verify(routingTable, times(1)).getByAllocationId(null);
verify(routingTable, times(5)).getByAllocationId(null);
assertFalse(phase1Called.get());
assertFalse(prepareTargetForTranslogCalled.get());
assertFalse(phase2Called.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,27 +118,6 @@ public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception {
}
}

public StartRecoveryRequest getStartRecoveryRequest() throws IOException {
Store.MetadataSnapshot metadataSnapshot = randomBoolean()
? Store.MetadataSnapshot.EMPTY
: new Store.MetadataSnapshot(
Collections.emptyMap(),
Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()),
randomIntBetween(0, 100)
);
return new StartRecoveryRequest(
shardId,
null,
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
metadataSnapshot,
randomBoolean(),
randomNonNegativeLong(),
randomBoolean() || metadataSnapshot.getHistoryUUID() == null ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong()
);
}


public void testThrowExceptionOnNoTargetInRouting() throws IOException {
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
final StartRecoveryRequest request = getStartRecoveryRequest();
Expand Down Expand Up @@ -231,9 +210,29 @@ void phase2(
handler.recoverToTarget(future);
future.actionGet();
});
verify(routingTable, times(3)).getByAllocationId(null);
verify(routingTable, times(5)).getByAllocationId(null);
assertFalse(phase1Called.get());
assertFalse(prepareTargetForTranslogCalled.get());
assertFalse(phase2Called.get());
}

public StartRecoveryRequest getStartRecoveryRequest() throws IOException {
Store.MetadataSnapshot metadataSnapshot = randomBoolean()
? Store.MetadataSnapshot.EMPTY
: new Store.MetadataSnapshot(
Collections.emptyMap(),
Collections.singletonMap(Engine.HISTORY_UUID_KEY, UUIDs.randomBase64UUID()),
randomIntBetween(0, 100)
);
return new StartRecoveryRequest(
shardId,
null,
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT),
metadataSnapshot,
randomBoolean(),
randomNonNegativeLong(),
randomBoolean() || metadataSnapshot.getHistoryUUID() == null ? SequenceNumbers.UNASSIGNED_SEQ_NO : randomNonNegativeLong()
);
}
}

0 comments on commit 60822d7

Please sign in to comment.