Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finer Control Over Which Worker Does What #7463

Merged
merged 14 commits into from
Dec 11, 2023
3 changes: 2 additions & 1 deletion CHANGELOG.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ For upgrade instructions, please check the [migration guide](MIGRATIONS.released

### Changed
- Improved loading speed of the annotation list. [#7410](https://github.com/scalableminds/webknossos/pull/7410)
- Admins and Team Managers can now also download job exports for jobs of other users, if they have the link. [7462](https://github.com/scalableminds/webknossos/pull/7462)
- Admins and Team Managers can now also download job exports for jobs of other users, if they have the link. [#7462](https://github.com/scalableminds/webknossos/pull/7462)
- Updated some dependencies of the backend code (play 2.9, sbt 1.9, minor upgrades for others) for optimized performance. [#7366](https://github.com/scalableminds/webknossos/pull/7366)
- Processing jobs can now be distributed to multiple webknossos-workers with finer-grained configurability. Compare migration guide. [#7463](https://github.com/scalableminds/webknossos/pull/7463)

### Fixed
- Datasets with annotations can now be deleted. The concerning annotations can no longer be viewed but still be downloaded. [#7429](https://github.com/scalableminds/webknossos/pull/7429)
Expand Down
5 changes: 4 additions & 1 deletion MIGRATIONS.unreleased.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ User-facing changes are documented in the [changelog](CHANGELOG.released.md).
## Unreleased
[Commits](https://github.com/scalableminds/webknossos/compare/23.11.0...HEAD)
- The config `setting play.http.secret.key` (secret random string) now requires a minimum length of 32 bytes.

- If your setup contains webknossos-workers, postgres evolution 110 introduces the column `supportedJobCommands`. This needs to be filled in manually for your workers. Currently available job commands are `compute_mesh_file`, `compute_segment_index_file`, `convert_to_wkw`, `export_tiff`, `find_largest_segment_id`, `infer_nuclei`, `infer_neurons`, `materialize_volume_annotation`, `render_animation`. [#7463](https://github.com/scalableminds/webknossos/pull/7463)
- If your setup contains webknossos-workers, postgres evolution 110 introduces the columns `maxParallelHighPriorityJobs` and `maxParallelLowPriorityJobs`. Make sure to set those values to match what you want for your deployment. [#7463](https://github.com/scalableminds/webknossos/pull/7463)

### Postgres Evolutions:

- [110-worker-config.sql](conf/evolutions/110-worker-config.sql)
7 changes: 3 additions & 4 deletions app/controllers/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import io.swagger.annotations.{Api, ApiOperation, ApiResponse, ApiResponses}
import mail.{DefaultMails, Send}
import models.analytics.{AnalyticsService, FrontendAnalyticsEvent}
import models.organization.OrganizationDAO
import models.user.{MultiUserDAO, UserService}
import models.user.UserService
import play.api.libs.json.{JsObject, Json}
import play.api.mvc.{Action, AnyContent, PlayBodyParsers}
import security.WkEnv
Expand All @@ -19,8 +19,7 @@ import javax.inject.Inject
import scala.concurrent.ExecutionContext

@Api
class Application @Inject()(multiUserDAO: MultiUserDAO,
actorSystem: ActorSystem,
class Application @Inject()(actorSystem: ActorSystem,
analyticsService: AnalyticsService,
userService: UserService,
releaseInformationDAO: ReleaseInformationDAO,
Expand Down Expand Up @@ -69,7 +68,7 @@ class Application @Inject()(multiUserDAO: MultiUserDAO,
@ApiOperation(hidden = true, value = "")
def features: Action[AnyContent] = sil.UserAwareAction {
addNoCacheHeaderFallback(
Ok(conf.raw.underlying.getConfig("features").resolve.root.render(ConfigRenderOptions.concise())))
Ok(conf.raw.underlying.getConfig("features").resolve.root.render(ConfigRenderOptions.concise())).as(jsonMimeType))
}

@ApiOperation(value = "Health endpoint")
Expand Down
34 changes: 27 additions & 7 deletions app/controllers/WKRemoteWorkerController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package controllers

import com.scalableminds.util.accesscontext.GlobalAccessContext
import com.scalableminds.util.tools.Fox
import models.job.JobCommand.JobCommand

import javax.inject.Inject
import models.job._
import play.api.libs.json.Json
Expand All @@ -19,7 +21,7 @@ class WKRemoteWorkerController @Inject()(jobDAO: JobDAO, jobService: JobService,
for {
worker <- workerDAO.findOneByKey(key) ?~> "jobs.worker.notFound"
_ = workerDAO.updateHeartBeat(worker._id)
_ <- reserveNextJobs(worker)
_ <- reserveNextJobs(worker, pendingIterationCount = 10)
assignedUnfinishedJobs: List[Job] <- jobDAO.findAllUnfinishedByWorker(worker._id)
jobsToCancel: List[Job] <- jobDAO.findAllCancellingByWorker(worker._id)
// make sure that the jobs to run have not already just been cancelled
Expand All @@ -30,18 +32,36 @@ class WKRemoteWorkerController @Inject()(jobDAO: JobDAO, jobService: JobService,
} yield Ok(Json.obj("to_run" -> assignedUnfinishedJs, "to_cancel" -> toCancelJs))
}

private def reserveNextJobs(worker: Worker): Fox[Unit] =
private def reserveNextJobs(worker: Worker, pendingIterationCount: Int): Fox[Unit] =
for {
unfinishedCount <- jobDAO.countUnfinishedByWorker(worker._id)
pendingCount <- jobDAO.countUnassignedPendingForDataStore(worker._dataStore)
_ <- if (unfinishedCount >= worker.maxParallelJobs || pendingCount == 0) Fox.successful(())
unfinishedHighPriorityCount <- jobDAO.countUnfinishedByWorker(worker._id, JobCommand.highPriorityJobs)
unfinishedLowPriorityCount <- jobDAO.countUnfinishedByWorker(worker._id, JobCommand.lowPriorityJobs)
pendingHighPriorityCount <- jobDAO.countUnassignedPendingForDataStore(
worker._dataStore,
JobCommand.highPriorityJobs.intersect(worker.supportedJobCommands))
pendingLowPriorityCount <- jobDAO.countUnassignedPendingForDataStore(
worker._dataStore,
JobCommand.lowPriorityJobs.intersect(worker.supportedJobCommands))
mayAssignHighPriorityJob = unfinishedHighPriorityCount < worker.maxParallelHighPriorityJobs && pendingHighPriorityCount > 0
mayAssignLowPriorityJob = unfinishedLowPriorityCount < worker.maxParallelLowPriorityJobs && pendingLowPriorityCount > 0
currentlyAssignableJobCommands = assignableJobCommands(mayAssignHighPriorityJob, mayAssignLowPriorityJob)
.intersect(worker.supportedJobCommands)
_ <- if ((!mayAssignHighPriorityJob && !mayAssignLowPriorityJob) || pendingIterationCount == 0)
Fox.successful(())
else {
jobDAO.reserveNextJob(worker).flatMap { _ =>
reserveNextJobs(worker)
jobDAO.reserveNextJob(worker, currentlyAssignableJobCommands).flatMap { _ =>
reserveNextJobs(worker, pendingIterationCount - 1)
}
}
} yield ()

private def assignableJobCommands(mayAssignHighPriorityJob: Boolean,
mayAssignLowPriorityJob: Boolean): Set[JobCommand] = {
val lowPriorityOrEmpty = if (mayAssignLowPriorityJob) JobCommand.lowPriorityJobs else Set()
val highPriorityOrEmpty = if (mayAssignHighPriorityJob) JobCommand.highPriorityJobs else Set()
lowPriorityOrEmpty ++ highPriorityOrEmpty
}

def updateJobStatus(key: String, id: String): Action[JobStatus] = Action.async(validateJson[JobStatus]) {
implicit request =>
for {
Expand Down
11 changes: 5 additions & 6 deletions app/models/dataset/DatasetService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package models.dataset

import com.scalableminds.util.accesscontext.{DBAccessContext, GlobalAccessContext}
import com.scalableminds.util.time.Instant
import com.scalableminds.util.tools.JsonHelper.box2Option
import com.scalableminds.util.tools.{Fox, FoxImplicits}
import com.scalableminds.webknossos.datastore.models.datasource.inbox.{
UnusableDataSource,
Expand All @@ -16,7 +15,7 @@ import com.scalableminds.webknossos.datastore.models.datasource.{
import com.scalableminds.webknossos.datastore.rpc.RPC
import com.typesafe.scalalogging.LazyLogging
import models.folder.FolderDAO
import models.job.WorkerDAO
import models.job.JobService
import models.organization.{Organization, OrganizationDAO}
import models.team._
import models.user.{User, UserService}
Expand All @@ -34,9 +33,8 @@ class DatasetService @Inject()(organizationDAO: OrganizationDAO,
datasetLastUsedTimesDAO: DatasetLastUsedTimesDAO,
datasetDataLayerDAO: DatasetLayerDAO,
teamDAO: TeamDAO,
workerDAO: WorkerDAO,
jobService: JobService,
folderDAO: FolderDAO,
additionalAxesDAO: DatasetLayerAdditionalAxesDAO,
dataStoreService: DataStoreService,
teamService: TeamService,
thumbnailCachingService: ThumbnailCachingService,
Expand Down Expand Up @@ -329,8 +327,8 @@ class DatasetService @Inject()(organizationDAO: OrganizationDAO,
lastUsedByUser <- lastUsedTimeFor(dataset._id, requestingUserOpt) ?~> "dataset.list.fetchLastUsedTimeFailed"
dataStoreJs <- dataStoreService.publicWrites(dataStore) ?~> "dataset.list.dataStoreWritesFailed"
dataSource <- dataSourceFor(dataset, Some(organization)) ?~> "dataset.list.fetchDataSourceFailed"
worker <- workerDAO.findOneByDataStore(dataStore.name).futureBox
jobsEnabled = conf.Features.jobsEnabled && worker.nonEmpty
jobsSupportedByAvailableWorkers <- jobService.jobsSupportedByAvailableWorkers(dataStore.name)
jobsEnabled = conf.Features.jobsEnabled && jobsSupportedByAvailableWorkers.nonEmpty
} yield {
Json.obj(
"name" -> dataset.name,
Expand All @@ -351,6 +349,7 @@ class DatasetService @Inject()(organizationDAO: OrganizationDAO,
"details" -> dataset.details,
"isUnreported" -> Json.toJson(isUnreported(dataset)),
"jobsEnabled" -> jobsEnabled,
"jobsSupportedByAvailableWorkers" -> Json.toJson(jobsSupportedByAvailableWorkers),
"tags" -> dataset.tags,
"folderId" -> dataset._folder,
// included temporarily for compatibility with webknossos-libs, until a better versioning mechanism is implemented
Expand Down
74 changes: 45 additions & 29 deletions app/models/job/Job.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,25 +159,33 @@ class JobDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext)
parsed <- parseFirst(r, jobId)
} yield parsed

def countUnassignedPendingForDataStore(dataStoreName: String): Fox[Int] =
for {
r <- run(q"""SELECT COUNT(_id) FROM $existingCollectionName
def countUnassignedPendingForDataStore(dataStoreName: String, jobCommands: Set[JobCommand]): Fox[Int] =
if (jobCommands.isEmpty) Fox.successful(0)
else {
for {
r <- run(q"""SELECT COUNT(_id) from $existingCollectionName
WHERE state = ${JobState.PENDING}
AND command IN ${SqlToken.tupleFromList(jobCommands)}
AND manualState IS NULL
AND _dataStore = $dataStoreName
AND _worker IS NULL""".as[Int])
head <- r.headOption
} yield head
head <- r.headOption
} yield head
}

def countUnfinishedByWorker(workerId: ObjectId): Fox[Int] =
for {
r <- run(q"""SELECT COUNT(_id)
def countUnfinishedByWorker(workerId: ObjectId, jobCommands: Set[JobCommand]): Fox[Int] =
if (jobCommands.isEmpty) Fox.successful(0)
else {
for {
r <- run(q"""SELECT COUNT(_id)
FROM $existingCollectionName
WHERE _worker = $workerId
AND state in ${SqlToken.tupleFromValues(JobState.PENDING, JobState.STARTED)}
AND state IN ${SqlToken.tupleFromValues(JobState.PENDING, JobState.STARTED)}
AND command IN ${SqlToken.tupleFromList(jobCommands)}
AND manualState IS NULL""".as[Int])
head <- r.headOption
} yield head
head <- r.headOption
} yield head
}

def findAllUnfinishedByWorker(workerId: ObjectId): Fox[List[Job]] =
for {
Expand Down Expand Up @@ -233,11 +241,14 @@ class JobDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext)
returnValue = ${s.returnValue},
started = ${s.started},
ended = ${s.ended}
where _id = $jobId""".asUpdate)
WHERE _id = $jobId""".asUpdate)
} yield ()

def reserveNextJob(worker: Worker): Fox[Unit] = {
val query = q"""
def reserveNextJob(worker: Worker, jobCommands: Set[JobCommand]): Fox[Unit] =
if (jobCommands.isEmpty) Fox.successful(())
else {
val query =
q"""
WITH subquery AS (
SELECT _id
FROM $existingCollectionName
Expand All @@ -246,6 +257,7 @@ class JobDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext)
AND _dataStore = ${worker._dataStore}
AND manualState IS NULL
AND _worker IS NULL
AND command IN ${SqlToken.tupleFromList(jobCommands)}
ORDER BY created
LIMIT 1
)
Expand All @@ -254,14 +266,14 @@ class JobDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext)
FROM subquery
WHERE j._id = subquery._id
""".asUpdate
for {
_ <- run(
query.withTransactionIsolation(Serializable),
retryCount = 50,
retryIfErrorContains = List(transactionSerializationError)
)
} yield ()
}
for {
_ <- run(
query.withTransactionIsolation(Serializable),
retryCount = 50,
retryIfErrorContains = List(transactionSerializationError)
)
} yield ()
}

def countByState: Fox[Map[String, Int]] =
for {
Expand Down Expand Up @@ -440,21 +452,25 @@ class JobService @Inject()(wkConf: WkConf,
"job_kwargs" -> job.commandArgs
)

def submitJob(command: JobCommand, commandArgs: JsObject, owner: User, dataStoreName: String)(
implicit ctx: DBAccessContext): Fox[Job] =
def submitJob(command: JobCommand, commandArgs: JsObject, owner: User, dataStoreName: String): Fox[Job] =
for {
_ <- bool2Fox(wkConf.Features.jobsEnabled) ?~> "job.disabled"
_ <- assertDataStoreHasWorkers(dataStoreName) ?~> "job.noWorkerForDatastore"
_ <- Fox.assertTrue(jobIsSupportedByAvailableWorkers(command, dataStoreName)) ?~> "job.noWorkerForDatastore"
job = Job(ObjectId.generate, owner._id, dataStoreName, command, commandArgs)
_ <- jobDAO.insertOne(job)
_ = analyticsService.track(RunJobEvent(owner, command))
} yield job

private def assertDataStoreHasWorkers(dataStoreName: String)(implicit ctx: DBAccessContext): Fox[Unit] =
def jobsSupportedByAvailableWorkers(dataStoreName: String): Fox[Set[JobCommand]] =
for {
_ <- dataStoreDAO.findOneByName(dataStoreName)
_ <- workerDAO.findOneByDataStore(dataStoreName)
} yield ()
workers <- workerDAO.findAllByDataStore(dataStoreName)
jobs = if (workers.isEmpty) Set[JobCommand]() else workers.map(_.supportedJobCommands).reduce(_.union(_))
} yield jobs

private def jobIsSupportedByAvailableWorkers(command: JobCommand, dataStoreName: String): Fox[Boolean] =
for {
jobs <- jobsSupportedByAvailableWorkers(dataStoreName)
} yield jobs.contains(command)

def assertBoundingBoxLimits(boundingBox: String, mag: Option[String]): Fox[Unit] =
for {
Expand Down
9 changes: 9 additions & 0 deletions app/models/job/JobCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ import com.scalableminds.util.enumeration.ExtendedEnumeration
object JobCommand extends ExtendedEnumeration {
type JobCommand = Value

/* NOTE: When adding a new job command here, do
* - Decide if it should be a highPriority job
* - Add it to the dbtool.js command enable-jobs so it is available during development
* - Add it to the migration guide (operators need to decide which workers should provide it)
*/

val compute_mesh_file, compute_segment_index_file, convert_to_wkw, export_tiff, find_largest_segment_id,
globalize_floodfills, infer_nuclei, infer_neurons, materialize_volume_annotation, render_animation = Value

val highPriorityJobs: Set[Value] = Set(convert_to_wkw, export_tiff)
val lowPriorityJobs: Set[Value] = values.diff(highPriorityJobs)
Comment on lines +17 to +18
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is general and should not be configurable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’d say for now, yes

}
30 changes: 20 additions & 10 deletions app/models/job/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.scalableminds.webknossos.datastore.helpers.IntervalScheduler
import com.scalableminds.webknossos.schema.Tables._
import com.typesafe.scalalogging.LazyLogging
import models.dataset.DataStoreDAO
import models.job.JobCommand.JobCommand
import play.api.inject.ApplicationLifecycle
import play.api.libs.json.{JsObject, Json}
import slick.lifted.Rep
Expand All @@ -23,7 +24,9 @@ import scala.concurrent.duration._
case class Worker(_id: ObjectId,
_dataStore: String,
key: String,
maxParallelJobs: Int,
maxParallelHighPriorityJobs: Int,
maxParallelLowPriorityJobs: Int,
supportedJobCommands: Set[JobCommand],
lastHeartBeat: Long = 0,
created: Instant = Instant.now,
isDeleted: Boolean = false)
Expand All @@ -37,33 +40,38 @@ class WorkerDAO @Inject()(sqlClient: SqlClient)(implicit ec: ExecutionContext)
protected def isDeletedColumn(x: Workers): Rep[Boolean] = x.isdeleted

protected def parse(r: WorkersRow): Fox[Worker] =
Fox.successful(
for {
supportedJobCommands <- Fox.serialCombined(parseArrayLiteral(r.supportedjobcommands)) { s =>
JobCommand.fromString(s).toFox ?~> f"$s is not a valid job command"
}
} yield
Worker(
ObjectId(r._Id),
r._Datastore,
r.key,
r.maxparalleljobs,
r.maxparallelhighpriorityjobs,
r.maxparallellowpriorityjobs,
supportedJobCommands.toSet,
r.lastheartbeat.getTime,
Instant.fromSql(r.created),
r.isdeleted
)
)

def findOneByKey(key: String): Fox[Worker] =
for {
r: Seq[WorkersRow] <- run(q"select $columns from $existingCollectionName where key = $key".as[WorkersRow])
r: Seq[WorkersRow] <- run(q"SELECT $columns FROM $existingCollectionName WHERE key = $key".as[WorkersRow])
parsed <- parseFirst(r, "key")
} yield parsed

def findOneByDataStore(dataStoreName: String): Fox[Worker] =
def findAllByDataStore(dataStoreName: String): Fox[List[Worker]] =
for {
r: Seq[WorkersRow] <- run(
q"select $columns from $existingCollectionName where _dataStore = $dataStoreName".as[WorkersRow])
parsed <- parseFirst(r, "dataStoreName")
q"SELECT $columns FROM $existingCollectionName WHERE _dataStore = $dataStoreName".as[WorkersRow])
parsed <- parseAll(r)
} yield parsed

def updateHeartBeat(_id: ObjectId): Unit = {
run(q"update webknossos.workers set lastHeartBeat = NOW() where _id = ${_id}".asUpdate)
run(q"UPDATE webknossos.workers SET lastHeartBeat = NOW() WHERE _id = ${_id}".asUpdate)
// Note that this should not block the jobs polling operation, failures here are not critical
()
}
Expand All @@ -77,7 +85,9 @@ class WorkerService @Inject()(conf: WkConf, dataStoreDAO: DataStoreDAO, workerDA
def publicWrites(worker: Worker): JsObject =
Json.obj(
"id" -> worker._id.id,
"maxParallelJobs" -> worker.maxParallelJobs,
"maxParallelHighPriorityJobs" -> worker.maxParallelHighPriorityJobs,
"maxParallelLowPriorityJobs" -> worker.maxParallelLowPriorityJobs,
"supportedJobCommands" -> worker.supportedJobCommands,
"created" -> worker.created,
"lastHeartBeat" -> worker.lastHeartBeat,
"lastHeartBeatIsRecent" -> lastHeartBeatIsRecent(worker)
Expand Down
20 changes: 20 additions & 0 deletions conf/evolutions/110-worker-config.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
START TRANSACTION;

do $$ begin ASSERT (select schemaVersion from webknossos.releaseInformation) = 109, 'Previous schema version mismatch'; end; $$ LANGUAGE plpgsql;

DROP VIEW webknossos.workers_;

ALTER TABLE webknossos.workers ADD COLUMN maxParallelHighPriorityJobs INT NOT NULL DEFAULT 1;
ALTER TABLE webknossos.workers ADD COLUMN maxParallelLowPriorityJobs INT NOT NULL DEFAULT 1;
ALTER TABLE webknossos.workers ADD COLUMN supportedJobCommands VARCHAR(256)[] NOT NULL DEFAULT array[]::varchar(256)[];

UPDATE webknossos.workers set maxParallelHighPriorityJobs = maxParallelJobs;
UPDATE webknossos.workers set maxParallelLowPriorityJobs = maxParallelJobs;

ALTER TABLE webknossos.workers DROP COLUMN maxParallelJobs;

CREATE VIEW webknossos.workers_ as SELECT * FROM webknossos.workers WHERE NOT isDeleted;

UPDATE webknossos.releaseInformation SET schemaVersion = 110;

COMMIT TRANSACTION;
Loading