From 4517d78dd707ef4aa0922c76f070f499194328fe Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Thu, 12 Oct 2023 00:05:58 +0000 Subject: [PATCH 1/8] Adding min. retry time per region for 404/1002 SessionTokenRetryPolicy when RegionSwitchHint is Remote --- ...njectionWithAvailabilityStrategyTests.java | 230 +++++++++++++++++- .../com/azure/cosmos/SessionRetryOptions.java | 11 +- .../cosmos/SessionRetryOptionsBuilder.java | 34 ++- .../azure/cosmos/implementation/Configs.java | 4 + .../ImplementationBridgeHelpers.java | 1 + .../SessionTokenMismatchRetryPolicy.java | 33 ++- .../directconnectivity/ConsistencyReader.java | 5 +- .../directconnectivity/ConsistencyWriter.java | 5 +- .../directconnectivity/TimeoutHelper.java | 6 +- 9 files changed, 311 insertions(+), 18 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FaultInjectionWithAvailabilityStrategyTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FaultInjectionWithAvailabilityStrategyTests.java index 78a61cbdc84ff..c51611b06390b 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FaultInjectionWithAvailabilityStrategyTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FaultInjectionWithAvailabilityStrategyTests.java @@ -4,6 +4,7 @@ import com.azure.cosmos.implementation.AsyncDocumentClient; import com.azure.cosmos.implementation.ClientSideRequestStatistics; +import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.DatabaseAccount; import com.azure.cosmos.implementation.DatabaseAccountLocation; import com.azure.cosmos.implementation.GlobalEndpointManager; @@ -53,6 +54,7 @@ import org.testng.annotations.Test; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -728,6 +730,7 @@ public void readAfterCreation( endToEndTimeout, availabilityStrategy, regionSwitchHint, + null, notSpecifiedWhetherIdempotentWriteRetriesAreEnabled, ArrayUtils.toArray(FaultInjectionOperationType.READ_ITEM), readItemCallback, @@ -746,6 +749,8 @@ public void readAfterCreation( public Object[][] testConfigs_writeAfterCreation() { final boolean nonIdempotentWriteRetriesEnabled = true; final boolean nonIdempotentWriteRetriesDisabled = false; + final String SECOND_REGION_NAME = writeableRegions.get(1).toLowerCase(Locale.ROOT); + final Duration NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES = null; Function createAnotherItemCallback = (params) -> { @@ -854,6 +859,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(3), noAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.CREATE_ITEM, createAnotherItemCallback, @@ -870,6 +876,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), noAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.CREATE_ITEM, createAnotherItemCallback, @@ -887,6 +894,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.CREATE_ITEM, createAnotherItemCallback, @@ -903,6 +911,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.CREATE_ITEM, createAnotherItemCallback, @@ -919,6 +928,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), eagerThresholdAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.CREATE_ITEM, createAnotherItemCallback, @@ -936,6 +946,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.CREATE_ITEM, createAnotherItemCallback, @@ -948,6 +959,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -960,6 +972,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.PATCH_ITEM, patchItemCallback, @@ -972,6 +985,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.DELETE_ITEM, deleteItemCallback, @@ -984,6 +998,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.UPSERT_ITEM, upsertExistingItemCallback, @@ -996,6 +1011,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.UPSERT_ITEM, upsertAnotherItemCallback, @@ -1008,6 +1024,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.PATCH_ITEM, patchItemCallback, @@ -1025,6 +1042,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.DELETE_ITEM, deleteItemCallback, @@ -1037,6 +1055,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1049,6 +1068,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.UPSERT_ITEM, upsertAnotherItemCallback, @@ -1061,6 +1081,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.UPSERT_ITEM, upsertExistingItemCallback, @@ -1073,6 +1094,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), noAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.CREATE_ITEM, createAnotherItemCallback, @@ -1089,6 +1111,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), noAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.CREATE_ITEM, createAnotherItemCallback, @@ -1107,6 +1130,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), reluctantThresholdAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.DELETE_ITEM, deleteItemCallback, @@ -1123,6 +1147,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.DELETE_ITEM, deleteItemCallback, @@ -1139,6 +1164,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), reluctantThresholdAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.PATCH_ITEM, patchItemCallback, @@ -1155,6 +1181,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.PATCH_ITEM, patchItemCallback, @@ -1173,6 +1200,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1190,6 +1218,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), eagerThresholdAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1205,6 +1234,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1221,6 +1251,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), noAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1237,6 +1268,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.UPSERT_ITEM, upsertExistingItemCallback, @@ -1254,6 +1286,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.UPSERT_ITEM, upsertExistingItemCallback, @@ -1270,6 +1303,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), noAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.UPSERT_ITEM, upsertAnotherItemCallback, @@ -1284,6 +1318,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(90), noAvailabilityStrategy, noRegionSwitchHint, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.UPSERT_ITEM, upsertAnotherItemCallback, @@ -1303,6 +1338,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, CosmosRegionSwitchHint.LOCAL_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.CREATE_ITEM, createAnotherItemCallback, @@ -1322,6 +1358,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), defaultAvailabilityStrategy, CosmosRegionSwitchHint.LOCAL_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1341,6 +1378,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), eagerThresholdAvailabilityStrategy, CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1358,6 +1396,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), eagerThresholdAvailabilityStrategy, CosmosRegionSwitchHint.LOCAL_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1374,6 +1413,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), eagerThresholdAvailabilityStrategy, CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1392,6 +1432,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), eagerThresholdAvailabilityStrategy, CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1410,6 +1451,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), eagerThresholdAvailabilityStrategy, CosmosRegionSwitchHint.LOCAL_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesDisabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1427,6 +1469,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), eagerThresholdAvailabilityStrategy, CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1444,6 +1487,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), eagerThresholdAvailabilityStrategy, CosmosRegionSwitchHint.LOCAL_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.REPLACE_ITEM, replaceItemCallback, @@ -1462,6 +1506,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), reluctantThresholdAvailabilityStrategy, CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.CREATE_ITEM, createAnotherItemCallback, @@ -1471,6 +1516,164 @@ public Object[][] testConfigs_writeAfterCreation() { validateDiagnosticsContextHasDiagnosticsForOnlyFirstRegionButWithRegionalFailover }, + // 404/1022 into local region only + // No Availability strategy exists. + // Expected to get successful response from cross regional retry - region switch is remote allowing the + // cross regional retry to finish within e2e timeout. + new Object[] { + "Create_404-1002_FirstRegionOnly_RemotePreferred_NoAvailabilityStrategy_WithRetries", + Duration.ofSeconds(1), + noAvailabilityStrategy, + CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, + nonIdempotentWriteRetriesEnabled, + FaultInjectionOperationType.CREATE_ITEM, + createAnotherItemCallback, + injectReadSessionNotAvailableIntoFirstRegionOnly, + validateStatusCodeIs201Created, + // no hedging even with availability strategy because nonIdempotentWrites are disabled + (Consumer)(ctx -> { + assertThat(ctx).isNotNull(); + assertThat(ctx.getDiagnostics()).isNotNull(); + CosmosDiagnostics[] diagnostics = ctx.getDiagnostics().toArray(new CosmosDiagnostics[0]); + assertThat(diagnostics).isNotNull(); + assertThat(diagnostics.length).isEqualTo(1); + assertThat(diagnostics[0].getClientSideRequestStatistics()).isNotNull(); + ClientSideRequestStatistics[] clientStats = + diagnostics[0].getClientSideRequestStatistics().toArray(new ClientSideRequestStatistics[0]); + assertThat(clientStats.length).isEqualTo(1); + assertThat(clientStats[0].getResponseStatisticsList()).isNotNull(); + ClientSideRequestStatistics.StoreResponseStatistics[] storeResponses = + clientStats[0].getResponseStatisticsList().toArray( + new ClientSideRequestStatistics.StoreResponseStatistics[0]); + assertThat(storeResponses.length).isGreaterThanOrEqualTo(2); + + + Instant firstRequestStart = Instant.MAX; + Instant firstRequestStartInSecondRegion = Instant.MAX; + for (ClientSideRequestStatistics.StoreResponseStatistics currentStoreResponse : storeResponses) { + if (currentStoreResponse.getRequestStartTimeUTC().isBefore(firstRequestStart)) { + firstRequestStart = currentStoreResponse.getRequestStartTimeUTC(); + } + + if (currentStoreResponse.getRegionName().equals(SECOND_REGION_NAME) && + currentStoreResponse.getRequestStartTimeUTC().isBefore(firstRequestStartInSecondRegion)) { + + firstRequestStartInSecondRegion = currentStoreResponse.getRequestStartTimeUTC(); + } + } + + logger.info("FirstRequestStart: {}, FirstRequestInSecondReqionStart: {}", + firstRequestStart, + firstRequestStartInSecondRegion); + + assertThat(firstRequestStartInSecondRegion.isAfter(firstRequestStart)).isEqualTo(true); + assertThat( + firstRequestStartInSecondRegion + .minus( + Configs.DEFAULT_MIN_IN_REGION_RETRY_TIME_FOR_WRITES.minus(Duration.ofMillis(5))) + .isAfter(firstRequestStart)).isEqualTo(true); + + validateDiagnosticsContextHasDiagnosticsForOnlyFirstRegionButWithRegionalFailover.accept(ctx); + }) + }, + + // 404/1022 into local region only + // No availability strategy exists. + // Expected to get successful response from cross regional retry - region switch is remote allowing the + // cross regional retry to finish within e2e timeout. + new Object[] { + "Create_404-1002_FirstRegionOnly_RemotePreferredWithHighInRegionRetryTime_NoAvailabilityStrategy_WithRetries", + Duration.ofSeconds(1), + noAvailabilityStrategy, + CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED, + Duration.ofMillis(600), + nonIdempotentWriteRetriesEnabled, + FaultInjectionOperationType.CREATE_ITEM, + createAnotherItemCallback, + injectReadSessionNotAvailableIntoFirstRegionOnly, + validateStatusCodeIs201Created, + // no hedging even with availability strategy because nonIdempotentWrites are disabled + (Consumer)(ctx -> { + assertThat(ctx).isNotNull(); + assertThat(ctx.getDiagnostics()).isNotNull(); + CosmosDiagnostics[] diagnostics = ctx.getDiagnostics().toArray(new CosmosDiagnostics[0]); + assertThat(diagnostics).isNotNull(); + assertThat(diagnostics.length).isEqualTo(1); + assertThat(diagnostics[0].getClientSideRequestStatistics()).isNotNull(); + ClientSideRequestStatistics[] clientStats = + diagnostics[0].getClientSideRequestStatistics().toArray(new ClientSideRequestStatistics[0]); + assertThat(clientStats.length).isEqualTo(1); + assertThat(clientStats[0].getResponseStatisticsList()).isNotNull(); + ClientSideRequestStatistics.StoreResponseStatistics[] storeResponses = + clientStats[0].getResponseStatisticsList().toArray( + new ClientSideRequestStatistics.StoreResponseStatistics[0]); + assertThat(storeResponses.length).isGreaterThanOrEqualTo(2); + + + Instant firstRequestStart = Instant.MAX; + Instant firstRequestStartInSecondRegion = Instant.MAX; + for (ClientSideRequestStatistics.StoreResponseStatistics currentStoreResponse : storeResponses) { + if (currentStoreResponse.getRequestStartTimeUTC().isBefore(firstRequestStart)) { + firstRequestStart = currentStoreResponse.getRequestStartTimeUTC(); + } + + if (currentStoreResponse.getRegionName().equals(SECOND_REGION_NAME) && + currentStoreResponse.getRequestStartTimeUTC().isBefore(firstRequestStartInSecondRegion)) { + + firstRequestStartInSecondRegion = currentStoreResponse.getRequestStartTimeUTC(); + } + } + + logger.info("FirstRequestStart: {}, FirstRequestInSecondReqionStart: {}", + firstRequestStart, + firstRequestStartInSecondRegion); + + assertThat(firstRequestStartInSecondRegion.isAfter(firstRequestStart)).isEqualTo(true); + assertThat( + firstRequestStartInSecondRegion + .minus(Duration.ofMillis(600-5)) + .isAfter(firstRequestStart)).isEqualTo(true); + + validateDiagnosticsContextHasDiagnosticsForOnlyFirstRegionButWithRegionalFailover.accept(ctx); + }) + }, + + // 404/1022 into local region only + // No availability strategy exists. + // Expected to get 408 because min. in-region wait time is larger than e2e timeout. + new Object[] { + "Create_404-1002_FirstRegionOnly_RemotePreferredWithTooHighInRegionRetryTime_NoAvailabilityStrategy_408", + Duration.ofSeconds(1), + noAvailabilityStrategy, + CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED, + Duration.ofMillis(1100), + nonIdempotentWriteRetriesEnabled, + FaultInjectionOperationType.CREATE_ITEM, + createAnotherItemCallback, + injectReadSessionNotAvailableIntoFirstRegionOnly, + validateStatusCodeIsOperationCancelled, + // no hedging even with availability strategy because nonIdempotentWrites are disabled + (Consumer)(ctx -> { + assertThat(ctx).isNotNull(); + assertThat(ctx.getDiagnostics()).isNotNull(); + CosmosDiagnostics[] diagnostics = ctx.getDiagnostics().toArray(new CosmosDiagnostics[0]); + assertThat(diagnostics).isNotNull(); + assertThat(diagnostics.length).isEqualTo(1); + assertThat(diagnostics[0].getClientSideRequestStatistics()).isNotNull(); + ClientSideRequestStatistics[] clientStats = + diagnostics[0].getClientSideRequestStatistics().toArray(new ClientSideRequestStatistics[0]); + assertThat(clientStats.length).isEqualTo(1); + assertThat(clientStats[0].getResponseStatisticsList()).isNotNull(); + ClientSideRequestStatistics.StoreResponseStatistics[] storeResponses = + clientStats[0].getResponseStatisticsList().toArray( + new ClientSideRequestStatistics.StoreResponseStatistics[0]); + assertThat(storeResponses.length).isGreaterThanOrEqualTo(2); + + validateDiagnosticsContextHasDiagnosticsForOnlyFirstRegion.accept(ctx); + }) + }, + // 404/1022 into local region only // Availability strategy exists, hedging is enabled. Region switch is local - meaning the local retries // will take so long, that the cross-regional retry in the client retry policy is not applicable. @@ -1480,6 +1683,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), eagerThresholdAvailabilityStrategy, CosmosRegionSwitchHint.LOCAL_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.CREATE_ITEM, createAnotherItemCallback, @@ -1499,6 +1703,7 @@ public Object[][] testConfigs_writeAfterCreation() { Duration.ofSeconds(1), eagerThresholdAvailabilityStrategy, CosmosRegionSwitchHint.LOCAL_REGION_PREFERRED, + NO_CUSTOM_MIN_RETRY_TIME_IN_REGION_FOR_WRITES, nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType.DELETE_ITEM, deleteNonExistingItemCallback, @@ -1516,6 +1721,7 @@ public void writeAfterCreation( Duration endToEndTimeout, ThresholdBasedAvailabilityStrategy availabilityStrategy, CosmosRegionSwitchHint regionSwitchHint, + Duration customMinRetryTimeInLocalRegionForWrites, Boolean nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType faultInjectionOperationType, Function actionAfterInitialCreation, @@ -1528,6 +1734,7 @@ public void writeAfterCreation( endToEndTimeout, availabilityStrategy, regionSwitchHint, + customMinRetryTimeInLocalRegionForWrites, nonIdempotentWriteRetriesEnabled, ArrayUtils.toArray(faultInjectionOperationType), actionAfterInitialCreation, @@ -2477,6 +2684,7 @@ public void queryAfterCreation( endToEndTimeout, availabilityStrategy, regionSwitchHint, + null, notSpecifiedWhetherIdempotentWriteRetriesAreEnabled, ArrayUtils.toArray(FaultInjectionOperationType.QUERY_ITEM), (params) -> queryExecution.apply(queryGenerator.apply(params), params), @@ -2951,6 +3159,7 @@ public void readManyAfterCreation( endToEndTimeout, availabilityStrategy, regionSwitchHint, + null, notSpecifiedWhetherIdempotentWriteRetriesAreEnabled, ArrayUtils.toArray( FaultInjectionOperationType.QUERY_ITEM, @@ -3566,6 +3775,7 @@ public void readAllAfterCreation( endToEndTimeout, availabilityStrategy, regionSwitchHint, + null, notSpecifiedWhetherIdempotentWriteRetriesAreEnabled, ArrayUtils.toArray(FaultInjectionOperationType.QUERY_ITEM), readAllOperation, @@ -3794,6 +4004,7 @@ private void execute( Duration endToEndTimeout, ThresholdBasedAvailabilityStrategy availabilityStrategy, CosmosRegionSwitchHint regionSwitchHint, + Duration customMinRetryTimeInLocalRegionForWrites, Boolean nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType[] faultInjectionOperationTypes, Function actionAfterInitialCreation, @@ -3809,7 +4020,11 @@ private void execute( logger.info("START {}", testCaseId); - CosmosAsyncClient clientWithPreferredRegions = buildCosmosClient(this.writeableRegions, regionSwitchHint, nonIdempotentWriteRetriesEnabled); + CosmosAsyncClient clientWithPreferredRegions = buildCosmosClient( + this.writeableRegions, + regionSwitchHint, + customMinRetryTimeInLocalRegionForWrites, + nonIdempotentWriteRetriesEnabled); try { if (clearContainerBeforeExecution) { @@ -3950,17 +4165,28 @@ private void execute( private static CosmosAsyncClient buildCosmosClient( List preferredRegions, CosmosRegionSwitchHint regionSwitchHint, + Duration customMinRetryTimeInLocalRegionForWrites, Boolean nonIdempotentWriteRetriesEnabled) { CosmosClientTelemetryConfig telemetryConfig = new CosmosClientTelemetryConfig() .diagnosticsHandler(new CosmosDiagnosticsLogger()); + CosmosRegionSwitchHint effectiveRegionSwitchHint = regionSwitchHint != null + ? regionSwitchHint + : CosmosRegionSwitchHint.LOCAL_REGION_PREFERRED; + SessionRetryOptionsBuilder retryOptionsBuilder = new SessionRetryOptionsBuilder() + .regionSwitchHint(effectiveRegionSwitchHint); + + if (customMinRetryTimeInLocalRegionForWrites != null) { + retryOptionsBuilder.minRetryTimeInLocalRegionForWriteOperations(customMinRetryTimeInLocalRegionForWrites); + } + CosmosClientBuilder builder = new CosmosClientBuilder() .endpoint(TestConfigurations.HOST) .key(TestConfigurations.MASTER_KEY) .consistencyLevel(ConsistencyLevel.SESSION) .preferredRegions(preferredRegions) - .sessionRetryOptions(new SessionRetryOptions(regionSwitchHint)) + .sessionRetryOptions(retryOptionsBuilder.build()) .directMode() .multipleWriteRegionsEnabled(true) .clientTelemetryConfig(telemetryConfig); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptions.java index 5cb50d91244f4..bef397e475cfc 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptions.java @@ -5,6 +5,8 @@ import com.azure.cosmos.implementation.ImplementationBridgeHelpers; +import java.time.Duration; + /** * {@link SessionRetryOptions} encapsulates hints which influence * internal retry policies which are applied when the effective consistency @@ -13,12 +15,14 @@ public final class SessionRetryOptions { private final CosmosRegionSwitchHint regionSwitchHint; + private final Duration minInRegionRetryTimeForWriteOperations; /** * Instantiates {@link SessionRetryOptions} * */ - SessionRetryOptions(CosmosRegionSwitchHint regionSwitchHint) { + SessionRetryOptions(CosmosRegionSwitchHint regionSwitchHint, Duration minInRegionRetryTimeForWriteOperations) { this.regionSwitchHint = regionSwitchHint; + this.minInRegionRetryTimeForWriteOperations = minInRegionRetryTimeForWriteOperations; } static void initialize() { @@ -29,6 +33,11 @@ static void initialize() { public CosmosRegionSwitchHint getRegionSwitchHint(SessionRetryOptions sessionRetryOptions) { return sessionRetryOptions.regionSwitchHint; } + + @Override + public Duration getMinInRegionRetryTimeForWriteOperations(SessionRetryOptions sessionRetryOptions) { + return sessionRetryOptions.minInRegionRetryTimeForWriteOperations; + } }); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptionsBuilder.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptionsBuilder.java index 30cab1c9f0c5b..2e15a6c90357e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptionsBuilder.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptionsBuilder.java @@ -3,6 +3,11 @@ package com.azure.cosmos; +import com.azure.cosmos.implementation.Configs; + +import java.time.Duration; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkArgument; import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; /** @@ -12,6 +17,7 @@ public final class SessionRetryOptionsBuilder { private CosmosRegionSwitchHint regionSwitchHint; + private Duration minInRegionRetryTimeForWriteOperations = Configs.DEFAULT_MIN_IN_REGION_RETRY_TIME_FOR_WRITES; /** * Sets the {@link CosmosRegionSwitchHint} which specifies for @@ -31,6 +37,17 @@ public SessionRetryOptionsBuilder regionSwitchHint(CosmosRegionSwitchHint region return this; } + /** + * Sets the minimum retry time for 404/1002 retries within each region for write operations. The minimum value + * is 100ms - this minimum is enforced to provide a way for the local region to catch-up on replication lag. + * @param minRetryTime the min retry time to be used with-in each region for write operations + * @return This instance of {@link SessionRetryOptionsBuilder} + */ + public SessionRetryOptionsBuilder minRetryTimeInLocalRegionForWriteOperations(Duration minRetryTime) { + this.minInRegionRetryTimeForWriteOperations = minRetryTime; + return this; + } + /** * Builds an instance of {@link SessionRetryOptions} * @@ -38,6 +55,21 @@ public SessionRetryOptionsBuilder regionSwitchHint(CosmosRegionSwitchHint region * */ public SessionRetryOptions build() { checkNotNull(regionSwitchHint, "regionSwitch hint cannot be null"); - return new SessionRetryOptions(regionSwitchHint); + + if (regionSwitchHint == CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) { + checkArgument( + minInRegionRetryTimeForWriteOperations != null, + "Argument 'minInRegionRetryTimeForWriteOperations' must not be null when 'regionSwitchHint' " + + "is 'REMOTE_REGION_PREFERRED'."); + + checkArgument( + minInRegionRetryTimeForWriteOperations + .compareTo(Configs.MIN_MIN_IN_REGION_RETRY_TIME_FOR_WRITES) >= 0, + "Argument 'minInRegionRetryTimeForWriteOperations' must have at least a value of '" + + Configs.MIN_MIN_IN_REGION_RETRY_TIME_FOR_WRITES.toString() + + "' when 'regionSwitchHint' is 'REMOTE_REGION_PREFERRED'."); + } + + return new SessionRetryOptions(regionSwitchHint, minInRegionRetryTimeForWriteOperations); } } \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java index d980aa0ffd593..0cfbda39cd95f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java @@ -112,6 +112,10 @@ public class Configs { "COSMOS.DEFAULT_SESSION_TOKEN_MISMATCH_MAXIMUM_BACKOFF_TIME_IN_MILLISECONDS"; private static final int DEFAULT_SESSION_TOKEN_MISMATCH_MAXIMUM_BACKOFF_TIME_IN_MILLISECONDS = 500; + public static final Duration MIN_MIN_IN_REGION_RETRY_TIME_FOR_WRITES = Duration.ofMillis(100); + + public static final Duration DEFAULT_MIN_IN_REGION_RETRY_TIME_FOR_WRITES = Duration.ofMillis(100); + // Whether to process the response on a different thread private static final String SWITCH_OFF_IO_THREAD_FOR_RESPONSE_NAME = "COSMOS.SWITCH_OFF_IO_THREAD_FOR_RESPONSE"; private static final boolean DEFAULT_SWITCH_OFF_IO_THREAD_FOR_RESPONSE = false; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java index 82bb8c3b9e00d..4186954be9481 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java @@ -1607,6 +1607,7 @@ public static void setCosmosSessionRetryOptionsAccessor(final CosmosSessionRetry public interface CosmosSessionRetryOptionsAccessor { CosmosRegionSwitchHint getRegionSwitchHint(SessionRetryOptions sessionRetryOptions); + Duration getMinInRegionRetryTimeForWriteOperations(SessionRetryOptions sessionRetryOptions); } } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenMismatchRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenMismatchRetryPolicy.java index 8144c49c6c4d8..1a5372c99a443 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenMismatchRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenMismatchRetryPolicy.java @@ -16,6 +16,10 @@ public class SessionTokenMismatchRetryPolicy implements IRetryPolicy { + private final static ImplementationBridgeHelpers.CosmosSessionRetryOptionsHelper.CosmosSessionRetryOptionsAccessor + sessionRetryOptionsAccessor = ImplementationBridgeHelpers + .CosmosSessionRetryOptionsHelper + .getCosmosSessionRetryOptionsAccessor(); private final static Logger LOGGER = LoggerFactory.getLogger(SessionTokenMismatchRetryPolicy.class); private static final int BACKOFF_MULTIPLIER = 2; private final Duration maximumBackoff; @@ -25,8 +29,13 @@ public class SessionTokenMismatchRetryPolicy implements IRetryPolicy { private RetryContext retryContext; private final AtomicInteger maxRetryAttemptsInCurrentRegion; private final SessionRetryOptions sessionRetryOptions; + private final boolean isWriteOperation; + + public SessionTokenMismatchRetryPolicy( + RetryContext retryContext, + SessionRetryOptions sessionRetryOptions, + boolean isWriteOperation) { - public SessionTokenMismatchRetryPolicy(RetryContext retryContext, int waitTimeInMilliseconds, SessionRetryOptions sessionRetryOptions) { this.waitTimeTimeoutHelper = new TimeoutHelper(Duration.ofMillis(Configs.getSessionTokenMismatchDefaultWaitTimeInMs())); this.maximumBackoff = Duration.ofMillis(Configs.getSessionTokenMismatchMaximumBackoffTimeInMs()); this.retryCount = new AtomicInteger(); @@ -35,10 +44,7 @@ public SessionTokenMismatchRetryPolicy(RetryContext retryContext, int waitTimeIn this.maxRetryAttemptsInCurrentRegion = new AtomicInteger(Configs.getMaxRetriesInLocalRegionWhenRemoteRegionPreferred()); this.retryContext = retryContext; this.sessionRetryOptions = sessionRetryOptions; - } - - public SessionTokenMismatchRetryPolicy(RetryContext retryContext, SessionRetryOptions sessionRetryOptions) { - this(retryContext, Configs.getSessionTokenMismatchDefaultWaitTimeInMs(), sessionRetryOptions); + this.isWriteOperation = isWriteOperation; } @Override @@ -129,12 +135,18 @@ private boolean shouldRetryLocally(SessionRetryOptions sessionRetryOptions, int return true; } - CosmosRegionSwitchHint regionSwitchHint = ImplementationBridgeHelpers - .CosmosSessionRetryOptionsHelper - .getCosmosSessionRetryOptionsAccessor() + CosmosRegionSwitchHint regionSwitchHint = sessionRetryOptionsAccessor .getRegionSwitchHint(sessionRetryOptions); - if (regionSwitchHint == null || regionSwitchHint == CosmosRegionSwitchHint.LOCAL_REGION_PREFERRED) { + if (regionSwitchHint != CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) { + return true; + } + + // For write operations we need to retry at least MinInRegionRetryTimeForWriteOperations + // to allow the region to catch up on replication + if (this.isWriteOperation && !this.waitTimeTimeoutHelper.isElapsed( + sessionRetryOptionsAccessor.getMinInRegionRetryTimeForWriteOperations(sessionRetryOptions))) { + return true; } @@ -143,7 +155,6 @@ private boolean shouldRetryLocally(SessionRetryOptions sessionRetryOptions, int // another attempt on the same region // hence to curb the retry attempts on a region, // compare sessionTokenMismatchRetryAttempts with max retry attempts allowed on the region - 1 - return !(regionSwitchHint == CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED - && sessionTokenMismatchRetryAttempts == (this.maxRetryAttemptsInCurrentRegion.get() - 1)); + return sessionTokenMismatchRetryAttempts < (this.maxRetryAttemptsInCurrentRegion.get() - 1); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReader.java index 64befa731e26b..60eb635f63c05 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReader.java @@ -235,7 +235,10 @@ public Mono readAsync(RxDocumentServiceRequest entity, if (targetConsistencyLevel.v == ConsistencyLevel.SESSION) { return BackoffRetryUtility.executeRetry( () -> this.readSessionAsync(entity, desiredReadMode), - new SessionTokenMismatchRetryPolicy(BridgeInternal.getRetryContext(entity.requestContext.cosmosDiagnostics), sessionRetryOptions)); + new SessionTokenMismatchRetryPolicy( + BridgeInternal.getRetryContext(entity.requestContext.cosmosDiagnostics), + sessionRetryOptions, + false)); } else { return this.readAnyAsync(entity, desiredReadMode); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java index 32149fe7bcd7e..ec29fdbf74538 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java @@ -119,7 +119,10 @@ public Mono writeAsync( return BackoffRetryUtility .executeRetry( () -> this.writePrivateAsync(entity, timeout, forceRefresh), - new SessionTokenMismatchRetryPolicy(BridgeInternal.getRetryContext(entity.requestContext.cosmosDiagnostics), sessionRetryOptions)) + new SessionTokenMismatchRetryPolicy( + BridgeInternal.getRetryContext(entity.requestContext.cosmosDiagnostics), + sessionRetryOptions, + true)) .doOnEach( arg -> { try { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TimeoutHelper.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TimeoutHelper.java index f7f78e0585251..c6f4e096beed8 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TimeoutHelper.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TimeoutHelper.java @@ -21,8 +21,12 @@ public TimeoutHelper(Duration timeOut) { } public boolean isElapsed() { + return this.isElapsed(this.timeOut); + } + + public boolean isElapsed(Duration timeoutToCompareTo) { Duration elapsed = Duration.ofMillis(Instant.now().toEpochMilli() - startTime.toEpochMilli()); - return elapsed.compareTo(this.timeOut) >= 0; + return elapsed.compareTo(timeoutToCompareTo) >= 0; } public Duration getRemainingTime() { From cccc4400fcf4751e8d209441300de654c6cadbe7 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Fri, 13 Oct 2023 13:36:25 +0000 Subject: [PATCH 2/8] Update SessionRetryOptionsTests.java --- .../azure/cosmos/faultinjection/SessionRetryOptionsTests.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/SessionRetryOptionsTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/SessionRetryOptionsTests.java index c5c316c069a41..9617d92a8481e 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/SessionRetryOptionsTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/SessionRetryOptionsTests.java @@ -250,7 +250,9 @@ public void writeOperation_withReadSessionUnavailable_test( // Check if the SessionTokenMismatchRetryPolicy retries on the bad / lagging region // for sessionTokenMismatchRetryAttempts by tracking the badSessionTokenRule hit count if (regionSwitchHint == CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) { - assertThat(badSessionTokenRule.getHitCount()).isEqualTo(sessionTokenMismatchRetryAttempts); + // higher hit count is possible while in MinRetryWaitTimeWithinRegion + assertThat(badSessionTokenRule.getHitCount()).isGreaterThanOrEqualTo( + sessionTokenMismatchRetryAttempts); } } finally { System.clearProperty("COSMOS.MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED"); From d5dfddc95628debc329018035d4fd674900736c8 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Tue, 17 Oct 2023 21:55:09 +0000 Subject: [PATCH 3/8] Iterating on session retry changes --- ...njectionWithAvailabilityStrategyTests.java | 13 +-- .../SessionRetryOptionsTests.java | 80 ++++++++++++++++++- .../com/azure/cosmos/SessionRetryOptions.java | 21 +++-- .../cosmos/SessionRetryOptionsBuilder.java | 46 ++++++++--- .../azure/cosmos/implementation/Configs.java | 25 +++++- .../ImplementationBridgeHelpers.java | 4 +- .../SessionTokenMismatchRetryPolicy.java | 37 +++++---- .../directconnectivity/ConsistencyReader.java | 3 +- .../directconnectivity/ConsistencyWriter.java | 3 +- .../directconnectivity/TimeoutHelper.java | 6 +- 10 files changed, 189 insertions(+), 49 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FaultInjectionWithAvailabilityStrategyTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FaultInjectionWithAvailabilityStrategyTests.java index c51611b06390b..b4a1d347d99f3 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FaultInjectionWithAvailabilityStrategyTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FaultInjectionWithAvailabilityStrategyTests.java @@ -1571,7 +1571,7 @@ public Object[][] testConfigs_writeAfterCreation() { assertThat( firstRequestStartInSecondRegion .minus( - Configs.DEFAULT_MIN_IN_REGION_RETRY_TIME_FOR_WRITES.minus(Duration.ofMillis(5))) + Configs.getMinRetryTimeInLocalRegionWhenRemoteRegionPreferred().minus(Duration.ofMillis(5))) .isAfter(firstRequestStart)).isEqualTo(true); validateDiagnosticsContextHasDiagnosticsForOnlyFirstRegionButWithRegionalFailover.accept(ctx); @@ -1668,7 +1668,10 @@ public Object[][] testConfigs_writeAfterCreation() { ClientSideRequestStatistics.StoreResponseStatistics[] storeResponses = clientStats[0].getResponseStatisticsList().toArray( new ClientSideRequestStatistics.StoreResponseStatistics[0]); - assertThat(storeResponses.length).isGreaterThanOrEqualTo(2); + + // retry should not have been issued yet. With just single retry + // the back-off time will be expanded to the minInRegionRetryWaitTime + assertThat(storeResponses.length).isEqualTo(1); validateDiagnosticsContextHasDiagnosticsForOnlyFirstRegion.accept(ctx); }) @@ -1721,7 +1724,7 @@ public void writeAfterCreation( Duration endToEndTimeout, ThresholdBasedAvailabilityStrategy availabilityStrategy, CosmosRegionSwitchHint regionSwitchHint, - Duration customMinRetryTimeInLocalRegionForWrites, + Duration customMinRetryTimeInLocalRegion, Boolean nonIdempotentWriteRetriesEnabled, FaultInjectionOperationType faultInjectionOperationType, Function actionAfterInitialCreation, @@ -1734,7 +1737,7 @@ public void writeAfterCreation( endToEndTimeout, availabilityStrategy, regionSwitchHint, - customMinRetryTimeInLocalRegionForWrites, + customMinRetryTimeInLocalRegion, nonIdempotentWriteRetriesEnabled, ArrayUtils.toArray(faultInjectionOperationType), actionAfterInitialCreation, @@ -4178,7 +4181,7 @@ private static CosmosAsyncClient buildCosmosClient( .regionSwitchHint(effectiveRegionSwitchHint); if (customMinRetryTimeInLocalRegionForWrites != null) { - retryOptionsBuilder.minRetryTimeInLocalRegionForWriteOperations(customMinRetryTimeInLocalRegionForWrites); + retryOptionsBuilder.minRetryTimeInLocalRegion(customMinRetryTimeInLocalRegionForWrites); } CosmosClientBuilder builder = new CosmosClientBuilder() diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/SessionRetryOptionsTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/SessionRetryOptionsTests.java index 9617d92a8481e..5ac61f3527d9a 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/SessionRetryOptionsTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/SessionRetryOptionsTests.java @@ -16,6 +16,7 @@ import com.azure.cosmos.implementation.DatabaseAccountLocation; import com.azure.cosmos.implementation.GlobalEndpointManager; import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.TestConfigurations; import com.azure.cosmos.implementation.throughputControl.TestItem; @@ -59,6 +60,10 @@ import static org.testng.AssertJUnit.fail; public class SessionRetryOptionsTests extends TestSuiteBase { + private final static ImplementationBridgeHelpers.CosmosSessionRetryOptionsHelper.CosmosSessionRetryOptionsAccessor + sessionRetryOptionsAccessor = ImplementationBridgeHelpers + .CosmosSessionRetryOptionsHelper + .getCosmosSessionRetryOptionsAccessor(); private CosmosAsyncClient cosmosAsyncClient; private CosmosAsyncContainer cosmosAsyncContainer; @@ -107,6 +112,79 @@ public Object[][] writeOperationContextProvider() { }; } + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void SessionRetryOptionsBuilder_defaultValues() { + SessionRetryOptions optionsWithDefaultValues = new SessionRetryOptionsBuilder() + .regionSwitchHint(CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) + .build(); + + assertThat(sessionRetryOptionsAccessor.getMaxInRegionRetryCount(optionsWithDefaultValues)) + .isEqualTo(Configs.getMaxRetriesInLocalRegionWhenRemoteRegionPreferred()); + + assertThat(sessionRetryOptionsAccessor.getMinInRegionRetryTime(optionsWithDefaultValues)) + .isEqualTo(Configs.getMinRetryTimeInLocalRegionWhenRemoteRegionPreferred()); + } + + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void SessionRetryOptionsBuilder_customValues() { + SessionRetryOptions optionsWithDefaultValues = new SessionRetryOptionsBuilder() + .regionSwitchHint(CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) + .minRetryTimeInLocalRegion(Duration.ofSeconds(1)) + .maxInRegionRetryCount(3) + .build(); + + assertThat(sessionRetryOptionsAccessor.getMaxInRegionRetryCount(optionsWithDefaultValues)) + .isEqualTo(3); + + assertThat(sessionRetryOptionsAccessor.getMinInRegionRetryTime(optionsWithDefaultValues)) + .isEqualTo(Duration.ofSeconds(1)); + } + + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void SessionRetryOptionsBuilder_minimum_maxRetryCountEnforced() { + SessionRetryOptionsBuilder builder = new SessionRetryOptionsBuilder() + .regionSwitchHint(CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) + .maxInRegionRetryCount(0); + + try { + builder.build(); + + fail("Building the session retry options should have failed"); + } catch (IllegalArgumentException illegalArgumentException) { + logger.info("Expected IllegalArgumentException", illegalArgumentException); + } + } + + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void SessionRetryOptionsBuilder_minimum_minRetryTimeEnforced() { + SessionRetryOptionsBuilder builder = new SessionRetryOptionsBuilder() + .regionSwitchHint(CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) + .minRetryTimeInLocalRegion(Duration.ofMillis(99)); + + try { + builder.build(); + + fail("Building the session retry options should have failed"); + } catch (IllegalArgumentException illegalArgumentException) { + logger.info("Expected IllegalArgumentException", illegalArgumentException); + } + } + + @Test(groups = {"unit"}, timeOut = TIMEOUT) + public void SessionRetryOptionsBuilder_minRetryTimeRequired() { + SessionRetryOptionsBuilder builder = new SessionRetryOptionsBuilder() + .regionSwitchHint(CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) + .minRetryTimeInLocalRegion(null); + + try { + builder.build(); + + fail("Building the session retry options should have failed"); + } catch (IllegalArgumentException illegalArgumentException) { + logger.info("Expected IllegalArgumentException", illegalArgumentException); + } + } + @Test(groups = {"multi-master"}, dataProvider = "nonWriteOperationContextProvider", timeOut = TIMEOUT) public void nonWriteOperation_WithReadSessionUnavailable_test( OperationType operationType, @@ -176,7 +254,7 @@ public void nonWriteOperation_WithReadSessionUnavailable_test( // Check if the SessionTokenMismatchRetryPolicy retries on the bad / lagging region // for sessionTokenMismatchRetryAttempts by tracking the badSessionTokenRule hit count if (regionSwitchHint == CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) { - assertThat(badSessionTokenRule.getHitCount()).isBetween((long) sessionTokenMismatchRetryAttempts, sessionTokenMismatchRetryAttempts * 4L); + assertThat(badSessionTokenRule.getHitCount()).isBetween((long) sessionTokenMismatchRetryAttempts, (1 + sessionTokenMismatchRetryAttempts) * 4L); } } finally { System.clearProperty("COSMOS.MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED"); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptions.java index bef397e475cfc..979baa5b2aadb 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptions.java @@ -3,6 +3,7 @@ package com.azure.cosmos; +import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import java.time.Duration; @@ -15,14 +16,19 @@ public final class SessionRetryOptions { private final CosmosRegionSwitchHint regionSwitchHint; - private final Duration minInRegionRetryTimeForWriteOperations; + private final Duration minInRegionRetryTime; + + private final int maxInRegionRetryCount; /** * Instantiates {@link SessionRetryOptions} * */ - SessionRetryOptions(CosmosRegionSwitchHint regionSwitchHint, Duration minInRegionRetryTimeForWriteOperations) { + SessionRetryOptions(CosmosRegionSwitchHint regionSwitchHint, + Duration minInRegionRetryTime, + int maxInRegionRetryCount) { this.regionSwitchHint = regionSwitchHint; - this.minInRegionRetryTimeForWriteOperations = minInRegionRetryTimeForWriteOperations; + this.minInRegionRetryTime = minInRegionRetryTime ; + this.maxInRegionRetryCount = maxInRegionRetryCount; } static void initialize() { @@ -35,8 +41,13 @@ public CosmosRegionSwitchHint getRegionSwitchHint(SessionRetryOptions sessionRet } @Override - public Duration getMinInRegionRetryTimeForWriteOperations(SessionRetryOptions sessionRetryOptions) { - return sessionRetryOptions.minInRegionRetryTimeForWriteOperations; + public Duration getMinInRegionRetryTime(SessionRetryOptions sessionRetryOptions) { + return sessionRetryOptions.minInRegionRetryTime; + } + + @Override + public int getMaxInRegionRetryCount(SessionRetryOptions sessionRetryOptions) { + return sessionRetryOptions.maxInRegionRetryCount; } }); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptionsBuilder.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptionsBuilder.java index 2e15a6c90357e..bef9e48679811 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptionsBuilder.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptionsBuilder.java @@ -17,7 +17,9 @@ public final class SessionRetryOptionsBuilder { private CosmosRegionSwitchHint regionSwitchHint; - private Duration minInRegionRetryTimeForWriteOperations = Configs.DEFAULT_MIN_IN_REGION_RETRY_TIME_FOR_WRITES; + private Duration minInRegionRetryTime = Configs.getMinRetryTimeInLocalRegionWhenRemoteRegionPreferred(); + + private int maxInRegionRetryCount = Configs.getMaxRetriesInLocalRegionWhenRemoteRegionPreferred(); /** * Sets the {@link CosmosRegionSwitchHint} which specifies for @@ -38,13 +40,27 @@ public SessionRetryOptionsBuilder regionSwitchHint(CosmosRegionSwitchHint region } /** - * Sets the minimum retry time for 404/1002 retries within each region for write operations. The minimum value - * is 100ms - this minimum is enforced to provide a way for the local region to catch-up on replication lag. - * @param minRetryTime the min retry time to be used with-in each region for write operations + * Sets the minimum retry time for 404/1002 retries within each region for read and write operations. The minimum + * value is 100ms - this minimum is enforced to provide a way for the local region to catch-up on replication lag. + * The default value is 500ms - as a recommendation ensure that this value is higher than the steady-state + * replication latency between the regions you chose. + * @param minRetryTime the min retry time to be used with-in each region + * @return This instance of {@link SessionRetryOptionsBuilder} + */ + public SessionRetryOptionsBuilder minRetryTimeInLocalRegion(Duration minRetryTime) { + this.minInRegionRetryTime = minRetryTime; + return this; + } + + /** + * Sets the maximum number of retries within each region for read and write operations. The minimum + * value is 1 - the backoff time for the last in-region retry will ensure that the total retry time within the + * region is at least {@link this.minRetryTimeInLocalRegion} + * @param maxInRegionRetryCount the max. number of retries with-in each region * @return This instance of {@link SessionRetryOptionsBuilder} */ - public SessionRetryOptionsBuilder minRetryTimeInLocalRegionForWriteOperations(Duration minRetryTime) { - this.minInRegionRetryTimeForWriteOperations = minRetryTime; + public SessionRetryOptionsBuilder maxInRegionRetryCount(int maxInRegionRetryCount) { + this.maxInRegionRetryCount = maxInRegionRetryCount; return this; } @@ -58,18 +74,24 @@ public SessionRetryOptions build() { if (regionSwitchHint == CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) { checkArgument( - minInRegionRetryTimeForWriteOperations != null, + minInRegionRetryTime != null, "Argument 'minInRegionRetryTimeForWriteOperations' must not be null when 'regionSwitchHint' " + "is 'REMOTE_REGION_PREFERRED'."); checkArgument( - minInRegionRetryTimeForWriteOperations - .compareTo(Configs.MIN_MIN_IN_REGION_RETRY_TIME_FOR_WRITES) >= 0, - "Argument 'minInRegionRetryTimeForWriteOperations' must have at least a value of '" - + Configs.MIN_MIN_IN_REGION_RETRY_TIME_FOR_WRITES.toString() + minInRegionRetryTime + .compareTo(Duration.ofMillis(Configs.MIN_MIN_IN_REGION_RETRY_TIME_FOR_WRITES_MS)) >= 0, + "Argument 'minInRegionRetryTime' must have at least a value of '" + + Duration.ofMillis(Configs.MIN_MIN_IN_REGION_RETRY_TIME_FOR_WRITES_MS) + + "' when 'regionSwitchHint' is 'REMOTE_REGION_PREFERRED'."); + + checkArgument( + maxInRegionRetryCount >= Configs.MIN_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED, + "Argument 'maxInRegionRetryCount' must have at least a value of '" + + Configs.MIN_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED + "' when 'regionSwitchHint' is 'REMOTE_REGION_PREFERRED'."); } - return new SessionRetryOptions(regionSwitchHint, minInRegionRetryTimeForWriteOperations); + return new SessionRetryOptions(regionSwitchHint, minInRegionRetryTime, maxInRegionRetryCount); } } \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java index 0cfbda39cd95f..9e1408314466e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Configs.java @@ -112,9 +112,11 @@ public class Configs { "COSMOS.DEFAULT_SESSION_TOKEN_MISMATCH_MAXIMUM_BACKOFF_TIME_IN_MILLISECONDS"; private static final int DEFAULT_SESSION_TOKEN_MISMATCH_MAXIMUM_BACKOFF_TIME_IN_MILLISECONDS = 500; - public static final Duration MIN_MIN_IN_REGION_RETRY_TIME_FOR_WRITES = Duration.ofMillis(100); + public static final int MIN_MIN_IN_REGION_RETRY_TIME_FOR_WRITES_MS = 100; - public static final Duration DEFAULT_MIN_IN_REGION_RETRY_TIME_FOR_WRITES = Duration.ofMillis(100); + private static final String DEFAULT_MIN_IN_REGION_RETRY_TIME_FOR_WRITES_MS_NAME = + "COSMOS.DEFAULT_SESSION_TOKEN_MISMATCH_IN_REGION-RETRY_TIME_IN_MILLISECONDS"; + private static final int DEFAULT_MIN_IN_REGION_RETRY_TIME_FOR_WRITES_MS = 500; // Whether to process the response on a different thread private static final String SWITCH_OFF_IO_THREAD_FOR_RESPONSE_NAME = "COSMOS.SWITCH_OFF_IO_THREAD_FOR_RESPONSE"; @@ -148,6 +150,8 @@ public class Configs { private static final String MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED = "COSMOS.MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED"; private static final int DEFAULT_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED = 1; + public static final int MIN_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED = 1; + public Configs() { this.sslContext = sslContextInit(); } @@ -399,7 +403,20 @@ public static int getAggressiveWarmupConcurrency() { } public static int getMaxRetriesInLocalRegionWhenRemoteRegionPreferred() { - return getIntValue(System.getProperty(MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED), - DEFAULT_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED); + return + Math.max( + getIntValue( + System.getProperty(MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED), + DEFAULT_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED), + MIN_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED); + } + + public static Duration getMinRetryTimeInLocalRegionWhenRemoteRegionPreferred() { + return + Duration.ofMillis(Math.max( + getIntValue( + System.getProperty(DEFAULT_MIN_IN_REGION_RETRY_TIME_FOR_WRITES_MS_NAME), + DEFAULT_MIN_IN_REGION_RETRY_TIME_FOR_WRITES_MS), + MIN_MIN_IN_REGION_RETRY_TIME_FOR_WRITES_MS)); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java index 4186954be9481..cf84067129e11 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ImplementationBridgeHelpers.java @@ -1607,7 +1607,9 @@ public static void setCosmosSessionRetryOptionsAccessor(final CosmosSessionRetry public interface CosmosSessionRetryOptionsAccessor { CosmosRegionSwitchHint getRegionSwitchHint(SessionRetryOptions sessionRetryOptions); - Duration getMinInRegionRetryTimeForWriteOperations(SessionRetryOptions sessionRetryOptions); + Duration getMinInRegionRetryTime(SessionRetryOptions sessionRetryOptions); + + int getMaxInRegionRetryCount(SessionRetryOptions sessionRetryOptions); } } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenMismatchRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenMismatchRetryPolicy.java index 1a5372c99a443..2d37713747f6f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenMismatchRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenMismatchRetryPolicy.java @@ -21,7 +21,7 @@ public class SessionTokenMismatchRetryPolicy implements IRetryPolicy { .CosmosSessionRetryOptionsHelper .getCosmosSessionRetryOptionsAccessor(); private final static Logger LOGGER = LoggerFactory.getLogger(SessionTokenMismatchRetryPolicy.class); - private static final int BACKOFF_MULTIPLIER = 2; + private static final int BACKOFF_MULTIPLIER = 5; private final Duration maximumBackoff; private final TimeoutHelper waitTimeTimeoutHelper; private final AtomicInteger retryCount; @@ -29,22 +29,20 @@ public class SessionTokenMismatchRetryPolicy implements IRetryPolicy { private RetryContext retryContext; private final AtomicInteger maxRetryAttemptsInCurrentRegion; private final SessionRetryOptions sessionRetryOptions; - private final boolean isWriteOperation; public SessionTokenMismatchRetryPolicy( RetryContext retryContext, - SessionRetryOptions sessionRetryOptions, - boolean isWriteOperation) { + SessionRetryOptions sessionRetryOptions) { this.waitTimeTimeoutHelper = new TimeoutHelper(Duration.ofMillis(Configs.getSessionTokenMismatchDefaultWaitTimeInMs())); this.maximumBackoff = Duration.ofMillis(Configs.getSessionTokenMismatchMaximumBackoffTimeInMs()); this.retryCount = new AtomicInteger(); this.retryCount.set(0); this.currentBackoff = Duration.ofMillis(Configs.getSessionTokenMismatchInitialBackoffTimeInMs()); - this.maxRetryAttemptsInCurrentRegion = new AtomicInteger(Configs.getMaxRetriesInLocalRegionWhenRemoteRegionPreferred()); + this.maxRetryAttemptsInCurrentRegion = + new AtomicInteger(sessionRetryOptionsAccessor.getMaxInRegionRetryCount(sessionRetryOptions)); this.retryContext = retryContext; this.sessionRetryOptions = sessionRetryOptions; - this.isWriteOperation = isWriteOperation; } @Override @@ -94,7 +92,8 @@ public Mono shouldRetry(Exception e) { Duration effectiveBackoff = Duration.ZERO; // Don't penalize first retry with delay - if (this.retryCount.getAndIncrement() > 0) { + int attempt = this.retryCount.getAndIncrement(); + if (attempt > 0) { // Get the backoff time by selecting the smallest value between the remaining time and // the current back off time @@ -108,11 +107,25 @@ public Mono shouldRetry(Exception e) { this.maximumBackoff); } + if (sessionRetryOptionsAccessor.getRegionSwitchHint(sessionRetryOptions) == + CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED + && attempt >= (this.maxRetryAttemptsInCurrentRegion.get() - 1)) { + + Duration remainingMinRetryTimeInLocalRegion = this.waitTimeTimeoutHelper.getRemainingTime( + sessionRetryOptionsAccessor.getMinInRegionRetryTime(this.sessionRetryOptions) + ); + + if (remainingMinRetryTimeInLocalRegion.compareTo(effectiveBackoff) > 0) { + effectiveBackoff = remainingMinRetryTimeInLocalRegion; + } + } + LOGGER.debug( "SessionTokenMismatchRetryPolicy will retry. Retry count = {}. Backoff time = {} ms", this.retryCount, effectiveBackoff.toMillis()); + return Mono.just(ShouldRetryResult.retryAfter(effectiveBackoff)); } @@ -142,19 +155,11 @@ private boolean shouldRetryLocally(SessionRetryOptions sessionRetryOptions, int return true; } - // For write operations we need to retry at least MinInRegionRetryTimeForWriteOperations - // to allow the region to catch up on replication - if (this.isWriteOperation && !this.waitTimeTimeoutHelper.isElapsed( - sessionRetryOptionsAccessor.getMinInRegionRetryTimeForWriteOperations(sessionRetryOptions))) { - - return true; - } - // SessionTokenMismatchRetryPolicy is invoked after 1 attempt on a region // sessionTokenMismatchRetryAttempts increments only after shouldRetry triggers // another attempt on the same region // hence to curb the retry attempts on a region, // compare sessionTokenMismatchRetryAttempts with max retry attempts allowed on the region - 1 - return sessionTokenMismatchRetryAttempts < (this.maxRetryAttemptsInCurrentRegion.get() - 1); + return sessionTokenMismatchRetryAttempts <= (this.maxRetryAttemptsInCurrentRegion.get() - 1); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReader.java index 60eb635f63c05..c50dd6bcee00d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyReader.java @@ -237,8 +237,7 @@ public Mono readAsync(RxDocumentServiceRequest entity, () -> this.readSessionAsync(entity, desiredReadMode), new SessionTokenMismatchRetryPolicy( BridgeInternal.getRetryContext(entity.requestContext.cosmosDiagnostics), - sessionRetryOptions, - false)); + sessionRetryOptions)); } else { return this.readAnyAsync(entity, desiredReadMode); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java index ec29fdbf74538..045632516078a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java @@ -121,8 +121,7 @@ public Mono writeAsync( () -> this.writePrivateAsync(entity, timeout, forceRefresh), new SessionTokenMismatchRetryPolicy( BridgeInternal.getRetryContext(entity.requestContext.cosmosDiagnostics), - sessionRetryOptions, - true)) + sessionRetryOptions)) .doOnEach( arg -> { try { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TimeoutHelper.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TimeoutHelper.java index c6f4e096beed8..e1ac84d9a58c4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TimeoutHelper.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/TimeoutHelper.java @@ -30,8 +30,12 @@ public boolean isElapsed(Duration timeoutToCompareTo) { } public Duration getRemainingTime() { + return this.getRemainingTime(this.timeOut); + } + + public Duration getRemainingTime(Duration timeoutToCompareTo) { Duration elapsed = Duration.ofMillis(Instant.now().toEpochMilli() - startTime.toEpochMilli()); - return this.timeOut.minus(elapsed); + return timeoutToCompareTo.minus(elapsed); } public void throwTimeoutIfElapsed() throws RequestTimeoutException { From 6e2ba00fa3012d7e36286c6d70aa50743ee7417e Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Tue, 17 Oct 2023 21:57:59 +0000 Subject: [PATCH 4/8] Renaming property --- .../cosmos/FaultInjectionWithAvailabilityStrategyTests.java | 2 +- .../cosmos/faultinjection/SessionRetryOptionsTests.java | 6 +++--- .../java/com/azure/cosmos/SessionRetryOptionsBuilder.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FaultInjectionWithAvailabilityStrategyTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FaultInjectionWithAvailabilityStrategyTests.java index b4a1d347d99f3..dab580c8b066f 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FaultInjectionWithAvailabilityStrategyTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/FaultInjectionWithAvailabilityStrategyTests.java @@ -4181,7 +4181,7 @@ private static CosmosAsyncClient buildCosmosClient( .regionSwitchHint(effectiveRegionSwitchHint); if (customMinRetryTimeInLocalRegionForWrites != null) { - retryOptionsBuilder.minRetryTimeInLocalRegion(customMinRetryTimeInLocalRegionForWrites); + retryOptionsBuilder.minInRegionRetryTime(customMinRetryTimeInLocalRegionForWrites); } CosmosClientBuilder builder = new CosmosClientBuilder() diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/SessionRetryOptionsTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/SessionRetryOptionsTests.java index 5ac61f3527d9a..f8c685afb13bb 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/SessionRetryOptionsTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/SessionRetryOptionsTests.java @@ -129,7 +129,7 @@ public void SessionRetryOptionsBuilder_defaultValues() { public void SessionRetryOptionsBuilder_customValues() { SessionRetryOptions optionsWithDefaultValues = new SessionRetryOptionsBuilder() .regionSwitchHint(CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) - .minRetryTimeInLocalRegion(Duration.ofSeconds(1)) + .minInRegionRetryTime(Duration.ofSeconds(1)) .maxInRegionRetryCount(3) .build(); @@ -159,7 +159,7 @@ public void SessionRetryOptionsBuilder_minimum_maxRetryCountEnforced() { public void SessionRetryOptionsBuilder_minimum_minRetryTimeEnforced() { SessionRetryOptionsBuilder builder = new SessionRetryOptionsBuilder() .regionSwitchHint(CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) - .minRetryTimeInLocalRegion(Duration.ofMillis(99)); + .minInRegionRetryTime(Duration.ofMillis(99)); try { builder.build(); @@ -174,7 +174,7 @@ public void SessionRetryOptionsBuilder_minimum_minRetryTimeEnforced() { public void SessionRetryOptionsBuilder_minRetryTimeRequired() { SessionRetryOptionsBuilder builder = new SessionRetryOptionsBuilder() .regionSwitchHint(CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) - .minRetryTimeInLocalRegion(null); + .minInRegionRetryTime(null); try { builder.build(); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptionsBuilder.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptionsBuilder.java index bef9e48679811..89fbb65b2c8a5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptionsBuilder.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptionsBuilder.java @@ -47,7 +47,7 @@ public SessionRetryOptionsBuilder regionSwitchHint(CosmosRegionSwitchHint region * @param minRetryTime the min retry time to be used with-in each region * @return This instance of {@link SessionRetryOptionsBuilder} */ - public SessionRetryOptionsBuilder minRetryTimeInLocalRegion(Duration minRetryTime) { + public SessionRetryOptionsBuilder minInRegionRetryTime(Duration minRetryTime) { this.minInRegionRetryTime = minRetryTime; return this; } @@ -55,7 +55,7 @@ public SessionRetryOptionsBuilder minRetryTimeInLocalRegion(Duration minRetryTim /** * Sets the maximum number of retries within each region for read and write operations. The minimum * value is 1 - the backoff time for the last in-region retry will ensure that the total retry time within the - * region is at least {@link this.minRetryTimeInLocalRegion} + * region is at least {@link this.minInRegionRetryTime} * @param maxInRegionRetryCount the max. number of retries with-in each region * @return This instance of {@link SessionRetryOptionsBuilder} */ From 4d455f18664396d74f805d6fdcd76aeb1a575c4f Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Tue, 17 Oct 2023 22:00:26 +0000 Subject: [PATCH 5/8] Update SessionTokenMismatchRetryPolicy.java --- .../cosmos/implementation/SessionTokenMismatchRetryPolicy.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenMismatchRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenMismatchRetryPolicy.java index 2d37713747f6f..f8aa7f9610925 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenMismatchRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenMismatchRetryPolicy.java @@ -107,6 +107,8 @@ public Mono shouldRetry(Exception e) { this.maximumBackoff); } + // For remote region preference ensure that the last retry is long enough (even when exceeding max backoff time) + // to consume the entire minRetryTimeInLocalRegion if (sessionRetryOptionsAccessor.getRegionSwitchHint(sessionRetryOptions) == CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED && attempt >= (this.maxRetryAttemptsInCurrentRegion.get() - 1)) { From e20f3df713986ae139991224c25e1f836455f0de Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Wed, 18 Oct 2023 01:11:55 +0000 Subject: [PATCH 6/8] Updating JavaDoc --- .../main/java/com/azure/cosmos/SessionRetryOptionsBuilder.java | 2 +- .../cosmos/implementation/SessionTokenMismatchRetryPolicy.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptionsBuilder.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptionsBuilder.java index 89fbb65b2c8a5..6b458aea7955e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptionsBuilder.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/SessionRetryOptionsBuilder.java @@ -55,7 +55,7 @@ public SessionRetryOptionsBuilder minInRegionRetryTime(Duration minRetryTime) { /** * Sets the maximum number of retries within each region for read and write operations. The minimum * value is 1 - the backoff time for the last in-region retry will ensure that the total retry time within the - * region is at least {@link this.minInRegionRetryTime} + * region is at least the min. in-region retry time. * @param maxInRegionRetryCount the max. number of retries with-in each region * @return This instance of {@link SessionRetryOptionsBuilder} */ diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenMismatchRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenMismatchRetryPolicy.java index f8aa7f9610925..41e1529b0637f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenMismatchRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenMismatchRetryPolicy.java @@ -127,7 +127,6 @@ public Mono shouldRetry(Exception e) { this.retryCount, effectiveBackoff.toMillis()); - return Mono.just(ShouldRetryResult.retryAfter(effectiveBackoff)); } From 39066cbc54cabd9205e638117361c70d686740d1 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Wed, 18 Oct 2023 01:48:40 +0000 Subject: [PATCH 7/8] Update SessionTokenMismatchRetryPolicy.java --- .../SessionTokenMismatchRetryPolicy.java | 37 +++++++++---------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenMismatchRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenMismatchRetryPolicy.java index 41e1529b0637f..2ced438b3e11b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenMismatchRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/SessionTokenMismatchRetryPolicy.java @@ -28,7 +28,9 @@ public class SessionTokenMismatchRetryPolicy implements IRetryPolicy { private Duration currentBackoff; private RetryContext retryContext; private final AtomicInteger maxRetryAttemptsInCurrentRegion; - private final SessionRetryOptions sessionRetryOptions; + private final CosmosRegionSwitchHint regionSwitchHint; + + private final Duration minInRegionRetryTime; public SessionTokenMismatchRetryPolicy( RetryContext retryContext, @@ -39,10 +41,17 @@ public SessionTokenMismatchRetryPolicy( this.retryCount = new AtomicInteger(); this.retryCount.set(0); this.currentBackoff = Duration.ofMillis(Configs.getSessionTokenMismatchInitialBackoffTimeInMs()); - this.maxRetryAttemptsInCurrentRegion = - new AtomicInteger(sessionRetryOptionsAccessor.getMaxInRegionRetryCount(sessionRetryOptions)); + if (sessionRetryOptions != null) { + this.maxRetryAttemptsInCurrentRegion = + new AtomicInteger(sessionRetryOptionsAccessor.getMaxInRegionRetryCount(sessionRetryOptions)); + this.regionSwitchHint = sessionRetryOptionsAccessor.getRegionSwitchHint(sessionRetryOptions); + this.minInRegionRetryTime = sessionRetryOptionsAccessor.getMinInRegionRetryTime(sessionRetryOptions); + } else { + this.maxRetryAttemptsInCurrentRegion = null; + this.regionSwitchHint = CosmosRegionSwitchHint.LOCAL_REGION_PREFERRED; + this.minInRegionRetryTime = null; + } this.retryContext = retryContext; - this.sessionRetryOptions = sessionRetryOptions; } @Override @@ -81,7 +90,7 @@ public Mono shouldRetry(Exception e) { // to the write region then region switch using ClientRetryPolicy will route // the retry to the same write region again, therefore the DIFFERENT_REGION_PREFERRED // hint causes quicker switch to the same write region which is reasonable - if (!shouldRetryLocally(sessionRetryOptions, retryCount.get())) { + if (!shouldRetryLocally(regionSwitchHint, retryCount.get())) { LOGGER.debug("SessionTokenMismatchRetryPolicy not retrying because it a retry attempt for the current region and " + "fallback to a different region is preferred "); @@ -109,13 +118,11 @@ public Mono shouldRetry(Exception e) { // For remote region preference ensure that the last retry is long enough (even when exceeding max backoff time) // to consume the entire minRetryTimeInLocalRegion - if (sessionRetryOptionsAccessor.getRegionSwitchHint(sessionRetryOptions) == - CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED + if (regionSwitchHint == CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED && attempt >= (this.maxRetryAttemptsInCurrentRegion.get() - 1)) { - Duration remainingMinRetryTimeInLocalRegion = this.waitTimeTimeoutHelper.getRemainingTime( - sessionRetryOptionsAccessor.getMinInRegionRetryTime(this.sessionRetryOptions) - ); + Duration remainingMinRetryTimeInLocalRegion = + this.waitTimeTimeoutHelper.getRemainingTime(minInRegionRetryTime); if (remainingMinRetryTimeInLocalRegion.compareTo(effectiveBackoff) > 0) { effectiveBackoff = remainingMinRetryTimeInLocalRegion; @@ -143,15 +150,7 @@ private static Duration getEffectiveBackoff(Duration backoff, Duration remaining return backoff; } - private boolean shouldRetryLocally(SessionRetryOptions sessionRetryOptions, int sessionTokenMismatchRetryAttempts) { - - if (sessionRetryOptions == null) { - return true; - } - - CosmosRegionSwitchHint regionSwitchHint = sessionRetryOptionsAccessor - .getRegionSwitchHint(sessionRetryOptions); - + private boolean shouldRetryLocally(CosmosRegionSwitchHint regionSwitchHint, int sessionTokenMismatchRetryAttempts) { if (regionSwitchHint != CosmosRegionSwitchHint.REMOTE_REGION_PREFERRED) { return true; } From 5f2aaa5240a4053c25c338cd5358ac7116c33a18 Mon Sep 17 00:00:00 2001 From: Fabian Meiswinkel Date: Wed, 18 Oct 2023 16:45:51 +0000 Subject: [PATCH 8/8] Update TestSuiteBase.java --- .../com/azure/cosmos/rx/TestSuiteBase.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index 35783d719cd2d..de4b2f22ca8b2 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -341,6 +341,24 @@ protected static void waitIfNeededForReplicasToCatchUp(CosmosClientBuilder clien public static CosmosAsyncContainer createCollection(CosmosAsyncDatabase database, CosmosContainerProperties cosmosContainerProperties, CosmosContainerRequestOptions options, int throughput) { database.createContainer(cosmosContainerProperties, ThroughputProperties.createManualThroughput(throughput), options).block(); + + // Creating a container is async - especially on multi-partition or multi-region accounts + CosmosAsyncClient client = ImplementationBridgeHelpers + .CosmosAsyncDatabaseHelper + .getCosmosAsyncDatabaseAccessor() + .getCosmosAsyncClient(database); + boolean isMultiRegional = ImplementationBridgeHelpers + .CosmosAsyncClientHelper + .getCosmosAsyncClientAccessor() + .getPreferredRegions(client).size() > 1; + if (throughput > 6000 || isMultiRegional) { + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return database.getContainer(cosmosContainerProperties.getId()); }