Skip to content

Commit

Permalink
break up monolithic tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mcovarr committed Dec 4, 2015
1 parent c2151f0 commit 6783d72
Showing 1 changed file with 50 additions and 47 deletions.
97 changes: 50 additions & 47 deletions src/test/scala/cromwell/engine/db/slick/SlickDataAccessSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -221,28 +221,26 @@ class SlickDataAccessSpec extends FlatSpec with Matchers with ScalaFutures {
} yield ()).futureValue
}

it should "support call caching configuration for specified calls" in {
assume(canConnect || testRequired)

def assertFailureFor(id: WorkflowId, callName: String, messages: String*): Future[Unit] = {
// The `from` Future is expected to fail, so if the `map` block runs the test should fail. `recover` a failed
// Future and assert all the expected messages are present in the exception text and the correct number of
// expected failures are seen.
CallCachingParameters.from(id, Option(callName), AllowFalse, dataAccess) map {
s => throw new RuntimeException(s"Unexpected success: $s") } recover {
case e: IllegalArgumentException =>
messages foreach { m => if (!e.getMessage.contains(m)) throw new RuntimeException(s"Missing message: $m. Exception text: ${e.getMessage}") }
if (e.getMessage.count(_ == '\n') != messages.size - 1) throw new RuntimeException(s"Unexpected messages seen: ${e.getMessage}")
}
def assertCallCachingFailure(id: WorkflowId, callName: Option[String], messages: String*): Future[Unit] = {
// The `from` Future is expected to fail, so if the `map` block runs the test should fail. `recover` the
// failed Future and assert all the expected messages are present in the exception text and the correct number
// of expected failures are seen.
CallCachingParameters.from(id, None, AllowFalse, dataAccess) map {
s => throw new RuntimeException(s"Unexpected success: $s") } recover {
case e: IllegalArgumentException =>
messages foreach { m => if (!e.getMessage.contains(m)) throw new RuntimeException(s"Missing message: $m. Exception text: ${e.getMessage}") }
if (e.getMessage.count(_ == '\n') != messages.size - 1) throw new RuntimeException(s"Unexpected messages seen: ${e.getMessage}")
}
}

it should "support call caching configuration for specified calls in a regular workflow" in {
assume(canConnect || testRequired)
val workflowInfo = new WorkflowDescriptor(WorkflowId(UUID.randomUUID()), SampleWdl.ThreeStep.asWorkflowSources())
val scatterWorkflowInfo = new WorkflowDescriptor(WorkflowId(UUID.randomUUID()), SampleWdl.SimpleScatterWdl.asWorkflowSources())

(for {
_ <- dataAccess.createWorkflow(workflowInfo, Nil, workflowInfo.namespace.workflow.calls, localBackend)
// Unknown workflow
_ <- assertFailureFor(WorkflowId(UUID.randomUUID()), "three_step.ps", "Workflow not found")
_ <- assertCallCachingFailure(WorkflowId(UUID.randomUUID()), callName = Option("three_step.ps"), "Workflow not found")
_ <- dataAccess.setStatus(workflowInfo.id, Seq(ExecutionDatabaseKey("three_step.ps", None)), ExecutionStatus.Done)
executions <- dataAccess.getExecutions(workflowInfo.id)
_ = executions should have size 3
Expand All @@ -255,88 +253,93 @@ class SlickDataAccessSpec extends FlatSpec with Matchers with ScalaFutures {
(allowing, disallowing) = executions partition { _.allowsResultReuse }
_ = allowing should have size 2
_ = disallowing should have size 1
_ = disallowing.seq.head.callFqn should be ("three_step.ps")
_ = disallowing.seq.head.callFqn should be("three_step.ps")
} yield ()).futureValue
}

it should "support call caching configuration for specified calls in a scattered workflow" in {
assume(canConnect || testRequired)
val workflowInfo = new WorkflowDescriptor(WorkflowId(UUID.randomUUID()), SampleWdl.SimpleScatterWdl.asWorkflowSources())

(for {
// The `inside_scatter` is a to-be-exploded placeholder, but it will conflict with the collector that the
// scatter explodes below so filter that out.
_ <- dataAccess.createWorkflow(scatterWorkflowInfo, Nil, scatterWorkflowInfo.namespace.workflow.calls.filterNot(_.name == "inside_scatter"), localBackend)
scatter = scatterWorkflowInfo.namespace.workflow.scatters.head
_ <- dataAccess.createWorkflow(workflowInfo, Nil, workflowInfo.namespace.workflow.calls.filterNot(_.name == "inside_scatter"), localBackend)
scatter = workflowInfo.namespace.workflow.scatters.head
scatterKey = ScatterKey(scatter, None)
newEntries = scatterKey.populate(5)
_ <- dataAccess.insertCalls(scatterWorkflowInfo.id, newEntries.keys, localBackend)
executions <- dataAccess.getExecutions(scatterWorkflowInfo.id)
_ <- dataAccess.insertCalls(workflowInfo.id, newEntries.keys, localBackend)
executions <- dataAccess.getExecutions(workflowInfo.id)
_ = executions foreach { _.allowsResultReuse shouldBe true }

outsideParams <- CallCachingParameters.from(scatterWorkflowInfo.id, Option("scatter0.outside_scatter"), AllowFalse, dataAccess)
// Calls outside the scatter should work the same as an unscattered workflow.
outsideParams <- CallCachingParameters.from(workflowInfo.id, Option("scatter0.outside_scatter"), AllowFalse, dataAccess)
_ <- dataAccess.updateCallCaching(outsideParams)
executions <- dataAccess.getExecutions(scatterWorkflowInfo.id)
executions <- dataAccess.getExecutions(workflowInfo.id)
(allowing, disallowing) = executions partition { _.allowsResultReuse }
_ = allowing should have size (executions.size - 1)
_ = disallowing should have size 1
_ = disallowing.seq.head.callFqn should be ("scatter0.outside_scatter")

// Support unindexed scattered call targets to update all shards
// Support unindexed scattered call targets to update all shards.
_ = executions filter { _.callFqn == "scatter0.inside_scatter" } foreach { _.allowsResultReuse shouldBe true }
unindexedCallParams <- CallCachingParameters.from(scatterWorkflowInfo.id, Option("scatter0.inside_scatter"), AllowFalse, dataAccess)
unindexedCallParams <- CallCachingParameters.from(workflowInfo.id, Option("scatter0.inside_scatter"), AllowFalse, dataAccess)
_ <- dataAccess.updateCallCaching(unindexedCallParams)
executions <- dataAccess.getExecutions(scatterWorkflowInfo.id)
executions <- dataAccess.getExecutions(workflowInfo.id)
_ = executions filter { _.callFqn == "scatter0.inside_scatter" } foreach { _.allowsResultReuse shouldBe false }

insideParams <- CallCachingParameters.from(scatterWorkflowInfo.id, Option("scatter0.inside_scatter.3"), AllowTrue, dataAccess)
// Support indexed shards as well.
insideParams <- CallCachingParameters.from(workflowInfo.id, Option("scatter0.inside_scatter.3"), AllowTrue, dataAccess)
_ <- dataAccess.updateCallCaching(insideParams)
executions <- dataAccess.getExecutions(scatterWorkflowInfo.id)
executions <- dataAccess.getExecutions(workflowInfo.id)
inside = executions filter { e => e.callFqn == "scatter0.inside_scatter" }
(allowing, disallowing) = inside partition { _.allowsResultReuse }
_ = allowing should have size 1
_ = disallowing should have size (inside.size - 1)
} yield ()).futureValue
}

it should "support call caching configuration for all calls in a workflow" in {
it should "support call caching configuration for all calls in a regular workflow" in {
assume(canConnect || testRequired)

def assertFailureFor(id: WorkflowId, messages: String*): Future[Unit] = {
// The `from` Future is expected to fail, so if the `map` block runs the test should fail. `recover` the
// failed Future and assert all the expected messages are present in the exception text and the correct number
// of expected failures are seen.
CallCachingParameters.from(id, None, AllowFalse, dataAccess) map { s => throw new RuntimeException(s"Unexpected success: $s") } recover {
case e: IllegalArgumentException =>
messages foreach { m => if (!e.getMessage.contains(m)) throw new RuntimeException(s"Missing message: $m. Exception text: ${e.getMessage}") }
if (e.getMessage.count(_ == '\n') != messages.size - 1) throw new RuntimeException(s"Unexpected messages seen: ${e.getMessage}")
}
}

val workflowInfo = new WorkflowDescriptor(WorkflowId(UUID.randomUUID()), SampleWdl.ThreeStep.asWorkflowSources())
val scatterWorkflowInfo = new WorkflowDescriptor(WorkflowId(UUID.randomUUID()), SampleWdl.SimpleScatterWdl.asWorkflowSources())
(for {
_ <- dataAccess.createWorkflow(workflowInfo, Nil, workflowInfo.namespace.workflow.calls, localBackend)
// Unknown workflow
_ <- assertFailureFor(WorkflowId(UUID.randomUUID()), "Workflow not found")
_ <- assertCallCachingFailure(WorkflowId(UUID.randomUUID()), callName = None, "Workflow not found")
params <- CallCachingParameters.from(workflowInfo.id, None, AllowFalse, dataAccess)
executions <- dataAccess.getExecutions(workflowInfo.id)
_ = executions should have size 3
_ = executions foreach { _.allowsResultReuse shouldBe true }
_ <- dataAccess.updateCallCaching(params)
executions <- dataAccess.getExecutions(workflowInfo.id)
_ = executions foreach { _.allowsResultReuse shouldBe false }
} yield ()).futureValue
}

it should "support call caching configuration for all calls in a scattered workflow" in {
assume(canConnect || testRequired)

val workflowInfo = new WorkflowDescriptor(WorkflowId(UUID.randomUUID()), SampleWdl.SimpleScatterWdl.asWorkflowSources())
(for {
// The `inside_scatter` is a to-be-exploded placeholder, but it will conflict with the collector that the
// scatter explodes below so filter that out.
_ <- dataAccess.createWorkflow(scatterWorkflowInfo, Nil, scatterWorkflowInfo.namespace.workflow.calls.filterNot(_.name == "inside_scatter"), localBackend)
scatter = scatterWorkflowInfo.namespace.workflow.scatters.head
_ <- dataAccess.createWorkflow(workflowInfo, Nil, workflowInfo.namespace.workflow.calls.filterNot(_.name == "inside_scatter"), localBackend)
scatter = workflowInfo.namespace.workflow.scatters.head
scatterKey = ScatterKey(scatter, None)
newEntries = scatterKey.populate(5)
_ <- dataAccess.insertCalls(scatterWorkflowInfo.id, newEntries.keys, localBackend)
_ <- dataAccess.insertCalls(workflowInfo.id, newEntries.keys, localBackend)

scatterParams <- CallCachingParameters.from(scatterWorkflowInfo.id, None, AllowFalse, dataAccess)
executions <- dataAccess.getExecutions(scatterWorkflowInfo.id)
scatterParams <- CallCachingParameters.from(workflowInfo.id, None, AllowFalse, dataAccess)
executions <- dataAccess.getExecutions(workflowInfo.id)
_ = executions foreach { _.allowsResultReuse shouldBe true }
_ <- dataAccess.updateCallCaching(scatterParams)
executions <- dataAccess.getExecutions(scatterWorkflowInfo.id)
executions <- dataAccess.getExecutions(workflowInfo.id)
_ = executions foreach { _.allowsResultReuse shouldBe false }
} yield ()).futureValue
}


it should "query a single execution status" in {
assume(canConnect || testRequired)
val workflowId = WorkflowId(UUID.randomUUID())
Expand Down

0 comments on commit 6783d72

Please sign in to comment.