Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rolling Supervisor restarts at taskDuration #14396

Merged
merged 16 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ public KafkaSupervisorIOConfig(
@JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod,
@JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime,
@JsonProperty("configOverrides") KafkaConfigOverrides configOverrides,
@JsonProperty("idleConfig") IdleConfig idleConfig
@JsonProperty("idleConfig") IdleConfig idleConfig,
@JsonProperty("stopTaskCount") Integer stopTaskCount
)
{
super(
Expand All @@ -82,7 +83,8 @@ public KafkaSupervisorIOConfig(
earlyMessageRejectionPeriod,
autoScalerConfig,
lateMessageRejectionStartDateTime,
idleConfig
idleConfig,
stopTaskCount
);

this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ public void testSample()
null,
null,
null,
null,
null
),
null,
Expand Down Expand Up @@ -237,6 +238,7 @@ public void testSampleKafkaInputFormat()
null,
null,
null,
null,
null
),
null,
Expand Down Expand Up @@ -338,6 +340,7 @@ public void testWithInputRowParser() throws IOException
null,
null,
null,
null,
null
),
null,
Expand Down Expand Up @@ -520,6 +523,7 @@ public void testInvalidKafkaConfig()
null,
null,
null,
null,
null
),
null,
Expand Down Expand Up @@ -574,6 +578,7 @@ public void testGetInputSourceResources()
null,
null,
null,
null,
null
),
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ public void testAutoScalingConfigSerde() throws JsonProcessingException
null,
null,
null,
null,
null
);
String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
Expand Down Expand Up @@ -348,7 +349,8 @@ public void testIdleConfigSerde() throws JsonProcessingException
null,
null,
null,
mapper.convertValue(idleConfig, IdleConfig.class)
mapper.convertValue(idleConfig, IdleConfig.class),
null
);
String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig);
KafkaSupervisorIOConfig kafkaSupervisorIOConfig1 = mapper.readValue(ioConfig, KafkaSupervisorIOConfig.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,8 @@ public SeekableStreamIndexTaskClient<Integer, Long> build(
null,
null,
null,
new IdleConfig(true, 1000L)
new IdleConfig(true, 1000L),
1
);

final KafkaSupervisorTuningConfig tuningConfigOri = new KafkaSupervisorTuningConfig(
Expand Down Expand Up @@ -4516,7 +4517,8 @@ private TestableKafkaSupervisor getTestableSupervisor(
earlyMessageRejectionPeriod,
null,
null,
idleConfig
idleConfig,
null
);

KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory(
Expand Down Expand Up @@ -4627,6 +4629,7 @@ private TestableKafkaSupervisor getTestableSupervisorCustomIsTaskCurrent(
earlyMessageRejectionPeriod,
null,
null,
null,
null
);

Expand Down Expand Up @@ -4742,6 +4745,7 @@ private KafkaSupervisor getSupervisor(
earlyMessageRejectionPeriod,
null,
null,
null,
null
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ public KinesisSupervisorIOConfig(
earlyMessageRejectionPeriod,
autoScalerConfig,
lateMessageRejectionStartDateTime,
new IdleConfig(null, null)
new IdleConfig(null, null),
null
);

this.endpoint = endpoint != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1574,6 +1574,7 @@ public void gracefulShutdownInternal() throws ExecutionException, InterruptedExc
}
}
}
earlyStopTime = DateTimes.EPOCH;

checkTaskDuration();
}
Expand Down Expand Up @@ -2934,33 +2935,43 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
final List<ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>> futures = new ArrayList<>();
final List<Integer> futureGroupIds = new ArrayList<>();

boolean stopTasksEarly = false;
if (earlyStopTime != null && (earlyStopTime.isBeforeNow() || earlyStopTime.isEqualNow())) {
log.info("Early stop requested - signalling tasks to complete");

earlyStopTime = null;
stopTasksEarly = true;
}

int stoppedTasks = 0;
for (Entry<Integer, TaskGroup> entry : activelyReadingTaskGroups.entrySet()) {
Integer groupId = entry.getKey();
TaskGroup group = entry.getValue();

// find the longest running task from this group
DateTime earliestTaskStart = DateTimes.nowUtc();
for (TaskData taskData : group.tasks.values()) {
if (taskData.startTime != null && earliestTaskStart.isAfter(taskData.startTime)) {
earliestTaskStart = taskData.startTime;
}
}


boolean stopTasksEarly = false;
if (earlyStopTime != null && (earlyStopTime.isBeforeNow() || earlyStopTime.isEqualNow())) {
log.info("Early stop requested - signalling tasks to complete");

earlyStopTime = null;
stopTasksEarly = true;
}


// if this task has run longer than the configured duration, signal all tasks in the group to persist
if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() || stopTasksEarly) {
log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration());
if (stopTasksEarly) {
log.info("Stopping task group [%d] early. It has run for [%s]", groupId, ioConfig.getTaskDuration());
futureGroupIds.add(groupId);
futures.add(checkpointTaskGroup(group, true));
} else {
// find the longest running task from this group
DateTime earliestTaskStart = DateTimes.nowUtc();
for (TaskData taskData : group.tasks.values()) {
if (taskData.startTime != null && earliestTaskStart.isAfter(taskData.startTime)) {
earliestTaskStart = taskData.startTime;
}
}

if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) {
// if this task has run longer than the configured duration
// as long as the pending task groups are less than the configured stop task count.
if (pendingCompletionTaskGroups.values().stream().mapToInt(CopyOnWriteArrayList::size).sum() + stoppedTasks
< ioConfig.getStopTaskCount()) {
log.info("Task group [%d] has run for [%s]. Stopping.", groupId, ioConfig.getTaskDuration());
futureGroupIds.add(groupId);
futures.add(checkpointTaskGroup(group, true));
stoppedTasks++;
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public abstract class SeekableStreamSupervisorIOConfig
@Nullable private final AutoScalerConfig autoScalerConfig;
@Nullable private final IdleConfig idleConfig;

private final int stopTaskCount;

public SeekableStreamSupervisorIOConfig(
String stream,
@Nullable InputFormat inputFormat,
Expand All @@ -64,7 +66,8 @@ public SeekableStreamSupervisorIOConfig(
Period earlyMessageRejectionPeriod,
@Nullable AutoScalerConfig autoScalerConfig,
DateTime lateMessageRejectionStartDateTime,
@Nullable IdleConfig idleConfig
@Nullable IdleConfig idleConfig,
@Nullable Integer stopTaskCount
)
{
this.stream = Preconditions.checkNotNull(stream, "stream cannot be null");
Expand All @@ -78,6 +81,8 @@ public SeekableStreamSupervisorIOConfig(
} else {
this.taskCount = taskCount != null ? taskCount : 1;
}
this.stopTaskCount = stopTaskCount == null ? this.taskCount : stopTaskCount;
Preconditions.checkArgument(this.stopTaskCount > 0, "stopTaskCount must be greater than 0");
this.taskDuration = defaultDuration(taskDuration, "PT1H");
this.startDelay = defaultDuration(startDelay, "PT5S");
this.period = defaultDuration(period, "PT30S");
Expand Down Expand Up @@ -199,4 +204,10 @@ public IdleConfig getIdleConfig()
{
return idleConfig;
}

@JsonProperty
public int getStopTaskCount()
{
return stopTaskCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,8 @@ private TestableSeekableStreamSupervisorIOConfig(
earlyMessageRejectionPeriod,
autoScalerConfig,
lateMessageRejectionStartDateTime,
idleConfig
idleConfig,
null
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,7 @@ public void testSeekableStreamSupervisorSpecWithScaleDisable() throws Interrupte
null,
null,
null,
null,
null
)
{
Expand Down Expand Up @@ -919,7 +920,8 @@ public void testEnablingIdleBeviourPerSupervisorWithOverlordConfigEnabled()
null,
null,
null,
new IdleConfig(true, null)
new IdleConfig(true, null),
null
)
{
};
Expand Down Expand Up @@ -1085,6 +1087,7 @@ private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean scal
null,
mapper.convertValue(getScaleOutProperties(2), AutoScalerConfig.class),
null,
null,
null
)
{
Expand All @@ -1104,6 +1107,7 @@ private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean scal
null,
mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class),
null,
null,
null
)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,8 @@ public void testIdleStateTransition() throws Exception
null,
null,
null,
new IdleConfig(true, 200L)
new IdleConfig(true, 200L),
null
)
{
}).anyTimes();
Expand Down Expand Up @@ -1088,6 +1089,7 @@ private void expectEmitterSupervisor(boolean suspended) throws EntryExistsExcept
null,
null,
null,
null,
null
)
{
Expand Down Expand Up @@ -1148,6 +1150,7 @@ private static SeekableStreamSupervisorIOConfig getIOConfig()
null,
OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class),
null,
null,
null
)
{
Expand Down
Loading