diff --git a/README.md b/README.md index d231dea002b..6db93da177b 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,8 @@ Workflow engine using [WDL](https://github.com/broadinstitute/wdl/blob/wdl2/SPEC * [GET /api/workflows/:version/:id/logs](#get-apiworkflowsversionidlogs) * [GET /api/workflows/:version/:id/metadata](#get-apiworkflowsversionidmetadata) * [POST /api/workflows/:version/:id/abort](#post-apiworkflowsversionidabort) + * [POST /api/workflows/:version/:id/call-caching](#post-apiworkflowsversionidcallcaching) + * [POST /api/workflows/:version/:id/call-caching/:call](#post-apiworkflowsversionidcallcachingcall) * [Developer](#developer) * [Generate WDL Parser](#generate-wdl-parser) * [Generating and Hosting ScalaDoc](#generating-and-hosting-scaladoc) @@ -1425,12 +1427,11 @@ Server: spray-can/1.3.3 ## GET /api/workflows/:version/query This endpoint allows for querying workflows based on the following criteria: - + +* `name` +* `status` +* `start` (start datetime) +* `end` (end datetime) Names and statuses can be given multiple times to include workflows with any of the specified names or statuses. Valid statuses are `Submitted`, `Running`, `Aborting`, `Aborted`, `Failed`, and `Succeeded`. `start` and `end` should @@ -1850,6 +1851,80 @@ Server: spray-can/1.3.3 } ``` +## POST /api/workflows/:version/:id/call-caching + +This endpoint allows for reconfiguration of call cache result reuse settings for all calls within a workflow. + +Accepted parameters are: + +* `allow` Mandatory boolean value, specifies whether call cache result reuse is allowed for all calls in the + specified workflow. + +cURL: + +``` +$ curl -X POST http://localhost:8000/api/workflows/v1/e442e52a-9de1-47f0-8b4f-e6e565008cf1/call-caching?allow=false +``` + +HTTPie: + +``` +$ http POST http://localhost:8000/api/workflows/v1/e442e52a-9de1-47f0-8b4f-e6e565008cf1/call-caching?allow=false +``` + +Response: +``` +HTTP/1.1 200 OK +Content-Length: 17 +Content-Type: application/json; charset=UTF-8 +Date: Thu, 04 Jun 2015 12:15:33 GMT +Server: spray-can/1.3.3 + +{ + "updateCount": 3 +} + +``` + +## POST /api/workflows/:version/:id/call-caching/:call + +This endpoint allows for reconfiguration of call cache result reuse settings for a single call within a workflow. + +Accepted parameters are: + +* `allow` Mandatory boolean value, specifies whether call cache result reuse is allowed for the specified call in the + specified workflow. + +For scattered calls, individual calls within the scatter can be targeted by appending a dot and the zero-based shard index. +e.g. `scatter_workflow.A.0` would target the zeroth shard of a scattered `A` call. If a shard index is not supplied for +a scattered call, all shards are targeted for update. + +cURL: + +``` +$ curl -X POST http://localhost:8000/api/workflows/v1/e442e52a-9de1-47f0-8b4f-e6e565008cf1/call-caching/three_step.wc?allow=false +``` + +HTTPie: + +``` +$ http POST http://localhost:8000/api/workflows/v1/e442e52a-9de1-47f0-8b4f-e6e565008cf1/call-caching/three_step.wc?allow=false +``` + +Response: +``` +HTTP/1.1 200 OK +Content-Length: 17 +Content-Type: application/json; charset=UTF-8 +Date: Thu, 04 Jun 2015 12:15:33 GMT +Server: spray-can/1.3.3 + +{ + "updateCount": 1 +} + +``` + # Developer ## Generate WDL Parser diff --git a/src/main/resources/swagger/cromwell.yaml b/src/main/resources/swagger/cromwell.yaml index 74b5b448efa..57027eff26d 100644 --- a/src/main/resources/swagger/cromwell.yaml +++ b/src/main/resources/swagger/cromwell.yaml @@ -241,7 +241,7 @@ paths: items: type: string collectionFormat: multi - pattern: ^[a-zA-Z][a-zA-Z0-9_]+$ + pattern: ^[a-zA-Z][a-zA-Z0-9_]*$ description: > Returns only workflows with the specified name. If specified multiple times, returns workflows with any of the specified names. @@ -507,6 +507,79 @@ paths: security: - google_oauth: - openid + '/workflows/{version}/{id}/call-caching': + post: + summary: Alter call cache result reuse settings for all calls within the specified workflow. + parameters: + - name: version + description: API Version + required: true + type: string + in: path + default: v1 + - name: id + description: Workflow ID + required: true + type: string + in: path + - name: allow + description: Whether to allow call cache result reuse for all calls within the specified workflow. + required: true + type: boolean + in: query + tags: + - Workflows + responses: + '200': + description: Successful Request + schema: + $ref: '#/definitions/CallCachingResponse' + '400': + description: Validation error + '500': + description: Internal Error + security: + - google_oauth: + - openid + '/workflows/{version}/{id}/call-caching/{callFqn}': + post: + summary: Alter call cache result reuse settings for the specified call within the specified workflow. + parameters: + - name: version + description: API Version + required: true + type: string + in: path + default: v1 + - name: id + description: Workflow ID + required: true + type: string + in: path + - name: callFqn + description: Call fully qualified name + required: true + type: string + in: path + - name: allow + description: Whether to allow call cache result reuse for the specified call within the workflow. + required: true + type: boolean + in: query + tags: + - Workflows + responses: + '200': + description: Successful Request + schema: + $ref: '#/definitions/CallCachingResponse' + '400': + description: Validation error + '500': + description: Internal Error + security: + - google_oauth: + - openid securityDefinitions: google_oauth: type: oauth2 @@ -671,4 +744,12 @@ definitions: type: string format: date-time description: Workflow end datetime + CallCachingResponse: + description: Update count for call caching result reuse endpoints. + required: + - updateCount + properties: + updateCount: + type: int + description: Number of calls updated by this request. diff --git a/src/main/scala/cromwell/binding/package.scala b/src/main/scala/cromwell/binding/package.scala index 1508356f2e9..02d791ec1b7 100644 --- a/src/main/scala/cromwell/binding/package.scala +++ b/src/main/scala/cromwell/binding/package.scala @@ -46,4 +46,41 @@ package object binding { case (k, CallOutput(wdlValue, hash)) => (k, wdlValue) } } + + object Patterns { + + val WorkflowName = """ + (?x) # Turn on comments and whitespace insensitivity. + + ( # Begin capture. + + [a-zA-Z][a-zA-Z0-9_]* # WDL identifier naming pattern of an initial alpha character followed by zero + # or more alphanumeric or underscore characters. + + ) # End capture. + """.trim.r + + val CallFullyQualifiedName = """ + (?x) # Turn on comments and whitespace insensitivity. + + ( # Begin outer capturing group for FQN. + + (?:[a-zA-Z][a-zA-Z0-9_]*) # Inner noncapturing group for top-level workflow name. This is the WDL + # identifier naming pattern of an initial alpha character followed by zero + # or more alphanumeric or underscore characters. + + (?:\.[a-zA-Z][a-zA-Z0-9_]*){1} # Inner noncapturing group for call name, a literal dot followed by a WDL + # identifier. Currently this is quantified to {1} since the call name is + # mandatory and nested workflows are not supported. This could be changed + # to + or a different quantifier if these assumptions change. + + ) # End outer capturing group for FQN. + + + (?: # Begin outer noncapturing group for shard. + \. # Literal dot. + (\d+) # Captured shard digits. + )? # End outer optional noncapturing group for shard. + """.trim.r // The trim is necessary as (?x) must be at the beginning of the regex. + } } diff --git a/src/main/scala/cromwell/engine/db/DataAccess.scala b/src/main/scala/cromwell/engine/db/DataAccess.scala index dc6230d0102..535cb1546eb 100644 --- a/src/main/scala/cromwell/engine/db/DataAccess.scala +++ b/src/main/scala/cromwell/engine/db/DataAccess.scala @@ -6,7 +6,7 @@ import cromwell.engine.backend.{Backend, JobKey} import cromwell.engine.db.slick._ import cromwell.engine.workflow.{CallKey, ExecutionStoreKey, OutputKey} import cromwell.engine.{SymbolStoreEntry, WorkflowDescriptor, WorkflowId, WorkflowState} -import cromwell.webservice.{WorkflowQueryParameters, WorkflowQueryResponse} +import cromwell.webservice.{CallCachingParameters, WorkflowQueryParameters, WorkflowQueryResponse} import scala.concurrent.Future @@ -101,4 +101,6 @@ trait DataAccess { def findResumableJesExecutions(workflowId: WorkflowId): Future[Map[ExecutionDatabaseKey, JobKey]] def queryWorkflows(queryParameters: WorkflowQueryParameters): Future[WorkflowQueryResponse] + + def updateCallCaching(cachingParameters: CallCachingParameters): Future[Int] } diff --git a/src/main/scala/cromwell/engine/db/slick/ExecutionComponent.scala b/src/main/scala/cromwell/engine/db/slick/ExecutionComponent.scala index cc04b76b9bf..0a8e243ff1b 100644 --- a/src/main/scala/cromwell/engine/db/slick/ExecutionComponent.scala +++ b/src/main/scala/cromwell/engine/db/slick/ExecutionComponent.scala @@ -84,6 +84,27 @@ trait ExecutionComponent { if workflowExecution.workflowExecutionUuid === workflowExecutionUuid } yield execution) + val executionsByWorkflowExecutionIdAndCallFqnAndIndex = Compiled( + (workflowExecutionId: Rep[Int], callFqn: Rep[String], index: Rep[Int]) => for { + execution <- executions + if execution.workflowExecutionId === workflowExecutionId + if execution.callFqn === callFqn + if execution.index === index + } yield execution) + + val executionsByWorkflowExecutionIdAndCallFqn = Compiled( + (workflowExecutionId: Rep[Int], callFqn: Rep[String]) => for { + execution <- executions + if execution.workflowExecutionId === workflowExecutionId + if execution.callFqn === callFqn + } yield execution) + + val executionsByWorkflowExecutionId = Compiled( + (workflowExecutionId: Rep[Int]) => for { + execution <- executions + if execution.workflowExecutionId === workflowExecutionId + } yield execution) + val executionsByWorkflowExecutionUuidAndCallFqn = Compiled( (workflowExecutionUuid: Rep[String], callFqn: Rep[String]) => for { execution <- executions diff --git a/src/main/scala/cromwell/engine/db/slick/SlickDataAccess.scala b/src/main/scala/cromwell/engine/db/slick/SlickDataAccess.scala index 737511bb36e..d8e5975f195 100644 --- a/src/main/scala/cromwell/engine/db/slick/SlickDataAccess.scala +++ b/src/main/scala/cromwell/engine/db/slick/SlickDataAccess.scala @@ -13,14 +13,13 @@ import cromwell.binding.values.WdlValue import cromwell.engine.ExecutionIndex._ import cromwell.engine.ExecutionStatus._ import cromwell.engine._ -import cromwell.engine.backend.{WorkflowQueryResult, Backend} import cromwell.engine.backend.jes.{JesBackend, JesJobKey} import cromwell.engine.backend.local.LocalBackend import cromwell.engine.backend.sge.SgeBackend +import cromwell.engine.backend.{Backend, WorkflowQueryResult} import cromwell.engine.db._ import cromwell.engine.workflow.{CallKey, ExecutionStoreKey, OutputKey, ScatterKey} -import cromwell.webservice.CromwellApiHandler.WorkflowQuery -import cromwell.webservice.{WorkflowQueryParameters, WorkflowQueryResponse} +import cromwell.webservice.{CallCachingParameters, WorkflowQueryParameters, WorkflowQueryResponse} import lenthall.config.ScalaConfig._ import org.joda.time.DateTime import org.slf4j.LoggerFactory @@ -719,4 +718,23 @@ class SlickDataAccess(databaseConfig: Config) extends DataAccess { }) } } + + override def updateCallCaching(parameters: CallCachingParameters): Future[Int] = { + // Figure out which of the three possible queries to use based on whether a call has been specified and + // if so whether an index has been specified. + val executionQuery: (Int) => Query[dataAccess.Executions, Execution, Seq] = { + (parameters.callKey, parameters.callKey flatMap { _.index }) match { + case (Some(key), Some(idx)) => dataAccess.executionsByWorkflowExecutionIdAndCallFqnAndIndex(_: Int, key.fqn, idx).extract + case (Some(key), None) => dataAccess.executionsByWorkflowExecutionIdAndCallFqn(_: Int, key.fqn).extract + case _ => dataAccess.executionsByWorkflowExecutionId(_: Int).extract + } + } + + val action = for { + workflowExecution <- dataAccess.workflowExecutionsByWorkflowExecutionUuid(parameters.workflowId.id.toString).result.head + count <- executionQuery(workflowExecution.workflowExecutionId.get).map(_.allowsResultReuse).update(parameters.allow) + } yield count + + runTransaction(action) + } } diff --git a/src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala b/src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala index e6ab65efd9a..a2523347c56 100644 --- a/src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala +++ b/src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala @@ -10,22 +10,22 @@ import cromwell.binding._ import cromwell.engine.ExecutionIndex._ import cromwell.engine.ExecutionStatus.ExecutionStatus import cromwell.engine._ -import cromwell.engine.backend.{Backend, CallMetadata, CallLogs} +import cromwell.engine.backend.{Backend, CallLogs, CallMetadata} import cromwell.engine.db.DataAccess._ import cromwell.engine.db.ExecutionDatabaseKey import cromwell.engine.db.slick._ import cromwell.engine.workflow.WorkflowActor.{Restart, Start} import cromwell.util.WriteOnceStore -import cromwell.webservice.{WorkflowQueryParameters, WorkflowQueryResponse, WorkflowMetadataResponse} +import cromwell.webservice._ import org.joda.time.DateTime import spray.json._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future +import scala.concurrent.duration._ import scala.io.Source import scala.language.postfixOps import scala.util.{Failure, Success, Try} -import scala.concurrent.duration._ object WorkflowManagerActor { class WorkflowNotFoundException(message: String) extends RuntimeException(message) @@ -44,6 +44,7 @@ object WorkflowManagerActor { case class WorkflowAbort(id: WorkflowId) extends WorkflowManagerActorMessage final case class WorkflowMetadata(id: WorkflowId) extends WorkflowManagerActorMessage final case class RestartWorkflows(workflows: Seq[WorkflowDescriptor]) extends WorkflowManagerActorMessage + final case class CallCaching(id: WorkflowId, parameters: QueryParameters, call: Option[String]) extends WorkflowManagerActorMessage def props(backend: Backend): Props = Props(new WorkflowManagerActor(backend)) @@ -101,6 +102,7 @@ class WorkflowManagerActor(backend: Backend) extends Actor with CromwellActor { self ! RestartWorkflows(ws) } case RestartWorkflows(Nil) => // No more workflows need restarting. + case CallCaching(id, parameters, callName) => callCaching(id, parameters, callName) pipeTo sender } /** @@ -294,9 +296,16 @@ class WorkflowManagerActor(backend: Backend) extends Actor with CromwellActor { private def query(rawParameters: Seq[(String, String)]): Future[WorkflowQueryResponse] = { for { - // Future/Try to wrap the exception that might be thrown from WorkflowQueryParameters.apply. + // Future/Try to wrap the exception that might be thrown from WorkflowQueryParameters.apply. parameters <- Future.fromTry(Try(WorkflowQueryParameters(rawParameters))) response <- globalDataAccess.queryWorkflows(parameters) } yield response } + + private def callCaching(id: WorkflowId, parameters: QueryParameters, callName: Option[String]): Future[Int] = { + for { + cachingParameters <- CallCachingParameters.from(id, callName, parameters) + updateCount <- globalDataAccess.updateCallCaching(cachingParameters) + } yield updateCount + } } diff --git a/src/main/scala/cromwell/webservice/ApiDataModels.scala b/src/main/scala/cromwell/webservice/ApiDataModels.scala index 8e7df7dd6d3..8c1c2aa92f0 100644 --- a/src/main/scala/cromwell/webservice/ApiDataModels.scala +++ b/src/main/scala/cromwell/webservice/ApiDataModels.scala @@ -25,3 +25,5 @@ case class WorkflowMetadataResponse(id: String, status: String, submission: Date calls: Map[String, Seq[CallMetadata]]) case class WorkflowQueryResponse(results: Seq[WorkflowQueryResult]) + +final case class CallCachingResponse(updateCount: Int) diff --git a/src/main/scala/cromwell/webservice/CallCachingParameters.scala b/src/main/scala/cromwell/webservice/CallCachingParameters.scala new file mode 100644 index 00000000000..d3eeef40463 --- /dev/null +++ b/src/main/scala/cromwell/webservice/CallCachingParameters.scala @@ -0,0 +1,113 @@ +package cromwell.webservice + +import cromwell.engine.WorkflowId +import cromwell.engine.db.DataAccess._ +import cromwell.engine.db.{DataAccess, ExecutionDatabaseKey} + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.Try +import scalaz.Scalaz._ +import scalaz.{Failure, Success, ValidationNel} + +case class CallCachingParameters private(workflowId: WorkflowId, callKey: Option[ExecutionDatabaseKey], allow: Boolean) + +object CallCachingParameters { + + private [webservice] def validateRecognizedKeys(queryParameters: QueryParameters): ValidationNel[String, Unit] = { + val badKeys = queryParameters collect { case q if q.key.toLowerCase != "allow" => q.key } + if (badKeys.nonEmpty) ("Found unrecognized keys: " + badKeys.mkString(", ")).failureNel else ().successNel + } + + private [webservice] def validateAllow(queryParameters: QueryParameters): ValidationNel[String, Boolean] = { + val allows = queryParameters.collect({ case q if q.key.toLowerCase == "allow" => q.value }).toSet + val (booleans, nonBooleans) = allows map { a => (a, Try(a.toBoolean)) } partition { _._2.isSuccess } + + val allBooleans = if (nonBooleans.nonEmpty) { + val values = (nonBooleans map { _._1 } ).mkString(", ") + s"Found non-boolean 'allow' values: $values".failureNel + } else { + ().successNel + } + + val (trues, falses) = booleans partition { _._2.get } + val coherentValues = if (trues.nonEmpty && falses.nonEmpty) "Found both true and false 'allow' values".failureNel else trues.nonEmpty.successNel + (allBooleans |@| coherentValues) { + case (_, allow) => allow + } + } + + private def validateWorkflowExists(workflowId: WorkflowId, dataAccess: DataAccess)(implicit ec: ExecutionContext): Future[ValidationNel[String, Unit]] = { + dataAccess.getWorkflowState(workflowId) map { + case Some(_) => ().successNel + case None => s"Workflow not found: ${workflowId.id.toString}".failureNel + } + } + + private [webservice] def validateCallName(callName: Option[String]): ValidationNel[String, Option[ExecutionDatabaseKey]] = { + import cromwell.binding.Patterns.CallFullyQualifiedName + callName map { + case CallFullyQualifiedName(fqn, index) => Option(ExecutionDatabaseKey(fqn, Option(index) map { _.toInt })).successNel + case name => s"Specified call does not parse as a fully qualified name with optional index: $name".failureNel + } getOrElse None.successNel + } + + /** + * Assert that a call with the specified key exists in the workflow. + */ + private def validateCall(workflowId: WorkflowId, key: ExecutionDatabaseKey, dataAccess: DataAccess) + (implicit ec: ExecutionContext): Future[ValidationNel[String, Unit]] = { + + dataAccess.getExecutionStatus(workflowId, key) map { + case Some(s) => ().successNel + case None => + val displayIndex = key.index map { "." + _ } getOrElse "" + s"Call ${key.fqn}$displayIndex does not exist in workflow $workflowId".failureNel + } + } + + private def validateWorkflowAndCall(workflowId: WorkflowId, callName: Option[String], dataAccess: DataAccess) + (implicit ec: ExecutionContext): Future[ValidationNel[String, Option[ExecutionDatabaseKey]]] = { + /** + * Perform a call validation conditioned on the workflow and call name validations having already succeeded. + * There's no possibility that the particulars of the call are even in the database if the workflow ID doesn't + * exist or the call name is malformed. + */ + def validateCallConditionally(workflowValidation: ValidationNel[String, Unit], + callNameValidation: ValidationNel[String, Option[ExecutionDatabaseKey]], + dataAccess: DataAccess) + (implicit ec: ExecutionContext): Future[ValidationNel[String, Unit]] = { + + (workflowValidation, callNameValidation) match { + case (Success(_), Success(call)) => + call map { c => validateCall(workflowId, c, dataAccess) } getOrElse Future.successful(().successNel) + // If either the workflow or call name validations are failed don't produce additional validation failures. + case _ => Future.successful(().successNel) + } + } + + for { + workflowValidation <- validateWorkflowExists(workflowId, dataAccess) + callNameValidation <- Future(validateCallName(callName)) + callValidation <- validateCallConditionally(workflowValidation, callNameValidation, dataAccess) + } yield (workflowValidation |@| callNameValidation |@| callValidation) { + case (_, key, _) => key + } + } + + def from(workflowId: WorkflowId, callName: Option[String], queryParameters: QueryParameters, dataAccess: DataAccess = globalDataAccess) + (implicit ec: ExecutionContext): Future[CallCachingParameters] = { + + val validations = for { + recognizedKeys <- Future(validateRecognizedKeys(queryParameters)) + validWorkflowAndCall <- validateWorkflowAndCall(workflowId, callName, dataAccess) + allow <- Future(validateAllow(queryParameters)) + } yield (recognizedKeys |@| validWorkflowAndCall |@| allow) { + case (_, callKey, a) => CallCachingParameters(workflowId, callKey, a) + } + + validations map { + case Success(s) => s + case Failure(errors) => throw new IllegalArgumentException(errors.list.mkString("\n")) + } + } +} diff --git a/src/main/scala/cromwell/webservice/CromwellApiHandler.scala b/src/main/scala/cromwell/webservice/CromwellApiHandler.scala index 116c6e08d3c..ba35678ef04 100644 --- a/src/main/scala/cromwell/webservice/CromwellApiHandler.scala +++ b/src/main/scala/cromwell/webservice/CromwellApiHandler.scala @@ -36,6 +36,7 @@ object CromwellApiHandler { final case class CallOutputs(id: WorkflowId, callFqn: String) extends WorkflowManagerMessage final case class CallStdoutStderr(id: WorkflowId, callFqn: String) extends WorkflowManagerMessage final case class WorkflowStdoutStderr(id: WorkflowId) extends WorkflowManagerMessage + final case class CallCaching(id: WorkflowId, parameters: QueryParameters, callName: Option[String]) extends WorkflowManagerMessage final case class WorkflowMetadata(id: WorkflowId) extends WorkflowManagerMessage } @@ -144,6 +145,14 @@ class CromwellApiHandler(workflowManager: ActorRef) extends Actor { case Failure(ex: WorkflowManagerActor.WorkflowNotFoundException) => context.parent ! RequestComplete(StatusCodes.NotFound, ex.getMessage) case Failure(ex) => context.parent ! RequestComplete(StatusCodes.InternalServerError, ex.getMessage) } + + case CallCaching(id, parameters, callName) => + val eventualUpdateCount = ask(workflowManager, WorkflowManagerActor.CallCaching(id, parameters, callName)).mapTo[Int] + eventualUpdateCount onComplete { + case Success(updateCount) => context.parent ! RequestComplete(StatusCodes.OK, CallCachingResponse(updateCount)) + case Failure(ex: IllegalArgumentException) => context.parent ! RequestComplete(StatusCodes.BadRequest, ex.getMessage) + case Failure(ex) => context.parent ! RequestComplete(StatusCodes.InternalServerError, ex.getMessage) + } } /** diff --git a/src/main/scala/cromwell/webservice/CromwellApiService.scala b/src/main/scala/cromwell/webservice/CromwellApiService.scala index 725e28111b7..66975633c9c 100644 --- a/src/main/scala/cromwell/webservice/CromwellApiService.scala +++ b/src/main/scala/cromwell/webservice/CromwellApiService.scala @@ -4,7 +4,8 @@ import akka.actor.{Actor, ActorRef, Props} import com.typesafe.config.Config import cromwell.engine.workflow.{ValidateActor, WorkflowOptions} import cromwell.engine.{WorkflowId, WorkflowSourceFiles} -import cromwell.webservice.CromwellApiHandler.WorkflowQuery +import cromwell.instrumentation.Instrumentation.Monitor +import cromwell.webservice.CromwellApiServiceActor.traceName import lenthall.config.ScalaConfig._ import lenthall.spray.SwaggerUiResourceHttpService import lenthall.spray.WrappedRoute._ @@ -12,9 +13,8 @@ import spray.http.StatusCodes import spray.json._ import spray.routing.Directive.pimpApply import spray.routing._ + import scala.util.{Failure, Success, Try} -import cromwell.instrumentation.Instrumentation.Monitor -import CromwellApiServiceActor.traceName trait SwaggerService extends SwaggerUiResourceHttpService { override def swaggerServiceName = "cromwell" @@ -46,8 +46,8 @@ trait CromwellApiService extends HttpService with PerRequestCreator { val workflowManager: ActorRef - val workflowRoutes = statusRoute ~ workflowOutputsRoute ~ submitRoute ~ workflowStdoutStderrRoute ~ abortRoute ~ - callOutputsRoute ~ callStdoutStderrRoute ~ validateRoute ~ metadataRoute ~ timingRoute ~ queryRoute + val workflowRoutes = queryRoute ~ workflowOutputsRoute ~ submitRoute ~ workflowStdoutStderrRoute ~ abortRoute ~ + callOutputsRoute ~ callStdoutStderrRoute ~ validateRoute ~ metadataRoute ~ timingRoute ~ callCachingRoute def statusRoute = path("workflows" / Segment / Segment / "status") { (version, id) => @@ -216,4 +216,18 @@ trait CromwellApiService extends HttpService with PerRequestCreator { } } } + + def callCachingRoute = + path("workflows" / Segment / Segment / "call-caching" ~ (Slash ~ Segment).?) { (version, workflowId, callFqn) => + parameterSeq { parameters => + val queryParameters = parameters map { case (k, v) => QueryParameter(k, v) } + post { requestContext => + Try(WorkflowId.fromString(workflowId)) match { + case Success(w) => perRequest(requestContext, CromwellApiHandler.props(workflowManager), + CromwellApiHandler.CallCaching(w, queryParameters, callFqn)) + case Failure(_) => complete(StatusCodes.BadRequest, s"Invalid workflow ID: '$workflowId'.") + } + } + } + } } diff --git a/src/main/scala/cromwell/webservice/WorkflowJsonSupport.scala b/src/main/scala/cromwell/webservice/WorkflowJsonSupport.scala index 4f0d4666057..880d881136c 100644 --- a/src/main/scala/cromwell/webservice/WorkflowJsonSupport.scala +++ b/src/main/scala/cromwell/webservice/WorkflowJsonSupport.scala @@ -31,5 +31,6 @@ object WorkflowJsonSupport extends DefaultJsonProtocol { implicit val workflowMetadataResponse = jsonFormat8(WorkflowMetadataResponse) implicit val workflowQueryResult = jsonFormat5(WorkflowQueryResult) implicit val workflowQueryResponse = jsonFormat1(WorkflowQueryResponse) + implicit val callCachingResponse = jsonFormat1(CallCachingResponse) } diff --git a/src/main/scala/cromwell/webservice/WorkflowQueryKey.scala b/src/main/scala/cromwell/webservice/WorkflowQueryKey.scala index fb451f2362b..45e5e987caf 100644 --- a/src/main/scala/cromwell/webservice/WorkflowQueryKey.scala +++ b/src/main/scala/cromwell/webservice/WorkflowQueryKey.scala @@ -23,15 +23,16 @@ object WorkflowQueryKey { case object Name extends SeqStringWorkflowQueryKey { override val name = "Name" - private val WorkflowNamePattern = "([a-zA-Z][a-zA-Z0-9_]+)".r override def validate(grouped: Map[String, Seq[(String, String)]]): ValidationNel[String, Seq[String]] = { + import cromwell.binding.Patterns.WorkflowName + val values = valuesFromMap(grouped).toList val nels = values map { - case WorkflowNamePattern(n) => n.successNel + case WorkflowName(n) => n.successNel case v => v.failureNel } - sequenceListOfValidationNels("Name values do not match allowed workflow naming pattern [a-zA-Z][a-zA-Z0-9_]+", nels) + sequenceListOfValidationNels(s"Name values do not match allowed workflow naming pattern", nels) } } diff --git a/src/main/scala/cromwell/webservice/package.scala b/src/main/scala/cromwell/webservice/package.scala new file mode 100644 index 00000000000..69c6a5ad98c --- /dev/null +++ b/src/main/scala/cromwell/webservice/package.scala @@ -0,0 +1,8 @@ +package cromwell + +package object webservice { + + case class QueryParameter(key: String, value: String) + type QueryParameters = Seq[QueryParameter] + +} diff --git a/src/test/scala/cromwell/engine/db/slick/SlickDataAccessSpec.scala b/src/test/scala/cromwell/engine/db/slick/SlickDataAccessSpec.scala index 570cf6997bb..0a6582861af 100644 --- a/src/test/scala/cromwell/engine/db/slick/SlickDataAccessSpec.scala +++ b/src/test/scala/cromwell/engine/db/slick/SlickDataAccessSpec.scala @@ -11,11 +11,13 @@ import cromwell.engine.ExecutionIndex.ExecutionIndex import cromwell.engine._ import cromwell.engine.backend.local.{LocalBackend, LocalBackendCall} import cromwell.engine.backend.{Backend, CallLogs} +import cromwell.engine.db.slick.SlickDataAccessSpec.{AllowFalse, AllowTrue} import cromwell.engine.db.{CallStatus, ExecutionDatabaseKey, LocalCallBackendInfo} -import cromwell.engine.workflow.{CallKey, WorkflowOptions} +import cromwell.engine.workflow.{CallKey, ScatterKey, WorkflowOptions} import cromwell.parser.BackendType import cromwell.util.SampleWdl -import cromwell.webservice.{WorkflowQueryKey, WorkflowQueryParameters} +import cromwell.webservice +import cromwell.webservice.{CallCachingParameters, WorkflowQueryKey, WorkflowQueryParameters} import org.scalactic.StringNormalizations._ import org.scalatest.concurrent.ScalaFutures import org.scalatest.prop.TableDrivenPropertyChecks @@ -24,6 +26,12 @@ import org.scalatest.{FlatSpec, Matchers} import scala.concurrent.{ExecutionContext, Future} + +object SlickDataAccessSpec { + val AllowFalse = Seq(webservice.QueryParameter("allow", "false")) + val AllowTrue = Seq(webservice.QueryParameter("allow", "true")) +} + class SlickDataAccessSpec extends FlatSpec with Matchers with ScalaFutures { import TableDrivenPropertyChecks._ @@ -213,6 +221,130 @@ class SlickDataAccessSpec extends FlatSpec with Matchers with ScalaFutures { } yield ()).futureValue } + def assertCallCachingFailure(id: WorkflowId, callName: Option[String], messages: String*): Future[Unit] = { + // The `from` Future is expected to fail, so if the forcomp actually runs the test should fail. + val parameters = for { + s <- CallCachingParameters.from(id, None, AllowFalse, dataAccess) + } yield throw new RuntimeException(s"Unexpected success: $s") + + // `recover` the failed Future looking for an expected `IllegalArgumentException`. Assert all the expected + // messages are present in the exception text and the correct number of expected failures are seen. + // If the `parameters` Future is failed but the exception isn't an `IllegalArgumentException` then this recover + // won't match and the Future will remain failed and fail the test. + parameters recover { + case e: IllegalArgumentException => + messages foreach { m => if (!e.getMessage.contains(m)) throw new RuntimeException(s"Missing message: $m. Exception text: ${e.getMessage}") } + if (e.getMessage.count(_ == '\n') != messages.size - 1) throw new RuntimeException(s"Unexpected messages seen: ${e.getMessage}") + } + } + + it should "support call caching configuration for specified calls in a regular workflow" in { + assume(canConnect || testRequired) + val workflowInfo = new WorkflowDescriptor(WorkflowId(UUID.randomUUID()), SampleWdl.ThreeStep.asWorkflowSources()) + + (for { + _ <- dataAccess.createWorkflow(workflowInfo, Nil, workflowInfo.namespace.workflow.calls, localBackend) + // Unknown workflow + _ <- assertCallCachingFailure(WorkflowId(UUID.randomUUID()), callName = Option("three_step.ps"), "Workflow not found") + _ <- dataAccess.setStatus(workflowInfo.id, Seq(ExecutionDatabaseKey("three_step.ps", None)), ExecutionStatus.Done) + executions <- dataAccess.getExecutions(workflowInfo.id) + _ = executions should have size 3 + _ = executions foreach { _.allowsResultReuse shouldBe true } + params <- CallCachingParameters.from(workflowInfo.id, Option("three_step.ps"), AllowFalse, dataAccess) + _ = params.workflowId shouldBe workflowInfo.id + _ = params.callKey shouldEqual Option(ExecutionDatabaseKey("three_step.ps", None)) + _ <- dataAccess.updateCallCaching(params) + executions <- dataAccess.getExecutions(workflowInfo.id) + (allowing, disallowing) = executions partition { _.allowsResultReuse } + _ = allowing should have size 2 + _ = disallowing should have size 1 + _ = disallowing.seq.head.callFqn should be("three_step.ps") + } yield ()).futureValue + } + + it should "support call caching configuration for specified calls in a scattered workflow" in { + assume(canConnect || testRequired) + val workflowInfo = new WorkflowDescriptor(WorkflowId(UUID.randomUUID()), SampleWdl.SimpleScatterWdl.asWorkflowSources()) + + (for { + // The `inside_scatter` is a to-be-exploded placeholder, but it will conflict with the collector that the + // scatter explodes below so filter that out. + _ <- dataAccess.createWorkflow(workflowInfo, Nil, workflowInfo.namespace.workflow.calls.filterNot(_.name == "inside_scatter"), localBackend) + scatter = workflowInfo.namespace.workflow.scatters.head + scatterKey = ScatterKey(scatter, None) + newEntries = scatterKey.populate(5) + _ <- dataAccess.insertCalls(workflowInfo.id, newEntries.keys, localBackend) + executions <- dataAccess.getExecutions(workflowInfo.id) + _ = executions foreach { _.allowsResultReuse shouldBe true } + + // Calls outside the scatter should work the same as an unscattered workflow. + outsideParams <- CallCachingParameters.from(workflowInfo.id, Option("scatter0.outside_scatter"), AllowFalse, dataAccess) + _ <- dataAccess.updateCallCaching(outsideParams) + executions <- dataAccess.getExecutions(workflowInfo.id) + (allowing, disallowing) = executions partition { _.allowsResultReuse } + _ = allowing should have size (executions.size - 1) + _ = disallowing should have size 1 + _ = disallowing.seq.head.callFqn should be ("scatter0.outside_scatter") + + // Support unindexed scattered call targets to update all shards. + _ = executions filter { _.callFqn == "scatter0.inside_scatter" } foreach { _.allowsResultReuse shouldBe true } + unindexedCallParams <- CallCachingParameters.from(workflowInfo.id, Option("scatter0.inside_scatter"), AllowFalse, dataAccess) + _ <- dataAccess.updateCallCaching(unindexedCallParams) + executions <- dataAccess.getExecutions(workflowInfo.id) + _ = executions filter { _.callFqn == "scatter0.inside_scatter" } foreach { _.allowsResultReuse shouldBe false } + + // Support indexed shards as well. + insideParams <- CallCachingParameters.from(workflowInfo.id, Option("scatter0.inside_scatter.3"), AllowTrue, dataAccess) + _ <- dataAccess.updateCallCaching(insideParams) + executions <- dataAccess.getExecutions(workflowInfo.id) + inside = executions filter { e => e.callFqn == "scatter0.inside_scatter" } + (allowing, disallowing) = inside partition { _.allowsResultReuse } + _ = allowing should have size 1 + _ = disallowing should have size (inside.size - 1) + } yield ()).futureValue + } + + it should "support call caching configuration for all calls in a regular workflow" in { + assume(canConnect || testRequired) + + val workflowInfo = new WorkflowDescriptor(WorkflowId(UUID.randomUUID()), SampleWdl.ThreeStep.asWorkflowSources()) + (for { + _ <- dataAccess.createWorkflow(workflowInfo, Nil, workflowInfo.namespace.workflow.calls, localBackend) + // Unknown workflow + _ <- assertCallCachingFailure(WorkflowId(UUID.randomUUID()), callName = None, "Workflow not found") + params <- CallCachingParameters.from(workflowInfo.id, None, AllowFalse, dataAccess) + executions <- dataAccess.getExecutions(workflowInfo.id) + _ = executions should have size 3 + _ = executions foreach { _.allowsResultReuse shouldBe true } + _ <- dataAccess.updateCallCaching(params) + executions <- dataAccess.getExecutions(workflowInfo.id) + _ = executions foreach { _.allowsResultReuse shouldBe false } + } yield ()).futureValue + } + + it should "support call caching configuration for all calls in a scattered workflow" in { + assume(canConnect || testRequired) + + val workflowInfo = new WorkflowDescriptor(WorkflowId(UUID.randomUUID()), SampleWdl.SimpleScatterWdl.asWorkflowSources()) + (for { + // The `inside_scatter` is a to-be-exploded placeholder, but it will conflict with the collector that the + // scatter explodes below so filter that out. + _ <- dataAccess.createWorkflow(workflowInfo, Nil, workflowInfo.namespace.workflow.calls.filterNot(_.name == "inside_scatter"), localBackend) + scatter = workflowInfo.namespace.workflow.scatters.head + scatterKey = ScatterKey(scatter, None) + newEntries = scatterKey.populate(5) + _ <- dataAccess.insertCalls(workflowInfo.id, newEntries.keys, localBackend) + + scatterParams <- CallCachingParameters.from(workflowInfo.id, None, AllowFalse, dataAccess) + executions <- dataAccess.getExecutions(workflowInfo.id) + _ = executions foreach { _.allowsResultReuse shouldBe true } + _ <- dataAccess.updateCallCaching(scatterParams) + executions <- dataAccess.getExecutions(workflowInfo.id) + _ = executions foreach { _.allowsResultReuse shouldBe false } + } yield ()).futureValue + } + + it should "query a single execution status" in { assume(canConnect || testRequired) val workflowId = WorkflowId(UUID.randomUUID()) diff --git a/src/test/scala/cromwell/webservice/CallCachingParametersSpec.scala b/src/test/scala/cromwell/webservice/CallCachingParametersSpec.scala new file mode 100644 index 00000000000..45577246a0c --- /dev/null +++ b/src/test/scala/cromwell/webservice/CallCachingParametersSpec.scala @@ -0,0 +1,78 @@ +package cromwell.webservice + +import cromwell.CromwellTestkitSpec +import cromwell.engine.db.ExecutionDatabaseKey + +import scalaz._ + +object CallCachingParametersSpec { + val CallNames = Seq("three_step.cgrep", "scatter.B") + val AllowTrue = QueryParameter("allow", "true") + val AllowFalse = QueryParameter("allow", "false") +} + +class CallCachingParametersSpec extends CromwellTestkitSpec("CallCachingParametersSpec") { + + import CallCachingParametersSpec._ + + "CallCachingParameters call name validation" should { + "be accepted if empty" in { + CallCachingParameters.validateCallName(None) match { + case Success(k) => k shouldBe None + case Failure(x) => fail("Unexpected failure: " + x.list.mkString(", ")) + } + } + + "be accepted if not indexed" in { + CallNames foreach { name => + CallCachingParameters.validateCallName(Option(name)) match { + case Success(k) => k shouldEqual Some(ExecutionDatabaseKey(name, None)) + case Failure(x) => fail("Unexpected failure: " + x.list.mkString(", ")) + } + } + } + + "be accepted if indexed" in { + CallNames foreach { name => + CallCachingParameters.validateCallName(Option(name + ".0")) match { + case Success(k) => k shouldEqual Some(ExecutionDatabaseKey(name, Some(0))) + case Failure(x) => fail("Unexpected failure: " + x.list.mkString(", ")) + } + } + } + + "be rejected if a scatter" in { + CallCachingParameters.validateCallName(Option("scattergather.$scatter_0")) match { + case Success(k) => fail("Unexpected Success: " + k) + case Failure(_) => + } + } + } + + "CallCachingParameters recognized keys validation" should { + "disallow unrecognized keys" in { + CallCachingParameters.validateRecognizedKeys(Seq(AllowTrue, QueryParameter("who", "dat"))) match { + case Success(k) => fail("Unexpected Success: " + k) + case Failure(_) => + } + } + } + + "CallCachingParameters allow validation" should { + "disallow non-boolean values" in { + CallCachingParameters.validateAllow(Seq(QueryParameter("allow", "sure why not"))) match { + case Success(k) => fail("Unexpected Success: " + k) + case Failure(_) => + } + } + + // Arguably this shouldn't allow multiple `allow`s at all, but for now the validation lets them slide + // as long as they're all the same. + "disallow incoherent settings" in { + CallCachingParameters.validateAllow(Seq(AllowTrue, AllowFalse)) match { + case Success(k) => fail("Unexpected Success: " + k) + case Failure(_) => + } + } + } +} diff --git a/src/test/scala/cromwell/webservice/CromwellApiServiceSpec.scala b/src/test/scala/cromwell/webservice/CromwellApiServiceSpec.scala index d949bbf56e8..da2ca3dca5d 100644 --- a/src/test/scala/cromwell/webservice/CromwellApiServiceSpec.scala +++ b/src/test/scala/cromwell/webservice/CromwellApiServiceSpec.scala @@ -4,11 +4,10 @@ import java.util.UUID import akka.actor.{Actor, Props} import akka.pattern.pipe -import cromwell.CromwellSpec import cromwell.binding._ -import cromwell.binding.values.{WdlValue, WdlFile, WdlInteger} +import cromwell.binding.values.{WdlFile, WdlInteger, WdlValue} import cromwell.engine._ -import cromwell.engine.backend.{WorkflowQueryResult, CallLogs} +import cromwell.engine.backend.{CallLogs, WorkflowQueryResult} import cromwell.engine.workflow.WorkflowManagerActor._ import cromwell.util.SampleWdl.HelloWorld import cromwell.webservice.MockWorkflowManagerActor.{submittedWorkflowId, unknownId} @@ -155,6 +154,25 @@ class MockWorkflowManagerActor extends Actor { } } futureResult pipeTo sender + + case CallCaching(id, parameters, callFqn) => + val parametersByKey = parameters.groupBy(_.key.toLowerCase.capitalize) mapValues { _ map { _.value } } mapValues { _.toSet } + val futureResponse = + Future { + if (id == unknownId) + throw new IllegalArgumentException("Unknown workflow") + if (!parametersByKey.contains("Allow")) + // Currently this is not strictly true as the "allow" validation only fails if "allow"s are non-boolean + // or both true and false. But really it would be better if "allow" was only specified once. + throw new IllegalArgumentException("must specify 'allow' exactly once") + if (parametersByKey.keys.size > 1) + throw new IllegalArgumentException("Unrecognized parameters: " + (parametersByKey.keys.toSet - "allow").mkString(", ")) + if (callFqn.contains("bogus")) + throw new IllegalArgumentException("Invalid call") + // If we run the gauntlet of exception throwing checks, return a made up update count. + 1 + } + futureResponse pipeTo sender } } @@ -653,15 +671,15 @@ class CromwellApiServiceSpec extends FlatSpec with CromwellApiService with Scala "Cromwell query API" should "return 400 for a bad query" in { Get(s"/workflows/$version/query?BadKey=foo") ~> - queryRoute ~> - check { - assertResult(StatusCodes.BadRequest) { - status + queryRoute ~> + check { + assertResult(StatusCodes.BadRequest) { + status + } } - } } - "Cromwell query API" should "return good results for a good query" in { + it should "return good results for a good query" in { Get(s"/workflows/$version/query?status=Succeeded") ~> queryRoute ~> check { @@ -673,4 +691,97 @@ class CromwellApiServiceSpec extends FlatSpec with CromwellApiService with Scala } } } + + "Cromwell single call caching API" should "work with good input" in { + Post(s"/workflows/$version/${MockWorkflowManagerActor.submittedScatterWorkflowId}/call-caching/w.good_call?allow=false") ~> + callCachingRoute ~> + check { + assertResult(StatusCodes.OK) { status } + } + } + + it should "reject missing 'allow'" in { + Post(s"/workflows/$version/${MockWorkflowManagerActor.submittedScatterWorkflowId}/call-caching/w.good_call") ~> + callCachingRoute ~> + check { + assertResult(StatusCodes.BadRequest) { status } + assertResult(true) { + responseAs[String].contains("must specify 'allow' exactly once") + } + } + } + + it should "reject bogus calls" in { + Post(s"/workflows/$version/${MockWorkflowManagerActor.submittedScatterWorkflowId}/call-caching/bogus?allow=true") ~> + callCachingRoute ~> + check { + assertResult(StatusCodes.BadRequest) { status } + assertResult(true) { + responseAs[String].contains("Invalid call") + } + } + } + + it should "reject invalid parameter keys" in { + Post(s"/workflows/$version/${MockWorkflowManagerActor.submittedScatterWorkflowId}/call-caching/w.good_call?allow=true&bogusKey=foo") ~> + callCachingRoute ~> + check { + assertResult(StatusCodes.BadRequest) { status } + assertResult(true) { + responseAs[String].contains("Unrecognized parameters: ") + } + } + } + + it should "reject bogus workflows" in { + Post(s"/workflows/$version/${MockWorkflowManagerActor.unknownId}/call-caching/w.good_call?allow=true") ~> + callCachingRoute ~> + check { + assertResult(StatusCodes.BadRequest) { status } + assertResult(true) { + responseAs[String].contains("Unknown workflow") + } + } + } + + "Cromwell all call caching API" should "work with good input" in { + Post(s"/workflows/$version/${MockWorkflowManagerActor.submittedScatterWorkflowId}/call-caching?allow=false") ~> + callCachingRoute ~> + check { + assertResult(StatusCodes.OK) { status } + } + } + + it should "reject missing 'allow'" in { + Post(s"/workflows/$version/${MockWorkflowManagerActor.submittedScatterWorkflowId}/call-caching") ~> + callCachingRoute ~> + check { + assertResult(StatusCodes.BadRequest) { status } + assertResult(true) { + responseAs[String].contains("must specify 'allow' exactly once") + } + } + } + + it should "reject invalid parameter keys" in { + Post(s"/workflows/$version/${MockWorkflowManagerActor.submittedScatterWorkflowId}/call-caching?allow=true&bogusKey=foo") ~> + callCachingRoute ~> + check { + assertResult(StatusCodes.BadRequest) { status } + assertResult(true) { + responseAs[String].contains("Unrecognized parameters: ") + } + } + } + + it should "reject bogus workflows" in { + Post(s"/workflows/$version/${MockWorkflowManagerActor.unknownId}/call-caching?allow=true") ~> + callCachingRoute ~> + check { + assertResult(StatusCodes.BadRequest) { status } + assertResult(true) { + responseAs[String].contains("Unknown workflow") + } + } + } }