From 1ed8a40fb131594fddb65f44beb8a2ef2eaa876c Mon Sep 17 00:00:00 2001 From: Florian M Date: Tue, 28 Nov 2023 13:41:18 +0100 Subject: [PATCH 01/12] wip: finer control over which worker does what --- app/models/job/Worker.scala | 8 ++++++-- conf/application.conf | 2 +- tools/postgres/schema.sql | 4 +++- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/app/models/job/Worker.scala b/app/models/job/Worker.scala index 1c240d7cecb..021043c6ca7 100644 --- a/app/models/job/Worker.scala +++ b/app/models/job/Worker.scala @@ -23,7 +23,9 @@ import scala.concurrent.duration._ case class Worker(_id: ObjectId, _dataStore: String, key: String, - maxParallelJobs: Int, + maxParallelHighPriorityJobs: Int, + maxParallelLowPriorityJobs: Int, + supportedJobCommands: Set[String], lastHeartBeat: Long = 0, created: Instant = Instant.now, isDeleted: Boolean = false) @@ -42,7 +44,9 @@ class WorkerDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext) ObjectId(r._Id), r._Datastore, r.key, - r.maxparalleljobs, + r.maxparallelhighpriorityjobs, + r.maxparallellowpriorityjobs, + parseArrayLiteral(r.supportedjobcommands).toSet, r.lastheartbeat.getTime, Instant.fromSql(r.created), r.isdeleted diff --git a/conf/application.conf b/conf/application.conf index b1cca532f4b..e9e35e1e7e0 100644 --- a/conf/application.conf +++ b/conf/application.conf @@ -138,7 +138,7 @@ features { taskReopenAllowedInSeconds = 30 allowDeleteDatasets = true # to enable jobs for local development, use "yarn enable-jobs" to also activate it in the database - jobsEnabled = false + jobsEnabled = true voxelyticsEnabled = false # For new users, the dashboard will show a banner which encourages the user to check out the following dataset. # If isWkorgInstance == true, `/createExplorative/hybrid/true` is appended to the URL so that a new tracing is opened. diff --git a/tools/postgres/schema.sql b/tools/postgres/schema.sql index 1b3af23ba51..adea58c34c2 100644 --- a/tools/postgres/schema.sql +++ b/tools/postgres/schema.sql @@ -445,7 +445,9 @@ CREATE TABLE webknossos.workers( _id CHAR(24) PRIMARY KEY, _dataStore VARCHAR(256) NOT NULL, key VARCHAR(1024) NOT NULL UNIQUE, - maxParallelJobs INT NOT NULL DEFAULT 1, + maxParallelHighPriorityJobs INT NOT NULL DEFAULT 1, + maxParallelLowPriorityJobs INT NOT NULL DEFAULT 1, + supportedJobCommands VARCHAR(256)[] NOT NULL, lastHeartBeat TIMESTAMPTZ NOT NULL DEFAULT '2000-01-01T00:00:00Z', created TIMESTAMPTZ NOT NULL DEFAULT NOW(), isDeleted BOOLEAN NOT NULL DEFAULT false From 3705391a6c2653935dd56253a41460d636e8dff6 Mon Sep 17 00:00:00 2001 From: Florian M Date: Tue, 28 Nov 2023 13:50:37 +0100 Subject: [PATCH 02/12] compile --- app/controllers/WKRemoteWorkerController.scala | 4 ++-- app/models/job/JobCommand.scala | 2 ++ app/models/job/Worker.scala | 4 +++- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/app/controllers/WKRemoteWorkerController.scala b/app/controllers/WKRemoteWorkerController.scala index ceb0d35f066..6c296130b36 100644 --- a/app/controllers/WKRemoteWorkerController.scala +++ b/app/controllers/WKRemoteWorkerController.scala @@ -33,8 +33,8 @@ class WKRemoteWorkerController @Inject()(jobDAO: JobDAO, jobService: JobService, private def reserveNextJobs(worker: Worker): Fox[Unit] = for { unfinishedCount <- jobDAO.countUnfinishedByWorker(worker._id) - pendingCount <- jobDAO.countUnassignedPendingForDataStore(worker._dataStore) - _ <- if (unfinishedCount >= worker.maxParallelJobs || pendingCount == 0) Fox.successful(()) + pendingCount <- jobDAO.countUnassignedPendingForDataStore(worker._dataStore) // TODO + _ <- if (unfinishedCount >= worker.maxParallelLowPriorityJobs || pendingCount == 0) Fox.successful(()) else { jobDAO.reserveNextJob(worker).flatMap { _ => reserveNextJobs(worker) diff --git a/app/models/job/JobCommand.scala b/app/models/job/JobCommand.scala index dcfe1c594c3..e49db905912 100644 --- a/app/models/job/JobCommand.scala +++ b/app/models/job/JobCommand.scala @@ -7,4 +7,6 @@ object JobCommand extends ExtendedEnumeration { val compute_mesh_file, convert_to_wkw, export_tiff, find_largest_segment_id, globalize_floodfills, infer_nuclei, infer_neurons, materialize_volume_annotation, render_animation = Value + + val highPriorityJobs: Seq[Value] = Seq(convert_to_wkw, export_tiff) } diff --git a/app/models/job/Worker.scala b/app/models/job/Worker.scala index 021043c6ca7..20600cc0ad8 100644 --- a/app/models/job/Worker.scala +++ b/app/models/job/Worker.scala @@ -81,7 +81,9 @@ class WorkerService @Inject()(conf: WkConf, dataStoreDAO: DataStoreDAO, workerDA def publicWrites(worker: Worker): JsObject = Json.obj( "id" -> worker._id.id, - "maxParallelJobs" -> worker.maxParallelJobs, + "maxParallelHighPriorityJobs" -> worker.maxParallelHighPriorityJobs, + "maxParallelLowPriorityJobs" -> worker.maxParallelLowPriorityJobs, + "supportedJobCommands" -> worker.supportedJobCommands, "created" -> worker.created, "lastHeartBeat" -> worker.lastHeartBeat, "lastHeartBeatIsRecent" -> lastHeartBeatIsRecent(worker) From 1936f239213e16c98a6ce9fda6b7f3fdf3b66358 Mon Sep 17 00:00:00 2001 From: Florian M Date: Tue, 28 Nov 2023 14:17:46 +0100 Subject: [PATCH 03/12] implement priority + supported job commands logic --- .../WKRemoteWorkerController.scala | 27 +++++++++++-- app/models/job/Job.scala | 38 ++++++++++--------- app/models/job/JobCommand.scala | 3 +- app/models/job/Worker.scala | 12 ++++-- 4 files changed, 54 insertions(+), 26 deletions(-) diff --git a/app/controllers/WKRemoteWorkerController.scala b/app/controllers/WKRemoteWorkerController.scala index 6c296130b36..6ecfe1cb492 100644 --- a/app/controllers/WKRemoteWorkerController.scala +++ b/app/controllers/WKRemoteWorkerController.scala @@ -2,6 +2,8 @@ package controllers import com.scalableminds.util.accesscontext.GlobalAccessContext import com.scalableminds.util.tools.Fox +import models.job.JobCommand.JobCommand + import javax.inject.Inject import models.job._ import play.api.libs.json.Json @@ -32,16 +34,33 @@ class WKRemoteWorkerController @Inject()(jobDAO: JobDAO, jobService: JobService, private def reserveNextJobs(worker: Worker): Fox[Unit] = for { - unfinishedCount <- jobDAO.countUnfinishedByWorker(worker._id) - pendingCount <- jobDAO.countUnassignedPendingForDataStore(worker._dataStore) // TODO - _ <- if (unfinishedCount >= worker.maxParallelLowPriorityJobs || pendingCount == 0) Fox.successful(()) + unfinishedHighPriorityCount <- jobDAO.countUnfinishedByWorker(worker._id, JobCommand.highPriorityJobs) + unfinishedLowPriorityCount <- jobDAO.countUnfinishedByWorker(worker._id, JobCommand.lowPriorityJobs) + pendingHighPriorityCount <- jobDAO.countUnassignedPendingForDataStore( + worker._dataStore, + JobCommand.highPriorityJobs.intersect(worker.supportedJobCommands)) + pendingLowPriorityCount <- jobDAO.countUnassignedPendingForDataStore( + worker._dataStore, + JobCommand.lowPriorityJobs.intersect(worker.supportedJobCommands)) + mayAssignHighPriorityJob = unfinishedHighPriorityCount < worker.maxParallelHighPriorityJobs && pendingHighPriorityCount > 0 + mayAssignLowPriorityJob = unfinishedLowPriorityCount < worker.maxParallelLowPriorityJobs && pendingLowPriorityCount > 0 + currentlyAssignableJobCommands = assignableJobCommands(mayAssignHighPriorityJob, mayAssignLowPriorityJob) + _ <- if ((unfinishedHighPriorityCount >= worker.maxParallelHighPriorityJobs && unfinishedLowPriorityCount >= worker.maxParallelLowPriorityJobs) || (pendingLowPriorityCount == 0 && pendingHighPriorityCount == 0)) + Fox.successful(()) else { - jobDAO.reserveNextJob(worker).flatMap { _ => + jobDAO.reserveNextJob(worker, currentlyAssignableJobCommands).flatMap { _ => reserveNextJobs(worker) } } } yield () + private def assignableJobCommands(mayAssignHighPriorityJob: Boolean, + mayAssignLowPriorityJob: Boolean): Set[JobCommand] = { + val lowPriorityOrEmpty = if (mayAssignLowPriorityJob) JobCommand.lowPriorityJobs else Set() + val highPriorityOrEmpty = if (mayAssignHighPriorityJob) JobCommand.highPriorityJobs else Set() + lowPriorityOrEmpty ++ highPriorityOrEmpty + } + def updateJobStatus(key: String, id: String): Action[JobStatus] = Action.async(validateJson[JobStatus]) { implicit request => for { diff --git a/app/models/job/Job.scala b/app/models/job/Job.scala index bdf6591accb..8180ebe194a 100644 --- a/app/models/job/Job.scala +++ b/app/models/job/Job.scala @@ -142,22 +142,24 @@ class JobDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext) parsed <- parseFirst(r, jobId) } yield parsed - def countUnassignedPendingForDataStore(dataStoreName: String): Fox[Int] = + def countUnassignedPendingForDataStore(dataStoreName: String, jobCommands: Set[JobCommand]): Fox[Int] = for { r <- run(q"""select count(_id) from $existingCollectionName where state = ${JobState.PENDING} + AND command IN ${SqlToken.tupleFromList(jobCommands)} and manualState is null and _dataStore = $dataStoreName and _worker is null""".as[Int]) head <- r.headOption } yield head - def countUnfinishedByWorker(workerId: ObjectId): Fox[Int] = + def countUnfinishedByWorker(workerId: ObjectId, jobCommands: Set[JobCommand]): Fox[Int] = for { r <- run(q"""SELECT COUNT(_id) FROM $existingCollectionName WHERE _worker = $workerId - AND state in ${SqlToken.tupleFromValues(JobState.PENDING, JobState.STARTED)} + AND state IN ${SqlToken.tupleFromValues(JobState.PENDING, JobState.STARTED)} + AND command IN ${SqlToken.tupleFromList(jobCommands)} AND manualState IS NULL""".as[Int]) head <- r.headOption } yield head @@ -219,25 +221,27 @@ class JobDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext) where _id = $jobId""".asUpdate) } yield () - def reserveNextJob(worker: Worker): Fox[Unit] = { + def reserveNextJob(worker: Worker, jobCommands: Set[JobCommand]): Fox[Unit] = { val query = q""" - with subquery as ( - select _id - from $existingCollectionName - where + WITH subquery AS ( + SELECT _id + FROM $existingCollectionName + WHERE state = ${JobState.PENDING} - and _dataStore = ${worker._dataStore} - and manualState is NULL - and _worker is NULL - order by created - limit 1 + AND _dataStore = ${worker._dataStore} + AND manualState IS NULL + AND _worker IS NULL + AND command IN ${SqlToken.tupleFromList(jobCommands)} + ORDER BY created + LIMIT 1 ) - update webknossos.jobs_ j - set _worker = ${worker._id} - from subquery - where j._id = subquery._id + UPDATE webknossos.jobs_ j + SET _worker = ${worker._id} + FROM subquery + WHERE j._id = subquery._id """.asUpdate for { + _ <- Fox.successful(logger.info("reserve next job")) _ <- run( query.withTransactionIsolation(Serializable), retryCount = 50, diff --git a/app/models/job/JobCommand.scala b/app/models/job/JobCommand.scala index e49db905912..278fc4f9a40 100644 --- a/app/models/job/JobCommand.scala +++ b/app/models/job/JobCommand.scala @@ -8,5 +8,6 @@ object JobCommand extends ExtendedEnumeration { val compute_mesh_file, convert_to_wkw, export_tiff, find_largest_segment_id, globalize_floodfills, infer_nuclei, infer_neurons, materialize_volume_annotation, render_animation = Value - val highPriorityJobs: Seq[Value] = Seq(convert_to_wkw, export_tiff) + val highPriorityJobs: Set[Value] = Set(convert_to_wkw, export_tiff) + val lowPriorityJobs: Set[Value] = values.diff(highPriorityJobs) } diff --git a/app/models/job/Worker.scala b/app/models/job/Worker.scala index 20600cc0ad8..ed626fc7cbe 100644 --- a/app/models/job/Worker.scala +++ b/app/models/job/Worker.scala @@ -9,6 +9,7 @@ import com.scalableminds.webknossos.datastore.helpers.IntervalScheduler import com.scalableminds.webknossos.schema.Tables._ import com.typesafe.scalalogging.LazyLogging import models.dataset.DataStoreDAO +import models.job.JobCommand.JobCommand import play.api.inject.ApplicationLifecycle import play.api.libs.json.{JsObject, Json} import slick.lifted.Rep @@ -25,7 +26,7 @@ case class Worker(_id: ObjectId, key: String, maxParallelHighPriorityJobs: Int, maxParallelLowPriorityJobs: Int, - supportedJobCommands: Set[String], + supportedJobCommands: Set[JobCommand], lastHeartBeat: Long = 0, created: Instant = Instant.now, isDeleted: Boolean = false) @@ -39,19 +40,22 @@ class WorkerDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext) protected def isDeletedColumn(x: Workers): Rep[Boolean] = x.isdeleted protected def parse(r: WorkersRow): Fox[Worker] = - Fox.successful( + for { + supportedJobCommands <- Fox.serialCombined(parseArrayLiteral(r.supportedjobcommands)) { s => + JobCommand.fromString(s).toFox + } + } yield Worker( ObjectId(r._Id), r._Datastore, r.key, r.maxparallelhighpriorityjobs, r.maxparallellowpriorityjobs, - parseArrayLiteral(r.supportedjobcommands).toSet, + supportedJobCommands.toSet, r.lastheartbeat.getTime, Instant.fromSql(r.created), r.isdeleted ) - ) def findOneByKey(key: String): Fox[Worker] = for { From e271af7c5518fc8c4e2928985d6ed8a6bd73944e Mon Sep 17 00:00:00 2001 From: Florian M Date: Tue, 28 Nov 2023 15:18:17 +0100 Subject: [PATCH 04/12] evolution --- app/models/job/Job.scala | 47 +++++++++++-------- app/models/job/Worker.scala | 2 +- conf/evolutions/110-worker-config.sql | 22 +++++++++ .../reversions/110-worker-config.sql | 19 ++++++++ tools/postgres/schema.sql | 4 +- 5 files changed, 72 insertions(+), 22 deletions(-) create mode 100644 conf/evolutions/110-worker-config.sql create mode 100644 conf/evolutions/reversions/110-worker-config.sql diff --git a/app/models/job/Job.scala b/app/models/job/Job.scala index 8180ebe194a..8bbe0fa0f6e 100644 --- a/app/models/job/Job.scala +++ b/app/models/job/Job.scala @@ -143,26 +143,32 @@ class JobDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext) } yield parsed def countUnassignedPendingForDataStore(dataStoreName: String, jobCommands: Set[JobCommand]): Fox[Int] = - for { - r <- run(q"""select count(_id) from $existingCollectionName + if (jobCommands.isEmpty) Fox.successful(0) + else { + for { + r <- run(q"""select count(_id) from $existingCollectionName where state = ${JobState.PENDING} AND command IN ${SqlToken.tupleFromList(jobCommands)} and manualState is null and _dataStore = $dataStoreName and _worker is null""".as[Int]) - head <- r.headOption - } yield head + head <- r.headOption + } yield head + } def countUnfinishedByWorker(workerId: ObjectId, jobCommands: Set[JobCommand]): Fox[Int] = - for { - r <- run(q"""SELECT COUNT(_id) + if (jobCommands.isEmpty) Fox.successful(0) + else { + for { + r <- run(q"""SELECT COUNT(_id) FROM $existingCollectionName WHERE _worker = $workerId AND state IN ${SqlToken.tupleFromValues(JobState.PENDING, JobState.STARTED)} AND command IN ${SqlToken.tupleFromList(jobCommands)} AND manualState IS NULL""".as[Int]) - head <- r.headOption - } yield head + head <- r.headOption + } yield head + } def findAllUnfinishedByWorker(workerId: ObjectId): Fox[List[Job]] = for { @@ -221,8 +227,11 @@ class JobDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext) where _id = $jobId""".asUpdate) } yield () - def reserveNextJob(worker: Worker, jobCommands: Set[JobCommand]): Fox[Unit] = { - val query = q""" + def reserveNextJob(worker: Worker, jobCommands: Set[JobCommand]): Fox[Unit] = + if (jobCommands.isEmpty) Fox.successful() + else { + val query = + q""" WITH subquery AS ( SELECT _id FROM $existingCollectionName @@ -240,15 +249,15 @@ class JobDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext) FROM subquery WHERE j._id = subquery._id """.asUpdate - for { - _ <- Fox.successful(logger.info("reserve next job")) - _ <- run( - query.withTransactionIsolation(Serializable), - retryCount = 50, - retryIfErrorContains = List(transactionSerializationError) - ) - } yield () - } + for { + _ <- Fox.successful(logger.info("reserve next job")) + _ <- run( + query.withTransactionIsolation(Serializable), + retryCount = 50, + retryIfErrorContains = List(transactionSerializationError) + ) + } yield () + } def countByState: Fox[Map[String, Int]] = for { diff --git a/app/models/job/Worker.scala b/app/models/job/Worker.scala index ed626fc7cbe..09a4a313c4a 100644 --- a/app/models/job/Worker.scala +++ b/app/models/job/Worker.scala @@ -42,7 +42,7 @@ class WorkerDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext) protected def parse(r: WorkersRow): Fox[Worker] = for { supportedJobCommands <- Fox.serialCombined(parseArrayLiteral(r.supportedjobcommands)) { s => - JobCommand.fromString(s).toFox + JobCommand.fromString(s).toFox ?~> f"$s is not a valid job command" } } yield Worker( diff --git a/conf/evolutions/110-worker-config.sql b/conf/evolutions/110-worker-config.sql new file mode 100644 index 00000000000..560c936dc83 --- /dev/null +++ b/conf/evolutions/110-worker-config.sql @@ -0,0 +1,22 @@ +START TRANSACTION; + +do $$ begin ASSERT (select schemaVersion from webknossos.releaseInformation) = 109, 'Previous schema version mismatch'; end; $$ LANGUAGE plpgsql; + +DROP VIEW webknossos.workers_; + +ALTER TABLE webknossos.workers ADD COLUMN maxParallelHighPriorityJobs INT NOT NULL DEFAULT 1; +ALTER TABLE webknossos.workers ADD COLUMN maxParallelLowPriorityJobs INT NOT NULL DEFAULT 1; +ALTER TABLE webknossos.workers ADD COLUMN supportedJobCommands VARCHAR(256)[] NOT NULL DEFAULT array[]::varchar(256)[]; + +UPDATE webknossos.workers set maxParallelHighPriorityJobs = maxParallelJobs; +UPDATE webknossos.workers set maxParallelLowPriorityJobs = maxParallelJobs; + +ALTER TABLE webknossos.workers DROP COLUMN maxParallelJobs; + +UPDATE + +CREATE VIEW webknossos.workers_ as SELECT * FROM webknossos.workers WHERE NOT isDeleted; + +UPDATE webknossos.releaseInformation SET schemaVersion = 110; + +COMMIT TRANSACTION; diff --git a/conf/evolutions/reversions/110-worker-config.sql b/conf/evolutions/reversions/110-worker-config.sql new file mode 100644 index 00000000000..85b46f48e8c --- /dev/null +++ b/conf/evolutions/reversions/110-worker-config.sql @@ -0,0 +1,19 @@ +START TRANSACTION; + +do $$ begin ASSERT (select schemaVersion from webknossos.releaseInformation) = 110, 'Previous schema version mismatch'; end; $$ LANGUAGE plpgsql; + +DROP VIEW webknossos.workers_; + +ALTER TABLE webknossos.workers ADD COLUMN maxParallelJobs INT NOT NULL DEFAULT 1, + +UPDATE webknossos.workers set maxParallelJobs = maxParallelHighPriorityJobs; + +ALTER TABLE webknossos.workers DROP COLUMN maxParallelHighPriorityJobs; +ALTER TABLE webknossos.workers DROP COLUMN maxParallelLowPriorityJobs; +ALTER TABLE webknossos.workers DROP COLUMN supportedJobCommands; + +CREATE VIEW webknossos.workers_ as SELECT * FROM webknossos.workers WHERE NOT isDeleted; + +UPDATE webknossos.releaseInformation SET schemaVersion = 109; + +COMMIT TRANSACTION; diff --git a/tools/postgres/schema.sql b/tools/postgres/schema.sql index adea58c34c2..4afa2f60206 100644 --- a/tools/postgres/schema.sql +++ b/tools/postgres/schema.sql @@ -20,7 +20,7 @@ CREATE TABLE webknossos.releaseInformation ( schemaVersion BIGINT NOT NULL ); -INSERT INTO webknossos.releaseInformation(schemaVersion) values(109); +INSERT INTO webknossos.releaseInformation(schemaVersion) values(110); COMMIT TRANSACTION; @@ -447,7 +447,7 @@ CREATE TABLE webknossos.workers( key VARCHAR(1024) NOT NULL UNIQUE, maxParallelHighPriorityJobs INT NOT NULL DEFAULT 1, maxParallelLowPriorityJobs INT NOT NULL DEFAULT 1, - supportedJobCommands VARCHAR(256)[] NOT NULL, + supportedJobCommands VARCHAR(256)[] NOT NULL DEFAULT array[]::varchar(256)[], lastHeartBeat TIMESTAMPTZ NOT NULL DEFAULT '2000-01-01T00:00:00Z', created TIMESTAMPTZ NOT NULL DEFAULT NOW(), isDeleted BOOLEAN NOT NULL DEFAULT false From c6cf6b47a51b8383aaaa35c91bd8e417150e877c Mon Sep 17 00:00:00 2001 From: Florian M Date: Tue, 28 Nov 2023 15:24:49 +0100 Subject: [PATCH 05/12] prepare isWorkerAvailableFor route --- app/controllers/JobsController.scala | 5 +++++ conf/webknossos.latest.routes | 1 + 2 files changed, 6 insertions(+) diff --git a/app/controllers/JobsController.scala b/app/controllers/JobsController.scala index efa2c1aa3de..d616e2e9b77 100644 --- a/app/controllers/JobsController.scala +++ b/app/controllers/JobsController.scala @@ -109,6 +109,11 @@ class JobsController @Inject()( } yield Ok(js) } + def isWorkerAvailableFor(command: String, dataStoreName: String) = sil.UserAwareAction.async { implicit request => + // TODO + Fox.successful(Ok) + } + // Note that the dataset has to be registered by reserveUpload via the datastore first. def runConvertToWkwJob(organizationName: String, dataSetName: String, scale: String): Action[AnyContent] = sil.SecuredAction.async { implicit request => diff --git a/conf/webknossos.latest.routes b/conf/webknossos.latest.routes index 4e71750b19a..66c4975e1aa 100644 --- a/conf/webknossos.latest.routes +++ b/conf/webknossos.latest.routes @@ -273,6 +273,7 @@ POST /jobs/run/inferNeurons/:organizationName/:dataSetName POST /jobs/run/materializeVolumeAnnotation/:organizationName/:dataSetName controllers.JobsController.runMaterializeVolumeAnnotationJob(organizationName: String, dataSetName: String, fallbackLayerName: String, annotationId: String, annotationType: String, newDatasetName: String, outputSegmentationLayerName: String, mergeSegments: Boolean, volumeLayerName: Option[String]) POST /jobs/run/findLargestSegmentId/:organizationName/:dataSetName controllers.JobsController.runFindLargestSegmentIdJob(organizationName: String, dataSetName: String, layerName: String) POST /jobs/run/renderAnimation/:organizationName/:dataSetName controllers.JobsController.runRenderAnimationJob(organizationName: String, dataSetName: String) +GET /jobs/isWorkerAvailableFor/:command controllers.JobsController.isWorkerAvailableFor(command: String, dataStoreName: String) GET /jobs/:id controllers.JobsController.get(id: String) PATCH /jobs/:id/cancel controllers.JobsController.cancel(id: String) POST /jobs/:id/status controllers.WKRemoteWorkerController.updateJobStatus(key: String, id: String) From 85a43aff6037c198a70e52c7c3407e1cb2f46403 Mon Sep 17 00:00:00 2001 From: Florian M Date: Tue, 28 Nov 2023 15:25:26 +0100 Subject: [PATCH 06/12] fix syntax error --- conf/evolutions/110-worker-config.sql | 2 -- 1 file changed, 2 deletions(-) diff --git a/conf/evolutions/110-worker-config.sql b/conf/evolutions/110-worker-config.sql index 560c936dc83..2ef3cfefdd5 100644 --- a/conf/evolutions/110-worker-config.sql +++ b/conf/evolutions/110-worker-config.sql @@ -13,8 +13,6 @@ UPDATE webknossos.workers set maxParallelLowPriorityJobs = maxParallelJobs; ALTER TABLE webknossos.workers DROP COLUMN maxParallelJobs; -UPDATE - CREATE VIEW webknossos.workers_ as SELECT * FROM webknossos.workers WHERE NOT isDeleted; UPDATE webknossos.releaseInformation SET schemaVersion = 110; From c2c307726d15863b18c1b5cdbf62644d1434acb6 Mon Sep 17 00:00:00 2001 From: Florian M Date: Thu, 30 Nov 2023 17:39:26 +0100 Subject: [PATCH 07/12] recursion breaker, refactoring --- app/controllers/WKRemoteWorkerController.scala | 9 +++++---- app/models/job/Job.scala | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/app/controllers/WKRemoteWorkerController.scala b/app/controllers/WKRemoteWorkerController.scala index 6ecfe1cb492..e09facdb466 100644 --- a/app/controllers/WKRemoteWorkerController.scala +++ b/app/controllers/WKRemoteWorkerController.scala @@ -21,7 +21,7 @@ class WKRemoteWorkerController @Inject()(jobDAO: JobDAO, jobService: JobService, for { worker <- workerDAO.findOneByKey(key) ?~> "jobs.worker.notFound" _ = workerDAO.updateHeartBeat(worker._id) - _ <- reserveNextJobs(worker) + _ <- reserveNextJobs(worker, pendingIterationCount = 10) assignedUnfinishedJobs: List[Job] <- jobDAO.findAllUnfinishedByWorker(worker._id) jobsToCancel: List[Job] <- jobDAO.findAllCancellingByWorker(worker._id) // make sure that the jobs to run have not already just been cancelled @@ -32,7 +32,7 @@ class WKRemoteWorkerController @Inject()(jobDAO: JobDAO, jobService: JobService, } yield Ok(Json.obj("to_run" -> assignedUnfinishedJs, "to_cancel" -> toCancelJs)) } - private def reserveNextJobs(worker: Worker): Fox[Unit] = + private def reserveNextJobs(worker: Worker, pendingIterationCount: Int): Fox[Unit] = for { unfinishedHighPriorityCount <- jobDAO.countUnfinishedByWorker(worker._id, JobCommand.highPriorityJobs) unfinishedLowPriorityCount <- jobDAO.countUnfinishedByWorker(worker._id, JobCommand.lowPriorityJobs) @@ -45,11 +45,12 @@ class WKRemoteWorkerController @Inject()(jobDAO: JobDAO, jobService: JobService, mayAssignHighPriorityJob = unfinishedHighPriorityCount < worker.maxParallelHighPriorityJobs && pendingHighPriorityCount > 0 mayAssignLowPriorityJob = unfinishedLowPriorityCount < worker.maxParallelLowPriorityJobs && pendingLowPriorityCount > 0 currentlyAssignableJobCommands = assignableJobCommands(mayAssignHighPriorityJob, mayAssignLowPriorityJob) - _ <- if ((unfinishedHighPriorityCount >= worker.maxParallelHighPriorityJobs && unfinishedLowPriorityCount >= worker.maxParallelLowPriorityJobs) || (pendingLowPriorityCount == 0 && pendingHighPriorityCount == 0)) + .intersect(worker.supportedJobCommands) + _ <- if ((!mayAssignHighPriorityJob && !mayAssignLowPriorityJob) || pendingIterationCount == 0) Fox.successful(()) else { jobDAO.reserveNextJob(worker, currentlyAssignableJobCommands).flatMap { _ => - reserveNextJobs(worker) + reserveNextJobs(worker, pendingIterationCount - 1) } } } yield () diff --git a/app/models/job/Job.scala b/app/models/job/Job.scala index 8bbe0fa0f6e..8543becc33b 100644 --- a/app/models/job/Job.scala +++ b/app/models/job/Job.scala @@ -228,7 +228,7 @@ class JobDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext) } yield () def reserveNextJob(worker: Worker, jobCommands: Set[JobCommand]): Fox[Unit] = - if (jobCommands.isEmpty) Fox.successful() + if (jobCommands.isEmpty) Fox.successful(()) else { val query = q""" From c14493372610da40872988b6bd52fb4090409cae Mon Sep 17 00:00:00 2001 From: Florian M Date: Mon, 4 Dec 2023 10:23:13 +0100 Subject: [PATCH 08/12] add jobsSupportedByAvailableWorkers field to dataset json --- app/controllers/Application.scala | 3 ++- app/controllers/JobsController.scala | 5 ----- app/models/dataset/DatasetService.scala | 11 +++++------ app/models/job/Job.scala | 18 +++++++++++------- app/models/job/JobCommand.scala | 6 ++++++ app/models/job/Worker.scala | 6 +++--- conf/application.conf | 2 +- conf/webknossos.latest.routes | 1 - tools/postgres/dbtool.js | 2 +- 9 files changed, 29 insertions(+), 25 deletions(-) diff --git a/app/controllers/Application.scala b/app/controllers/Application.scala index 316526ce3a7..63b74e2ada7 100755 --- a/app/controllers/Application.scala +++ b/app/controllers/Application.scala @@ -9,6 +9,7 @@ import mail.{DefaultMails, Send} import models.analytics.{AnalyticsService, FrontendAnalyticsEvent} import models.organization.OrganizationDAO import models.user.{MultiUserDAO, UserService} +import play.api.http.HeaderNames import play.api.libs.json.{JsObject, Json} import play.api.mvc.{Action, AnyContent, PlayBodyParsers} import security.WkEnv @@ -69,7 +70,7 @@ class Application @Inject()(multiUserDAO: MultiUserDAO, @ApiOperation(hidden = true, value = "") def features: Action[AnyContent] = sil.UserAwareAction { addNoCacheHeaderFallback( - Ok(conf.raw.underlying.getConfig("features").resolve.root.render(ConfigRenderOptions.concise()))) + Ok(conf.raw.underlying.getConfig("features").resolve.root.render(ConfigRenderOptions.concise())).as(jsonMimeType)) } @ApiOperation(value = "Health endpoint") diff --git a/app/controllers/JobsController.scala b/app/controllers/JobsController.scala index d616e2e9b77..efa2c1aa3de 100644 --- a/app/controllers/JobsController.scala +++ b/app/controllers/JobsController.scala @@ -109,11 +109,6 @@ class JobsController @Inject()( } yield Ok(js) } - def isWorkerAvailableFor(command: String, dataStoreName: String) = sil.UserAwareAction.async { implicit request => - // TODO - Fox.successful(Ok) - } - // Note that the dataset has to be registered by reserveUpload via the datastore first. def runConvertToWkwJob(organizationName: String, dataSetName: String, scale: String): Action[AnyContent] = sil.SecuredAction.async { implicit request => diff --git a/app/models/dataset/DatasetService.scala b/app/models/dataset/DatasetService.scala index d83067bba3e..f8551a77899 100644 --- a/app/models/dataset/DatasetService.scala +++ b/app/models/dataset/DatasetService.scala @@ -2,7 +2,6 @@ package models.dataset import com.scalableminds.util.accesscontext.{DBAccessContext, GlobalAccessContext} import com.scalableminds.util.time.Instant -import com.scalableminds.util.tools.JsonHelper.box2Option import com.scalableminds.util.tools.{Fox, FoxImplicits} import com.scalableminds.webknossos.datastore.models.datasource.inbox.{ UnusableDataSource, @@ -16,7 +15,7 @@ import com.scalableminds.webknossos.datastore.models.datasource.{ import com.scalableminds.webknossos.datastore.rpc.RPC import com.typesafe.scalalogging.LazyLogging import models.folder.FolderDAO -import models.job.WorkerDAO +import models.job.JobService import models.organization.{Organization, OrganizationDAO} import models.team._ import models.user.{User, UserService} @@ -34,9 +33,8 @@ class DatasetService @Inject()(organizationDAO: OrganizationDAO, datasetLastUsedTimesDAO: DatasetLastUsedTimesDAO, datasetDataLayerDAO: DatasetLayerDAO, teamDAO: TeamDAO, - workerDAO: WorkerDAO, + jobService: JobService, folderDAO: FolderDAO, - additionalAxesDAO: DatasetLayerAdditionalAxesDAO, dataStoreService: DataStoreService, teamService: TeamService, thumbnailCachingService: ThumbnailCachingService, @@ -329,8 +327,8 @@ class DatasetService @Inject()(organizationDAO: OrganizationDAO, lastUsedByUser <- lastUsedTimeFor(dataset._id, requestingUserOpt) ?~> "dataset.list.fetchLastUsedTimeFailed" dataStoreJs <- dataStoreService.publicWrites(dataStore) ?~> "dataset.list.dataStoreWritesFailed" dataSource <- dataSourceFor(dataset, Some(organization)) ?~> "dataset.list.fetchDataSourceFailed" - worker <- workerDAO.findOneByDataStore(dataStore.name).futureBox - jobsEnabled = conf.Features.jobsEnabled && worker.nonEmpty + jobsSupportedByAvailableWorkers <- jobService.jobsSupportedByAvailableWorkers(dataStore.name) + jobsEnabled = conf.Features.jobsEnabled && jobsSupportedByAvailableWorkers.nonEmpty } yield { Json.obj( "name" -> dataset.name, @@ -351,6 +349,7 @@ class DatasetService @Inject()(organizationDAO: OrganizationDAO, "details" -> dataset.details, "isUnreported" -> Json.toJson(isUnreported(dataset)), "jobsEnabled" -> jobsEnabled, + "jobsSupportedByAvailableWorkers" -> Json.toJson(jobsSupportedByAvailableWorkers), "tags" -> dataset.tags, "folderId" -> dataset._folder, // included temporarily for compatibility with webknossos-libs, until a better versioning mechanism is implemented diff --git a/app/models/job/Job.scala b/app/models/job/Job.scala index 8543becc33b..3c1e498d106 100644 --- a/app/models/job/Job.scala +++ b/app/models/job/Job.scala @@ -436,21 +436,25 @@ class JobService @Inject()(wkConf: WkConf, "job_kwargs" -> job.commandArgs ) - def submitJob(command: JobCommand, commandArgs: JsObject, owner: User, dataStoreName: String)( - implicit ctx: DBAccessContext): Fox[Job] = + def submitJob(command: JobCommand, commandArgs: JsObject, owner: User, dataStoreName: String): Fox[Job] = for { _ <- bool2Fox(wkConf.Features.jobsEnabled) ?~> "job.disabled" - _ <- assertDataStoreHasWorkers(dataStoreName) ?~> "job.noWorkerForDatastore" + _ <- Fox.assertTrue(jobIsSupportedByAvailableWorkers(command, dataStoreName)) ?~> "job.noWorkerForDatastore" job = Job(ObjectId.generate, owner._id, dataStoreName, command, commandArgs) _ <- jobDAO.insertOne(job) _ = analyticsService.track(RunJobEvent(owner, command)) } yield job - private def assertDataStoreHasWorkers(dataStoreName: String)(implicit ctx: DBAccessContext): Fox[Unit] = + def jobsSupportedByAvailableWorkers(dataStoreName: String): Fox[Set[JobCommand]] = for { - _ <- dataStoreDAO.findOneByName(dataStoreName) - _ <- workerDAO.findOneByDataStore(dataStoreName) - } yield () + workers <- workerDAO.findAllByDataStore(dataStoreName) + jobs = if (workers.isEmpty) Set[JobCommand]() else workers.map(_.supportedJobCommands).reduce(_.union(_)) + } yield jobs + + private def jobIsSupportedByAvailableWorkers(command: JobCommand, dataStoreName: String): Fox[Boolean] = + for { + jobs <- jobsSupportedByAvailableWorkers(dataStoreName) + } yield jobs.contains(command) def assertBoundingBoxLimits(boundingBox: String, mag: Option[String]): Fox[Unit] = for { diff --git a/app/models/job/JobCommand.scala b/app/models/job/JobCommand.scala index 278fc4f9a40..dc0650095af 100644 --- a/app/models/job/JobCommand.scala +++ b/app/models/job/JobCommand.scala @@ -5,6 +5,12 @@ import com.scalableminds.util.enumeration.ExtendedEnumeration object JobCommand extends ExtendedEnumeration { type JobCommand = Value + /* NOTE: When adding a new job command here, do + * - Decide if it should be a highPriority job + * - Add it to the dbtool.js command enable-jobs so it is available during development + * - Add it to the migration guide (operators need to decide which workers should provide it) + */ + val compute_mesh_file, convert_to_wkw, export_tiff, find_largest_segment_id, globalize_floodfills, infer_nuclei, infer_neurons, materialize_volume_annotation, render_animation = Value diff --git a/app/models/job/Worker.scala b/app/models/job/Worker.scala index 09a4a313c4a..33fbc589bff 100644 --- a/app/models/job/Worker.scala +++ b/app/models/job/Worker.scala @@ -63,11 +63,11 @@ class WorkerDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext) parsed <- parseFirst(r, "key") } yield parsed - def findOneByDataStore(dataStoreName: String): Fox[Worker] = + def findAllByDataStore(dataStoreName: String): Fox[List[Worker]] = for { r: Seq[WorkersRow] <- run( - q"select $columns from $existingCollectionName where _dataStore = $dataStoreName".as[WorkersRow]) - parsed <- parseFirst(r, "dataStoreName") + q"SELECT $columns FROM $existingCollectionName where _dataStore = $dataStoreName".as[WorkersRow]) + parsed <- parseAll(r) } yield parsed def updateHeartBeat(_id: ObjectId): Unit = { diff --git a/conf/application.conf b/conf/application.conf index e9e35e1e7e0..b1cca532f4b 100644 --- a/conf/application.conf +++ b/conf/application.conf @@ -138,7 +138,7 @@ features { taskReopenAllowedInSeconds = 30 allowDeleteDatasets = true # to enable jobs for local development, use "yarn enable-jobs" to also activate it in the database - jobsEnabled = true + jobsEnabled = false voxelyticsEnabled = false # For new users, the dashboard will show a banner which encourages the user to check out the following dataset. # If isWkorgInstance == true, `/createExplorative/hybrid/true` is appended to the URL so that a new tracing is opened. diff --git a/conf/webknossos.latest.routes b/conf/webknossos.latest.routes index 66c4975e1aa..4e71750b19a 100644 --- a/conf/webknossos.latest.routes +++ b/conf/webknossos.latest.routes @@ -273,7 +273,6 @@ POST /jobs/run/inferNeurons/:organizationName/:dataSetName POST /jobs/run/materializeVolumeAnnotation/:organizationName/:dataSetName controllers.JobsController.runMaterializeVolumeAnnotationJob(organizationName: String, dataSetName: String, fallbackLayerName: String, annotationId: String, annotationType: String, newDatasetName: String, outputSegmentationLayerName: String, mergeSegments: Boolean, volumeLayerName: Option[String]) POST /jobs/run/findLargestSegmentId/:organizationName/:dataSetName controllers.JobsController.runFindLargestSegmentIdJob(organizationName: String, dataSetName: String, layerName: String) POST /jobs/run/renderAnimation/:organizationName/:dataSetName controllers.JobsController.runRenderAnimationJob(organizationName: String, dataSetName: String) -GET /jobs/isWorkerAvailableFor/:command controllers.JobsController.isWorkerAvailableFor(command: String, dataStoreName: String) GET /jobs/:id controllers.JobsController.get(id: String) PATCH /jobs/:id/cancel controllers.JobsController.cancel(id: String) POST /jobs/:id/status controllers.WKRemoteWorkerController.updateJobStatus(key: String, id: String) diff --git a/tools/postgres/dbtool.js b/tools/postgres/dbtool.js index 2ca29afcf4f..1bb8dcedba0 100755 --- a/tools/postgres/dbtool.js +++ b/tools/postgres/dbtool.js @@ -388,7 +388,7 @@ program console.log("Enabling jobs in the local database by inserting a worker."); console.log( callPsql( - `INSERT INTO webknossos.workers(_id, _dataStore, key) VALUES('6194dc03040200b0027f28a1', 'localhost', 'secretWorkerKey') ON CONFLICT DO NOTHING;`, + `INSERT INTO webknossos.workers(_id, _dataStore, key, supportedJobCommands) VALUES('6194dc03040200b0027f28a1', 'localhost', 'secretWorkerKey', '{compute_mesh_file, convert_to_wkw, export_tiff, find_largest_segment_id, globalize_floodfills, infer_nuclei, infer_neurons, materialize_volume_annotation, render_animation}') ON CONFLICT DO NOTHING;`, ), ); console.log("✨✨ Done"); From 8dc8998b654614e1d3bf408d3be4471c873d1e4e Mon Sep 17 00:00:00 2001 From: Florian M Date: Mon, 4 Dec 2023 10:36:21 +0100 Subject: [PATCH 09/12] remove unused stuff --- app/controllers/Application.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/app/controllers/Application.scala b/app/controllers/Application.scala index 63b74e2ada7..6c904f9a328 100755 --- a/app/controllers/Application.scala +++ b/app/controllers/Application.scala @@ -8,8 +8,7 @@ import io.swagger.annotations.{Api, ApiOperation, ApiResponse, ApiResponses} import mail.{DefaultMails, Send} import models.analytics.{AnalyticsService, FrontendAnalyticsEvent} import models.organization.OrganizationDAO -import models.user.{MultiUserDAO, UserService} -import play.api.http.HeaderNames +import models.user.UserService import play.api.libs.json.{JsObject, Json} import play.api.mvc.{Action, AnyContent, PlayBodyParsers} import security.WkEnv @@ -20,8 +19,7 @@ import javax.inject.Inject import scala.concurrent.ExecutionContext @Api -class Application @Inject()(multiUserDAO: MultiUserDAO, - actorSystem: ActorSystem, +class Application @Inject()(actorSystem: ActorSystem, analyticsService: AnalyticsService, userService: UserService, releaseInformationDAO: ReleaseInformationDAO, From 7e8ceffd342f4d727307a7fc7cee201956aa084f Mon Sep 17 00:00:00 2001 From: Florian M Date: Mon, 4 Dec 2023 10:57:26 +0100 Subject: [PATCH 10/12] refresh snapshots --- .../backend-snapshot-tests/datasets.e2e.js.md | 2 ++ .../datasets.e2e.js.snap | Bin 3460 -> 3507 bytes 2 files changed, 2 insertions(+) diff --git a/frontend/javascripts/test/snapshots/public-test/test-bundle/test/backend-snapshot-tests/datasets.e2e.js.md b/frontend/javascripts/test/snapshots/public-test/test-bundle/test/backend-snapshot-tests/datasets.e2e.js.md index 01341f763ce..79d865b740a 100644 --- a/frontend/javascripts/test/snapshots/public-test/test-bundle/test/backend-snapshot-tests/datasets.e2e.js.md +++ b/frontend/javascripts/test/snapshots/public-test/test-bundle/test/backend-snapshot-tests/datasets.e2e.js.md @@ -266,6 +266,7 @@ Generated by [AVA](https://avajs.dev). isPublic: false, isUnreported: false, jobsEnabled: false, + jobsSupportedByAvailableWorkers: [], lastUsedByUser: 0, logoUrl: '/assets/images/mpi-logos.svg', name: 'confocal-multi_knossos', @@ -403,6 +404,7 @@ Generated by [AVA](https://avajs.dev). isPublic: false, isUnreported: false, jobsEnabled: false, + jobsSupportedByAvailableWorkers: [], lastUsedByUser: 0, logoUrl: '/assets/images/mpi-logos.svg', name: 'l4_sample', diff --git a/frontend/javascripts/test/snapshots/public-test/test-bundle/test/backend-snapshot-tests/datasets.e2e.js.snap b/frontend/javascripts/test/snapshots/public-test/test-bundle/test/backend-snapshot-tests/datasets.e2e.js.snap index 0790b3ed276a9894d310358d98399dc9e32a019d..59571fe48cc3729e6a1b0b617832ef2d7bdb4a11 100644 GIT binary patch literal 3507 zcmV;k4NUSuRzVn`9dTX|`ljpaQ;N24Cpu z$sj6tr*b3SjIZNlgyA^F*WBryio-dMj!#a!^YlCwM?k&rZklcOmyne5Dl;(ip_~1G z|NsAQfB(n+|GU2pF~)di@JQx^?{p58#(oC%|ymtEqyKg<;4ZBY@ATEu541_ z{hWGs=p@FBEJH1svv5W`{vrdzY1W@J=X0jX;jqxjaj~P^++4>_YESOzwNm%$-mHluhj9V`V8fIoxd zU^u+82&@HZ{IX+#tr~aNKSP2XIElnesWC5DZEyMc-AT&?c*?AH<~&icf9q zXzK_Hk-!R(ak;3;5L0D%!=exoed=%C+_<*6zJ*iE;w{}}W%-1N&@6R?y`q{{9ha7= zrbVPME!yf6Wp6mp9tlVx^_(%AR_+rcLLew7Pqq32a(htdoGrA8$rYAXDd-c!GkmcV z1vPbcPkpOPba^~(m)mD|yK6+D-cd(02IR?J#T)7gLoP|q3dmD^fr#J>;_E6}qdftF<38_LukL{6o=5_ZYvI_7(#nGw5oOg|_w}ZT7P|YU289Yi7czv2cc)arTx>{q5SQyIn(c4+QrSk%@@- zhaq?jya?XOBzL?Fdb2fs1<7}yAd4|KxF(Ba8BMb2YA@HNeKQ3-UNrJ!IcTucpMUg6^jHg6)dxoEK^clWzcsicdsZ^fV9+U)M z@);pBAUAu%Ld3fy`FJN|N4fVbk>m(b89Ta1=D;P9Nc;GzDos1S-fp+YUHlC;y)&M$ z>7DVu?ZnP7XD2$tk$wI-1AX=ynC%0b!Jcf=0eRYI`7a^)7L?~O)|5l!<^=imkgNxH zf-O1v;bS4g$M`4&kAwGVxg!(Gwd69E14=+uuD;w+bXYb+un^nXgLYi0C%2@&W>1!C4~BPRerz{+T|SGA z`KOgbgKD+m)EB(GVrUvmY)XdcJi#k#QuQ~-(G$@!`3$a~%T-xZkAH4goy*~L*y~+= z&&hqw9_7lpDxK9z+tt-M91gdm)waV+$)ppBIj28vO27f;gPXwi{v=z!glsQB@+vq4tOFRU0SgC^ zY=sHgRzR`_Yytbg=fFCUWV6$38Dk)+1WjNScm%u-j?k?22@gh0>aDl`pzd3*)gyI; zd|2f+NUP|!1B+wPFNHg`+q72-N@3c?#Gu$Fh9ZsF56GIbBM^$XQ+8OjtQLoA(6o4i z!lEn%JCvRx$JxBP@7U~A4SFKI&7brgo2ylW1`SSYbJO5+Z*#0Hm~(w9i_NO!552V5 zCX0T>ICWC$iQ#6&@MVBdHzN@y`=pD)v5w1&j>? z6TvlLWdRxMSqZbtCP*Ft+rd*{4|usizc{jyMVIMK2>t=S0|SOIRxyOs(?6k}CP?Oj zMW7Wd2Wy7t>lsMv*#N;t@O$tV@CmJ_D4`y1C}SBQ9~6Od;2f&2r-asXDFlr`05^gy zLrFc3gnD*CvKPDp-U9D~!=#>STF*BS`~U_QGFA)b6q0(zC)Cphi44|)TfncuwnBY9 zm(ay4-5!~IVtXXI@8NNyCaLBnV!*#75=%Cy{cJ~JcR$OBNbR%4R=O85tql0OOEswX zYm>SzI9M24AsCCaWud)@#a0lDmBZ)?@;2p=KV4SuDmn$LKP`$`onNe3eOa+)^|E5k z>iddMr`0bNpMur@E7n$HC7RWKDqQR%74!6%M}x{4i8K zOh0t3bXz?Vf^lFxXd>clD*hz|9uNlWh&YFe?}gxg@Hp5*#JN=b8U$~FkHI%YoJYm{ zaK^I0P;k+3y*QtWFM*%|%mo4w51``Z5Ud0nzCYW@Fds=-XP+^RQwSHpMmdz zWrSW_K*d8KC z1jm7GBxA)R_2M!ru7tn^rhs`wJdBE4AqavsU?UL^r{Zl8JPLM#mxy=-6(5A)J@6m! zGZ8zfIDZswq@WzQN9n~bDxLws9MA&ViMWP}Z-QV0*bE*c;#w+x4uXB)@8ClsuA}0w zA^09-jApE0v|j9{;;|4^fl1&>BA!6SJ_weA>%i?qd?^(_2*JbP8L*#-FQekOA$S-3 z3mhZjiBxPWXRIF>4yw!b;z?9I1%jF2YOsumC)0=R0` zE@?gk&1azb3`z6xG#^j%@k#SpXg&+gXGxkbkLJsx`SLC}^MW(aA7>hvjbw~FW6-aM z&z?(@=S%O>d%I%tJ_BW# zc&<$wkC(jyk~QEr;016DRCdMUg}~k5S#TH(Gh+#FPWN^*dnF`m!EeF-u28)yIB_J2?>hq!T2SyveUTJ)BmBUrjVy|08AK;(m4p7yb4)F8zKGy-L8~bDj z$eFsTT?_}57cDJzJDsr3a?3aMrsZRSy62?j>&1F+tHD&CoyT z@YWv0ubzC%pW&EmeGg!2`}%v+zKi=_`_`Nf?R)wIf|E7ry52PD70pY|@nr3*CX;pT z`7l`UOp#_ykrw~XC~oV@yYP2Bm;&ac+pZd; zujjPw>elY<>W3`4UA1tTBd|vu3ouc!4PZBT2OI^%urr-zNVogUTnvd9Yyr=MZ$Uq7 zUR_`jxY3v{#yxX0BwN7&a12zUJI(6)7Ht#QP4~cA37aeneIY2ep*Luwrvn|eW%48? z8d>QfZGn(auJlT6G~l>35SAme$`=jlbNRZ0`daB!f`REny~)Rv6{#U9 z)Y&F=b?tNzS)>bXYMb0bWNp2JKw9j9l5 z1;A@c^Hq)V2rEKFG-TdzJp}8C$eMgkywen$6Ze|5Iq?fqbWYU1f+#fWzlxY?j!naz zIJyBm0}iHrl*S)bGGY?12J`6`uBHU}YDjJZJHb0tp7>qCaYzgn#%jPq`VcN543=LD z$vW^Xc$dl(A6*$wJ(xK=AS+)ErW|@F2zX9sK)u`T^i7DaZaUBm*34uWI` zr2LSi%Yf=$8c^F41A5|t(-z-;2AiPw2K1x}9C523lN*081AcHf*pqGr#Oq!Zq&7GH h4MOU^75{1>bti}_jor7qq)J!o{{d}ek_m`F008@tlT!cy literal 3460 zcmV-~4SVuIRzV_SH@M_*&mAt00000000B+ zn+tRl)fva{>?@mOH`xRNfjoJb3QDrO*(4jNLK2`*-VIdLL*wl3WLdh|2|E)YBGro2 z_9!YoD;DdcJ#BjGQR@Q)sg+`tKC~VyQlu3N&CT5Xp&(~IKXIv5E@;GijghM$alz;{S$YFAR~u;kR1u8yuS zFNanLjLSz&hPW!r9~F362&lhJmXiltW@fJ!iDj$^(MThr&|o)Z9Qw>J0PUbNLP-wZhsih66%$W*~l|xURwJ zYi#!j9-q(a@dlh;Z=Jw5x*BN4kksT?yrG^jLj_#M4vP*qEyj`A}`EK@*+tHGGii@J*`0vsh zUXlb^stI-QK|!kN=nPFzBvQ4sB1rO@B5}tmy~Qs^+C@Jfo`8-bhuXd!5hY2KNR~oz zWh4{{&Wi^5Na#vF(XxuU$tT+u)64Bt-NpCa7jh(c3uVs|y7hhWNK$_@$$U^s8WF|T zbR*&->j{pCCC9ArS^FLl8?5I>#Q)cb$PRnjB)+3FOk44^4uYiax6`T{1Xj+lA7|gT zYHy-O%}o@d4^e5Wik8Z9=fs+tgmz-1)9FmQ_`l_9 zonf){JU;c?Ti*7!*55nN2zqS|FJT!r&H%w%le-xv}o5X?S!< zdY|>V`3ApJsP#FW-T+@O_|x^MEEy}2w#6>mamD`JS_YdvIjTJv-uFA=d$m0d79aEZ z1x($`+139~bRB9^49#GPP00|OCrsS1-rpQYPsGYJWpRZbPjy`*{&_tO9+%tYZ1fC1 zCl5Az5*zgC1@T5-QxI$POfB8#`s#Qa7ft7KRl94Iwrgl`xm;dXy@#}ST1{^k;7XMl z!LkMR{~ElmTt^{e7l3(%WFpC?6Nx1ZNfg`+o&awE;}DXqP?xO|k}A*wLf|&AnPw}| zW%~;xd%eW4dFsrxTstjlu-2ZH;-GvM#w zGg^;RS5MJM#!A3>zzHUS=_9rExM@8LAh-l{f!o1DBS}4tx_bTq$;;q%@HY4e93k~g zqKj9$Ju(M`PC2&kG2uo{QY}k`P;jXnPd2FiEUV;LKg*KE&e=jc-3yslh62Y*HK_M% zv$`(ul*CsEd`V(icvA_BuOPN5hp`ppyUL;IWLaHQdIDBAm&UCYOA}WAs5D{quF{0n zhe}VT)nm#|!0P#BiPczH!s=g?C9Hm}EMc{&yq{LnRARMBwc1sFoK~BXto~Mc-0B<3 z6IO32Pgs4RJYjWV#mTgKa>WT)9j=I5eRoB|>fco)tp21TVRdC?%xdgA=ismv_&~H$ zJ9KmDw)$EKz7Ku~dWhIg#akix4fr#7lZf-E_+tqE1FWMk%a78E^Qm||1WqsoEFfYB z75gAq3RZz15ODz&Z-n4J@Gy9Wh=)?~OAx#Y4uC^MJdBF7Ml)syWuSVrRy>@FCqpm| zECTIBTuj9)Ay@;}gL{d11QkCF!4u#)u$zcSQt>+wybq25+Ze65jEYMk7!BN@g@`Mt zcp(Irfn{Jd5m!?2jS$=l9srLK@hB?Z3Bm8dUhp0fkEY@y5E#ZXRtUz7)r!YZu?K>Q zU=~e;#0^w@Jp}8)z2Ffdo=C+zAb0`n0skc8uTt@02v`+k4lt@pE53+|-4J-e z3~(tCUrfczAm{|wfDJ@EnTj8P;OAfq*hR!s=)-pMB!H1!KO;MQmh#yUN1qLG^uYj^ zGM|CwGthj7l=)0FpNZx(rOaog`K&abHDx{r&F7%`9Os;Q&Y5SAGY!l}GA5ldXxGE1 z&!wsJWp-)me3@OE`uZ}vG-VT>bLk1WG%2zzONnTk4(5Z)K|5FuuF4eYW>NPD*C!&~ z_OirwVgi@gPF%w!wiB;#vF!v7s|-xtk61Z-Hx91`4}#~wm%wF&JzzPw9Xtiz0hO3| zuE-eiW?uoxRp36b1AGCh&5X?f5wHO~3El;xER6A%Om8>sD&q?Ht@djo1;4#`Sz57^!t@GveLYZMRyH-bmOKR{UyW0UBMeygs?1&~|< zR)bAoH^|Oq?0hg2guw=|8GH!xY?+>|*~daM9?S)upa<*&Uw~3OW0OGuTnDy;qx99U zP4{TtmOg^XFtgpfkUn!bg`q?#UPx)O2n{a!vWY|=(bw=9qmNRA7g0&Sf`>HESqt1w zmwSSE-5Xd6Qyxf&|Iy`EwSQ!>0K_Obnui!&rY0RNpa=sEy( z=|kKMxoK)?hE+%`dT;dck@24&t2=w zF!YZ*>a-tWrN`g$r#KW^+XI-|zQKXC?~=jSzIA6q`=0#3-c(Jxb|6hUzjf&uo~#44 zWU{V58zyUGH0V3CGxU%3&8-a-CNbHXG;^A?tTbuaY0`4jq$R&IO4_=bJoq~iOa=2Y zZC8!z#`Choc6HOS?dpCO+pb!<>@nD*jsuve*!5s5*aHrOQP`PIH)PsRHHl11`%)v*h=@nIl4`jmA()ZI?x*uqo)fU zwXJE25{<0%kd9C!AXWRt4jORW9*Rox+@!8+iRH|^iVDPn`aFT&puToGm0(~7P;crn zm5bDf80qd1d%L!7_mx0`&Go( zE%9l%8%I}zUxD2jAElWND;Y6~*Mdvvccf;Wd^IHB1&@P0RIdN7K>75}U}dZhTuL9p z>B3-5Yam$*o&t3Gc;dm#_K>7}(3f`T^%3x#$$&<$*X^rqa0a~IMl6b)Zl{0+ z5*-Ao3`qMSNv{F512mw%KL+&211BxM{S-FAzzyh66S$IAKV~ofUJQa@1K5^n1!U5^ mC`fN^@*9NoeJlCZLi$b+QyP!m?oui}?f(NA0*+bcKL7v}sC=RT From 5d4aa6d8e2bdd7a9bddb08d1d6acabf81c69cb4d Mon Sep 17 00:00:00 2001 From: Florian M Date: Thu, 7 Dec 2023 11:55:47 +0100 Subject: [PATCH 11/12] add links to pr in guides --- CHANGELOG.unreleased.md | 2 +- MIGRATIONS.unreleased.md | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.unreleased.md b/CHANGELOG.unreleased.md index 51a78d1aff9..b26665b1a17 100644 --- a/CHANGELOG.unreleased.md +++ b/CHANGELOG.unreleased.md @@ -18,7 +18,7 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released ### Changed - Improved loading speed of the annotation list. [#7410](https://github.com/scalableminds/webknossos/pull/7410) - Admins and Team Managers can now also download job exports for jobs of other users, if they have the link. [#7462](https://github.com/scalableminds/webknossos/pull/7462) -- Processing jobs can now be distributed to multiple webknossos-workers with finer-grained configurability. Compare migration guide. +- Processing jobs can now be distributed to multiple webknossos-workers with finer-grained configurability. Compare migration guide. [#7463](https://github.com/scalableminds/webknossos/pull/7463) ### Fixed - Datasets with annotations can now be deleted. The concerning annotations can no longer be viewed but still be downloaded. [#7429](https://github.com/scalableminds/webknossos/pull/7429) diff --git a/MIGRATIONS.unreleased.md b/MIGRATIONS.unreleased.md index 179418eadaa..f389ca6622b 100644 --- a/MIGRATIONS.unreleased.md +++ b/MIGRATIONS.unreleased.md @@ -7,8 +7,8 @@ User-facing changes are documented in the [changelog](CHANGELOG.released.md). ## Unreleased [Commits](https://github.com/scalableminds/webknossos/compare/23.11.0...HEAD) -- If your setup contains webknossos-workers, postgres evolution 110 introduces the column `supportedJobCommands`. This needs to be filled in manually for your workers. Currently available job commands are `compute_mesh_file`, `compute_segment_index_file`, `convert_to_wkw`, `export_tiff`, `find_largest_segment_id`, `infer_nuclei`, `infer_neurons`, `materialize_volume_annotation`, `render_animation`. -- If your setup contains webknossos-workers, postgres evolution 110 introduces the columns `maxParallelHighPriorityJobs` and `maxParallelLowPriorityJobs`. Make sure to set those values to match what you want for your deployment. +- If your setup contains webknossos-workers, postgres evolution 110 introduces the column `supportedJobCommands`. This needs to be filled in manually for your workers. Currently available job commands are `compute_mesh_file`, `compute_segment_index_file`, `convert_to_wkw`, `export_tiff`, `find_largest_segment_id`, `infer_nuclei`, `infer_neurons`, `materialize_volume_annotation`, `render_animation`. [#7463](https://github.com/scalableminds/webknossos/pull/7463) +- If your setup contains webknossos-workers, postgres evolution 110 introduces the columns `maxParallelHighPriorityJobs` and `maxParallelLowPriorityJobs`. Make sure to set those values to match what you want for your deployment. [#7463](https://github.com/scalableminds/webknossos/pull/7463) ### Postgres Evolutions: From 3a0883b470e16a190730907e1b4b7a70c1cd9f81 Mon Sep 17 00:00:00 2001 From: Florian M Date: Mon, 11 Dec 2023 10:18:20 +0100 Subject: [PATCH 12/12] sql capitalization, remove debug logging --- app/models/job/Job.scala | 1 - app/models/job/Worker.scala | 6 +++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/app/models/job/Job.scala b/app/models/job/Job.scala index 83ddd68572f..c38e53cdbeb 100644 --- a/app/models/job/Job.scala +++ b/app/models/job/Job.scala @@ -267,7 +267,6 @@ class JobDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext) WHERE j._id = subquery._id """.asUpdate for { - _ <- Fox.successful(logger.info("reserve next job")) _ <- run( query.withTransactionIsolation(Serializable), retryCount = 50, diff --git a/app/models/job/Worker.scala b/app/models/job/Worker.scala index 33fbc589bff..1bb8119911e 100644 --- a/app/models/job/Worker.scala +++ b/app/models/job/Worker.scala @@ -59,19 +59,19 @@ class WorkerDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext) def findOneByKey(key: String): Fox[Worker] = for { - r: Seq[WorkersRow] <- run(q"select $columns from $existingCollectionName where key = $key".as[WorkersRow]) + r: Seq[WorkersRow] <- run(q"SELECT $columns FROM $existingCollectionName WHERE key = $key".as[WorkersRow]) parsed <- parseFirst(r, "key") } yield parsed def findAllByDataStore(dataStoreName: String): Fox[List[Worker]] = for { r: Seq[WorkersRow] <- run( - q"SELECT $columns FROM $existingCollectionName where _dataStore = $dataStoreName".as[WorkersRow]) + q"SELECT $columns FROM $existingCollectionName WHERE _dataStore = $dataStoreName".as[WorkersRow]) parsed <- parseAll(r) } yield parsed def updateHeartBeat(_id: ObjectId): Unit = { - run(q"update webknossos.workers set lastHeartBeat = NOW() where _id = ${_id}".asUpdate) + run(q"UPDATE webknossos.workers SET lastHeartBeat = NOW() WHERE _id = ${_id}".asUpdate) // Note that this should not block the jobs polling operation, failures here are not critical () }