From 91686ef051dbf4041f2f152fa6b190ecafc733ac Mon Sep 17 00:00:00 2001 From: Brice Laurencin Date: Wed, 19 Feb 2020 12:16:59 +0100 Subject: [PATCH] New feature: can pause specific jobs upon startup --- .../com/criteo/cuttle/timeseries/CuttleProject.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala index e043222d8..846769410 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala @@ -31,6 +31,7 @@ class CuttleProject private[cuttle] (val name: String, * @param stateRetention If specified, automatically clean the timeseries state older than the given duration. * @param logsRetention If specified, automatically clean the execution logs older than the given duration. * @param maxVersionsHistory If specified keep only the version information for the x latest versions. + * @param jobsToBePausedOnStartup Automatically pause those jobs at startup. Ignored if `paused == true`. */ def start( platforms: Seq[ExecutionPlatform] = CuttleProject.defaultPlatforms, @@ -40,7 +41,8 @@ class CuttleProject private[cuttle] (val name: String, paused: Boolean = false, stateRetention: Option[Duration] = None, logsRetention: Option[Duration] = None, - maxVersionsHistory: Option[Int] = None + maxVersionsHistory: Option[Int] = None, + jobsToBePausedOnStartup: Set[Job[TimeSeries]] = Set.empty ): Unit = { val (routes, startScheduler) = build(platforms, databaseConfig, retryStrategy, paused, stateRetention, logsRetention, maxVersionsHistory) @@ -66,6 +68,7 @@ class CuttleProject private[cuttle] (val name: String, * @param stateRetention If specified, automatically clean the timeseries state older than the given duration. * @param logsRetention If specified, automatically clean the execution logs older than the given duration. * @param maxVersionsHistory If specified keep only the version information for the x latest versions. + * @param jobsToBePausedOnStartup Automatically pause those jobs at startup. Ignored if `paused == true`. * * @return a tuple with cuttleRoutes (needed to start a server) and a function to start the scheduler */ @@ -76,7 +79,8 @@ class CuttleProject private[cuttle] (val name: String, paused: Boolean = false, stateRetention: Option[Duration] = None, logsRetention: Option[Duration] = None, - maxVersionsHistory: Option[Int] = None + maxVersionsHistory: Option[Int] = None, + jobsToBePausedOnStartup: Set[Job[TimeSeries]] = Set.empty ): (Service, () => Unit) = { val xa = CuttleDatabase.connect(databaseConfig)(logger) val executor = new Executor[TimeSeries](platforms, xa, logger, name, version, logsRetention)(retryStrategy) @@ -86,6 +90,10 @@ class CuttleProject private[cuttle] (val name: String, if (paused) { logger.info("Pausing workflow") scheduler.pauseJobs(jobs.all, executor, xa)(Auth.User("Startup")) + } else { + val toPause = jobs.all.filter(_ == jobsToBePausedOnStartup) + logger.info(s"Pausing ${toPause.map(j => s"${j.name} (${j.id})").mkString(" ")}") + scheduler.pauseJobs(toPause, executor, xa)(Auth.User("Startup")) } logger.info("Start workflow") scheduler.start(jobs, executor, xa, logger)