diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FaultInjectionWithAvailabilityStrategyTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FaultInjectionWithAvailabilityStrategyTests.java index 78a61cbdc84ff..dab580c8b066f 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FaultInjectionWithAvailabilityStrategyTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FaultInjectionWithAvailabilityStrategyTests.java @@ -4,6 +4,7 @@ import com.azure.cosmos.implementation.AsyncDocumentClient; import com.azure.cosmos.implementation.ClientSideRequestStatistics; +import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.DatabaseAccount; import com.azure.cosmos.implementation.DatabaseAccountLocation; import com.azure.cosmos.implementation.GlobalEndpointManager; @@ -53,6 +54,7 @@ import org.testng.annotations.Test; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -728,6 +730,7 @@ public void readAfterCreation( endToEndTimeout, availabilityStrategy, regionSwitchHint, + null, notSpecifiedWhetherIdempotentWriteRetriesAreEnabled, ArrayUtils.toArray(FaultInjectionOperationType.READ_ITEM), readItemCallback, @@ -746,6 +749,8 @@ public void readAfterCreation( public Object[][] testConfigs_writeAfterCreation() { final boolean nonIdempotentWriteRetriesEnabled = true; final boolean nonIdempotentWriteRetriesDisabled = false; + final String SECOND_REGION_NAME = writeableRegions.get(1).toLowerCase(Locale.ROOT); + final Duration NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES = null; Function createAnotherItemCallback = (params) -> { @@ -854,6 +859,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(3), noAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.CREATE_ITEM, createAnotherItemCallback, @@ -870,6 +876,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), noAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.CREATE_ITEM, createAnotherItemCallback, @@ -887,6 +894,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.CREATE_ITEM, createAnotherItemCallback, @@ -903,6 +911,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.CREATE_ITEM, createAnotherItemCallback, @@ -919,6 +928,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), eagerThresholdAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.CREATE_ITEM, createAnotherItemCallback, @@ -936,6 +946,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.CREATE_ITEM, createAnotherItemCallback, @@ -948,6 +959,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -960,6 +972,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.PATCH_ITEM, patchItemCallback, @@ -972,6 +985,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.DELETE_ITEM, deleteItemCallback, @@ -984,6 +998,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.UPSERT_ITEM, upsertExistingItemCallback, @@ -996,6 +1011,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.UPSERT_ITEM, upsertAnotherItemCallback, @@ -1008,6 +1024,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.PATCH_ITEM, patchItemCallback, @@ -1025,6 +1042,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.DELETE_ITEM, deleteItemCallback, @@ -1037,6 +1055,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1049,6 +1068,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.UPSERT_ITEM, upsertAnotherItemCallback, @@ -1061,6 +1081,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.UPSERT_ITEM, upsertExistingItemCallback, @@ -1073,6 +1094,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), noAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.CREATE_ITEM, createAnotherItemCallback, @@ -1089,6 +1111,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), noAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.CREATE_ITEM, createAnotherItemCallback, @@ -1107,6 +1130,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), reluctantThresholdAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.DELETE_ITEM, deleteItemCallback, @@ -1123,6 +1147,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.DELETE_ITEM, deleteItemCallback, @@ -1139,6 +1164,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), reluctantThresholdAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.PATCH_ITEM, patchItemCallback, @@ -1155,6 +1181,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.PATCH_ITEM, patchItemCallback, @@ -1173,6 +1200,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1190,6 +1218,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), eagerThresholdAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1205,6 +1234,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1221,6 +1251,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), noAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1237,6 +1268,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.UPSERT_ITEM, upsertExistingItemCallback, @@ -1254,6 +1286,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.UPSERT_ITEM, upsertExistingItemCallback, @@ -1270,6 +1303,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), noAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.UPSERT_ITEM, upsertAnotherItemCallback, @@ -1284,6 +1318,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(90), noAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.UPSERT_ITEM, upsertAnotherItemCallback, @@ -1303,6 +1338,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, CosmosRegionSwitchHint.LOCAL_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.CREATE_ITEM, createAnotherItemCallback, @@ -1322,6 +1358,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, CosmosRegionSwitchHint.LOCAL_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1341,6 +1378,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), eagerThresholdAvailabilityStrategy, CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1358,6 +1396,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), eagerThresholdAvailabilityStrategy, CosmosRegionSwitchHint.LOCAL_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1374,6 +1413,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), eagerThresholdAvailabilityStrategy, CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1392,6 +1432,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), eagerThresholdAvailabilityStrategy, CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1410,6 +1451,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), eagerThresholdAvailabilityStrategy, CosmosRegionSwitchHint.LOCAL_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1427,6 +1469,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), eagerThresholdAvailabilityStrategy, CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1444,6 +1487,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), eagerThresholdAvailabilityStrategy, CosmosRegionSwitchHint.LOCAL_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1462,6 +1506,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), reluctantThresholdAvailabilityStrategy, CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.CREATE_ITEM, createAnotherItemCallback, @@ -1471,6 +1516,167 @@ public Object[][] testConfigs_writeAfterCreation() { validateDiagnosticsContextHasDiagnosticsForOnlyFirstRegionButWithRegionalFailover }, + // 404/1022 into local region only + // No Availability strategy exists. + // Expected to get successful response from cross regional retry - region switch is remote allowing the + // cross regional retry to finish within e2e timeout. + new Object[] { + "Create_404-1002_FirstRegionOnly_RemotePreferred_NoAvailabilityStrategy_WithRetries", + Duration.ofSeconds(1), + noAvailabilityStrategy, + CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, + nonIdempotentWriteRetriesEnabled, + FaultInjectionOperationType.CREATE_ITEM, + createAnotherItemCallback, + injectReadSessionNotAvailableIntoFirstRegionOnly, + validateStatusCodeIs201Created, + // no hedging even with availability strategy because nonIdempotentWrites are disabled + (Consumer)(ctx -> { + assertThat(ctx).isNotNull(); + assertThat(ctx.getDiagnostics()).isNotNull(); + CosmosDiagnostics[] diagnostics = ctx.getDiagnostics().toArray(new CosmosDiagnostics[0]); + assertThat(diagnostics).isNotNull(); + assertThat(diagnostics.length).isEqualTo(1); + assertThat(diagnostics[0].getClientSideRequestStatistics()).isNotNull(); + ClientSideRequestStatistics[] clientStats = + diagnostics[0].getClientSideRequestStatistics().toArray(new ClientSideRequestStatistics[0]); + assertThat(clientStats.length).isEqualTo(1); + assertThat(clientStats[0].getResponseStatisticsList()).isNotNull(); + ClientSideRequestStatistics.StoreResponseStatistics[] storeResponses = + clientStats[0].getResponseStatisticsList().toArray( + new ClientSideRequestStatistics.StoreResponseStatistics[0]); + assertThat(storeResponses.length).isGreaterThanOrEqualTo(2); + + + Instant firstRequestStart = Instant.MAX; + Instant firstRequestStartInSecondRegion = Instant.MAX; + for (ClientSideRequestStatistics.StoreResponseStatistics currentStoreResponse : storeResponses) { + if (currentStoreResponse.getRequestStartTimeUTC().isBefore(firstRequestStart)) { + firstRequestStart = currentStoreResponse.getRequestStartTimeUTC(); + } + + if (currentStoreResponse.getRegionName().equals(SECOND_REGION_NAME) && + currentStoreResponse.getRequestStartTimeUTC().isBefore(firstRequestStartInSecondRegion)) { + + firstRequestStartInSecondRegion = currentStoreResponse.getRequestStartTimeUTC(); + } + } + + logger.info("FirstRequestStart: {}, FirstRequestInSecondReqionStart: {}", + firstRequestStart, + firstRequestStartInSecondRegion); + + assertThat(firstRequestStartInSecondRegion.isAfter(firstRequestStart)).isEqualTo(true); + assertThat( + firstRequestStartInSecondRegion + .minus( + Configs.getMinRetryTimeInLocalRegionWhenRemoteRegionPreferred().minus(Duration.ofMillis(5))) + .isAfter(firstRequestStart)).isEqualTo(true); + + validateDiagnosticsContextHasDiagnosticsForOnlyFirstRegionButWithRegionalFailover.accept(ctx); + }) + }, + + // 404/1022 into local region only + // No availability strategy exists. + // Expected to get successful response from cross regional retry - region switch is remote allowing the + // cross regional retry to finish within e2e timeout. + new Object[] { + "Create_404-1002_FirstRegionOnly_RemotePreferredWithHighInRegionRetryTime_NoAvailabilityStrategy_WithRetries", + Duration.ofSeconds(1), + noAvailabilityStrategy, + CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED, + Duration.ofMillis(600), + nonIdempotentWriteRetriesEnabled, + FaultInjectionOperationType.CREATE_ITEM, + createAnotherItemCallback, + injectReadSessionNotAvailableIntoFirstRegionOnly, + validateStatusCodeIs201Created, + // no hedging even with availability strategy because nonIdempotentWrites are disabled + (Consumer)(ctx -> { + assertThat(ctx).isNotNull(); + assertThat(ctx.getDiagnostics()).isNotNull(); + CosmosDiagnostics[] diagnostics = ctx.getDiagnostics().toArray(new CosmosDiagnostics[0]); + assertThat(diagnostics).isNotNull(); + assertThat(diagnostics.length).isEqualTo(1); + assertThat(diagnostics[0].getClientSideRequestStatistics()).isNotNull(); + ClientSideRequestStatistics[] clientStats = + diagnostics[0].getClientSideRequestStatistics().toArray(new ClientSideRequestStatistics[0]); + assertThat(clientStats.length).isEqualTo(1); + assertThat(clientStats[0].getResponseStatisticsList()).isNotNull(); + ClientSideRequestStatistics.StoreResponseStatistics[] storeResponses = + clientStats[0].getResponseStatisticsList().toArray( + new ClientSideRequestStatistics.StoreResponseStatistics[0]); + assertThat(storeResponses.length).isGreaterThanOrEqualTo(2); + + + Instant firstRequestStart = Instant.MAX; + Instant firstRequestStartInSecondRegion = Instant.MAX; + for (ClientSideRequestStatistics.StoreResponseStatistics currentStoreResponse : storeResponses) { + if (currentStoreResponse.getRequestStartTimeUTC().isBefore(firstRequestStart)) { + firstRequestStart = currentStoreResponse.getRequestStartTimeUTC(); + } + + if (currentStoreResponse.getRegionName().equals(SECOND_REGION_NAME) && + currentStoreResponse.getRequestStartTimeUTC().isBefore(firstRequestStartInSecondRegion)) { + + firstRequestStartInSecondRegion = currentStoreResponse.getRequestStartTimeUTC(); + } + } + + logger.info("FirstRequestStart: {}, FirstRequestInSecondReqionStart: {}", + firstRequestStart, + firstRequestStartInSecondRegion); + + assertThat(firstRequestStartInSecondRegion.isAfter(firstRequestStart)).isEqualTo(true); + assertThat( + firstRequestStartInSecondRegion + .minus(Duration.ofMillis(600-5)) + .isAfter(firstRequestStart)).isEqualTo(true); + + validateDiagnosticsContextHasDiagnosticsForOnlyFirstRegionButWithRegionalFailover.accept(ctx); + }) + }, + + // 404/1022 into local region only + // No availability strategy exists. + // Expected to get 408 because min. in-region wait time is larger than e2e timeout. + new Object[] { + "Create_404-1002_FirstRegionOnly_RemotePreferredWithTooHighInRegionRetryTime_NoAvailabilityStrategy_408", + Duration.ofSeconds(1), + noAvailabilityStrategy, + CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED, + Duration.ofMillis(1100), + nonIdempotentWriteRetriesEnabled, + FaultInjectionOperationType.CREATE_ITEM, + createAnotherItemCallback, + injectReadSessionNotAvailableIntoFirstRegionOnly, + validateStatusCodeIsOperationCancelled, + // no hedging even with availability strategy because nonIdempotentWrites are disabled + (Consumer)(ctx -> { + assertThat(ctx).isNotNull(); + assertThat(ctx.getDiagnostics()).isNotNull(); + CosmosDiagnostics[] diagnostics = ctx.getDiagnostics().toArray(new CosmosDiagnostics[0]); + assertThat(diagnostics).isNotNull(); + assertThat(diagnostics.length).isEqualTo(1); + assertThat(diagnostics[0].getClientSideRequestStatistics()).isNotNull(); + ClientSideRequestStatistics[] clientStats = + diagnostics[0].getClientSideRequestStatistics().toArray(new ClientSideRequestStatistics[0]); + assertThat(clientStats.length).isEqualTo(1); + assertThat(clientStats[0].getResponseStatisticsList()).isNotNull(); + ClientSideRequestStatistics.StoreResponseStatistics[] storeResponses = + clientStats[0].getResponseStatisticsList().toArray( + new ClientSideRequestStatistics.StoreResponseStatistics[0]); + + // retry should not have been issued yet. With just single retry + // the back-off time will be expanded to the minInRegionRetryWaitTime + assertThat(storeResponses.length).isEqualTo(1); + + validateDiagnosticsContextHasDiagnosticsForOnlyFirstRegion.accept(ctx); + }) + }, + // 404/1022 into local region only // Availability strategy exists, hedging is enabled. Region switch is local - meaning the local retries // will take so long, that the cross-regional retry in the client retry policy is not applicable. @@ -1480,6 +1686,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), eagerThresholdAvailabilityStrategy, CosmosRegionSwitchHint.LOCAL_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.CREATE_ITEM, createAnotherItemCallback, @@ -1499,6 +1706,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), eagerThresholdAvailabilityStrategy, CosmosRegionSwitchHint.LOCAL_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.DELETE_ITEM, deleteNonExistingItemCallback, @@ -1516,6 +1724,7 @@ public void writeAfterCreation( Duration endToEndTimeout, ThresholdBasedAvailabilityStrategy availabilityStrategy, CosmosRegionSwitchHint regionSwitchHint, + Duration customMinRetryTimeInLocalRegion, Boolean nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType faultInjectionOperationType, Function actionAfterInitialCreation, @@ -1528,6 +1737,7 @@ public void writeAfterCreation( endToEndTimeout, availabilityStrategy, regionSwitchHint, + customMinRetryTimeInLocalRegion, nonIdempotentWriteRetriesEnabled, ArrayUtils.toArray(faultInjectionOperationType), actionAfterInitialCreation, @@ -2477,6 +2687,7 @@ public void queryAfterCreation( endToEndTimeout, availabilityStrategy, regionSwitchHint, + null, notSpecifiedWhetherIdempotentWriteRetriesAreEnabled, ArrayUtils.toArray(FaultInjectionOperationType.QUERY_ITEM), (params) -> queryExecution.apply(queryGenerator.apply(params), params), @@ -2951,6 +3162,7 @@ public void readManyAfterCreation( endToEndTimeout, availabilityStrategy, regionSwitchHint, + null, notSpecifiedWhetherIdempotentWriteRetriesAreEnabled, ArrayUtils.toArray( FaultInjectionOperationType.QUERY_ITEM, @@ -3566,6 +3778,7 @@ public void readAllAfterCreation( endToEndTimeout, availabilityStrategy, regionSwitchHint, + null, notSpecifiedWhetherIdempotentWriteRetriesAreEnabled, ArrayUtils.toArray(FaultInjectionOperationType.QUERY_ITEM), readAllOperation, @@ -3794,6 +4007,7 @@ private void execute( Duration endToEndTimeout, ThresholdBasedAvailabilityStrategy availabilityStrategy, CosmosRegionSwitchHint regionSwitchHint, + Duration customMinRetryTimeInLocalRegionForWrites, Boolean nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType[] faultInjectionOperationTypes, Function actionAfterInitialCreation, @@ -3809,7 +4023,11 @@ private void execute( logger.info("START {}", testCaseId); - CosmosAsyncClient clientWithPreferredRegions = buildCosmosClient(this.writeableRegions, regionSwitchHint, nonIdempotentWriteRetriesEnabled); + CosmosAsyncClient clientWithPreferredRegions = buildCosmosClient( + this.writeableRegions, + regionSwitchHint, + customMinRetryTimeInLocalRegionForWrites, + nonIdempotentWriteRetriesEnabled); try { if (clearContainerBeforeExecution) { @@ -3950,17 +4168,28 @@ private void execute( private static CosmosAsyncClient buildCosmosClient( List preferredRegions, CosmosRegionSwitchHint regionSwitchHint, + Duration customMinRetryTimeInLocalRegionForWrites, Boolean nonIdempotentWriteRetriesEnabled) { CosmosClientTelemetryConfig telemetryConfig = new CosmosClientTelemetryConfig() .diagnosticsHandler(new CosmosDiagnosticsLogger()); + CosmosRegionSwitchHint effectiveRegionSwitchHint = regionSwitchHint != null + ? regionSwitchHint + : CosmosRegionSwitchHint.LOCAL_REGION_PREFERRED; + SessionRetryOptionsBuilder retryOptionsBuilder = new SessionRetryOptionsBuilder() + .regionSwitchHint(effectiveRegionSwitchHint); + + if (customMinRetryTimeInLocalRegionForWrites != null) { + retryOptionsBuilder.minInRegionRetryTime(customMinRetryTimeInLocalRegionForWrites); + } + CosmosClientBuilder builder = new CosmosClientBuilder() .endpoint(TestConfigurations.HOST) .key(TestConfigurations.MASTER_KEY) .consistencyLevel(ConsistencyLevel.SESSION) .preferredRegions(preferredRegions) - .sessionRetryOptions(new SessionRetryOptions(regionSwitchHint)) + .sessionRetryOptions(retryOptionsBuilder.build()) .directMode() .multipleWriteRegionsEnabled(true) .clientTelemetryConfig(telemetryConfig); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/SessionRetryOptionsTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/SessionRetryOptionsTests.java index c5c316c069a41..f8c685afb13bb 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/SessionRetryOptionsTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/SessionRetryOptionsTests.java @@ -16,6 +16,7 @@ import com.azure.cosmos.implementation.DatabaseAccountLocation; import com.azure.cosmos.implementation.GlobalEndpointManager; import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.TestConfigurations; import com.azure.cosmos.implementation.throughputControl.TestItem; @@ -59,6 +60,10 @@ import static org.testng.AssertJUnit.fail; public class SessionRetryOptionsTests extends TestSuiteBase { + private final static ImplementationBridgeHelpers.CosmosSessionRetryOptionsHelper.CosmosSessionRetryOptionsAccessor + sessionRetryOptionsAccessor = ImplementationBridgeHelpers + .CosmosSessionRetryOptionsHelper + .getCosmosSessionRetryOptionsAccessor(); private CosmosAsyncClient cosmosAsyncClient; private CosmosAsyncContainer cosmosAsyncContainer; @@ -107,6 +112,79 @@ public Object[][] writeOperationContextProvider() { }; } + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void SessionRetryOptionsBuilder_defaultValues() { + SessionRetryOptions optionsWithDefaultValues = new SessionRetryOptionsBuilder() + .regionSwitchHint(CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) + .build(); + + assertThat(sessionRetryOptionsAccessor.getMaxInRegionRetryCount(optionsWithDefaultValues)) + .isEqualTo(Configs.getMaxRetriesInLocalRegionWhenRemoteRegionPreferred()); + + assertThat(sessionRetryOptionsAccessor.getMinInRegionRetryTime(optionsWithDefaultValues)) + .isEqualTo(Configs.getMinRetryTimeInLocalRegionWhenRemoteRegionPreferred()); + } + + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void SessionRetryOptionsBuilder_customValues() { + SessionRetryOptions optionsWithDefaultValues = new SessionRetryOptionsBuilder() + .regionSwitchHint(CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) + .minInRegionRetryTime(Duration.ofSeconds(1)) + .maxInRegionRetryCount(3) + .build(); + + assertThat(sessionRetryOptionsAccessor.getMaxInRegionRetryCount(optionsWithDefaultValues)) + .isEqualTo(3); + + assertThat(sessionRetryOptionsAccessor.getMinInRegionRetryTime(optionsWithDefaultValues)) + .isEqualTo(Duration.ofSeconds(1)); + } + + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void SessionRetryOptionsBuilder_minimum_maxRetryCountEnforced() { + SessionRetryOptionsBuilder builder = new SessionRetryOptionsBuilder() + .regionSwitchHint(CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) + .maxInRegionRetryCount(0); + + try { + builder.build(); + + fail("Building the session retry options should have failed"); + } catch (IllegalArgumentException illegalArgumentException) { + logger.info("Expected IllegalArgumentException", illegalArgumentException); + } + } + + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void SessionRetryOptionsBuilder_minimum_minRetryTimeEnforced() { + SessionRetryOptionsBuilder builder = new SessionRetryOptionsBuilder() + .regionSwitchHint(CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) + .minInRegionRetryTime(Duration.ofMillis(99)); + + try { + builder.build(); + + fail("Building the session retry options should have failed"); + } catch (IllegalArgumentException illegalArgumentException) { + logger.info("Expected IllegalArgumentException", illegalArgumentException); + } + } + + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void SessionRetryOptionsBuilder_minRetryTimeRequired() { + SessionRetryOptionsBuilder builder = new SessionRetryOptionsBuilder() + .regionSwitchHint(CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) + .minInRegionRetryTime(null); + + try { + builder.build(); + + fail("Building the session retry options should have failed"); + } catch (IllegalArgumentException illegalArgumentException) { + logger.info("Expected IllegalArgumentException", illegalArgumentException); + } + } + @Test(groups = {"multi-master"}, dataProvider = "nonWriteOperationContextProvider", timeOut = TIMEOUT) public void nonWriteOperation_WithReadSessionUnavailable_test( OperationType operationType, @@ -176,7 +254,7 @@ public void nonWriteOperation_WithReadSessionUnavailable_test( // Check if the SessionTokenMismatchRetryPolicy retries on the bad / lagging region // for sessionTokenMismatchRetryAttempts by tracking the badSessionTokenRule hit count if (regionSwitchHint == CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) { - assertThat(badSessionTokenRule.getHitCount()).isBetween((long) sessionTokenMismatchRetryAttempts, sessionTokenMismatchRetryAttempts * 4L); + assertThat(badSessionTokenRule.getHitCount()).isBetween((long) sessionTokenMismatchRetryAttempts, (1 + sessionTokenMismatchRetryAttempts) * 4L); } } finally { System.clearProperty("COSMOS.MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED"); @@ -250,7 +328,9 @@ public void writeOperation_withReadSessionUnavailable_test( // Check if the SessionTokenMismatchRetryPolicy retries on the bad / lagging region // for sessionTokenMismatchRetryAttempts by tracking the badSessionTokenRule hit count if (regionSwitchHint == CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) { - assertThat(badSessionTokenRule.getHitCount()).isEqualTo(sessionTokenMismatchRetryAttempts); + // higher hit count is possible while in MinRetryWaitTimeWithinRegion + assertThat(badSessionTokenRule.getHitCount()).isGreaterThanOrEqualTo( + sessionTokenMismatchRetryAttempts); } } finally { System.clearProperty("COSMOS.MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED"); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index 35783d719cd2d..de4b2f22ca8b2 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -341,6 +341,24 @@ protected static void waitIfNeededForReplicasToCatchUp(CosmosClientBuilder clien public static CosmosAsyncContainer createCollection(CosmosAsyncDatabase database, CosmosContainerProperties cosmosContainerProperties, CosmosContainerRequestOptions options, int throughput) { database.createContainer(cosmosContainerProperties, ThroughputProperties.createManualThroughput(throughput), options).block(); + + // Creating a container is async - especially on multi-partition or multi-region accounts + CosmosAsyncClient client = ImplementationBridgeHelpers + .CosmosAsyncDatabaseHelper + .getCosmosAsyncDatabaseAccessor() + .getCosmosAsyncClient(database); + boolean isMultiRegional = ImplementationBridgeHelpers + .CosmosAsyncClientHelper + .getCosmosAsyncClientAccessor() + .getPreferredRegions(client).size() > 1; + if (throughput > 6000 || isMultiRegional) { + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return database.getContainer(cosmosContainerProperties.getId()); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptions.java index 5cb50d91244f4..979baa5b2aadb 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptions.java @@ -3,8 +3,11 @@ package com.azure.cosmos; +import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.ImplementationBridgeHelpers; +import java.time.Duration; + /** * {@link SessionRetryOptions} encapsulates hints which influence * internal retry policies which are applied when the effective consistency @@ -13,12 +16,19 @@ public final class SessionRetryOptions { private final CosmosRegionSwitchHint regionSwitchHint; + private final Duration minInRegionRetryTime; + + private final int maxInRegionRetryCount; /** * Instantiates {@link SessionRetryOptions} * */ - SessionRetryOptions(CosmosRegionSwitchHint regionSwitchHint) { + SessionRetryOptions(CosmosRegionSwitchHint regionSwitchHint, + Duration minInRegionRetryTime, + int maxInRegionRetryCount) { this.regionSwitchHint = regionSwitchHint; + this.minInRegionRetryTime = minInRegionRetryTime ; + this.maxInRegionRetryCount = maxInRegionRetryCount; } static void initialize() { @@ -29,6 +39,16 @@ static void initialize() { public CosmosRegionSwitchHint getRegionSwitchHint(SessionRetryOptions sessionRetryOptions) { return sessionRetryOptions.regionSwitchHint; } + + @Override + public Duration getMinInRegionRetryTime(SessionRetryOptions sessionRetryOptions) { + return sessionRetryOptions.minInRegionRetryTime; + } + + @Override + public int getMaxInRegionRetryCount(SessionRetryOptions sessionRetryOptions) { + return sessionRetryOptions.maxInRegionRetryCount; + } }); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptionsBuilder.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptionsBuilder.java index 30cab1c9f0c5b..6b458aea7955e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptionsBuilder.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptionsBuilder.java @@ -3,6 +3,11 @@ package com.azure.cosmos; +import com.azure.cosmos.implementation.Configs; + +import java.time.Duration; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; /** @@ -12,6 +17,9 @@ public final class SessionRetryOptionsBuilder { private CosmosRegionSwitchHint regionSwitchHint; + private Duration minInRegionRetryTime = Configs.getMinRetryTimeInLocalRegionWhenRemoteRegionPreferred(); + + private int maxInRegionRetryCount = Configs.getMaxRetriesInLocalRegionWhenRemoteRegionPreferred(); /** * Sets the {@link CosmosRegionSwitchHint} which specifies for @@ -31,6 +39,31 @@ public SessionRetryOptionsBuilder regionSwitchHint(CosmosRegionSwitchHint region return this; } + /** + * Sets the minimum retry time for 404/1002 retries within each region for read and write operations. The minimum + * value is 100ms - this minimum is enforced to provide a way for the local region to catch-up on replication lag. + * The default value is 500ms - as a recommendation ensure that this value is higher than the steady-state + * replication latency between the regions you chose. + * @param minRetryTime the min retry time to be used with-in each region + * @return This instance of {@link SessionRetryOptionsBuilder} + */ + public SessionRetryOptionsBuilder minInRegionRetryTime(Duration minRetryTime) { + this.minInRegionRetryTime = minRetryTime; + return this; + } + + /** + * Sets the maximum number of retries within each region for read and write operations. The minimum + * value is 1 - the backoff time for the last in-region retry will ensure that the total retry time within the + * region is at least the min. in-region retry time. + * @param maxInRegionRetryCount the max. number of retries with-in each region + * @return This instance of {@link SessionRetryOptionsBuilder} + */ + public SessionRetryOptionsBuilder maxInRegionRetryCount(int maxInRegionRetryCount) { + this.maxInRegionRetryCount = maxInRegionRetryCount; + return this; + } + /** * Builds an instance of {@link SessionRetryOptions} * @@ -38,6 +71,27 @@ public SessionRetryOptionsBuilder regionSwitchHint(CosmosRegionSwitchHint region * */ public SessionRetryOptions build() { checkNotNull(regionSwitchHint, "regionSwitch hint cannot be null"); - return new SessionRetryOptions(regionSwitchHint); + + if (regionSwitchHint == CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) { + checkArgument( + minInRegionRetryTime != null, + "Argument 'minInRegionRetryTimeForWriteOperations' must not be null when 'regionSwitchHint' " + + "is 'REMOTE_REGION_PREFERRED'."); + + checkArgument( + minInRegionRetryTime + .compareTo(Duration.ofMillis(Configs.MIN_MIN_IN_REGION_RETRY_TIME_FOR_WRITES_MS)) >= 0, + "Argument 'minInRegionRetryTime' must have at least a value of '" + + Duration.ofMillis(Configs.MIN_MIN_IN_REGION_RETRY_TIME_FOR_WRITES_MS) + + "' when 'regionSwitchHint' is 'REMOTE_REGION_PREFERRED'."); + + checkArgument( + maxInRegionRetryCount >= Configs.MIN_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED, + "Argument 'maxInRegionRetryCount' must have at least a value of '" + + Configs.MIN_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED + + "' when 'regionSwitchHint' is 'REMOTE_REGION_PREFERRED'."); + } + + return new SessionRetryOptions(regionSwitchHint, minInRegionRetryTime, maxInRegionRetryCount); } } \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java index d980aa0ffd593..9e1408314466e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java @@ -112,6 +112,12 @@ public class Configs { "COSMOS.DEFAULT_SESSION_TOKEN_MISMATCH_MAXIMUM_BACKOFF_TIME_IN_MILLISECONDS"; private static final int DEFAULT_SESSION_TOKEN_MISMATCH_MAXIMUM_BACKOFF_TIME_IN_MILLISECONDS = 500; + public static final int MIN_MIN_IN_REGION_RETRY_TIME_FOR_WRITES_MS = 100; + + private static final String DEFAULT_MIN_IN_REGION_RETRY_TIME_FOR_WRITES_MS_NAME = + "COSMOS.DEFAULT_SESSION_TOKEN_MISMATCH_IN_REGION-RETRY_TIME_IN_MILLISECONDS"; + private static final int DEFAULT_MIN_IN_REGION_RETRY_TIME_FOR_WRITES_MS = 500; + // Whether to process the response on a different thread private static final String SWITCH_OFF_IO_THREAD_FOR_RESPONSE_NAME = "COSMOS.SWITCH_OFF_IO_THREAD_FOR_RESPONSE"; private static final boolean DEFAULT_SWITCH_OFF_IO_THREAD_FOR_RESPONSE = false; @@ -144,6 +150,8 @@ public class Configs { private static final String MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED = "COSMOS.MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED"; private static final int DEFAULT_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED = 1; + public static final int MIN_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED = 1; + public Configs() { this.sslContext = sslContextInit(); } @@ -395,7 +403,20 @@ public static int getAggressiveWarmupConcurrency() { } public static int getMaxRetriesInLocalRegionWhenRemoteRegionPreferred() { - return getIntValue(System.getProperty(MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED), - DEFAULT_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED); + return + Math.max( + getIntValue( + System.getProperty(MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED), + DEFAULT_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED), + MIN_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED); + } + + public static Duration getMinRetryTimeInLocalRegionWhenRemoteRegionPreferred() { + return + Duration.ofMillis(Math.max( + getIntValue( + System.getProperty(DEFAULT_MIN_IN_REGION_RETRY_TIME_FOR_WRITES_MS_NAME), + DEFAULT_MIN_IN_REGION_RETRY_TIME_FOR_WRITES_MS), + MIN_MIN_IN_REGION_RETRY_TIME_FOR_WRITES_MS)); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java index 82bb8c3b9e00d..cf84067129e11 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java @@ -1607,6 +1607,9 @@ public static void setCosmosSessionRetryOptionsAccessor(final CosmosSessionRetry public interface CosmosSessionRetryOptionsAccessor { CosmosRegionSwitchHint getRegionSwitchHint(SessionRetryOptions sessionRetryOptions); + Duration getMinInRegionRetryTime(SessionRetryOptions sessionRetryOptions); + + int getMaxInRegionRetryCount(SessionRetryOptions sessionRetryOptions); } } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenMismatchRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenMismatchRetryPolicy.java index 8144c49c6c4d8..2ced438b3e11b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenMismatchRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenMismatchRetryPolicy.java @@ -16,29 +16,42 @@ public class SessionTokenMismatchRetryPolicy implements IRetryPolicy { + private final static ImplementationBridgeHelpers.CosmosSessionRetryOptionsHelper.CosmosSessionRetryOptionsAccessor + sessionRetryOptionsAccessor = ImplementationBridgeHelpers + .CosmosSessionRetryOptionsHelper + .getCosmosSessionRetryOptionsAccessor(); private final static Logger LOGGER = LoggerFactory.getLogger(SessionTokenMismatchRetryPolicy.class); - private static final int BACKOFF_MULTIPLIER = 2; + private static final int BACKOFF_MULTIPLIER = 5; private final Duration maximumBackoff; private final TimeoutHelper waitTimeTimeoutHelper; private final AtomicInteger retryCount; private Duration currentBackoff; private RetryContext retryContext; private final AtomicInteger maxRetryAttemptsInCurrentRegion; - private final SessionRetryOptions sessionRetryOptions; + private final CosmosRegionSwitchHint regionSwitchHint; + + private final Duration minInRegionRetryTime; + + public SessionTokenMismatchRetryPolicy( + RetryContext retryContext, + SessionRetryOptions sessionRetryOptions) { - public SessionTokenMismatchRetryPolicy(RetryContext retryContext, int waitTimeInMilliseconds, SessionRetryOptions sessionRetryOptions) { this.waitTimeTimeoutHelper = new TimeoutHelper(Duration.ofMillis(Configs.getSessionTokenMismatchDefaultWaitTimeInMs())); this.maximumBackoff = Duration.ofMillis(Configs.getSessionTokenMismatchMaximumBackoffTimeInMs()); this.retryCount = new AtomicInteger(); this.retryCount.set(0); this.currentBackoff = Duration.ofMillis(Configs.getSessionTokenMismatchInitialBackoffTimeInMs()); - this.maxRetryAttemptsInCurrentRegion = new AtomicInteger(Configs.getMaxRetriesInLocalRegionWhenRemoteRegionPreferred()); + if (sessionRetryOptions != null) { + this.maxRetryAttemptsInCurrentRegion = + new AtomicInteger(sessionRetryOptionsAccessor.getMaxInRegionRetryCount(sessionRetryOptions)); + this.regionSwitchHint = sessionRetryOptionsAccessor.getRegionSwitchHint(sessionRetryOptions); + this.minInRegionRetryTime = sessionRetryOptionsAccessor.getMinInRegionRetryTime(sessionRetryOptions); + } else { + this.maxRetryAttemptsInCurrentRegion = null; + this.regionSwitchHint = CosmosRegionSwitchHint.LOCAL_REGION_PREFERRED; + this.minInRegionRetryTime = null; + } this.retryContext = retryContext; - this.sessionRetryOptions = sessionRetryOptions; - } - - public SessionTokenMismatchRetryPolicy(RetryContext retryContext, SessionRetryOptions sessionRetryOptions) { - this(retryContext, Configs.getSessionTokenMismatchDefaultWaitTimeInMs(), sessionRetryOptions); } @Override @@ -77,7 +90,7 @@ public Mono shouldRetry(Exception e) { // to the write region then region switch using ClientRetryPolicy will route // the retry to the same write region again, therefore the DIFFERENT_REGION_PREFERRED // hint causes quicker switch to the same write region which is reasonable - if (!shouldRetryLocally(sessionRetryOptions, retryCount.get())) { + if (!shouldRetryLocally(regionSwitchHint, retryCount.get())) { LOGGER.debug("SessionTokenMismatchRetryPolicy not retrying because it a retry attempt for the current region and " + "fallback to a different region is preferred "); @@ -88,7 +101,8 @@ public Mono shouldRetry(Exception e) { Duration effectiveBackoff = Duration.ZERO; // Don't penalize first retry with delay - if (this.retryCount.getAndIncrement() > 0) { + int attempt = this.retryCount.getAndIncrement(); + if (attempt > 0) { // Get the backoff time by selecting the smallest value between the remaining time and // the current back off time @@ -102,6 +116,19 @@ public Mono shouldRetry(Exception e) { this.maximumBackoff); } + // For remote region preference ensure that the last retry is long enough (even when exceeding max backoff time) + // to consume the entire minRetryTimeInLocalRegion + if (regionSwitchHint == CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED + && attempt >= (this.maxRetryAttemptsInCurrentRegion.get() - 1)) { + + Duration remainingMinRetryTimeInLocalRegion = + this.waitTimeTimeoutHelper.getRemainingTime(minInRegionRetryTime); + + if (remainingMinRetryTimeInLocalRegion.compareTo(effectiveBackoff) > 0) { + effectiveBackoff = remainingMinRetryTimeInLocalRegion; + } + } + LOGGER.debug( "SessionTokenMismatchRetryPolicy will retry. Retry count = {}. Backoff time = {} ms", this.retryCount, @@ -123,18 +150,8 @@ private static Duration getEffectiveBackoff(Duration backoff, Duration remaining return backoff; } - private boolean shouldRetryLocally(SessionRetryOptions sessionRetryOptions, int sessionTokenMismatchRetryAttempts) { - - if (sessionRetryOptions == null) { - return true; - } - - CosmosRegionSwitchHint regionSwitchHint = ImplementationBridgeHelpers - .CosmosSessionRetryOptionsHelper - .getCosmosSessionRetryOptionsAccessor() - .getRegionSwitchHint(sessionRetryOptions); - - if (regionSwitchHint == null || regionSwitchHint == CosmosRegionSwitchHint.LOCAL_REGION_PREFERRED) { + private boolean shouldRetryLocally(CosmosRegionSwitchHint regionSwitchHint, int sessionTokenMismatchRetryAttempts) { + if (regionSwitchHint != CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) { return true; } @@ -143,7 +160,6 @@ private boolean shouldRetryLocally(SessionRetryOptions sessionRetryOptions, int // another attempt on the same region // hence to curb the retry attempts on a region, // compare sessionTokenMismatchRetryAttempts with max retry attempts allowed on the region - 1 - return !(regionSwitchHint == CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED - && sessionTokenMismatchRetryAttempts == (this.maxRetryAttemptsInCurrentRegion.get() - 1)); + return sessionTokenMismatchRetryAttempts <= (this.maxRetryAttemptsInCurrentRegion.get() - 1); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReader.java index 64befa731e26b..c50dd6bcee00d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReader.java @@ -235,7 +235,9 @@ public Mono readAsync(RxDocumentServiceRequest entity, if (targetConsistencyLevel.v == ConsistencyLevel.SESSION) { return BackoffRetryUtility.executeRetry( () -> this.readSessionAsync(entity, desiredReadMode), - new SessionTokenMismatchRetryPolicy(BridgeInternal.getRetryContext(entity.requestContext.cosmosDiagnostics), sessionRetryOptions)); + new SessionTokenMismatchRetryPolicy( + BridgeInternal.getRetryContext(entity.requestContext.cosmosDiagnostics), + sessionRetryOptions)); } else { return this.readAnyAsync(entity, desiredReadMode); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java index 32149fe7bcd7e..045632516078a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java @@ -119,7 +119,9 @@ public Mono writeAsync( return BackoffRetryUtility .executeRetry( () -> this.writePrivateAsync(entity, timeout, forceRefresh), - new SessionTokenMismatchRetryPolicy(BridgeInternal.getRetryContext(entity.requestContext.cosmosDiagnostics), sessionRetryOptions)) + new SessionTokenMismatchRetryPolicy( + BridgeInternal.getRetryContext(entity.requestContext.cosmosDiagnostics), + sessionRetryOptions)) .doOnEach( arg -> { try { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TimeoutHelper.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TimeoutHelper.java index f7f78e0585251..e1ac84d9a58c4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TimeoutHelper.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TimeoutHelper.java @@ -21,13 +21,21 @@ public TimeoutHelper(Duration timeOut) { } public boolean isElapsed() { + return this.isElapsed(this.timeOut); + } + + public boolean isElapsed(Duration timeoutToCompareTo) { Duration elapsed = Duration.ofMillis(Instant.now().toEpochMilli() - startTime.toEpochMilli()); - return elapsed.compareTo(this.timeOut) >= 0; + return elapsed.compareTo(timeoutToCompareTo) >= 0; } public Duration getRemainingTime() { + return this.getRemainingTime(this.timeOut); + } + + public Duration getRemainingTime(Duration timeoutToCompareTo) { Duration elapsed = Duration.ofMillis(Instant.now().toEpochMilli() - startTime.toEpochMilli()); - return this.timeOut.minus(elapsed); + return timeoutToCompareTo.minus(elapsed); } public void throwTimeoutIfElapsed() throws RequestTimeoutException {