diff --git a/docs/articles/streams/error-handling.md b/docs/articles/streams/error-handling.md index 4a50be5e487..397689052b8 100644 --- a/docs/articles/streams/error-handling.md +++ b/docs/articles/streams/error-handling.md @@ -10,6 +10,7 @@ In many cases you may want to avoid complete stream failure, this can be done in - `Recover` to emit a final element then complete the stream normally on upstream failure - `RecoverWithRetries` to create a new upstream and start consuming from that on failure +- Restarting sections of the stream after a backoff - Using a supervision strategy for stages that support it In addition to these built in tools for error handling, a common pattern is to wrap the stream inside an actor, and have the actor restart the entire stream on failure. @@ -83,6 +84,42 @@ seven eight ``` +## Delayed restarts with a backoff stage + +Just as Akka provides the [backoff supervision pattern for actors](xref:supervision#delayed-restarts-with-the-backoffsupervisor-pattern), Akka streams +also provides a `RestartSource`, `RestartSink` and `RestartFlow` for implementing the so-called *exponential backoff +supervision strategy*, starting a stage again when it fails, each time with a growing time delay between restarts. + +This pattern is useful when the stage fails or completes because some external resource is not available +and we need to give it some time to start-up again. One of the prime examples when this is useful is +when a WebSocket connection fails due to the HTTP server it's running on going down, perhaps because it is overloaded. +By using an exponential backoff, we avoid going into a tight reconnect look, which both gives the HTTP server some time +to recover, and it avoids using needless resources on the client side. + +The following snippet shows how to create a backoff supervisor using `Akka.Streams.Dsl.RestartSource` +which will supervise the given `Source`. The `Source` in this case is a +`HttpResponseMessage`, produced by `HttpCLient`. If the stream fails or completes at any point, the request will +be made again, in increasing intervals of 3, 6, 12, 24 and finally 30 seconds (at which point it will remain capped due +to the `maxBackoff` parameter): + +[!code-csharp[RestartDocTests.cs](../../examples/DocsExamples/Streams/RestartDocTests.cs?name=restart-with-backoff-source)] + +Using a `randomFactor` to add a little bit of additional variance to the backoff intervals +is highly recommended, in order to avoid multiple streams re-start at the exact same point in time, +for example because they were stopped due to a shared resource such as the same server going down +and re-starting after the same configured interval. By adding additional randomness to the +re-start intervals the streams will start in slightly different points in time, thus avoiding +large spikes of traffic hitting the recovering server or other resource that they all need to contact. + +The above `RestartSource` will never terminate unless the `Sink` it's fed into cancels. It will often be handy to use +it in combination with a `KillSwitch`, so that you can terminate it when needed: + +[!code-csharp[RestartDocTests.cs](../../examples/DocsExamples/Streams/RestartDocTests.cs?name=with-kill-switch)] + +Sinks and flows can also be supervised, using `Akka.Streams.Dsl.RestartSink` and `Akka.Streams.Dsl.RestartFlow`. +The `RestartSink` is restarted when it cancels, while the `RestartFlow` is restarted when either the in port cancels, +the out port completes, or the out port sends an error. + ## Supervision Strategies > [!NOTE] diff --git a/docs/examples/DocsExamples/Streams/RestartDocTests.cs b/docs/examples/DocsExamples/Streams/RestartDocTests.cs new file mode 100644 index 00000000000..b1dd3dc446a --- /dev/null +++ b/docs/examples/DocsExamples/Streams/RestartDocTests.cs @@ -0,0 +1,62 @@ +using System; +using System.Linq; +using System.Net.Http; +using System.Threading.Tasks; +using Akka; +using Akka.Streams; +using Akka.Streams.Dsl; +using Akka.TestKit.Xunit2; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace DocsExamples.Streams +{ + public class RestartDocTests : TestKit + { + private ActorMaterializer Materializer { get; } + + public RestartDocTests(ITestOutputHelper output) + : base("", output) + { + Materializer = Sys.Materializer(); + } + + private void DoSomethingElse() + { + } + + [Fact] + public void Restart_stages_should_demonstrate_a_restart_with_backoff_source() + { + #region restart-with-backoff-source + var httpClient = new HttpClient(); + + var restartSource = RestartSource.WithBackoff(() => + { + // Create a source from a task + return Source.FromTask( + httpClient.GetAsync("http://example.com/eventstream") // Make a single request + ) + .Select(c => c.Content.ReadAsStringAsync()) + .Select(c => c.Result); + }, + minBackoff: TimeSpan.FromSeconds(3), + maxBackoff: TimeSpan.FromSeconds(30), + randomFactor: 0.2 // adds 20% "noise" to vary the intervals slightly + ); + #endregion + + #region with-kill-switch + var killSwitch = restartSource + .ViaMaterialized(KillSwitches.Single(), Keep.Right) + .ToMaterialized(Sink.ForEach(evt => Console.WriteLine($"Got event: {evt}")), Keep.Left) + .Run(Materializer); + + DoSomethingElse(); + + killSwitch.Shutdown(); + #endregion + } + } +} diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt index f6ab48888ec..4f6cfa0a46f 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt @@ -1418,6 +1418,18 @@ namespace Akka.Streams.Dsl { public PartitionOutOfBoundsException(string message) { } } + public class static RestartFlow + { + public static Akka.Streams.Dsl.Flow WithBackoff(System.Func> flowFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor) { } + } + public class static RestartSink + { + public static Akka.Streams.Dsl.Sink WithBackoff(System.Func> sinkFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor) { } + } + public class static RestartSource + { + public static Akka.Streams.Dsl.Source WithBackoff(System.Func> sourceFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor) { } + } public class static ReverseOps { public static Akka.Streams.Dsl.GraphDsl.Builder From(this Akka.Streams.Dsl.GraphDsl.ReverseOps ops, Akka.Streams.Outlet outlet) diff --git a/src/core/Akka.Streams.Tests/Dsl/RestartSpec.cs b/src/core/Akka.Streams.Tests/Dsl/RestartSpec.cs new file mode 100644 index 00000000000..5ef60f18df4 --- /dev/null +++ b/src/core/Akka.Streams.Tests/Dsl/RestartSpec.cs @@ -0,0 +1,665 @@ +using System; +using System.Collections.Generic; +using System.Collections.Immutable; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Akka.Streams.Dsl; +using Akka.Streams.TestKit; +using Akka.Streams.TestKit.Tests; +using Akka.TestKit; +using Akka.Util.Internal; +using Xunit; +using FluentAssertions; +using Xunit.Abstractions; + +namespace Akka.Streams.Tests.Dsl +{ + public class RestartSpec : AkkaSpec + { + private ActorMaterializer Materializer { get; } + + public RestartSpec(ITestOutputHelper output) : base("", output) + { + Materializer = Sys.Materializer(); + } + + // + // Source + // + + [Fact] + public void A_restart_with_backoff_source_should_run_normally() + { + this.AssertAllStagesStopped(() => + { + var created = new AtomicCounter(0); + var probe = RestartSource.WithBackoff(() => + { + created.IncrementAndGet(); + return Source.Repeat("a"); + }, TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(20), 0).RunWith(this.SinkProbe(), Materializer); + + probe.RequestNext("a"); + probe.RequestNext("a"); + probe.RequestNext("a"); + probe.RequestNext("a"); + probe.RequestNext("a"); + + created.Current.Should().Be(1); + + probe.Cancel(); + }, Materializer); + } + + [Fact] + public void A_restart_with_backoff_source_should_restart_on_completion() + { + this.AssertAllStagesStopped(() => + { + var created = new AtomicCounter(0); + var probe = RestartSource.WithBackoff(() => + { + created.IncrementAndGet(); + return Source.From(new List { "a", "b" }); + }, TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(20), 0).RunWith(this.SinkProbe(), Materializer); + + probe.RequestNext("a"); + probe.RequestNext("b"); + probe.RequestNext("a"); + probe.RequestNext("b"); + probe.RequestNext("a"); + + created.Current.Should().Be(3); + + probe.Cancel(); + }, Materializer); + } + + [Fact] + public void A_restart_with_backoff_source_should_restart_on_failure() + { + this.AssertAllStagesStopped(() => + { + var created = new AtomicCounter(0); + var probe = RestartSource.WithBackoff(() => + { + created.IncrementAndGet(); + var enumerable = new List { "a", "b", "c" }.Select(c => + { + if (c == "c") + throw new ArgumentException("failed"); + return c; + }); + return Source.From(enumerable); + }, TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(20), 0).RunWith(this.SinkProbe(), Materializer); + + probe.RequestNext("a"); + probe.RequestNext("b"); + probe.RequestNext("a"); + probe.RequestNext("b"); + probe.RequestNext("a"); + + created.Current.Should().Be(3); + + probe.Cancel(); + }, Materializer); + } + + [Fact] + public void A_restart_with_backoff_source_should_backoff_before_restart() + { + this.AssertAllStagesStopped(() => + { + var created = new AtomicCounter(0); + var probe = RestartSource.WithBackoff(() => + { + created.IncrementAndGet(); + return Source.From(new List { "a", "b" }); + }, TimeSpan.FromMilliseconds(200), TimeSpan.FromMilliseconds(1000), 0).RunWith(this.SinkProbe(), Materializer); + + probe.RequestNext("a"); + probe.RequestNext("b"); + probe.Request(1); + // There should be a delay of at least 200ms before we receive the element, wait for 100ms. + var deadline = TimeSpan.FromMilliseconds(100).FromNow(); + // But the delay shouldn't be more than 300ms. + probe.ExpectNext(TimeSpan.FromMilliseconds(300), "a"); + deadline.IsOverdue.Should().Be(true); + + created.Current.Should().Be(2); + + probe.Cancel(); + }, Materializer); + } + + [Fact] + public void A_restart_with_backoff_source_should_reset_exponential_backoff_back_to_minimum_when_source_runs_for_at_least_minimum_backoff_without_completing() + { + this.AssertAllStagesStopped(() => + { + var created = new AtomicCounter(0); + var probe = RestartSource.WithBackoff(() => + { + created.IncrementAndGet(); + return Source.From(new List { "a", "b" }); + }, TimeSpan.FromMilliseconds(200), TimeSpan.FromMilliseconds(2000), 0).RunWith(this.SinkProbe(), Materializer); + + probe.RequestNext("a"); + probe.RequestNext("b"); + // There should be a 200ms delay + probe.RequestNext("a"); + probe.RequestNext("b"); + probe.Request(1); + // The probe should now be backing off for 400ms + + // Now wait for the 400ms delay to pass, then it will start the new source, we also want to wait for the + // subsequent 200ms min backoff to pass, so it resets the restart count + Thread.Sleep(700); + probe.ExpectNext("a"); + probe.RequestNext("b"); + + // We should have reset, so the restart delay should be back to 200ms, ie we should definitely receive the + // next element within 300ms + probe.RequestNext(TimeSpan.FromMilliseconds(300)).Should().Be("a"); + + created.Current.Should().Be(4); + + probe.Cancel(); + }, Materializer); + } + + [Fact] + public void A_restart_with_backoff_source_should_cancel_the_currently_running_source_when_cancelled() + { + this.AssertAllStagesStopped(() => + { + var created = new AtomicCounter(0); + var tcs = new TaskCompletionSource(); + var probe = RestartSource.WithBackoff(() => + { + created.IncrementAndGet(); + return Source.From(new List { "a", "b" }) + .WatchTermination((source, _) => + { + tcs.SetResult(Done.Instance); + return source; + }); + }, TimeSpan.FromMilliseconds(10), TimeSpan.FromSeconds(2), 0).RunWith(this.SinkProbe(), Materializer); + + probe.RequestNext("a"); + probe.Cancel(); + + tcs.Task.Result.Should().BeSameAs(Done.Instance); + + // Wait to ensure it isn't restarted + Thread.Sleep(200); + created.Current.Should().Be(1); + }, Materializer); + } + + [Fact] + public void A_restart_with_backoff_source_should_not_restart_the_source_when_cancelled_while_backing_off() + { + this.AssertAllStagesStopped(() => + { + var created = new AtomicCounter(0); + var probe = RestartSource.WithBackoff(() => + { + created.IncrementAndGet(); + return Source.Single("a"); + }, TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(20), 0).RunWith(this.SinkProbe(), Materializer); + + probe.RequestNext("a"); + probe.Request(1); + // Should be backing off now + probe.Cancel(); + + // Wait to ensure it isn't restarted + Thread.Sleep(300); + created.Current.Should().Be(1); + }, Materializer); + } + + // + // Sink + // + + [Fact] + public void A_restart_with_backoff_sink_should_run_normally() + { + this.AssertAllStagesStopped(() => + { + var created = new AtomicCounter(0); + var tcs = new TaskCompletionSource>(); + var probe = this.SourceProbe().ToMaterialized(RestartSink.WithBackoff(() => + { + created.IncrementAndGet(); + return Sink.Seq().MapMaterializedValue(task => + { + task.ContinueWith(c => tcs.SetResult(c.Result)); + return Done.Instance; + }); + }, TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(20), 0), Keep.Left).Run(Materializer); + + probe.SendNext("a"); + probe.SendNext("b"); + probe.SendNext("c"); + probe.SendComplete(); + + tcs.Task.Result.Should().ContainInOrder("a", "b", "c"); + created.Current.Should().Be(1); + }, Materializer); + } + + [Fact] + public void A_restart_with_backoff_sink_should_restart_on_cancellation() + { + this.AssertAllStagesStopped(() => + { + var created = new AtomicCounter(0); + var tuple = this.SourceProbe().ToMaterialized(this.SinkProbe(), Keep.Both).Run(Materializer); + var queue = tuple.Item1; + var sinkProbe = tuple.Item2; + var probe = this.SourceProbe().ToMaterialized(RestartSink.WithBackoff(() => + { + created.IncrementAndGet(); + return Flow.Create().TakeWhile(c => c != "cancel", inclusive: true) + .To(Sink.ForEach(c => queue.SendNext(c))); + }, TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(20), 0), Keep.Left).Run(Materializer); + + probe.SendNext("a"); + sinkProbe.RequestNext("a"); + probe.SendNext("b"); + sinkProbe.RequestNext("b"); + probe.SendNext("cancel"); + sinkProbe.RequestNext("cancel"); + probe.SendNext("c"); + sinkProbe.RequestNext("c"); + probe.SendComplete(); + + created.Current.Should().Be(2); + + sinkProbe.Cancel(); + probe.SendComplete(); + }, Materializer); + } + + [Fact] + public void A_restart_with_backoff_sink_should_backoff_before_restart() + { + this.AssertAllStagesStopped(() => + { + var created = new AtomicCounter(0); + var tuple = this.SourceProbe().ToMaterialized(this.SinkProbe(), Keep.Both).Run(Materializer); + var queue = tuple.Item1; + var sinkProbe = tuple.Item2; + var probe = this.SourceProbe().ToMaterialized(RestartSink.WithBackoff(() => + { + created.IncrementAndGet(); + return Flow.Create().TakeWhile(c => c != "cancel", inclusive: true) + .To(Sink.ForEach(c => queue.SendNext(c))); + }, TimeSpan.FromMilliseconds(200), TimeSpan.FromSeconds(2), 0), Keep.Left).Run(Materializer); + + probe.SendNext("a"); + sinkProbe.RequestNext("a"); + probe.SendNext("cancel"); + sinkProbe.RequestNext("cancel"); + probe.SendNext("b"); + sinkProbe.Request(1); + var deadline = TimeSpan.FromMilliseconds(100).FromNow(); + sinkProbe.ExpectNext(TimeSpan.FromMilliseconds(300), "b"); + deadline.IsOverdue.Should().BeTrue(); + + created.Current.Should().Be(2); + + sinkProbe.Cancel(); + probe.SendComplete(); + }, Materializer); + } + + [Fact] + public void A_restart_with_backoff_sink_should_reset_exponential_backoff_back_to_minimum_when_source_runs_for_at_least_minimum_backoff_without_completing() + { + this.AssertAllStagesStopped(() => + { + var created = new AtomicCounter(0); + var tuple = this.SourceProbe().ToMaterialized(this.SinkProbe(), Keep.Both).Run(Materializer); + var queue = tuple.Item1; + var sinkProbe = tuple.Item2; + var probe = this.SourceProbe().ToMaterialized(RestartSink.WithBackoff(() => + { + created.IncrementAndGet(); + return Flow.Create().TakeWhile(c => c != "cancel", inclusive: true) + .To(Sink.ForEach(c => queue.SendNext(c))); + }, TimeSpan.FromMilliseconds(200), TimeSpan.FromSeconds(2), 0), Keep.Left).Run(Materializer); + + probe.SendNext("a"); + sinkProbe.RequestNext("a"); + probe.SendNext("cancel"); + sinkProbe.RequestNext("cancel"); + // There should be a 200ms delay + probe.SendNext("b"); + sinkProbe.RequestNext("b"); + probe.SendNext("cancel"); + sinkProbe.RequestNext("cancel"); + sinkProbe.Request(1); + // The probe should now be backing off for 400ms + + // Now wait for the 400ms delay to pass, then it will start the new source, we also want to wait for the + // subsequent 200ms min backoff to pass, so it resets the restart count + Thread.Sleep(700); + + probe.SendNext("cancel"); + sinkProbe.RequestNext("cancel"); + + // We should have reset, so the restart delay should be back to 200ms, ie we should definitely receive the + // next element within 300ms + probe.SendNext("c"); + sinkProbe.Request(1); + sinkProbe.ExpectNext(TimeSpan.FromMilliseconds(300), "c"); + + created.Current.Should().Be(4); + + sinkProbe.Cancel(); + probe.SendComplete(); + }, Materializer); + } + + [Fact] + public void A_restart_with_backoff_sink_should_not_restart_the_sink_when_completed_while_backing_off() + { + this.AssertAllStagesStopped(() => + { + var created = new AtomicCounter(0); + var tuple = this.SourceProbe().ToMaterialized(this.SinkProbe(), Keep.Both).Run(Materializer); + var queue = tuple.Item1; + var sinkProbe = tuple.Item2; + var probe = this.SourceProbe().ToMaterialized(RestartSink.WithBackoff(() => + { + created.IncrementAndGet(); + return Flow.Create().TakeWhile(c => c != "cancel", inclusive: true) + .To(Sink.ForEach(c => queue.SendNext(c))); + }, TimeSpan.FromMilliseconds(200), TimeSpan.FromSeconds(2), 0), Keep.Left).Run(Materializer); + + probe.SendNext("a"); + sinkProbe.RequestNext("a"); + probe.SendNext("cancel"); + sinkProbe.RequestNext("cancel"); + // Should be backing off now + probe.SendComplete(); + + // Wait to ensure it isn't restarted + Thread.Sleep(300); + created.Current.Should().Be(1); + + sinkProbe.Cancel(); + }, Materializer); + } + + // + // Flow + // + + private Tuple, TestSubscriber.Probe, TestPublisher.Probe, TestSubscriber.Probe> SetupFlow(TimeSpan minBackoff, TimeSpan maxBackoff) + { + var created = new AtomicCounter(0); + var probe1 = this.SourceProbe().ToMaterialized(this.SinkProbe(), Keep.Both).Run(Materializer); + var flowInSource = probe1.Item1; + var flowInProbe = probe1.Item2; + var probe2 = this.SourceProbe().ToMaterialized(BroadcastHub.Sink(), Keep.Both).Run(Materializer); + var flowOutProbe = probe2.Item1; + var flowOutSource = probe2.Item2; + + // We can't just use ordinary probes here because we're expecting them to get started/restarted. Instead, we + // simply use the probes as a message bus for feeding and capturing events. + var probe3 = this.SourceProbe().ViaMaterialized(RestartFlow.WithBackoff(() => + { + created.IncrementAndGet(); + var snk = Flow.Create() + .TakeWhile(s => s != "cancel") + .To(Sink.ForEach(c => flowInSource.SendNext(c)) + .MapMaterializedValue(task => task.ContinueWith( + t1 => + { + if (t1.IsFaulted || t1.IsCanceled) + flowInSource.SendNext("in error"); + else + flowInSource.SendNext("in complete"); + }))); + + var src = flowOutSource.TakeWhile(s => s != "complete").Select(c => + { + if (c == "error") + throw new ArgumentException("failed"); + return c; + }).WatchTermination((s1, task) => + { + task.ContinueWith(_ => + { + flowInSource.SendNext("out complete"); + return NotUsed.Instance; + }, TaskContinuationOptions.OnlyOnRanToCompletion); + return s1; + }); + + return Flow.FromSinkAndSource(snk, src); + }, minBackoff, maxBackoff, 0), Keep.Left) + .ToMaterialized(this.SinkProbe(), Keep.Both).Run(Materializer); + var source = probe3.Item1; + var sink = probe3.Item2; + + return Tuple.Create(created, source, flowInProbe, flowOutProbe, sink); + } + + [Fact] + public void A_restart_with_backoff_flow_should_run_normally() + { + this.AssertAllStagesStopped(() => + { + var created = new AtomicCounter(0); + var tuple = this.SourceProbe().ViaMaterialized(RestartFlow.WithBackoff(() => + { + created.IncrementAndGet(); + return Flow.Create(); ; + }, TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(20), 0), Keep.Left).ToMaterialized(this.SinkProbe(), Keep.Both).Run(Materializer); + var source = tuple.Item1; + var sink = tuple.Item2; + + source.SendNext("a"); + sink.RequestNext("a"); + source.SendNext("b"); + sink.RequestNext("b"); + + created.Current.Should().Be(1); + source.SendComplete(); + }, Materializer); + } + + [Fact] + public void A_restart_with_backoff_flow_should_restart_on_cancellation() + { + var tuple = SetupFlow(TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(20)); + var created = tuple.Item1; + var source = tuple.Item2; + var flowInProbe = tuple.Item3; + var flowOutProbe = tuple.Item4; + var sink = tuple.Item5; + + source.SendNext("a"); + flowInProbe.RequestNext("a"); + flowOutProbe.SendNext("b"); + sink.RequestNext("b"); + + source.SendNext("cancel"); + // This will complete the flow in probe and cancel the flow out probe + flowInProbe.Request(2); + ImmutableList.Create(flowInProbe.ExpectNext(TimeSpan.FromSeconds(5)), flowInProbe.ExpectNext(TimeSpan.FromSeconds(5))).Should() + .Contain(ImmutableList.Create("in complete", "out complete")); + + // and it should restart + source.SendNext("c"); + flowInProbe.RequestNext("c"); + flowOutProbe.SendNext("d"); + sink.RequestNext("d"); + + created.Current.Should().Be(2); + } + + [Fact] + public void A_restart_with_backoff_flow_should_restart_on_completion() + { + var tuple = SetupFlow(TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(20)); + var created = tuple.Item1; + var source = tuple.Item2; + var flowInProbe = tuple.Item3; + var flowOutProbe = tuple.Item4; + var sink = tuple.Item5; + + source.SendNext("a"); + flowInProbe.RequestNext("a"); + flowOutProbe.SendNext("b"); + sink.RequestNext("b"); + + sink.Request(1); + flowOutProbe.SendNext("complete"); + + // This will complete the flow in probe and cancel the flow out probe + flowInProbe.Request(2); + ImmutableList.Create(flowInProbe.ExpectNext(TimeSpan.FromSeconds(5)), flowInProbe.ExpectNext(TimeSpan.FromSeconds(5))).Should() + .Contain(ImmutableList.Create("in complete", "out complete")); + + // and it should restart + source.SendNext("c"); + flowInProbe.RequestNext("c"); + flowOutProbe.SendNext("d"); + sink.RequestNext("d"); + + created.Current.Should().Be(2); + } + + [Fact] + public void A_restart_with_backoff_flow_should_restart_on_failure() + { + var tuple = SetupFlow(TimeSpan.FromMilliseconds(10), TimeSpan.FromMilliseconds(20)); + var created = tuple.Item1; + var source = tuple.Item2; + var flowInProbe = tuple.Item3; + var flowOutProbe = tuple.Item4; + var sink = tuple.Item5; + + source.SendNext("a"); + flowInProbe.RequestNext("a"); + flowOutProbe.SendNext("b"); + sink.RequestNext("b"); + + sink.Request(1); + flowOutProbe.SendNext("error"); + + // This should complete the in probe + flowInProbe.RequestNext("in complete"); + + // and it should restart + source.SendNext("c"); + flowInProbe.RequestNext("c"); + flowOutProbe.SendNext("d"); + sink.RequestNext("d"); + + created.Current.Should().Be(2); + } + + [Fact] + public void A_restart_with_backoff_flow_should_backoff_before_restart() + { + var tuple = SetupFlow(TimeSpan.FromMilliseconds(200), TimeSpan.FromSeconds(2)); + var created = tuple.Item1; + var source = tuple.Item2; + var flowInProbe = tuple.Item3; + var flowOutProbe = tuple.Item4; + var sink = tuple.Item5; + + source.SendNext("a"); + flowInProbe.RequestNext("a"); + flowOutProbe.SendNext("b"); + sink.RequestNext("b"); + + source.SendNext("cancel"); + // This will complete the flow in probe and cancel the flow out probe + flowInProbe.Request(2); + ImmutableList.Create(flowInProbe.ExpectNext(TimeSpan.FromSeconds(5)), flowInProbe.ExpectNext(TimeSpan.FromSeconds(5))).Should() + .Contain(ImmutableList.Create("in complete", "out complete")); + + source.SendNext("c"); + flowInProbe.Request(1); + var deadline = TimeSpan.FromMilliseconds(100).FromNow(); + flowInProbe.ExpectNext(TimeSpan.FromMilliseconds(300), "c"); + deadline.IsOverdue.Should().BeTrue(); + + created.Current.Should().Be(2); + } + + [Fact] + public void A_restart_with_backoff_flow_should_continue_running_flow_out_port_after_in_has_been_sent_completion() + { + this.AssertAllStagesStopped(() => + { + var tuple = SetupFlow(TimeSpan.FromMilliseconds(20), TimeSpan.FromMilliseconds(40)); + var created = tuple.Item1; + var source = tuple.Item2; + var flowInProbe = tuple.Item3; + var flowOutProbe = tuple.Item4; + var sink = tuple.Item5; + + source.SendNext("a"); + flowInProbe.RequestNext("a"); + flowOutProbe.SendNext("b"); + sink.RequestNext("b"); + + source.SendComplete(); + flowInProbe.RequestNext("in complete"); + + flowOutProbe.SendNext("c"); + sink.RequestNext("c"); + flowOutProbe.SendNext("d"); + sink.RequestNext("d"); + + sink.Request(1); + flowOutProbe.SendComplete(); + flowInProbe.RequestNext("out complete"); + sink.ExpectComplete(); + + created.Current.Should().Be(1); + }, Materializer); + } + + [Fact] + public void A_restart_with_backoff_flow_should_continue_running_flow_in_port_after_out_has_been_cancelled() + { + var tuple = SetupFlow(TimeSpan.FromMilliseconds(20), TimeSpan.FromMilliseconds(40)); + var created = tuple.Item1; + var source = tuple.Item2; + var flowInProbe = tuple.Item3; + var flowOutProbe = tuple.Item4; + var sink = tuple.Item5; + + source.SendNext("a"); + flowInProbe.RequestNext("a"); + flowOutProbe.SendNext("b"); + sink.RequestNext("b"); + + sink.Cancel(); + flowInProbe.RequestNext("out complete"); + + source.SendNext("c"); + flowInProbe.RequestNext("c"); + source.SendNext("d"); + flowInProbe.RequestNext("d"); + + source.SendNext("cancel"); + flowInProbe.RequestNext("in complete"); + source.ExpectCancellation(); + + created.Current.Should().Be(1); + } + } +} diff --git a/src/core/Akka.Streams/Dsl/Restart.cs b/src/core/Akka.Streams/Dsl/Restart.cs new file mode 100644 index 00000000000..b84526daa9a --- /dev/null +++ b/src/core/Akka.Streams/Dsl/Restart.cs @@ -0,0 +1,490 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2015-2017 Lightbend Inc. +// Copyright (C) 2013-2017 Akka.NET project +// +//----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Akka.Pattern; +using Akka.Streams.Stage; + +namespace Akka.Streams.Dsl +{ + /// + /// A RestartSource wraps a that gets restarted when it completes or fails. + /// They are useful for graphs that need to run for longer than the can necessarily guarantee it will, for + /// example, for streams that depend on a remote server that may crash or become partitioned. The + /// RestartSource ensures that the graph can continue running while the restarts. + /// + public static class RestartSource + { + /// + /// Wrap the given with a that will restart it when it fails or complete using an exponential + /// backoff. + /// This will never emit a complete or failure, since the completion or failure of the wrapped + /// is always handled by restarting it. The wrapped can however be cancelled by cancelling this . + /// When that happens, the wrapped , if currently running will be cancelled, and it will not be restarted. + /// This can be triggered simply by the downstream cancelling, or externally by introducing a right + /// after this in the graph. + /// This uses the same exponential backoff algorithm as . + /// + /// A factory for producing the to wrap. + /// Minimum (initial) duration until the child actor will started again, if it is terminated + /// The exponential back-off is capped to this duration + /// After calculation of the exponential back-off an additional random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`. + public static Source WithBackoff(Func> sourceFactory, TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor) + => Source.FromGraph(new RestartWithBackoffSource(sourceFactory, minBackoff, maxBackoff, randomFactor)); + } + + internal sealed class RestartWithBackoffSource : GraphStage> + { + public Func> SourceFactory { get; } + public TimeSpan MinBackoff { get; } + public TimeSpan MaxBackoff { get; } + public double RandomFactor { get; } + + public RestartWithBackoffSource( + Func> sourceFactory, + TimeSpan minBackoff, + TimeSpan maxBackoff, + double randomFactor) + { + SourceFactory = sourceFactory; + MinBackoff = minBackoff; + MaxBackoff = maxBackoff; + RandomFactor = randomFactor; + Shape = new SourceShape(Out); + } + + public Outlet Out { get; } = new Outlet("RestartWithBackoffSource.out"); + public override SourceShape Shape { get; } + + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) + { + return new Logic(this, "Source"); + } + + private class Logic : RestartWithBackoffLogic> + { + private readonly RestartWithBackoffSource _stage; + + public Logic(RestartWithBackoffSource stage, string name) + : base(name, stage.Shape, stage.MinBackoff, stage.MaxBackoff, stage.RandomFactor) + { + _stage = stage; + Backoff(); + } + + protected override void StartGraph() + { + var sinkIn = CreateSubInlet(_stage.Out); + _stage.SourceFactory().RunWith(sinkIn.Sink, SubFusingMaterializer); + if (IsAvailable(_stage.Out)) + { + sinkIn.Pull(); + } + } + + protected override void Backoff() + { + SetHandler(_stage.Out, () => + { + // do nothing + }); + } + } + } + + /// + /// A RestartSink wraps a that gets restarted when it completes or fails. + /// They are useful for graphs that need to run for longer than the can necessarily guarantee it will, for + /// example, for streams that depend on a remote server that may crash or become partitioned. The + /// RestartSink ensures that the graph can continue running while the restarts. + /// + public static class RestartSink + { + /// + /// Wrap the given with a that will restart it when it fails or complete using an exponential + /// backoff. + /// This will never cancel, since cancellation by the wrapped is always handled by restarting it. + /// The wrapped can however be completed by feeding a completion or error into this . When that + /// happens, the , if currently running, will terminate and will not be restarted. This can be triggered + /// simply by the upstream completing, or externally by introducing a right before this in the + /// graph. + /// The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + /// messages. When the wrapped does cancel, this will backpressure, however any elements already + /// sent may have been lost. + /// This uses the same exponential backoff algorithm as . + /// + /// A factory for producing the to wrap. + /// Minimum (initial) duration until the child actor will started again, if it is terminated + /// The exponential back-off is capped to this duration + /// After calculation of the exponential back-off an additional random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`. + public static Sink WithBackoff(Func> sinkFactory, TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor) + => Sink.FromGraph(new RestartWithBackoffSink(sinkFactory, minBackoff, maxBackoff, randomFactor)); + } + + internal sealed class RestartWithBackoffSink : GraphStage> + { + public Func> SinkFactory { get; } + public TimeSpan MinBackoff { get; } + public TimeSpan MaxBackoff { get; } + public double RandomFactor { get; } + + public RestartWithBackoffSink( + Func> sinkFactory, + TimeSpan minBackoff, + TimeSpan maxBackoff, + double randomFactor) + { + SinkFactory = sinkFactory; + MinBackoff = minBackoff; + MaxBackoff = maxBackoff; + RandomFactor = randomFactor; + Shape = new SinkShape(In); + } + + public Inlet In { get; } = new Inlet("RestartWithBackoffSink.in"); + public override SinkShape Shape { get; } + + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) + { + return new Logic(this, "Sink"); + } + + private class Logic : RestartWithBackoffLogic> + { + private readonly RestartWithBackoffSink _stage; + + public Logic(RestartWithBackoffSink stage, string name) + : base(name, stage.Shape, stage.MinBackoff, stage.MaxBackoff, stage.RandomFactor) + { + _stage = stage; + Backoff(); + } + + protected override void StartGraph() + { + var sourceOut = CreateSubOutlet(_stage.In); + Source.FromGraph(sourceOut.Source).RunWith(_stage.SinkFactory(), SubFusingMaterializer); + } + + protected override void Backoff() + { + SetHandler(_stage.In, () => + { + // do nothing + }); + } + } + } + + /// + /// A RestartFlow wraps a that gets restarted when it completes or fails. + /// They are useful for graphs that need to run for longer than the can necessarily guarantee it will, for + /// example, for streams that depend on a remote server that may crash or become partitioned. The + /// RestartFlow ensures that the graph can continue running while the restarts. + /// + public static class RestartFlow + { + /// + /// Wrap the given with a that will restart it when it fails or complete using an exponential + /// backoff. + /// This will not cancel, complete or emit a failure, until the opposite end of it has been cancelled or + /// completed.Any termination by the before that time will be handled by restarting it. Any termination + /// signals sent to this however will terminate the wrapped , if it's running, and then the + /// will be allowed to terminate without being restarted. + /// The restart process is inherently lossy, since there is no coordination between cancelling and the sending of + /// messages. A termination signal from either end of the wrapped will cause the other end to be terminated, + /// and any in transit messages will be lost. During backoff, this will backpressure. + /// This uses the same exponential backoff algorithm as . + /// + /// A factory for producing the ] to wrap. + /// Minimum (initial) duration until the child actor will started again, if it is terminated + /// The exponential back-off is capped to this duration + /// After calculation of the exponential back-off an additional random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`. + public static Flow WithBackoff(Func> flowFactory, TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor) + => Flow.FromGraph(new RestartWithBackoffFlow(flowFactory, minBackoff, maxBackoff, randomFactor)); + } + + internal sealed class RestartWithBackoffFlow : GraphStage> + { + public Func> FlowFactory { get; } + public TimeSpan MinBackoff { get; } + public TimeSpan MaxBackoff { get; } + public double RandomFactor { get; } + + public RestartWithBackoffFlow( + Func> flowFactory, + TimeSpan minBackoff, + TimeSpan maxBackoff, + double randomFactor) + { + FlowFactory = flowFactory; + MinBackoff = minBackoff; + MaxBackoff = maxBackoff; + RandomFactor = randomFactor; + Shape = new FlowShape(In, Out); + } + + public Inlet In { get; } = new Inlet("RestartWithBackoffFlow.in"); + public Outlet Out { get; } = new Outlet("RestartWithBackoffFlow.out"); + + public override FlowShape Shape { get; } + + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) + { + return new Logic(this, "Flow"); + } + + private class Logic : RestartWithBackoffLogic> + { + private readonly RestartWithBackoffFlow _stage; + private Tuple, SubSinkInlet> activeOutIn = null; + + public Logic(RestartWithBackoffFlow stage, string name) + : base(name, stage.Shape, stage.MinBackoff, stage.MaxBackoff, stage.RandomFactor) + { + _stage = stage; + Backoff(); + } + + protected override void StartGraph() + { + var sourceOut = CreateSubOutlet(_stage.In); + var sinkIn = CreateSubInlet(_stage.Out); + Source.FromGraph(sourceOut.Source).Via(_stage.FlowFactory()).RunWith(sinkIn.Sink, SubFusingMaterializer); + if (IsAvailable(_stage.Out)) + { + sinkIn.Pull(); + } + activeOutIn = Tuple.Create(sourceOut, sinkIn); + } + + protected override void Backoff() + { + SetHandler(_stage.In, () => + { + // do nothing + }); + SetHandler(_stage.Out, () => + { + // do nothing + }); + + // We need to ensure that the other end of the sub flow is also completed, so that we don't + // receive any callbacks from it. + if (activeOutIn != null) + { + var sourceOut = activeOutIn.Item1; + var sinkIn = activeOutIn.Item2; + if (!sourceOut.IsClosed) + { + sourceOut.Complete(); + } + if (!sinkIn.IsClosed) + { + sinkIn.Cancel(); + } + activeOutIn = null; + } + } + } + } + + /// + /// Shared logic for all restart with backoff logics. + /// + internal abstract class RestartWithBackoffLogic : TimerGraphStageLogic where S : Shape + { + private readonly string _name; + private readonly S _shape; + private readonly TimeSpan _minBackoff; + private readonly TimeSpan _maxBackoff; + private readonly double _randomFactor; + + protected Inlet In { get; } + protected Outlet Out { get; } + + private int _restartCount = 0; + private Deadline _resetDeadline; + // This is effectively only used for flows, if either the main inlet or outlet of this stage finishes, then we + // don't want to restart the sub inlet when it finishes, we just finish normally. + private bool _finishing = false; + + protected RestartWithBackoffLogic( + string name, + S shape, + TimeSpan minBackoff, + TimeSpan maxBackoff, + double randomFactor) : base(shape) + { + _name = name; + _shape = shape; + _minBackoff = minBackoff; + _maxBackoff = maxBackoff; + _randomFactor = randomFactor; + + _resetDeadline = minBackoff.FromNow(); + + In = shape.Inlets.FirstOrDefault(); + Out = shape.Outlets.FirstOrDefault(); + } + + protected abstract void StartGraph(); + protected abstract void Backoff(); + + protected SubSinkInlet CreateSubInlet(Outlet outlet) + { + var sinkIn = new SubSinkInlet(this, $"RestartWithBackoff{_name}.subIn"); + + sinkIn.SetHandler(new LambdaInHandler( + onPush: () => + { + Push(Out, sinkIn.Grab()); + }, + onUpstreamFinish: () => + { + if (_finishing) + { + Complete(Out); + } + else + { + Log.Debug("Graph out finished"); + OnCompleteOrFailure(); + } + }, + onUpstreamFailure: ex => + { + if (_finishing) + { + Fail(_shape.Outlets.First(), ex); + } + else + { + Log.Error(ex, "Restarting graph due to failure"); + OnCompleteOrFailure(); + } + })); + + SetHandler(Out, + onPull: () => sinkIn.Pull(), + onDownstreamFinish: () => + { + _finishing = true; + sinkIn.Cancel(); + }); + + return sinkIn; + } + + protected SubSourceOutlet CreateSubOutlet(Inlet inlet) + { + var sourceOut = new SubSourceOutlet(this, $"RestartWithBackoff{_name}.subOut"); + sourceOut.SetHandler(new LambdaOutHandler( + onPull: () => + { + if (IsAvailable(In)) + { + sourceOut.Push(Grab(In)); + } + else + { + if (!HasBeenPulled(In)) + Pull(In); + } + }, + onDownstreamFinish: () => + { + if (_finishing) + { + Cancel(In); + } + else + { + Log.Debug("Graph in finished"); + OnCompleteOrFailure(); + } + } + )); + + SetHandler(In, + onPush: () => + { + if (sourceOut.IsAvailable) + sourceOut.Push(Grab(In)); + }, + onUpstreamFinish: () => + { + _finishing = true; + sourceOut.Complete(); + }, + onUpstreamFailure: ex => + { + _finishing = true; + sourceOut.Fail(ex); + }); + + return sourceOut; + } + + internal void OnCompleteOrFailure() + { + // Check if the last start attempt was more than the minimum backoff + if (_resetDeadline.IsOverdue) + { + Log.Debug($"Last restart attempt was more than {_minBackoff} ago, resetting restart count"); + _restartCount = 0; + } + + var restartDelay = BackoffSupervisor.CalculateDelay(_restartCount, _minBackoff, _maxBackoff, _randomFactor); + Log.Debug($"Restarting graph in {restartDelay}"); + ScheduleOnce("RestartTimer", restartDelay); + _restartCount += 1; + // And while we wait, we go into backoff mode + Backoff(); + } + + protected internal override void OnTimer(object timerKey) + { + StartGraph(); + _resetDeadline = _minBackoff.FromNow(); + } + + // When the stage starts, start the source + public override void PreStart() => StartGraph(); + } + + internal sealed class Deadline + { + public Deadline(TimeSpan time) + { + Time = time; + } + + public TimeSpan Time { get; } + + public bool IsOverdue => Time.Ticks - DateTime.UtcNow.Ticks < 0; + + public static Deadline Now => new Deadline(new TimeSpan(DateTime.UtcNow.Ticks)); + + public static Deadline operator +(Deadline deadline, TimeSpan duration) + { + return new Deadline(deadline.Time.Add(duration)); + } + } + + internal static class DeadlineExtensions + { + public static Deadline FromNow(this TimeSpan timespan) + { + return Deadline.Now + timespan; + } + } +} diff --git a/src/core/Akka.Streams/Dsl/Sink.cs b/src/core/Akka.Streams/Dsl/Sink.cs index 5e75e6258aa..663df046aac 100644 --- a/src/core/Akka.Streams/Dsl/Sink.cs +++ b/src/core/Akka.Streams/Dsl/Sink.cs @@ -205,7 +205,7 @@ public static Sink> First() { if (!e.IsFaulted && e.IsCompleted && e.Result == null) throw new InvalidOperationException("Sink.First materialized on an empty stream"); - + return e; }); @@ -241,7 +241,12 @@ public static Sink> LastOrDefault() => FromGraph(new LastOrDefault()).WithAttributes(DefaultAttributes.LastOrDefaultSink); /// - /// TBD + /// A that keeps on collecting incoming elements until upstream terminates. + /// As upstream may be unbounded, `Flow.Create{T}().Take` or the stricter `Flow.Create{T}().Limit` (and their variants) + /// may be used to ensure boundedness. + /// Materializes into a of containing all the collected elements. + /// `Seq` is limited to elements, this Sink will cancel the stream + /// after having received that many elements. /// /// TBD /// TBD