diff --git a/core/src/main/java/io/grpc/internal/KeepAliveManager.java b/core/src/main/java/io/grpc/internal/KeepAliveManager.java index a5eb086b353..7311ba27d68 100644 --- a/core/src/main/java/io/grpc/internal/KeepAliveManager.java +++ b/core/src/main/java/io/grpc/internal/KeepAliveManager.java @@ -17,6 +17,7 @@ package io.grpc.internal; import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.MoreExecutors; @@ -61,6 +62,7 @@ public void run() { private final Runnable sendPing = new LogExceptionRunnable(new Runnable() { @Override public void run() { + pingFuture = null; boolean shouldSendPing = false; synchronized (KeepAliveManager.this) { if (state == State.PING_SCHEDULED) { @@ -170,8 +172,8 @@ public synchronized void onDataReceived() { } // schedule a new ping state = State.PING_SCHEDULED; - pingFuture = - scheduler.schedule(sendPing, keepAliveTimeInNanos, TimeUnit.NANOSECONDS); + checkState(pingFuture == null, "There should be no outstanding pingFuture"); + pingFuture = scheduler.schedule(sendPing, keepAliveTimeInNanos, TimeUnit.NANOSECONDS); } } @@ -183,10 +185,12 @@ public synchronized void onTransportActive() { // When the transport goes active, we do not reset the nextKeepaliveTime. This allows us to // quickly check whether the connection is still working. state = State.PING_SCHEDULED; - pingFuture = scheduler.schedule( - sendPing, - nextKeepaliveTime - ticker.read(), - TimeUnit.NANOSECONDS); + if (pingFuture == null) { + pingFuture = scheduler.schedule( + sendPing, + nextKeepaliveTime - ticker.read(), + TimeUnit.NANOSECONDS); + } } else if (state == State.IDLE_AND_PING_SENT) { state = State.PING_SENT; } // Other states are possible when keepAliveDuringTransportIdle == true @@ -218,6 +222,7 @@ public synchronized void onTransportTermination() { } if (pingFuture != null) { pingFuture.cancel(false); + pingFuture = null; } } } diff --git a/core/src/test/java/io/grpc/internal/KeepAliveManagerTest.java b/core/src/test/java/io/grpc/internal/KeepAliveManagerTest.java index fb38967b680..5487064e41c 100644 --- a/core/src/test/java/io/grpc/internal/KeepAliveManagerTest.java +++ b/core/src/test/java/io/grpc/internal/KeepAliveManagerTest.java @@ -105,6 +105,10 @@ public void sendKeepAlivePings() { @Test public void keepAlivePingDelayedByIncomingData() { + ScheduledFuture future = mock(ScheduledFuture.class); + doReturn(future) + .when(scheduler).schedule(isA(Runnable.class), isA(Long.class), isA(TimeUnit.class)); + // Transport becomes active. We should schedule keepalive pings. keepAliveManager.onTransportActive(); ArgumentCaptor sendPingCaptor = ArgumentCaptor.forClass(Runnable.class); @@ -218,6 +222,10 @@ public void keepAlivePingTimesOut() { @Test public void transportGoesIdle() { + ScheduledFuture pingFuture = mock(ScheduledFuture.class); + doReturn(pingFuture) + .when(scheduler).schedule(isA(Runnable.class), isA(Long.class), isA(TimeUnit.class)); + // Transport becomes active. We should schedule keepalive pings. keepAliveManager.onTransportActive(); ArgumentCaptor sendPingCaptor = ArgumentCaptor.forClass(Runnable.class); @@ -229,10 +237,14 @@ public void transportGoesIdle() { keepAliveManager.onTransportIdle(); sendPing.run(); // Ping was not sent. - verify(transport, times(0)).ping(isA(ClientTransport.PingCallback.class), - isA(Executor.class)); + verify(transport, times(0)).ping(isA(ClientTransport.PingCallback.class), isA(Executor.class)); // No new ping got scheduled. verify(scheduler, times(1)).schedule(isA(Runnable.class), isA(Long.class), isA(TimeUnit.class)); + + // But when transport goes back to active + keepAliveManager.onTransportActive(); + // Then we do schedule another ping + verify(scheduler, times(2)).schedule(isA(Runnable.class), isA(Long.class), isA(TimeUnit.class)); } @Test @@ -294,6 +306,26 @@ public void transportGoesIdleAfterPingSent() { verify(scheduler, times(3)).schedule(isA(Runnable.class), isA(Long.class), isA(TimeUnit.class)); } + @Test + public void transportGoesIdleBeforePingSent() { + // Transport becomes active. We should schedule keepalive pings. + ScheduledFuture pingFuture = mock(ScheduledFuture.class); + doReturn(pingFuture) + .when(scheduler).schedule(isA(Runnable.class), isA(Long.class), isA(TimeUnit.class)); + keepAliveManager.onTransportActive(); + verify(scheduler, times(1)).schedule(isA(Runnable.class), isA(Long.class), isA(TimeUnit.class)); + + // Data is received, and we go to ping delayed + keepAliveManager.onDataReceived(); + + // Transport becomes idle while the 1st ping is still scheduled + keepAliveManager.onTransportIdle(); + + // Transport becomes active again, we don't need to reschedule another ping + keepAliveManager.onTransportActive(); + verify(scheduler, times(1)).schedule(isA(Runnable.class), isA(Long.class), isA(TimeUnit.class)); + } + @Test public void transportShutsdownAfterPingScheduled() { ScheduledFuture pingFuture = mock(ScheduledFuture.class);