Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restorable indexing tasks #1881

Merged
merged 2 commits into from
Nov 24, 2015
Merged

Restorable indexing tasks #1881

merged 2 commits into from
Nov 24, 2015

Conversation

gianm
Copy link
Contributor

@gianm gianm commented Oct 28, 2015

Some changes that make it possible to restart tasks on the same hardware.

This is done by killing and respawning the jvms rather than reconnecting to existing
jvms, for a couple reasons. One is that it lets you restore tasks after server reboots
too, and another is that it lets you upgrade all the software on a box at once by just
restarting everything.

The main changes are,

  1. Add "canRestore" and "stopGracefully" methods to Tasks that say if a task can
    stop gracefully, and actually do a graceful stop. RealtimeIndexTask is the only
    one that currently implements this.
  2. Add "stop" method to TaskRunners that attempts to do an orderly shutdown.
    ThreadPoolTaskRunner- call stopGracefully on restorable tasks, wait for exit
    ForkingTaskRunner- close output stream to restorable tasks, wait for exit
    RemoteTaskRunner- do nothing special, we actually don't want to shutdown
  3. Add "restore" method to TaskRunners that attempts to bootstrap tasks from last run.
    Only ForkingTaskRunner does anything here. It maintains a "restore.json" file with
    a list of restorable tasks.
  4. Have the CliPeon's ExecutorLifecycle lock the task base directory to prevent a restored
    task and a zombie old task from stomping on each other.

@gianm
Copy link
Contributor Author

gianm commented Oct 28, 2015

jdk8 DruidCoordinatorTest.testCoordinatorRun:349 expected:<1> but was:<0>

@gianm gianm closed this Oct 28, 2015
@gianm gianm reopened this Oct 28, 2015
public void run()
{
persistLatch.countDown();
committer.run();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the contract for persist that it will run in a non-daemon thread? also, if the committer errors out is there any special "oh crap" message that needs to fly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it runs in a daemon thread, but that's okay because there's a non-daemon thread (the shutdown hook) which is waiting for it to finish.

If the committer errors out then that would get logged by the plumber.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to interchange persistLatch.countDown() and committer.run()? I believe persistLatch.await() should wait for task to really finish.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is a good point. I was thinking that the persistlatch needs to count down even if committer.run() throws an exception- but we can address this with a try/catch

plumber.finishJob();
} else {
log.info("Persisting pending data without handoff, in preparation for restart.");
final Committer committer = committerSupplier.get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here the firehose is already closed while doing shutdown.
Is it safe to call commit on a closed firehose ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this might not work for KafkaEightFirehoseFactory too since the underlying connector is already closed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nishantmonu51 that's a good point, this does not really work very well with the kafka firehose. But then again, RealtimeIndexTasks never did work well with kafka…

do you think it's worth it to rework things such that this works well for kafka and the event receiver? I think to do that we would want the behavior,

  • if you're using EventReceiverFirehose, stopGracefully causes the servlet to stop accepting new data, and the task will drain existing data, then stop.
  • if you're using the Kafka firehose, stopGracefully causes the task to simply stop reading data, then persist/commit, then stop.

This could probably be accomplished somehow…

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree things dont work very well with kafka firehose at present also, till now we also didn't had a concept for graceful restart, If we are going to provide that functionality, I think we should also look into how we can make it working with our current firehoses.
Also, to others in community who might have written their own custom firehoses, a call to commit after a firehose has been shutdown may be unexpected and might result in weird errors.

Both the behaviours for EventReceiverFirehose and KafkaFirehose seem good and points to that we may need to add an API to the firehose where instead of completely shutting it down, we ask firehose to stop reading any further events, ingest all events which might be in some buffers, persist and call commit on firehose, shutdown the firehose and release any resources being held up by the firehose .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think some people do use kafka firehose (with either partition or replication) and it works in the specific cases, with this change that will break.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nishantmonu51 @himanshug thinking of doing this by just having special behavior triggered by an instanceof check for EventReceiverFirehose. There doesn't seem to be a nicer way to do it with the current firehose interface. Basically- the ERF would get closed and drained, all other firehoses we would simply immediately stop reading. For those we would rely on commit() being an effective way to get back undrained data.

@nishantmonu51
Copy link
Member

After these changes, what is the behaviour of task logs ?
Do they get uploaded to s3 on each restart with diff file name or the new logs will override prev logs ?
or they get appended to the same file and finally when the task completes/fails they are uploaded to S3 ?

@gianm
Copy link
Contributor Author

gianm commented Oct 29, 2015

@nishantmonu51 they get uploaded to S3 on each restart, with new uploads overriding previous uploads. The FTR opens the log in "append" mode so each upload will contain logs from all previous runs of the same task.

@gianm
Copy link
Contributor Author

gianm commented Oct 29, 2015

@nishantmonu51 I do think that in the future we should make the uploading happen in chunks rather than all at once- but this would be a different PR

taskRestoreInfo = jsonMapper.readValue(restoreFile, TaskRestoreInfo.class);
}
catch (Exception e) {
log.warn(e, "Failed to restore tasks from file[%s]. Skipping restore.", restoreFile);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be error as restoreFile existence should mean that there are valid tasks to restore and for whatever reason reading the file failed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to me.

@gianm
Copy link
Contributor Author

gianm commented Oct 29, 2015

@himanshug @nishantmonu51 @drcrallen pushed a new commit, the main change is in firehose-closing behavior. also looking at adding some tests.

@gianm
Copy link
Contributor Author

gianm commented Oct 29, 2015

hmm, I think there's a further problem. If you don't close the kafka firehose, and no new data is forthcoming, I believe it will block forever on hasMore. I don't think there's anything we can do about that right now, but that points to potentially wanting to have a timeout on hasMore or a poll-style Firehose interface.

@gianm
Copy link
Contributor Author

gianm commented Oct 29, 2015

What that means is that if you do try to stopGracefully a realtime task reading from a dry kafka firehose, it will likely time out and be killed after the gracefulShutdownTimeout (default 5 minutes)

@@ -62,6 +63,8 @@
import java.io.IOException;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, yes, removed.

@fjy fjy added this to the 0.9.0 milestone Oct 29, 2015
@gianm gianm mentioned this pull request Oct 30, 2015
log.info("Acquired lock file[%s] in %,dms.", taskLockFile, System.currentTimeMillis() - startLocking);
}
} else {
throw new ISE("Already started!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this ever happen or just to catch if start() is called twice (or more)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to catch if start() is called more than once- which it should not be.

@gianm
Copy link
Contributor Author

gianm commented Oct 30, 2015

@himanshug @nishantmonu51 @drcrallen @guobingkun pushed updates for outstanding comments + also some unit tests for restoring realtime tasks

log.info("Starting graceful shutdown of task[%s].", task.getId());

try {
task.stopGracefully();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we emit a metric about this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to have a way to keep track of how many graceful stops we do

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

@fjy fjy closed this Nov 23, 2015
@fjy fjy reopened this Nov 23, 2015
…are.

This is done by killing and respawning the jvms rather than reconnecting to existing
jvms, for a couple reasons. One is that it lets you restore tasks after server reboots
too, and another is that it lets you upgrade all the software on a box at once by just
restarting everything.

The main changes are,

1) Add "canRestore" and "stopGracefully" methods to Tasks that say if a task can
   stop gracefully, and actually do a graceful stop. RealtimeIndexTask is the only
   one that currently implements this.

2) Add "stop" method to TaskRunners that attempts to do an orderly shutdown.
   ThreadPoolTaskRunner- call stopGracefully on restorable tasks, wait for exit
   ForkingTaskRunner- close output stream to restorable tasks, wait for exit
   RemoteTaskRunner- do nothing special, we actually don't want to shutdown

3) Add "restore" method to TaskRunners that attempts to bootstrap tasks from last run.
   Only ForkingTaskRunner does anything here. It maintains a "restore.json" file with
   a list of restorable tasks.

4) Have the CliPeon's ExecutorLifecycle lock the task base directory to avoid a restored
   task and a zombie old task from stomping on each other.
@gianm
Copy link
Contributor Author

gianm commented Nov 23, 2015

@drcrallen @himanshug updated the branch with changes from code review

@fjy fjy closed this Nov 24, 2015
@fjy fjy reopened this Nov 24, 2015
@fjy
Copy link
Contributor

fjy commented Nov 24, 2015

we've been using this internally for awhile, seems to work

@himanshug
Copy link
Contributor

👍 with the changes too

@fjy fjy closed this Nov 24, 2015
@fjy fjy reopened this Nov 24, 2015
fjy added a commit that referenced this pull request Nov 24, 2015
@fjy fjy merged commit 8e83d80 into apache:master Nov 24, 2015
@xvrl
Copy link
Member

xvrl commented Nov 24, 2015

would be great to add some integration tests for this feature.

@gianm gianm modified the milestones: 0.8.3, 0.9.0 Dec 1, 2015
This was referenced Dec 1, 2015
@gianm gianm deleted the restartable-tasks branch February 25, 2016 16:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants