Skip to content

Commit

Permalink
fix: re-use existing connection info during force refresh (#1441)
Browse files Browse the repository at this point in the history
When the connector initiates a force refresh, it would previously
discard existing connection info and block on the refresh operation
completing. In case the cause of the connection failure is transient, we
keep the current connection info around and refresh in the background.
When the current connection info is in fact invalid, this will cause
callers to see additional failures until the refresh completes (usually
< 1s).

Co-authored-by: Jonathan Hess <hessjc@google.com>
  • Loading branch information
enocom and hessjcg committed Sep 8, 2023
1 parent c70059a commit 769de5e
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class CloudSqlInstance {
private final CloudSqlInstanceName instanceName;
private final ListenableFuture<KeyPair> keyPair;
private final Object instanceDataGuard = new Object();
// Limit forced refreshes to 1 every minute.

@SuppressWarnings("UnstableApiUsage")
private final RateLimiter forcedRenewRateLimiter;

private final RefreshCalculator refreshCalculator = new RefreshCalculator();
Expand Down Expand Up @@ -83,7 +84,7 @@ class CloudSqlInstance {
CredentialFactory tokenSourceFactory,
ListeningScheduledExecutorService executor,
ListenableFuture<KeyPair> keyPair,
RateLimiter forcedRenewRateLimiter) {
@SuppressWarnings("UnstableApiUsage") RateLimiter forcedRenewRateLimiter) {
this.instanceName = new CloudSqlInstanceName(connectionName);
this.instanceDataSupplier = instanceDataSupplier;
this.authType = authType;
Expand Down Expand Up @@ -173,8 +174,7 @@ void forceRefresh() {
"[%s] Force Refresh: the next refresh operation was cancelled."
+ " Scheduling new refresh operation immediately.",
instanceName));
currentInstanceData = executor.submit(this::performRefresh);
nextInstanceData = currentInstanceData;
nextInstanceData = executor.submit(this::performRefresh);
}
}

Expand All @@ -187,6 +187,7 @@ private InstanceData performRefresh() throws InterruptedException, ExecutionExce
logger.fine(
String.format("[%s] Refresh Operation: Acquiring rate limiter permit.", instanceName));
// To avoid unreasonable SQL Admin API usage, use a rate limit to throttle our usage.
//noinspection UnstableApiUsage
forcedRenewRateLimiter.acquire();
logger.fine(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@

public class CloudSqlInstanceConcurrencyTest {

public static final int DEFAULT_WAIT = 200;
private static final Logger logger =
Logger.getLogger(CloudSqlInstanceConcurrencyTest.class.getName());
public static final int FORCE_REFRESH_COUNT = 10;

private static class TestCredentialFactory implements CredentialFactory, HttpRequestInitializer {

Expand Down Expand Up @@ -86,24 +88,26 @@ public void testThatForceRefreshBalksWhenARefreshIsInProgress() throws Exception
// Test that there was 1 successful attempt from when the CloudSqlInstance was instantiated.
assertThat(supplier.successCounter.get()).isEqualTo(1);

// Now, run through 20 cycles where we call forceRefresh() multiple times and make sure that
// it only runs one successful refresh per cycle 20 times. This will prove that forceRefresh()
// will balk when an operation is in progress, and that forceRefresh() will retry after a failed
// attempt to get InstanceData.
for (int i = 1; i <= 20; i++) {
// Now, run through a number of cycles where we call forceRefresh() multiple times and make sure
// that it only runs one successful refresh per cycle 10 times. This will prove that
// forceRefresh() will balk when an operation is in progress, and that forceRefresh() will retry
// after a failed attempt to get InstanceData.
for (int i = 1; i <= FORCE_REFRESH_COUNT; i++) {
// Assert the expected number of successful refresh operations
assertThat(supplier.successCounter.get()).isEqualTo(i);

// Call forceRefresh 3 times in rapid succession. This should only kick off 1 refresh
// cycle.
instance.forceRefresh();
// force Java to run a different thread now. That gives the refrsh task an opportunity to
// force Java to run a different thread now. That gives the refresh task an opportunity to
// start.
Thread.yield();
instance.forceRefresh();
instance.forceRefresh();
Thread.yield();

Thread.sleep(DEFAULT_WAIT); // Wait for the refresh to occur

// This will loop forever if CloudSqlInstance does not successfully retry after a failed
// forceRefresh() attempt.
while (true) {
Expand All @@ -115,25 +119,32 @@ public void testThatForceRefreshBalksWhenARefreshIsInProgress() throws Exception
executor.submit(instance::getSslData),
executor.submit(instance::getSslData));

// Wait for all to finish.
// This should return immediately
allData2.get();

// If they all succeeded, then continue with the test.
break;

} catch (ExecutionException e) {
// We expect some of these to throw an exception indicating that the refresh cycle
// got a failed attempt. When they throw an exception,
// sleep and try again. This shows that the refresh cycle is working.
Thread.sleep(100);
//noinspection BusyWait
Thread.sleep(DEFAULT_WAIT);
}
}

// Wait for the actual background refresh to complete before checking the success counter.
Thread.sleep(DEFAULT_WAIT);

// Assert the expected number of successful refresh operations at the end of the loop
// is one more than the beginning of the loop.
// is only one more than the beginning of the loop.
// This means that only one additional refresh operation was run.
assertThat(supplier.successCounter.get()).isEqualTo(i + 1);

Thread.sleep(100);
Thread.sleep(DEFAULT_WAIT);
}

// Refresh count should equal initial refresh plus FORCE_REFRESH_COUNT times refreshing
assertThat(supplier.successCounter.get()).isEqualTo(FORCE_REFRESH_COUNT + 1);
}

@Test(timeout = 20000) // 45 seconds timeout in case of deadlock
Expand Down Expand Up @@ -164,7 +175,7 @@ public void testForceRefreshDoesNotCauseADeadlockOrBrokenRefreshLoop() throws Ex

assertThat(supplier.counter.get()).isEqualTo(instanceCount);

// Now that everything is initialized, make the network flakey
// Now that everything is initialized, make the network flaky
supplier.flaky = true;

// Start a thread for each instance that will force refresh and get InstanceData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
*/
package com.google.cloud.sql.core;

import static com.google.common.truth.Truth.*;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.fail;

import com.google.cloud.sql.AuthType;
import com.google.common.collect.ImmutableMap;
Expand All @@ -37,17 +38,17 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class CloudSqlInstanceTest {

private ListeningScheduledExecutorService executorService;
private ListenableFuture<KeyPair> keyPairFuture;
@SuppressWarnings("UnstableApiUsage")
public static final RateLimiter TEST_RATE_LIMITER =
RateLimiter.create(1000 /* permits per second */);

private final StubCredentialFactory stubCredentialFactory =
new StubCredentialFactory("my-token", System.currentTimeMillis() + 3600L);
private ListeningScheduledExecutorService executorService;
private ListenableFuture<KeyPair> keyPairFuture;

@Before
public void setup() throws Exception {
Expand Down Expand Up @@ -77,7 +78,7 @@ public void testCloudSqlInstanceDataRetrievedSuccessfully() throws Exception {
stubCredentialFactory,
executorService,
keyPairFuture,
RateLimiter.create(1.0 / 30.0));
TEST_RATE_LIMITER);

SslData gotSslData = instance.getSslData();
assertThat(gotSslData).isSameInstanceAs(instanceDataSupplier.response.getSslData());
Expand Down Expand Up @@ -111,41 +112,50 @@ public void testInstanceFailsOnConnectionError() throws Exception {
stubCredentialFactory,
executorService,
keyPairFuture,
RateLimiter.create(1.0 / 30.0));
TEST_RATE_LIMITER);

RuntimeException ex = Assert.assertThrows(RuntimeException.class, instance::getSslData);
assertThat(ex).hasMessageThat().contains("always fails");
}

@Test
public void testCloudSqlInstanceForcesRefresh() throws Exception {
SslData sslData = new SslData(null, null, null);
InstanceData data =
new InstanceData(null, sslData, Date.from(Instant.now().plus(1, ChronoUnit.HOURS)));
public void testCloudSqlInstanceForcesRefresh() throws InterruptedException {
AtomicInteger refreshCount = new AtomicInteger();

InstanceDataSupplier instanceDataSupplier =
(instanceName, accessTokenSupplier, authType, executor, keyPair) -> {
Thread.sleep(100);
refreshCount.incrementAndGet();
return data;
};

CloudSqlInstance instance =
new CloudSqlInstance(
"project:region:instance",
instanceDataSupplier,
(instanceName, accessTokenSupplier, authType, executor, keyPair) -> {
refreshCount.incrementAndGet();
return new InstanceData(
null,
new SslData(null, null, null),
Date.from(Instant.now().plus(1, ChronoUnit.HOURS)));
},
AuthType.PASSWORD,
stubCredentialFactory,
executorService,
keyPairFuture,
RateLimiter.create(1.0 / 30.0));
TEST_RATE_LIMITER);

instance.getSslData();
assertThat(refreshCount.get()).isEqualTo(1);

SslData gotSslData = instance.getSslData();
assertThat(gotSslData).isSameInstanceAs(sslData);
instance.forceRefresh();

instance.getSslData();
assertThat(refreshCount.get()).isEqualTo(2);
// refresh count hasn't changed because we re-use the existing connection info
assertThat(refreshCount.get()).isEqualTo(1);

for (int i = 0; i < 10; i++) {
instance.getSslData();
if (refreshCount.get() > 1) {
return;
}
Thread.sleep(100);
}

fail(String.format("refresh count should be 2, got = %d", refreshCount.get()));
}

@Test
Expand Down Expand Up @@ -179,7 +189,7 @@ public void testGetPreferredIpTypes() throws Exception {
stubCredentialFactory,
executorService,
keyPairFuture,
RateLimiter.create(1.0 / 30.0));
TEST_RATE_LIMITER);

assertThat(instance.getPreferredIp(Arrays.asList("PUBLIC", "PRIVATE"))).isEqualTo("10.1.2.3");
assertThat(instance.getPreferredIp(Collections.singletonList("PUBLIC"))).isEqualTo("10.1.2.3");
Expand Down Expand Up @@ -217,7 +227,7 @@ public void testGetPreferredIpTypesThrowsException() throws Exception {
stubCredentialFactory,
executorService,
keyPairFuture,
RateLimiter.create(1.0 / 30.0));
TEST_RATE_LIMITER);
Assert.assertThrows(
IllegalArgumentException.class,
() -> instance.getPreferredIp(Collections.singletonList("PRIVATE")));
Expand All @@ -228,7 +238,6 @@ private ListeningScheduledExecutorService newTestExecutor() {
(ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(2);
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);

//noinspection UnstableApiUsage
return MoreExecutors.listeningDecorator(
MoreExecutors.getExitingScheduledExecutorService(executor));
}
Expand Down

0 comments on commit 769de5e

Please sign in to comment.