Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding min. retry time per region for 404/1002 SessionTokenRetryPolicy when RegionSwitchHint is Remote #37143

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import com.azure.cosmos.implementation.ImplementationBridgeHelpers;

import java.time.Duration;

/**
* {@link SessionRetryOptions} encapsulates hints which influence
* internal retry policies which are applied when the effective consistency
Expand All @@ -13,12 +15,14 @@
public final class SessionRetryOptions {

private final CosmosRegionSwitchHint regionSwitchHint;
private final Duration minInRegionRetryTimeForWriteOperations;

/**
* Instantiates {@link SessionRetryOptions}
* */
SessionRetryOptions(CosmosRegionSwitchHint regionSwitchHint) {
SessionRetryOptions(CosmosRegionSwitchHint regionSwitchHint, Duration minInRegionRetryTimeForWriteOperations) {
this.regionSwitchHint = regionSwitchHint;
this.minInRegionRetryTimeForWriteOperations = minInRegionRetryTimeForWriteOperations;
}

static void initialize() {
Expand All @@ -29,6 +33,11 @@ static void initialize() {
public CosmosRegionSwitchHint getRegionSwitchHint(SessionRetryOptions sessionRetryOptions) {
return sessionRetryOptions.regionSwitchHint;
}

@Override
public Duration getMinInRegionRetryTimeForWriteOperations(SessionRetryOptions sessionRetryOptions) {
return sessionRetryOptions.minInRegionRetryTimeForWriteOperations;
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@

package com.azure.cosmos;

import com.azure.cosmos.implementation.Configs;

import java.time.Duration;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

/**
Expand All @@ -12,6 +17,7 @@
public final class SessionRetryOptionsBuilder {

private CosmosRegionSwitchHint regionSwitchHint;
private Duration minInRegionRetryTimeForWriteOperations = Configs.DEFAULT_MIN_IN_REGION_RETRY_TIME_FOR_WRITES;

/**
* Sets the {@link CosmosRegionSwitchHint} which specifies for
Expand All @@ -31,13 +37,39 @@ public SessionRetryOptionsBuilder regionSwitchHint(CosmosRegionSwitchHint region
return this;
}

/**
* Sets the minimum retry time for 404/1002 retries within each region for write operations. The minimum value
* is 100ms - this minimum is enforced to provide a way for the local region to catch-up on replication lag.
* @param minRetryTime the min retry time to be used with-in each region for write operations
* @return This instance of {@link SessionRetryOptionsBuilder}
*/
public SessionRetryOptionsBuilder minRetryTimeInLocalRegionForWriteOperations(Duration minRetryTime) {
this.minInRegionRetryTimeForWriteOperations = minRetryTime;
return this;
}

/**
* Builds an instance of {@link SessionRetryOptions}
*
* @return An instance of {@link SessionRetryOptions}
* */
public SessionRetryOptions build() {
checkNotNull(regionSwitchHint, "regionSwitch hint cannot be null");
return new SessionRetryOptions(regionSwitchHint);

if (regionSwitchHint == CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) {
checkArgument(
minInRegionRetryTimeForWriteOperations != null,
"Argument 'minInRegionRetryTimeForWriteOperations' must not be null when 'regionSwitchHint' "
+ "is 'REMOTE_REGION_PREFERRED'.");

checkArgument(
minInRegionRetryTimeForWriteOperations
.compareTo(Configs.MIN_MIN_IN_REGION_RETRY_TIME_FOR_WRITES) >= 0,
"Argument 'minInRegionRetryTimeForWriteOperations' must have at least a value of '"
+ Configs.MIN_MIN_IN_REGION_RETRY_TIME_FOR_WRITES.toString()
+ "' when 'regionSwitchHint' is 'REMOTE_REGION_PREFERRED'.");
}

return new SessionRetryOptions(regionSwitchHint, minInRegionRetryTimeForWriteOperations);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ public class Configs {
"COSMOS.DEFAULT_SESSION_TOKEN_MISMATCH_MAXIMUM_BACKOFF_TIME_IN_MILLISECONDS";
private static final int DEFAULT_SESSION_TOKEN_MISMATCH_MAXIMUM_BACKOFF_TIME_IN_MILLISECONDS = 500;

public static final Duration MIN_MIN_IN_REGION_RETRY_TIME_FOR_WRITES = Duration.ofMillis(100);

public static final Duration DEFAULT_MIN_IN_REGION_RETRY_TIME_FOR_WRITES = Duration.ofMillis(100);

// Whether to process the response on a different thread
private static final String SWITCH_OFF_IO_THREAD_FOR_RESPONSE_NAME = "COSMOS.SWITCH_OFF_IO_THREAD_FOR_RESPONSE";
private static final boolean DEFAULT_SWITCH_OFF_IO_THREAD_FOR_RESPONSE = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1607,6 +1607,7 @@ public static void setCosmosSessionRetryOptionsAccessor(final CosmosSessionRetry

public interface CosmosSessionRetryOptionsAccessor {
CosmosRegionSwitchHint getRegionSwitchHint(SessionRetryOptions sessionRetryOptions);
Duration getMinInRegionRetryTimeForWriteOperations(SessionRetryOptions sessionRetryOptions);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@

public class SessionTokenMismatchRetryPolicy implements IRetryPolicy {

private final static ImplementationBridgeHelpers.CosmosSessionRetryOptionsHelper.CosmosSessionRetryOptionsAccessor
sessionRetryOptionsAccessor = ImplementationBridgeHelpers
.CosmosSessionRetryOptionsHelper
.getCosmosSessionRetryOptionsAccessor();
private final static Logger LOGGER = LoggerFactory.getLogger(SessionTokenMismatchRetryPolicy.class);
private static final int BACKOFF_MULTIPLIER = 2;
private final Duration maximumBackoff;
Expand All @@ -25,8 +29,13 @@ public class SessionTokenMismatchRetryPolicy implements IRetryPolicy {
private RetryContext retryContext;
private final AtomicInteger maxRetryAttemptsInCurrentRegion;
private final SessionRetryOptions sessionRetryOptions;
private final boolean isWriteOperation;

public SessionTokenMismatchRetryPolicy(
RetryContext retryContext,
SessionRetryOptions sessionRetryOptions,
boolean isWriteOperation) {

public SessionTokenMismatchRetryPolicy(RetryContext retryContext, int waitTimeInMilliseconds, SessionRetryOptions sessionRetryOptions) {
this.waitTimeTimeoutHelper = new TimeoutHelper(Duration.ofMillis(Configs.getSessionTokenMismatchDefaultWaitTimeInMs()));
this.maximumBackoff = Duration.ofMillis(Configs.getSessionTokenMismatchMaximumBackoffTimeInMs());
this.retryCount = new AtomicInteger();
Expand All @@ -35,10 +44,7 @@ public SessionTokenMismatchRetryPolicy(RetryContext retryContext, int waitTimeIn
this.maxRetryAttemptsInCurrentRegion = new AtomicInteger(Configs.getMaxRetriesInLocalRegionWhenRemoteRegionPreferred());
this.retryContext = retryContext;
this.sessionRetryOptions = sessionRetryOptions;
}

public SessionTokenMismatchRetryPolicy(RetryContext retryContext, SessionRetryOptions sessionRetryOptions) {
this(retryContext, Configs.getSessionTokenMismatchDefaultWaitTimeInMs(), sessionRetryOptions);
this.isWriteOperation = isWriteOperation;
}

@Override
Expand Down Expand Up @@ -129,12 +135,18 @@ private boolean shouldRetryLocally(SessionRetryOptions sessionRetryOptions, int
return true;
}

CosmosRegionSwitchHint regionSwitchHint = ImplementationBridgeHelpers
.CosmosSessionRetryOptionsHelper
.getCosmosSessionRetryOptionsAccessor()
CosmosRegionSwitchHint regionSwitchHint = sessionRetryOptionsAccessor
.getRegionSwitchHint(sessionRetryOptions);

if (regionSwitchHint == null || regionSwitchHint == CosmosRegionSwitchHint.LOCAL_REGION_PREFERRED) {
if (regionSwitchHint != CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) {
return true;
}

// For write operations we need to retry at least MinInRegionRetryTimeForWriteOperations
// to allow the region to catch up on replication
if (this.isWriteOperation && !this.waitTimeTimeoutHelper.isElapsed(
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
sessionRetryOptionsAccessor.getMinInRegionRetryTimeForWriteOperations(sessionRetryOptions))) {

return true;
}

Expand All @@ -143,7 +155,6 @@ private boolean shouldRetryLocally(SessionRetryOptions sessionRetryOptions, int
// another attempt on the same region
// hence to curb the retry attempts on a region,
// compare sessionTokenMismatchRetryAttempts with max retry attempts allowed on the region - 1
return !(regionSwitchHint == CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED
&& sessionTokenMismatchRetryAttempts == (this.maxRetryAttemptsInCurrentRegion.get() - 1));
return sessionTokenMismatchRetryAttempts < (this.maxRetryAttemptsInCurrentRegion.get() - 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,10 @@ public Mono<StoreResponse> readAsync(RxDocumentServiceRequest entity,
if (targetConsistencyLevel.v == ConsistencyLevel.SESSION) {
return BackoffRetryUtility.executeRetry(
() -> this.readSessionAsync(entity, desiredReadMode),
new SessionTokenMismatchRetryPolicy(BridgeInternal.getRetryContext(entity.requestContext.cosmosDiagnostics), sessionRetryOptions));
new SessionTokenMismatchRetryPolicy(
BridgeInternal.getRetryContext(entity.requestContext.cosmosDiagnostics),
sessionRetryOptions,
false));
} else {
return this.readAnyAsync(entity, desiredReadMode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ public Mono<StoreResponse> writeAsync(
return BackoffRetryUtility
.executeRetry(
() -> this.writePrivateAsync(entity, timeout, forceRefresh),
new SessionTokenMismatchRetryPolicy(BridgeInternal.getRetryContext(entity.requestContext.cosmosDiagnostics), sessionRetryOptions))
new SessionTokenMismatchRetryPolicy(
BridgeInternal.getRetryContext(entity.requestContext.cosmosDiagnostics),
sessionRetryOptions,
true))
.doOnEach(
arg -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ public TimeoutHelper(Duration timeOut) {
}

public boolean isElapsed() {
return this.isElapsed(this.timeOut);
}

public boolean isElapsed(Duration timeoutToCompareTo) {
Duration elapsed = Duration.ofMillis(Instant.now().toEpochMilli() - startTime.toEpochMilli());
return elapsed.compareTo(this.timeOut) >= 0;
return elapsed.compareTo(timeoutToCompareTo) >= 0;
}

public Duration getRemainingTime() {
Expand Down
Loading