-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add control and status endpoints to KafkaIndexTask #2730
Conversation
.build(); | ||
} | ||
|
||
for (Integer partition : offsets.keySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: following is more desired in java land if you wanted to iterate through both keys and values in a map
for (Map.Entry<Integer, Long> entry : offsets.entrySet())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good, thanks!
👍 after #2730 (comment) |
@@ -467,10 +515,25 @@ public boolean canRestore() | |||
public void stopGracefully() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't look like it does what a stopGracefully
should do. Maybe we should rename that method to something like suspend
(not in this PR though). But, what it's supposed to do is suspend a task quickly so the jvm can be shut down and then restarted. Specifically it shouldn't be trying to publish or anything like that – just persist to disk and then stop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can swap the implementations of stopGracefully and stopAbruptly since the abrupt one does what you described.
@gianm thanks for the comments, I added some follow-up questions/thoughts |
@gianm addressed your comments |
private volatile boolean stopping = false; | ||
private volatile boolean publishing = false; | ||
private volatile DateTime startTime; | ||
private volatile Status status; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
status
should start at something – perhaps NOT_STARTED
or some such
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is status
kept consistent? Are all writes going through a single thread, or are they protected by a lock? Would be good to have that in a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, status
is only modified by the task runner thread which calls run(). I added a comment.
👍 after #2730 (comment) |
log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition()); | ||
assignPartitions(consumer, topic, assignment); | ||
stillReading = !assignment.isEmpty(); | ||
stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why we stop reading altogether after one partition was done and pauseAfterRead was set? wouldn't this terminate the task earlier than we wanted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If pauseAfterRead
is true, stillReading
will remain true regardless of the state of assignment
- this will ensure we continue to loop and will pause in possiblyPause()
if all partitions have been read. If pauseAfterRead
is false, the loop will end as soon as all the partitions have been read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, so pauseAfterRead ensures that task does not stop even after completion of consuming all partitions upto given offsets... and possibly supervisor would make requests to stop and publish later.
👍 besides #2730 (comment) |
Thanks! |
Added the following HTTP endpoints to KafkaIndexTask:
These endpoints are required for implementing functionality for #2656, specifically:
(1) exposes the offset currently being read and allows us to start the next tasks early
(4) allows us to stop redundant replicas when another has completed without generating a failed status
(all) allows us to stop tasks gracefully in a coordinated way across replicas (by pausing and then setting the end offsets so that they all finish at the same place) - this enables graceful immediate schema rollover as well as time-based task lifetimes instead of number of event based
(all) in conjunction with a pauseAfterRead flag in KafkaIOConfig, provides a framework to enable leader/follower capabilities, where followers will pause when they are done reading their offsets until the leader assigns a new set of ending offsets