Skip to content

Commit

Permalink
Do not emit negative lag because of stale offsets (#14292)
Browse files Browse the repository at this point in the history
The latest topic offsets are polled frequently and used to determine the lag based on the current offsets. However, when the offsets are stale (which can happen due to connection issues commonly), we may see a negative lag .

This PR prevents emission of metrics when the offsets are stale and at least one of the partitions has a negative lag.
  • Loading branch information
AmatyaAvadhanula authored Jul 5, 2023
1 parent cc159f4 commit 609833c
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4220,6 +4220,21 @@ protected void emitLag()
return;
}

// Try emitting lag even with stale metrics provided that none of the partitions has negative lag
final long staleMillis = sequenceLastUpdated == null
? 0
: DateTimes.nowUtc().getMillis()
- (tuningConfig.getOffsetFetchPeriod().getMillis() + sequenceLastUpdated.getMillis());
if (staleMillis > 0 && partitionLags.values().stream().anyMatch(x -> x < 0)) {
// Log at most once every twenty supervisor runs to reduce noise in the logs
if ((staleMillis / getIoConfig().getPeriod().getMillis()) % 20 == 0) {
log.warn("Lag is negative and will not be emitted because topic offsets have become stale. "
+ "This will not impact data processing. "
+ "Offsets may become stale because of connectivity issues.");
}
return;
}

LagStats lagStats = computeLags(partitionLags);
Map<String, Object> metricTags = spec.getContextValue(DruidMetrics.TAGS);
for (Map.Entry<PartitionIdType, Long> entry : partitionLags.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamState;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
Expand Down Expand Up @@ -1035,6 +1036,30 @@ public void testGetStats()
);
}

@Test
public void testStaleOffsetsNegativeLagNotEmitted() throws Exception
{
expectEmitterSupervisor(false);

CountDownLatch latch = new CountDownLatch(1);

final TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
latch,
TestEmittingTestSeekableStreamSupervisor.LAG,
// Record lag must not be emitted
ImmutableMap.of("0", 10L, "1", -100L),
null
);
supervisor.start();
// Forcibly set the offsets to be stale
supervisor.sequenceLastUpdated = DateTimes.nowUtc().minus(Integer.MAX_VALUE);

latch.await();

supervisor.emitLag();
Assert.assertEquals(0, emitter.getEvents().size());
}

private List<Event> filterMetrics(List<Event> events, List<String> whitelist)
{
List<Event> result = events.stream()
Expand Down

0 comments on commit 609833c

Please sign in to comment.