Skip to content

Commit

Permalink
fix specs
Browse files Browse the repository at this point in the history
  • Loading branch information
marcpiechura committed Sep 17, 2017
1 parent b6b7515 commit 965a108
Showing 1 changed file with 146 additions and 149 deletions.
295 changes: 146 additions & 149 deletions src/core/Akka.Streams.Tests/Dsl/RestartSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -416,18 +416,20 @@ public void A_restart_with_backoff_sink_should_not_restart_the_sink_when_complet
var probe3 = this.SourceProbe<string>().ViaMaterialized(RestartFlow.WithBackoff(() =>
{
created.IncrementAndGet();
var snk = Flow.Create<string>().TakeWhile(s =>
{
return s != "cancel";
}).To(Sink.ForEach<string>(c => flowInSource.SendNext(c))
.MapMaterializedValue(task => task.ContinueWith(
t1 =>
{
if (!t1.IsFaulted || !t1.IsCanceled)
flowInSource.SendNext("in error");
else
flowInSource.SendNext("in complete");
})));
var snk = Flow.Create<string>()
.TakeWhile(s =>
{
return s != "cancel";
})
.To(Sink.ForEach<string>(c => flowInSource.SendNext(c))
.MapMaterializedValue(task => task.ContinueWith(
t1 =>
{
if (t1.IsFaulted || t1.IsCanceled)
flowInSource.SendNext("in error");
else
flowInSource.SendNext("in complete");
})));
var src = flowOutSource.TakeWhile(s =>
{
Expand All @@ -439,7 +441,11 @@ public void A_restart_with_backoff_sink_should_not_restart_the_sink_when_complet
return c;
}).WatchTermination((s1, task) =>
{
flowInSource.SendNext("out complete");
task.ContinueWith(_ =>
{
flowInSource.SendNext("out complete");
return NotUsed.Instance;
}, TaskContinuationOptions.OnlyOnRanToCompletion);
return s1;
});
Expand Down Expand Up @@ -479,129 +485,123 @@ public void A_restart_with_backoff_flow_should_run_normally()
[Fact]
public void A_restart_with_backoff_flow_should_restart_on_cancellation()
{
this.AssertAllStagesStopped(() =>
{
var tuple = SetupFlow(TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(20));
var created = tuple.Item1;
var source = tuple.Item2;
var flowInProbe = tuple.Item3;
var flowOutProbe = tuple.Item4;
var sink = tuple.Item5;
source.SendNext("a");
flowInProbe.RequestNext("a");
flowOutProbe.SendNext("b");
sink.RequestNext("b");
source.SendNext("cancel");
// This will complete the flow in probe and cancel the flow out probe
flowInProbe.Request(2);
ImmutableList.Create(flowInProbe.ExpectNext(), flowInProbe.ExpectNext()).Should().Contain(ImmutableList.Create("in complete", "out complete"));
// and it should restart
source.SendNext("c");
flowInProbe.RequestNext("c");
flowOutProbe.SendNext("d");
sink.RequestNext("d");
created.Current.Should().Be(2);
}, Materializer);
var tuple = SetupFlow(TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(20));
var created = tuple.Item1;
var source = tuple.Item2;
var flowInProbe = tuple.Item3;
var flowOutProbe = tuple.Item4;
var sink = tuple.Item5;

source.SendNext("a");
flowInProbe.RequestNext("a");
flowOutProbe.SendNext("b");
sink.RequestNext("b");

source.SendNext("cancel");
// This will complete the flow in probe and cancel the flow out probe
flowInProbe.Request(2);
ImmutableList.Create(flowInProbe.ExpectNext(), flowInProbe.ExpectNext()).Should()
.Contain(ImmutableList.Create("in complete", "out complete"));

// and it should restart
source.SendNext("c");
flowInProbe.RequestNext("c");
flowOutProbe.SendNext("d");
sink.RequestNext("d");

created.Current.Should().Be(2);
}

[Fact]
public void A_restart_with_backoff_flow_should_restart_on_completion()
{
this.AssertAllStagesStopped(() =>
{
var tuple = SetupFlow(TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(20));
var created = tuple.Item1;
var source = tuple.Item2;
var flowInProbe = tuple.Item3;
var flowOutProbe = tuple.Item4;
var sink = tuple.Item5;
source.SendNext("a");
flowInProbe.RequestNext("a");
flowOutProbe.SendNext("b");
sink.RequestNext("b");
// This will complete the flow in probe and cancel the flow out probe
flowInProbe.Request(2);
ImmutableList.Create(flowInProbe.ExpectNext(), flowInProbe.ExpectNext()).Should().Contain(ImmutableList.Create("in complete", "out complete"));
// and it should restart
source.SendNext("c");
flowInProbe.RequestNext("c");
flowOutProbe.SendNext("d");
sink.RequestNext("d");
created.Current.Should().Be(2);
}, Materializer);
var tuple = SetupFlow(TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(20));
var created = tuple.Item1;
var source = tuple.Item2;
var flowInProbe = tuple.Item3;
var flowOutProbe = tuple.Item4;
var sink = tuple.Item5;

source.SendNext("a");
flowInProbe.RequestNext("a");
flowOutProbe.SendNext("b");
sink.RequestNext("b");

sink.Request(1);
flowOutProbe.SendNext("complete");

// This will complete the flow in probe and cancel the flow out probe
flowInProbe.Request(2);
ImmutableList.Create(flowInProbe.ExpectNext(), flowInProbe.ExpectNext()).Should()
.Contain(ImmutableList.Create("in complete", "out complete"));

// and it should restart
source.SendNext("c");
flowInProbe.RequestNext("c");
flowOutProbe.SendNext("d");
sink.RequestNext("d");

created.Current.Should().Be(2);
}

[Fact]
public void A_restart_with_backoff_flow_should_restart_on_failure()
{
this.AssertAllStagesStopped(() =>
{
var tuple = SetupFlow(TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(20));
var created = tuple.Item1;
var source = tuple.Item2;
var flowInProbe = tuple.Item3;
var flowOutProbe = tuple.Item4;
var sink = tuple.Item5;
source.SendNext("a");
flowInProbe.RequestNext("a");
flowOutProbe.SendNext("b");
sink.RequestNext("b");
flowInProbe.Request(1);
flowOutProbe.SendNext("error");
// This should complete the in probe
flowInProbe.RequestNext("in complete");
// and it should restart
source.SendNext("c");
flowInProbe.RequestNext("c");
flowOutProbe.SendNext("d");
sink.RequestNext("d");
created.Current.Should().Be(2);
}, Materializer);
var tuple = SetupFlow(TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(20));
var created = tuple.Item1;
var source = tuple.Item2;
var flowInProbe = tuple.Item3;
var flowOutProbe = tuple.Item4;
var sink = tuple.Item5;

source.SendNext("a");
flowInProbe.RequestNext("a");
flowOutProbe.SendNext("b");
sink.RequestNext("b");

sink.Request(1);
flowOutProbe.SendNext("error");

// This should complete the in probe
flowInProbe.RequestNext("in complete");

// and it should restart
source.SendNext("c");
flowInProbe.RequestNext("c");
flowOutProbe.SendNext("d");
sink.RequestNext("d");

created.Current.Should().Be(2);
}

[Fact]
public void A_restart_with_backoff_flow_should_backoff_before_restart()
{
this.AssertAllStagesStopped(() =>
{
var tuple = SetupFlow(TimeSpan.FromMilliseconds(200), TimeSpan.FromSeconds(2));
var created = tuple.Item1;
var source = tuple.Item2;
var flowInProbe = tuple.Item3;
var flowOutProbe = tuple.Item4;
var sink = tuple.Item5;
source.SendNext("a");
flowInProbe.RequestNext("a");
flowOutProbe.SendNext("b");
sink.RequestNext("b");
source.SendNext("cancel");
// This will complete the flow in probe and cancel the flow out probe
flowInProbe.Request(2);
ImmutableList.Create(flowInProbe.ExpectNext(), flowInProbe.ExpectNext()).Should().Contain(ImmutableList.Create("in complete", "out complete"));
source.SendNext("c");
flowInProbe.Request(1);
var deadline = TimeSpan.FromMilliseconds(100).FromNow();
flowInProbe.ExpectNext(TimeSpan.FromMilliseconds(300), "b");
deadline.IsOverdue.Should().BeTrue();
created.Current.Should().Be(2);
}, Materializer);
var tuple = SetupFlow(TimeSpan.FromMilliseconds(200), TimeSpan.FromSeconds(2));
var created = tuple.Item1;
var source = tuple.Item2;
var flowInProbe = tuple.Item3;
var flowOutProbe = tuple.Item4;
var sink = tuple.Item5;

source.SendNext("a");
flowInProbe.RequestNext("a");
flowOutProbe.SendNext("b");
sink.RequestNext("b");

source.SendNext("cancel");
// This will complete the flow in probe and cancel the flow out probe
flowInProbe.Request(2);
ImmutableList.Create(flowInProbe.ExpectNext(), flowInProbe.ExpectNext()).Should()
.Contain(ImmutableList.Create("in complete", "out complete"));

source.SendNext("c");
flowInProbe.Request(1);
var deadline = TimeSpan.FromMilliseconds(100).FromNow();
flowInProbe.ExpectNext(TimeSpan.FromMilliseconds(300), "c");
deadline.IsOverdue.Should().BeTrue();

created.Current.Should().Be(2);
}

[Fact]
Expand Down Expand Up @@ -641,34 +641,31 @@ public void A_restart_with_backoff_flow_should_continue_running_flow_out_port_af
[Fact]
public void A_restart_with_backoff_flow_should_continue_running_flow_in_port_after_out_has_been_cancelled()
{
this.AssertAllStagesStopped(() =>
{
var tuple = SetupFlow(TimeSpan.FromMilliseconds(20), TimeSpan.FromMilliseconds(40));
var created = tuple.Item1;
var source = tuple.Item2;
var flowInProbe = tuple.Item3;
var flowOutProbe = tuple.Item4;
var sink = tuple.Item5;
source.SendNext("a");
flowInProbe.RequestNext("a");
flowOutProbe.SendNext("b");
sink.RequestNext("b");
sink.Cancel();
flowInProbe.RequestNext("out complete");
source.SendNext("c");
flowInProbe.RequestNext("c");
source.SendNext("d");
flowInProbe.RequestNext("d");
source.SendNext("cancel");
flowInProbe.RequestNext("in complete");
source.ExpectCancellation();
created.Current.Should().Be(1);
}, Materializer);
var tuple = SetupFlow(TimeSpan.FromMilliseconds(20), TimeSpan.FromMilliseconds(40));
var created = tuple.Item1;
var source = tuple.Item2;
var flowInProbe = tuple.Item3;
var flowOutProbe = tuple.Item4;
var sink = tuple.Item5;

source.SendNext("a");
flowInProbe.RequestNext("a");
flowOutProbe.SendNext("b");
sink.RequestNext("b");

sink.Cancel();
flowInProbe.RequestNext("out complete");

source.SendNext("c");
flowInProbe.RequestNext("c");
source.SendNext("d");
flowInProbe.RequestNext("d");

source.SendNext("cancel");
flowInProbe.RequestNext("in complete");
source.ExpectCancellation();

created.Current.Should().Be(1);
}
}
}

0 comments on commit 965a108

Please sign in to comment.