Skip to content

Commit

Permalink
Call caching endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
mcovarr committed Dec 7, 2015
1 parent a1d6205 commit 2c354e8
Show file tree
Hide file tree
Showing 17 changed files with 746 additions and 34 deletions.
87 changes: 81 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ Workflow engine using [WDL](https://github.com/broadinstitute/wdl/blob/wdl2/SPEC
* [GET /api/workflows/:version/:id/logs](#get-apiworkflowsversionidlogs)
* [GET /api/workflows/:version/:id/metadata](#get-apiworkflowsversionidmetadata)
* [POST /api/workflows/:version/:id/abort](#post-apiworkflowsversionidabort)
* [POST /api/workflows/:version/:id/call-caching](#post-apiworkflowsversionidcallcaching)
* [POST /api/workflows/:version/:id/call-caching/:call](#post-apiworkflowsversionidcallcachingcall)
* [Developer](#developer)
* [Generate WDL Parser](#generate-wdl-parser)
* [Generating and Hosting ScalaDoc](#generating-and-hosting-scaladoc)
Expand Down Expand Up @@ -1425,12 +1427,11 @@ Server: spray-can/1.3.3
## GET /api/workflows/:version/query

This endpoint allows for querying workflows based on the following criteria:
<ul>
<li>`name`</li>
<li>`status`</li>
<li>`start` (start datetime)</li>
<li>`end` (end datetime)</li>
</ul>

* `name`
* `status`
* `start` (start datetime)
* `end` (end datetime)

Names and statuses can be given multiple times to include workflows with any of the specified names or statuses.
Valid statuses are `Submitted`, `Running`, `Aborting`, `Aborted`, `Failed`, and `Succeeded`. `start` and `end` should
Expand Down Expand Up @@ -1850,6 +1851,80 @@ Server: spray-can/1.3.3
}
```

## POST /api/workflows/:version/:id/call-caching

This endpoint allows for reconfiguration of call cache result reuse settings for all calls within a workflow.

Accepted parameters are:

* `allow` Mandatory boolean value, specifies whether call cache result reuse is allowed for all calls in the
specified workflow.

cURL:

```
$ curl -X POST http://localhost:8000/api/workflows/v1/e442e52a-9de1-47f0-8b4f-e6e565008cf1/call-caching?allow=false
```

HTTPie:

```
$ http POST http://localhost:8000/api/workflows/v1/e442e52a-9de1-47f0-8b4f-e6e565008cf1/call-caching?allow=false
```

Response:
```
HTTP/1.1 200 OK
Content-Length: 17
Content-Type: application/json; charset=UTF-8
Date: Thu, 04 Jun 2015 12:15:33 GMT
Server: spray-can/1.3.3
{
"updateCount": 3
}
```

## POST /api/workflows/:version/:id/call-caching/:call

This endpoint allows for reconfiguration of call cache result reuse settings for a single call within a workflow.

Accepted parameters are:

* `allow` Mandatory boolean value, specifies whether call cache result reuse is allowed for the specified call in the
specified workflow.

For scattered calls, individual calls within the scatter can be targeted by appending a dot and the zero-based shard index.
e.g. `scatter_workflow.A.0` would target the zeroth shard of a scattered `A` call. If a shard index is not supplied for
a scattered call, all shards are targeted for update.

cURL:

```
$ curl -X POST http://localhost:8000/api/workflows/v1/e442e52a-9de1-47f0-8b4f-e6e565008cf1/call-caching/three_step.wc?allow=false
```

HTTPie:

```
$ http POST http://localhost:8000/api/workflows/v1/e442e52a-9de1-47f0-8b4f-e6e565008cf1/call-caching/three_step.wc?allow=false
```

Response:
```
HTTP/1.1 200 OK
Content-Length: 17
Content-Type: application/json; charset=UTF-8
Date: Thu, 04 Jun 2015 12:15:33 GMT
Server: spray-can/1.3.3
{
"updateCount": 1
}
```

# Developer

## Generate WDL Parser
Expand Down
83 changes: 82 additions & 1 deletion src/main/resources/swagger/cromwell.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ paths:
items:
type: string
collectionFormat: multi
pattern: ^[a-zA-Z][a-zA-Z0-9_]+$
pattern: ^[a-zA-Z][a-zA-Z0-9_]*$
description: >
Returns only workflows with the specified name. If specified multiple times,
returns workflows with any of the specified names.
Expand Down Expand Up @@ -507,6 +507,79 @@ paths:
security:
- google_oauth:
- openid
'/workflows/{version}/{id}/call-caching':
post:
summary: Alter call cache result reuse settings for all calls within the specified workflow.
parameters:
- name: version
description: API Version
required: true
type: string
in: path
default: v1
- name: id
description: Workflow ID
required: true
type: string
in: path
- name: allow
description: Whether to allow call cache result reuse for all calls within the specified workflow.
required: true
type: boolean
in: query
tags:
- Workflows
responses:
'200':
description: Successful Request
schema:
$ref: '#/definitions/CallCachingResponse'
'400':
description: Validation error
'500':
description: Internal Error
security:
- google_oauth:
- openid
'/workflows/{version}/{id}/call-caching/{callFqn}':
post:
summary: Alter call cache result reuse settings for the specified call within the specified workflow.
parameters:
- name: version
description: API Version
required: true
type: string
in: path
default: v1
- name: id
description: Workflow ID
required: true
type: string
in: path
- name: callFqn
description: Call fully qualified name
required: true
type: string
in: path
- name: allow
description: Whether to allow call cache result reuse for the specified call within the workflow.
required: true
type: boolean
in: query
tags:
- Workflows
responses:
'200':
description: Successful Request
schema:
$ref: '#/definitions/CallCachingResponse'
'400':
description: Validation error
'500':
description: Internal Error
security:
- google_oauth:
- openid
securityDefinitions:
google_oauth:
type: oauth2
Expand Down Expand Up @@ -671,4 +744,12 @@ definitions:
type: string
format: date-time
description: Workflow end datetime
CallCachingResponse:
description: Update count for call caching result reuse endpoints.
required:
- updateCount
properties:
updateCount:
type: int
description: Number of calls updated by this request.

37 changes: 37 additions & 0 deletions src/main/scala/cromwell/binding/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,41 @@ package object binding {
case (k, CallOutput(wdlValue, hash)) => (k, wdlValue)
}
}

object Patterns {

val WorkflowName = """
(?x) # Turn on comments and whitespace insensitivity.
( # Begin capture.
[a-zA-Z][a-zA-Z0-9_]* # WDL identifier naming pattern of an initial alpha character followed by zero
# or more alphanumeric or underscore characters.
) # End capture.
""".trim.r

val CallFullyQualifiedName = """
(?x) # Turn on comments and whitespace insensitivity.
( # Begin outer capturing group for FQN.
(?:[a-zA-Z][a-zA-Z0-9_]*) # Inner noncapturing group for top-level workflow name. This is the WDL
# identifier naming pattern of an initial alpha character followed by zero
# or more alphanumeric or underscore characters.
(?:\.[a-zA-Z][a-zA-Z0-9_]*){1} # Inner noncapturing group for call name, a literal dot followed by a WDL
# identifier. Currently this is quantified to {1} since the call name is
# mandatory and nested workflows are not supported. This could be changed
# to + or a different quantifier if these assumptions change.
) # End outer capturing group for FQN.
(?: # Begin outer noncapturing group for shard.
\. # Literal dot.
(\d+) # Captured shard digits.
)? # End outer optional noncapturing group for shard.
""".trim.r // The trim is necessary as (?x) must be at the beginning of the regex.
}
}
4 changes: 3 additions & 1 deletion src/main/scala/cromwell/engine/db/DataAccess.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import cromwell.engine.backend.{Backend, JobKey}
import cromwell.engine.db.slick._
import cromwell.engine.workflow.{CallKey, ExecutionStoreKey, OutputKey}
import cromwell.engine.{SymbolStoreEntry, WorkflowDescriptor, WorkflowId, WorkflowState}
import cromwell.webservice.{WorkflowQueryParameters, WorkflowQueryResponse}
import cromwell.webservice.{CallCachingParameters, WorkflowQueryParameters, WorkflowQueryResponse}

import scala.concurrent.Future

Expand Down Expand Up @@ -101,4 +101,6 @@ trait DataAccess {
def findResumableJesExecutions(workflowId: WorkflowId): Future[Map[ExecutionDatabaseKey, JobKey]]

def queryWorkflows(queryParameters: WorkflowQueryParameters): Future[WorkflowQueryResponse]

def updateCallCaching(cachingParameters: CallCachingParameters): Future[Int]
}
21 changes: 21 additions & 0 deletions src/main/scala/cromwell/engine/db/slick/ExecutionComponent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,27 @@ trait ExecutionComponent {
if workflowExecution.workflowExecutionUuid === workflowExecutionUuid
} yield execution)

val executionsByWorkflowExecutionIdAndCallFqnAndIndex = Compiled(
(workflowExecutionId: Rep[Int], callFqn: Rep[String], index: Rep[Int]) => for {
execution <- executions
if execution.workflowExecutionId === workflowExecutionId
if execution.callFqn === callFqn
if execution.index === index
} yield execution)

val executionsByWorkflowExecutionIdAndCallFqn = Compiled(
(workflowExecutionId: Rep[Int], callFqn: Rep[String]) => for {
execution <- executions
if execution.workflowExecutionId === workflowExecutionId
if execution.callFqn === callFqn
} yield execution)

val executionsByWorkflowExecutionId = Compiled(
(workflowExecutionId: Rep[Int]) => for {
execution <- executions
if execution.workflowExecutionId === workflowExecutionId
} yield execution)

val executionsByWorkflowExecutionUuidAndCallFqn = Compiled(
(workflowExecutionUuid: Rep[String], callFqn: Rep[String]) => for {
execution <- executions
Expand Down
24 changes: 21 additions & 3 deletions src/main/scala/cromwell/engine/db/slick/SlickDataAccess.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@ import cromwell.binding.values.WdlValue
import cromwell.engine.ExecutionIndex._
import cromwell.engine.ExecutionStatus._
import cromwell.engine._
import cromwell.engine.backend.{WorkflowQueryResult, Backend}
import cromwell.engine.backend.jes.{JesBackend, JesJobKey}
import cromwell.engine.backend.local.LocalBackend
import cromwell.engine.backend.sge.SgeBackend
import cromwell.engine.backend.{Backend, WorkflowQueryResult}
import cromwell.engine.db._
import cromwell.engine.workflow.{CallKey, ExecutionStoreKey, OutputKey, ScatterKey}
import cromwell.webservice.CromwellApiHandler.WorkflowQuery
import cromwell.webservice.{WorkflowQueryParameters, WorkflowQueryResponse}
import cromwell.webservice.{CallCachingParameters, WorkflowQueryParameters, WorkflowQueryResponse}
import lenthall.config.ScalaConfig._
import org.joda.time.DateTime
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -719,4 +718,23 @@ class SlickDataAccess(databaseConfig: Config) extends DataAccess {
})
}
}

override def updateCallCaching(parameters: CallCachingParameters): Future[Int] = {
// Figure out which of the three possible queries to use based on whether a call has been specified and
// if so whether an index has been specified.
val executionQuery: (Int) => Query[dataAccess.Executions, Execution, Seq] = {
(parameters.callKey, parameters.callKey flatMap { _.index }) match {
case (Some(key), Some(idx)) => dataAccess.executionsByWorkflowExecutionIdAndCallFqnAndIndex(_: Int, key.fqn, idx).extract
case (Some(key), None) => dataAccess.executionsByWorkflowExecutionIdAndCallFqn(_: Int, key.fqn).extract
case _ => dataAccess.executionsByWorkflowExecutionId(_: Int).extract
}
}

val action = for {
workflowExecution <- dataAccess.workflowExecutionsByWorkflowExecutionUuid(parameters.workflowId.id.toString).result.head
count <- executionQuery(workflowExecution.workflowExecutionId.get).map(_.allowsResultReuse).update(parameters.allow)
} yield count

runTransaction(action)
}
}
17 changes: 13 additions & 4 deletions src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,22 @@ import cromwell.binding._
import cromwell.engine.ExecutionIndex._
import cromwell.engine.ExecutionStatus.ExecutionStatus
import cromwell.engine._
import cromwell.engine.backend.{Backend, CallMetadata, CallLogs}
import cromwell.engine.backend.{Backend, CallLogs, CallMetadata}
import cromwell.engine.db.DataAccess._
import cromwell.engine.db.ExecutionDatabaseKey
import cromwell.engine.db.slick._
import cromwell.engine.workflow.WorkflowActor.{Restart, Start}
import cromwell.util.WriteOnceStore
import cromwell.webservice.{WorkflowQueryParameters, WorkflowQueryResponse, WorkflowMetadataResponse}
import cromwell.webservice._
import org.joda.time.DateTime
import spray.json._

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.io.Source
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
import scala.concurrent.duration._

object WorkflowManagerActor {
class WorkflowNotFoundException(message: String) extends RuntimeException(message)
Expand All @@ -44,6 +44,7 @@ object WorkflowManagerActor {
case class WorkflowAbort(id: WorkflowId) extends WorkflowManagerActorMessage
final case class WorkflowMetadata(id: WorkflowId) extends WorkflowManagerActorMessage
final case class RestartWorkflows(workflows: Seq[WorkflowDescriptor]) extends WorkflowManagerActorMessage
final case class CallCaching(id: WorkflowId, parameters: QueryParameters, call: Option[String]) extends WorkflowManagerActorMessage

def props(backend: Backend): Props = Props(new WorkflowManagerActor(backend))

Expand Down Expand Up @@ -101,6 +102,7 @@ class WorkflowManagerActor(backend: Backend) extends Actor with CromwellActor {
self ! RestartWorkflows(ws)
}
case RestartWorkflows(Nil) => // No more workflows need restarting.
case CallCaching(id, parameters, callName) => callCaching(id, parameters, callName) pipeTo sender
}

/**
Expand Down Expand Up @@ -294,9 +296,16 @@ class WorkflowManagerActor(backend: Backend) extends Actor with CromwellActor {

private def query(rawParameters: Seq[(String, String)]): Future[WorkflowQueryResponse] = {
for {
// Future/Try to wrap the exception that might be thrown from WorkflowQueryParameters.apply.
// Future/Try to wrap the exception that might be thrown from WorkflowQueryParameters.apply.
parameters <- Future.fromTry(Try(WorkflowQueryParameters(rawParameters)))
response <- globalDataAccess.queryWorkflows(parameters)
} yield response
}

private def callCaching(id: WorkflowId, parameters: QueryParameters, callName: Option[String]): Future[Int] = {
for {
cachingParameters <- CallCachingParameters.from(id, callName, parameters)
updateCount <- globalDataAccess.updateCallCaching(cachingParameters)
} yield updateCount
}
}
Loading

0 comments on commit 2c354e8

Please sign in to comment.