diff --git a/src/test/scala/cromwell/engine/db/slick/SlickDataAccessSpec.scala b/src/test/scala/cromwell/engine/db/slick/SlickDataAccessSpec.scala index 766a96911cf..928cebabf19 100644 --- a/src/test/scala/cromwell/engine/db/slick/SlickDataAccessSpec.scala +++ b/src/test/scala/cromwell/engine/db/slick/SlickDataAccessSpec.scala @@ -221,28 +221,26 @@ class SlickDataAccessSpec extends FlatSpec with Matchers with ScalaFutures { } yield ()).futureValue } - it should "support call caching configuration for specified calls" in { - assume(canConnect || testRequired) - - def assertFailureFor(id: WorkflowId, callName: String, messages: String*): Future[Unit] = { - // The `from` Future is expected to fail, so if the `map` block runs the test should fail. `recover` a failed - // Future and assert all the expected messages are present in the exception text and the correct number of - // expected failures are seen. - CallCachingParameters.from(id, Option(callName), AllowFalse, dataAccess) map { - s => throw new RuntimeException(s"Unexpected success: $s") } recover { - case e: IllegalArgumentException => - messages foreach { m => if (!e.getMessage.contains(m)) throw new RuntimeException(s"Missing message: $m. Exception text: ${e.getMessage}") } - if (e.getMessage.count(_ == '\n') != messages.size - 1) throw new RuntimeException(s"Unexpected messages seen: ${e.getMessage}") - } + def assertCallCachingFailure(id: WorkflowId, callName: Option[String], messages: String*): Future[Unit] = { + // The `from` Future is expected to fail, so if the `map` block runs the test should fail. `recover` the + // failed Future and assert all the expected messages are present in the exception text and the correct number + // of expected failures are seen. + CallCachingParameters.from(id, None, AllowFalse, dataAccess) map { + s => throw new RuntimeException(s"Unexpected success: $s") } recover { + case e: IllegalArgumentException => + messages foreach { m => if (!e.getMessage.contains(m)) throw new RuntimeException(s"Missing message: $m. Exception text: ${e.getMessage}") } + if (e.getMessage.count(_ == '\n') != messages.size - 1) throw new RuntimeException(s"Unexpected messages seen: ${e.getMessage}") } + } + it should "support call caching configuration for specified calls in a regular workflow" in { + assume(canConnect || testRequired) val workflowInfo = new WorkflowDescriptor(WorkflowId(UUID.randomUUID()), SampleWdl.ThreeStep.asWorkflowSources()) - val scatterWorkflowInfo = new WorkflowDescriptor(WorkflowId(UUID.randomUUID()), SampleWdl.SimpleScatterWdl.asWorkflowSources()) (for { _ <- dataAccess.createWorkflow(workflowInfo, Nil, workflowInfo.namespace.workflow.calls, localBackend) // Unknown workflow - _ <- assertFailureFor(WorkflowId(UUID.randomUUID()), "three_step.ps", "Workflow not found") + _ <- assertCallCachingFailure(WorkflowId(UUID.randomUUID()), callName = Option("three_step.ps"), "Workflow not found") _ <- dataAccess.setStatus(workflowInfo.id, Seq(ExecutionDatabaseKey("three_step.ps", None)), ExecutionStatus.Done) executions <- dataAccess.getExecutions(workflowInfo.id) _ = executions should have size 3 @@ -255,36 +253,45 @@ class SlickDataAccessSpec extends FlatSpec with Matchers with ScalaFutures { (allowing, disallowing) = executions partition { _.allowsResultReuse } _ = allowing should have size 2 _ = disallowing should have size 1 - _ = disallowing.seq.head.callFqn should be ("three_step.ps") + _ = disallowing.seq.head.callFqn should be("three_step.ps") + } yield ()).futureValue + } + + it should "support call caching configuration for specified calls in a scattered workflow" in { + assume(canConnect || testRequired) + val workflowInfo = new WorkflowDescriptor(WorkflowId(UUID.randomUUID()), SampleWdl.SimpleScatterWdl.asWorkflowSources()) + (for { // The `inside_scatter` is a to-be-exploded placeholder, but it will conflict with the collector that the // scatter explodes below so filter that out. - _ <- dataAccess.createWorkflow(scatterWorkflowInfo, Nil, scatterWorkflowInfo.namespace.workflow.calls.filterNot(_.name == "inside_scatter"), localBackend) - scatter = scatterWorkflowInfo.namespace.workflow.scatters.head + _ <- dataAccess.createWorkflow(workflowInfo, Nil, workflowInfo.namespace.workflow.calls.filterNot(_.name == "inside_scatter"), localBackend) + scatter = workflowInfo.namespace.workflow.scatters.head scatterKey = ScatterKey(scatter, None) newEntries = scatterKey.populate(5) - _ <- dataAccess.insertCalls(scatterWorkflowInfo.id, newEntries.keys, localBackend) - executions <- dataAccess.getExecutions(scatterWorkflowInfo.id) + _ <- dataAccess.insertCalls(workflowInfo.id, newEntries.keys, localBackend) + executions <- dataAccess.getExecutions(workflowInfo.id) _ = executions foreach { _.allowsResultReuse shouldBe true } - outsideParams <- CallCachingParameters.from(scatterWorkflowInfo.id, Option("scatter0.outside_scatter"), AllowFalse, dataAccess) + // Calls outside the scatter should work the same as an unscattered workflow. + outsideParams <- CallCachingParameters.from(workflowInfo.id, Option("scatter0.outside_scatter"), AllowFalse, dataAccess) _ <- dataAccess.updateCallCaching(outsideParams) - executions <- dataAccess.getExecutions(scatterWorkflowInfo.id) + executions <- dataAccess.getExecutions(workflowInfo.id) (allowing, disallowing) = executions partition { _.allowsResultReuse } _ = allowing should have size (executions.size - 1) _ = disallowing should have size 1 _ = disallowing.seq.head.callFqn should be ("scatter0.outside_scatter") - // Support unindexed scattered call targets to update all shards + // Support unindexed scattered call targets to update all shards. _ = executions filter { _.callFqn == "scatter0.inside_scatter" } foreach { _.allowsResultReuse shouldBe true } - unindexedCallParams <- CallCachingParameters.from(scatterWorkflowInfo.id, Option("scatter0.inside_scatter"), AllowFalse, dataAccess) + unindexedCallParams <- CallCachingParameters.from(workflowInfo.id, Option("scatter0.inside_scatter"), AllowFalse, dataAccess) _ <- dataAccess.updateCallCaching(unindexedCallParams) - executions <- dataAccess.getExecutions(scatterWorkflowInfo.id) + executions <- dataAccess.getExecutions(workflowInfo.id) _ = executions filter { _.callFqn == "scatter0.inside_scatter" } foreach { _.allowsResultReuse shouldBe false } - insideParams <- CallCachingParameters.from(scatterWorkflowInfo.id, Option("scatter0.inside_scatter.3"), AllowTrue, dataAccess) + // Support indexed shards as well. + insideParams <- CallCachingParameters.from(workflowInfo.id, Option("scatter0.inside_scatter.3"), AllowTrue, dataAccess) _ <- dataAccess.updateCallCaching(insideParams) - executions <- dataAccess.getExecutions(scatterWorkflowInfo.id) + executions <- dataAccess.getExecutions(workflowInfo.id) inside = executions filter { e => e.callFqn == "scatter0.inside_scatter" } (allowing, disallowing) = inside partition { _.allowsResultReuse } _ = allowing should have size 1 @@ -292,26 +299,14 @@ class SlickDataAccessSpec extends FlatSpec with Matchers with ScalaFutures { } yield ()).futureValue } - it should "support call caching configuration for all calls in a workflow" in { + it should "support call caching configuration for all calls in a regular workflow" in { assume(canConnect || testRequired) - def assertFailureFor(id: WorkflowId, messages: String*): Future[Unit] = { - // The `from` Future is expected to fail, so if the `map` block runs the test should fail. `recover` the - // failed Future and assert all the expected messages are present in the exception text and the correct number - // of expected failures are seen. - CallCachingParameters.from(id, None, AllowFalse, dataAccess) map { s => throw new RuntimeException(s"Unexpected success: $s") } recover { - case e: IllegalArgumentException => - messages foreach { m => if (!e.getMessage.contains(m)) throw new RuntimeException(s"Missing message: $m. Exception text: ${e.getMessage}") } - if (e.getMessage.count(_ == '\n') != messages.size - 1) throw new RuntimeException(s"Unexpected messages seen: ${e.getMessage}") - } - } - val workflowInfo = new WorkflowDescriptor(WorkflowId(UUID.randomUUID()), SampleWdl.ThreeStep.asWorkflowSources()) - val scatterWorkflowInfo = new WorkflowDescriptor(WorkflowId(UUID.randomUUID()), SampleWdl.SimpleScatterWdl.asWorkflowSources()) (for { _ <- dataAccess.createWorkflow(workflowInfo, Nil, workflowInfo.namespace.workflow.calls, localBackend) // Unknown workflow - _ <- assertFailureFor(WorkflowId(UUID.randomUUID()), "Workflow not found") + _ <- assertCallCachingFailure(WorkflowId(UUID.randomUUID()), callName = None, "Workflow not found") params <- CallCachingParameters.from(workflowInfo.id, None, AllowFalse, dataAccess) executions <- dataAccess.getExecutions(workflowInfo.id) _ = executions should have size 3 @@ -319,24 +314,32 @@ class SlickDataAccessSpec extends FlatSpec with Matchers with ScalaFutures { _ <- dataAccess.updateCallCaching(params) executions <- dataAccess.getExecutions(workflowInfo.id) _ = executions foreach { _.allowsResultReuse shouldBe false } + } yield ()).futureValue + } + it should "support call caching configuration for all calls in a scattered workflow" in { + assume(canConnect || testRequired) + + val workflowInfo = new WorkflowDescriptor(WorkflowId(UUID.randomUUID()), SampleWdl.SimpleScatterWdl.asWorkflowSources()) + (for { // The `inside_scatter` is a to-be-exploded placeholder, but it will conflict with the collector that the // scatter explodes below so filter that out. - _ <- dataAccess.createWorkflow(scatterWorkflowInfo, Nil, scatterWorkflowInfo.namespace.workflow.calls.filterNot(_.name == "inside_scatter"), localBackend) - scatter = scatterWorkflowInfo.namespace.workflow.scatters.head + _ <- dataAccess.createWorkflow(workflowInfo, Nil, workflowInfo.namespace.workflow.calls.filterNot(_.name == "inside_scatter"), localBackend) + scatter = workflowInfo.namespace.workflow.scatters.head scatterKey = ScatterKey(scatter, None) newEntries = scatterKey.populate(5) - _ <- dataAccess.insertCalls(scatterWorkflowInfo.id, newEntries.keys, localBackend) + _ <- dataAccess.insertCalls(workflowInfo.id, newEntries.keys, localBackend) - scatterParams <- CallCachingParameters.from(scatterWorkflowInfo.id, None, AllowFalse, dataAccess) - executions <- dataAccess.getExecutions(scatterWorkflowInfo.id) + scatterParams <- CallCachingParameters.from(workflowInfo.id, None, AllowFalse, dataAccess) + executions <- dataAccess.getExecutions(workflowInfo.id) _ = executions foreach { _.allowsResultReuse shouldBe true } _ <- dataAccess.updateCallCaching(scatterParams) - executions <- dataAccess.getExecutions(scatterWorkflowInfo.id) + executions <- dataAccess.getExecutions(workflowInfo.id) _ = executions foreach { _.allowsResultReuse shouldBe false } } yield ()).futureValue } + it should "query a single execution status" in { assume(canConnect || testRequired) val workflowId = WorkflowId(UUID.randomUUID())