Skip to content

Commit

Permalink
Some changes that make it possible to restart tasks on the same hardw…
Browse files Browse the repository at this point in the history
…are.

This is done by killing and respawning the jvms rather than reconnecting to existing
jvms, for a couple reasons. One is that it lets you restore tasks after server reboots
too, and another is that it lets you upgrade all the software on a box at once by just
restarting everything.

The main changes are,

1) Add "canRestore" and "stopGracefully" methods to Tasks that say if a task can
   stop gracefully, and actually do a graceful stop. RealtimeIndexTask is the only
   one that currently implements this.

2) Add "stop" method to TaskRunners that attempts to do an orderly shutdown.
   ThreadPoolTaskRunner- call stopGracefully on restorable tasks, wait for exit
   ForkingTaskRunner- close output stream to restorable tasks, wait for exit
   RemoteTaskRunner- do nothing special, we actually don't want to shutdown

3) Add "restore" method to TaskRunners that attempts to bootstrap tasks from last run.
   Only ForkingTaskRunner does anything here. It maintains a "restore.json" file with
   a list of restorable tasks.

4) Have the CliPeon's ExecutorLifecycle lock the task base directory to avoid a restored
   task and a zombie old task from stomping on each other.
  • Loading branch information
gianm committed Nov 23, 2015
1 parent 3656909 commit 501dcb4
Show file tree
Hide file tree
Showing 26 changed files with 1,598 additions and 227 deletions.
2 changes: 2 additions & 0 deletions docs/content/configuration/indexing-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ Additional peon configs include:
|`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|/tmp/druid-indexing|
|`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|50000|
|`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.3.0|
|`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on middleManager restart for restorable tasks to gracefully exit.|PT5M|
|`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M|

If `druid.indexer.runner.separateIngestionEndpoint` is set to true then following configurations are available for the ingestion server at peon:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public TaskToolboxFactory(

public TaskToolbox build(Task task)
{
final File taskWorkDir = new File(new File(config.getBaseTaskDir(), task.getId()), "work");
final File taskWorkDir = config.getTaskWorkDir(task.getId());

return new TaskToolbox(
config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import org.joda.time.Period;

import java.io.File;
import java.util.List;
Expand All @@ -30,6 +31,10 @@ public class TaskConfig
"org.apache.hadoop:hadoop-client:2.3.0"
);

private static final Period DEFAULT_DIRECTORY_LOCK_TIMEOUT = new Period("PT10M");

private static final Period DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = new Period("PT5M");

@JsonProperty
private final String baseDir;

Expand All @@ -45,13 +50,21 @@ public class TaskConfig
@JsonProperty
private final List<String> defaultHadoopCoordinates;

@JsonProperty
private final Period gracefulShutdownTimeout;

@JsonProperty
private final Period directoryLockTimeout;

@JsonCreator
public TaskConfig(
@JsonProperty("baseDir") String baseDir,
@JsonProperty("baseTaskDir") String baseTaskDir,
@JsonProperty("hadoopWorkingPath") String hadoopWorkingPath,
@JsonProperty("defaultRowFlushBoundary") Integer defaultRowFlushBoundary,
@JsonProperty("defaultHadoopCoordinates") List<String> defaultHadoopCoordinates
@JsonProperty("defaultHadoopCoordinates") List<String> defaultHadoopCoordinates,
@JsonProperty("gracefulShutdownTimeout") Period gracefulShutdownTimeout,
@JsonProperty("directoryLockTimeout") Period directoryLockTimeout
)
{
this.baseDir = baseDir == null ? "/tmp" : baseDir;
Expand All @@ -61,6 +74,12 @@ public TaskConfig(
this.defaultHadoopCoordinates = defaultHadoopCoordinates == null
? DEFAULT_DEFAULT_HADOOP_COORDINATES
: defaultHadoopCoordinates;
this.gracefulShutdownTimeout = gracefulShutdownTimeout == null
? DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT
: gracefulShutdownTimeout;
this.directoryLockTimeout = directoryLockTimeout == null
? DEFAULT_DIRECTORY_LOCK_TIMEOUT
: directoryLockTimeout;
}

@JsonProperty
Expand All @@ -75,6 +94,21 @@ public File getBaseTaskDir()
return baseTaskDir;
}

public File getTaskDir(String taskId)
{
return new File(baseTaskDir, taskId);
}

public File getTaskWorkDir(String taskId)
{
return new File(getTaskDir(taskId), "work");
}

public File getTaskLockFile(String taskId)
{
return new File(getTaskDir(taskId), "lock");
}

@JsonProperty
public String getHadoopWorkingPath()
{
Expand All @@ -93,6 +127,18 @@ public List<String> getDefaultHadoopCoordinates()
return defaultHadoopCoordinates;
}

@JsonProperty
public Period getGracefulShutdownTimeout()
{
return gracefulShutdownTimeout;
}

@JsonProperty
public Period getDirectoryLockTimeout()
{
return directoryLockTimeout;
}

private String defaultDir(String configParameter, final String defaultVal)
{
if (configParameter == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,19 @@ public String getClasspathPrefix()
return null;
}

@Override
public boolean canRestore()
{
return false;
}

@Override
public void stopGracefully()
{
// Should not be called when canRestore = false.
throw new UnsupportedOperationException("Cannot stop gracefully");
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
Expand All @@ -48,6 +49,9 @@
import io.druid.segment.realtime.FireDepartment;
import io.druid.segment.realtime.RealtimeMetricsMonitor;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.segment.realtime.firehose.ClippedFirehoseFactory;
import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory;
import io.druid.segment.realtime.plumber.Committers;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
Expand All @@ -62,6 +66,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;

public class RealtimeIndexTask extends AbstractTask
{
Expand Down Expand Up @@ -104,6 +109,12 @@ private static String makeDatasource(FireDepartment fireDepartment)
@JsonIgnore
private volatile Plumber plumber = null;

@JsonIgnore
private volatile Firehose firehose = null;

@JsonIgnore
private volatile boolean stopped = false;

@JsonIgnore
private volatile QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate = null;

Expand Down Expand Up @@ -285,8 +296,6 @@ public String getVersion(final Interval interval)

this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, fireDepartment.getMetrics());

// Delay firehose connection to avoid claiming input resources while the plumber is starting up.
Firehose firehose = null;
Supplier<Committer> committerSupplier = null;

try {
Expand All @@ -295,12 +304,14 @@ public String getVersion(final Interval interval)
// Set up metrics emission
toolbox.getMonitorScheduler().addMonitor(metricsMonitor);

// Set up firehose
firehose = spec.getIOConfig().getFirehoseFactory().connect(spec.getDataSchema().getParser());
// Delay firehose connection to avoid claiming input resources while the plumber is starting up.
final FirehoseFactory firehoseFactory = spec.getIOConfig().getFirehoseFactory();
final boolean firehoseDrainableByClosing = isFirehoseDrainableByClosing(firehoseFactory);
firehose = firehoseFactory.connect(spec.getDataSchema().getParser());
committerSupplier = Committers.supplierFromFirehose(firehose);

// Time to read data!
while (firehose.hasMore()) {
while ((!stopped || firehoseDrainableByClosing) && firehose.hasMore()) {
final InputRow inputRow;

try {
Expand Down Expand Up @@ -337,8 +348,38 @@ public String getVersion(final Interval interval)
finally {
if (normalExit) {
try {
plumber.persist(committerSupplier.get());
plumber.finishJob();
if (!stopped) {
// Hand off all pending data
log.info("Persisting and handing off pending data.");
plumber.persist(committerSupplier.get());
plumber.finishJob();
} else {
log.info("Persisting pending data without handoff, in preparation for restart.");
final Committer committer = committerSupplier.get();
final CountDownLatch persistLatch = new CountDownLatch(1);
plumber.persist(
new Committer()
{
@Override
public Object getMetadata()
{
return committer.getMetadata();
}

@Override
public void run()
{
try {
committer.run();
}
finally {
persistLatch.countDown();
}
}
}
);
persistLatch.await();
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to finish realtime task").emit();
Expand All @@ -352,15 +393,67 @@ public String getVersion(final Interval interval)
}
}

log.info("Job done!");
return TaskStatus.success(getId());
}

@Override
public boolean canRestore()
{
return true;
}

@Override
public void stopGracefully()
{
try {
synchronized (this) {
if (!stopped) {
stopped = true;
log.info("Gracefully stopping.");
if (isFirehoseDrainableByClosing(spec.getIOConfig().getFirehoseFactory())) {
firehose.close();
} else {
log.debug("Cannot drain firehose[%s] by closing, so skipping closing.", firehose);
}
}
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}

/**
* Public for tests.
*/
@JsonIgnore
public Firehose getFirehose()
{
return firehose;
}

@JsonProperty("spec")
public FireDepartment getRealtimeIngestionSchema()
{
return spec;
}

/**
* Is a firehose from this factory drainable by closing it? If so, we should drain on stopGracefully rather than
* abruptly stopping.
* <p/>
* This is a hack to get around the fact that the Firehose and FirehoseFactory interfaces do not help us do this.
*/
private static boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory)
{
return firehoseFactory instanceof EventReceiverFirehoseFactory
|| (firehoseFactory instanceof TimedShutoffFirehoseFactory
&& isFirehoseDrainableByClosing(((TimedShutoffFirehoseFactory) firehoseFactory).getDelegateFactory()))
|| (firehoseFactory instanceof ClippedFirehoseFactory
&& isFirehoseDrainableByClosing(((ClippedFirehoseFactory) firehoseFactory).getDelegate()));
}

public static class TaskActionSegmentPublisher implements SegmentPublisher
{
final Task task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,26 +62,30 @@ public interface Task
{
/**
* Returns ID of this task. Must be unique across all tasks ever created.
*
* @return task ID
*/
public String getId();

/**
* Returns group ID of this task. Tasks with the same group ID can share locks. If tasks do not need to share locks,
* a common convention is to set group ID equal to task ID.
*
* @return task group ID
*/
public String getGroupId();

/**
* Returns a {@link io.druid.indexing.common.task.TaskResource} for this task. Task resources define specific
* worker requirements a task may require.
*
* @return {@link io.druid.indexing.common.task.TaskResource} for this task
*/
public TaskResource getTaskResource();

/**
* Returns a descriptive label for this task type. Used for metrics emission and logging.
*
* @return task type label
*/
public String getType();
Expand All @@ -90,7 +94,7 @@ public interface Task
* Get the nodeType for if/when this task publishes on zookeeper.
*
* @return the nodeType to use when publishing the server to zookeeper. null if the task doesn't expect to
* publish to zookeeper.
* publish to zookeeper.
*/
public String getNodeType();

Expand All @@ -102,7 +106,9 @@ public interface Task
/**
* Returns query runners for this task. If this task is not meant to answer queries over its datasource, this method
* should return null.
*
* @param <T> query result type
*
* @return query runners for this task
*/
public <T> QueryRunner<T> getQueryRunner(Query<T> query);
Expand All @@ -117,7 +123,7 @@ public interface Task
* Execute preflight actions for a task. This can be used to acquire locks, check preconditions, and so on. The
* actions must be idempotent, since this method may be executed multiple times. This typically runs on the
* coordinator. If this method throws an exception, the task should be considered a failure.
*
* <p/>
* This method must be idempotent, as it may be run multiple times per task.
*
* @param taskActionClient action client for this task (not the full toolbox)
Expand All @@ -128,6 +134,20 @@ public interface Task
*/
public boolean isReady(TaskActionClient taskActionClient) throws Exception;

/**
* Returns whether or not this task can restore its progress from its on-disk working directory. Restorable tasks
* may be started with a non-empty working directory. Tasks that exit uncleanly may still have a chance to attempt
* restores, meaning that restorable tasks should be able to deal with potentially partially written on-disk state.
*/
public boolean canRestore();

/**
* Asks a task to arrange for its "run" method to exit promptly. This method will only be called if
* {@link #canRestore()} returns true. Tasks that take too long to stop gracefully will be terminated with
* extreme prejudice.
*/
public void stopGracefully();

/**
* Execute a task. This typically runs on a worker as determined by a TaskRunner, and will be run while
* holding a lock on our dataSource and implicit lock interval (if any). If this method throws an exception, the task
Expand Down
Loading

0 comments on commit 501dcb4

Please sign in to comment.