Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add control and status endpoints to KafkaIndexTask #2730

Merged
merged 1 commit into from
Apr 21, 2016
Merged

add control and status endpoints to KafkaIndexTask #2730

merged 1 commit into from
Apr 21, 2016

Conversation

dclim
Copy link
Contributor

@dclim dclim commented Mar 25, 2016

Added the following HTTP endpoints to KafkaIndexTask:

  1. [GET /offsets/current] - Return which offsets we are currently processing
  2. [GET /offsets/end] - Return the offsets that the task will read until
  3. [POST /offsets/end] - Modify the ending offsets to allow the task to continue reading or stop early
  4. [POST /stop?publish=true/false] - Signal the task to stop early, either abruptly or gracefully (stop reading and publish segment with what we currently have). In both cases it returns a SUCCESS status
  5. [POST /pause?timeout=x] - Signal the task to pause reading, either indefinitely or with a provided timeout to resume
  6. [GET /status] - Return the task's state
  7. [POST /resume] - Signal the task to resume reading
  8. [GET /time/start] - Returns the timestamp when the task began running

These endpoints are required for implementing functionality for #2656, specifically:
(1) exposes the offset currently being read and allows us to start the next tasks early
(4) allows us to stop redundant replicas when another has completed without generating a failed status
(all) allows us to stop tasks gracefully in a coordinated way across replicas (by pausing and then setting the end offsets so that they all finish at the same place) - this enables graceful immediate schema rollover as well as time-based task lifetimes instead of number of event based
(all) in conjunction with a pauseAfterRead flag in KafkaIOConfig, provides a framework to enable leader/follower capabilities, where followers will pause when they are done reading their offsets until the leader assigns a new set of ending offsets

.build();
}

for (Integer partition : offsets.keySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: following is more desired in java land if you wanted to iterate through both keys and values in a map

 for (Map.Entry<Integer, Long> entry : offsets.entrySet())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, thanks!

@himanshug
Copy link
Contributor

👍 after #2730 (comment)

@himanshug himanshug added this to the 0.9.1 milestone Mar 25, 2016
@@ -467,10 +515,25 @@ public boolean canRestore()
public void stopGracefully()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look like it does what a stopGracefully should do. Maybe we should rename that method to something like suspend (not in this PR though). But, what it's supposed to do is suspend a task quickly so the jvm can be shut down and then restarted. Specifically it shouldn't be trying to publish or anything like that – just persist to disk and then stop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can swap the implementations of stopGracefully and stopAbruptly since the abrupt one does what you described.

@dclim
Copy link
Contributor Author

dclim commented Mar 31, 2016

@gianm thanks for the comments, I added some follow-up questions/thoughts

@dclim
Copy link
Contributor Author

dclim commented Mar 31, 2016

@gianm addressed your comments

private volatile boolean stopping = false;
private volatile boolean publishing = false;
private volatile DateTime startTime;
private volatile Status status;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

status should start at something – perhaps NOT_STARTED or some such

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is status kept consistent? Are all writes going through a single thread, or are they protected by a lock? Would be good to have that in a comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, status is only modified by the task runner thread which calls run(). I added a comment.

@gianm
Copy link
Contributor

gianm commented Apr 19, 2016

👍 after #2730 (comment)

log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition());
assignPartitions(consumer, topic, assignment);
stillReading = !assignment.isEmpty();
stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we stop reading altogether after one partition was done and pauseAfterRead was set? wouldn't this terminate the task earlier than we wanted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If pauseAfterRead is true, stillReading will remain true regardless of the state of assignment - this will ensure we continue to loop and will pause in possiblyPause() if all partitions have been read. If pauseAfterRead is false, the loop will end as soon as all the partitions have been read.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, so pauseAfterRead ensures that task does not stop even after completion of consuming all partitions upto given offsets... and possibly supervisor would make requests to stop and publish later.

@himanshug
Copy link
Contributor

👍 besides #2730 (comment)

@dclim
Copy link
Contributor Author

dclim commented Apr 21, 2016

Thanks!

@gianm gianm merged commit 7641f26 into apache:master Apr 21, 2016
@dclim dclim deleted the kafka-index-task-endpoints branch April 21, 2016 22:57
seoeun25 pushed a commit to seoeun25/incubator-druid that referenced this pull request Jan 10, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants