diff --git a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala index e043222d8..250b9c9a3 100644 --- a/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala +++ b/timeseries/src/main/scala/com/criteo/cuttle/timeseries/CuttleProject.scala @@ -31,6 +31,7 @@ class CuttleProject private[cuttle] (val name: String, * @param stateRetention If specified, automatically clean the timeseries state older than the given duration. * @param logsRetention If specified, automatically clean the execution logs older than the given duration. * @param maxVersionsHistory If specified keep only the version information for the x latest versions. + * @param jobsToBePausedOnStartup Automatically pause those jobs at startup. Ignored if `paused == true`. */ def start( platforms: Seq[ExecutionPlatform] = CuttleProject.defaultPlatforms, @@ -40,7 +41,8 @@ class CuttleProject private[cuttle] (val name: String, paused: Boolean = false, stateRetention: Option[Duration] = None, logsRetention: Option[Duration] = None, - maxVersionsHistory: Option[Int] = None + maxVersionsHistory: Option[Int] = None, + jobsToBePausedOnStartup: Set[Job[TimeSeries]] = Set.empty ): Unit = { val (routes, startScheduler) = build(platforms, databaseConfig, retryStrategy, paused, stateRetention, logsRetention, maxVersionsHistory) @@ -66,6 +68,7 @@ class CuttleProject private[cuttle] (val name: String, * @param stateRetention If specified, automatically clean the timeseries state older than the given duration. * @param logsRetention If specified, automatically clean the execution logs older than the given duration. * @param maxVersionsHistory If specified keep only the version information for the x latest versions. + * @param jobsToBePausedOnStartup Automatically pause those jobs at startup. Ignored if `paused == true`. * * @return a tuple with cuttleRoutes (needed to start a server) and a function to start the scheduler */ @@ -76,7 +79,8 @@ class CuttleProject private[cuttle] (val name: String, paused: Boolean = false, stateRetention: Option[Duration] = None, logsRetention: Option[Duration] = None, - maxVersionsHistory: Option[Int] = None + maxVersionsHistory: Option[Int] = None, + jobsToBePausedOnStartup: Set[Job[TimeSeries]] = Set.empty ): (Service, () => Unit) = { val xa = CuttleDatabase.connect(databaseConfig)(logger) val executor = new Executor[TimeSeries](platforms, xa, logger, name, version, logsRetention)(retryStrategy) @@ -86,6 +90,12 @@ class CuttleProject private[cuttle] (val name: String, if (paused) { logger.info("Pausing workflow") scheduler.pauseJobs(jobs.all, executor, xa)(Auth.User("Startup")) + } else { + val toPause = jobs.all.intersect(jobsToBePausedOnStartup) + if (toPause.nonEmpty) { + logger.info(s"Pausing ${toPause.map(j => s"${j.name} (${j.id})").mkString(", ")}") + scheduler.pauseJobs(toPause, executor, xa)(Auth.User("Startup")) + } } logger.info("Start workflow") scheduler.start(jobs, executor, xa, logger)