diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 0ae9aad9631e..d99f84c5746c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -4220,6 +4220,21 @@ protected void emitLag() return; } + // Try emitting lag even with stale metrics provided that none of the partitions has negative lag + final long staleMillis = sequenceLastUpdated == null + ? 0 + : DateTimes.nowUtc().getMillis() + - (tuningConfig.getOffsetFetchPeriod().getMillis() + sequenceLastUpdated.getMillis()); + if (staleMillis > 0 && partitionLags.values().stream().anyMatch(x -> x < 0)) { + // Log at most once every twenty supervisor runs to reduce noise in the logs + if ((staleMillis / getIoConfig().getPeriod().getMillis()) % 20 == 0) { + log.warn("Lag is negative and will not be emitted because topic offsets have become stale. " + + "This will not impact data processing. " + + "Offsets may become stale because of connectivity issues."); + } + return; + } + LagStats lagStats = computeLags(partitionLags); Map metricTags = spec.getContextValue(DruidMetrics.TAGS); for (Map.Entry entry : partitionLags.entrySet()) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index b8d5a556eef6..a347541a4efb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -62,6 +62,7 @@ import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamState; import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; @@ -1035,6 +1036,30 @@ public void testGetStats() ); } + @Test + public void testStaleOffsetsNegativeLagNotEmitted() throws Exception + { + expectEmitterSupervisor(false); + + CountDownLatch latch = new CountDownLatch(1); + + final TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor( + latch, + TestEmittingTestSeekableStreamSupervisor.LAG, + // Record lag must not be emitted + ImmutableMap.of("0", 10L, "1", -100L), + null + ); + supervisor.start(); + // Forcibly set the offsets to be stale + supervisor.sequenceLastUpdated = DateTimes.nowUtc().minus(Integer.MAX_VALUE); + + latch.await(); + + supervisor.emitLag(); + Assert.assertEquals(0, emitter.getEvents().size()); + } + private List filterMetrics(List events, List whitelist) { List result = events.stream()