Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added RestartSource/Flow/Sink #3103

Merged
merged 9 commits into from
Sep 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions docs/articles/streams/error-handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ In many cases you may want to avoid complete stream failure, this can be done in

- `Recover` to emit a final element then complete the stream normally on upstream failure
- `RecoverWithRetries` to create a new upstream and start consuming from that on failure
- Restarting sections of the stream after a backoff
- Using a supervision strategy for stages that support it

In addition to these built in tools for error handling, a common pattern is to wrap the stream inside an actor, and have the actor restart the entire stream on failure.
Expand Down Expand Up @@ -83,6 +84,42 @@ seven
eight
```

## Delayed restarts with a backoff stage

Just as Akka provides the [backoff supervision pattern for actors](xref:supervision#delayed-restarts-with-the-backoffsupervisor-pattern), Akka streams
also provides a `RestartSource`, `RestartSink` and `RestartFlow` for implementing the so-called *exponential backoff
supervision strategy*, starting a stage again when it fails, each time with a growing time delay between restarts.

This pattern is useful when the stage fails or completes because some external resource is not available
and we need to give it some time to start-up again. One of the prime examples when this is useful is
when a WebSocket connection fails due to the HTTP server it's running on going down, perhaps because it is overloaded.
By using an exponential backoff, we avoid going into a tight reconnect look, which both gives the HTTP server some time
to recover, and it avoids using needless resources on the client side.

The following snippet shows how to create a backoff supervisor using `Akka.Streams.Dsl.RestartSource`
which will supervise the given `Source`. The `Source` in this case is a
`HttpResponseMessage`, produced by `HttpCLient`. If the stream fails or completes at any point, the request will
be made again, in increasing intervals of 3, 6, 12, 24 and finally 30 seconds (at which point it will remain capped due
to the `maxBackoff` parameter):

[!code-csharp[RestartDocTests.cs](../../examples/DocsExamples/Streams/RestartDocTests.cs?name=restart-with-backoff-source)]

Using a `randomFactor` to add a little bit of additional variance to the backoff intervals
is highly recommended, in order to avoid multiple streams re-start at the exact same point in time,
for example because they were stopped due to a shared resource such as the same server going down
and re-starting after the same configured interval. By adding additional randomness to the
re-start intervals the streams will start in slightly different points in time, thus avoiding
large spikes of traffic hitting the recovering server or other resource that they all need to contact.

The above `RestartSource` will never terminate unless the `Sink` it's fed into cancels. It will often be handy to use
it in combination with a `KillSwitch`, so that you can terminate it when needed:

[!code-csharp[RestartDocTests.cs](../../examples/DocsExamples/Streams/RestartDocTests.cs?name=with-kill-switch)]

Sinks and flows can also be supervised, using `Akka.Streams.Dsl.RestartSink` and `Akka.Streams.Dsl.RestartFlow`.
The `RestartSink` is restarted when it cancels, while the `RestartFlow` is restarted when either the in port cancels,
the out port completes, or the out port sends an error.

## Supervision Strategies

> [!NOTE]
Expand Down
62 changes: 62 additions & 0 deletions docs/examples/DocsExamples/Streams/RestartDocTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using System;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
using Akka;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.TestKit.Xunit2;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace DocsExamples.Streams
{
public class RestartDocTests : TestKit
{
private ActorMaterializer Materializer { get; }

public RestartDocTests(ITestOutputHelper output)
: base("", output)
{
Materializer = Sys.Materializer();
}

private void DoSomethingElse()
{
}

[Fact]
public void Restart_stages_should_demonstrate_a_restart_with_backoff_source()
{
#region restart-with-backoff-source
var httpClient = new HttpClient();

var restartSource = RestartSource.WithBackoff(() =>
{
// Create a source from a task
return Source.FromTask(
httpClient.GetAsync("http://example.com/eventstream") // Make a single request
)
.Select(c => c.Content.ReadAsStringAsync())
.Select(c => c.Result);
},
minBackoff: TimeSpan.FromSeconds(3),
maxBackoff: TimeSpan.FromSeconds(30),
randomFactor: 0.2 // adds 20% "noise" to vary the intervals slightly
);
#endregion

#region with-kill-switch
var killSwitch = restartSource
.ViaMaterialized(KillSwitches.Single<string>(), Keep.Right)
.ToMaterialized(Sink.ForEach<string>(evt => Console.WriteLine($"Got event: {evt}")), Keep.Left)
.Run(Materializer);

DoSomethingElse();

killSwitch.Shutdown();
#endregion
}
}
}
12 changes: 12 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,18 @@ namespace Akka.Streams.Dsl
{
public PartitionOutOfBoundsException(string message) { }
}
public class static RestartFlow
{
public static Akka.Streams.Dsl.Flow<TIn, TOut, Akka.NotUsed> WithBackoff<TIn, TOut, TMat>(System.Func<Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> flowFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor) { }
}
public class static RestartSink
{
public static Akka.Streams.Dsl.Sink<T, Akka.NotUsed> WithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Sink<T, TMat>> sinkFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor) { }
}
public class static RestartSource
{
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> WithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Source<T, TMat>> sourceFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor) { }
}
public class static ReverseOps
{
public static Akka.Streams.Dsl.GraphDsl.Builder<TMat> From<TIn, TOut, TMat>(this Akka.Streams.Dsl.GraphDsl.ReverseOps<TIn, TMat> ops, Akka.Streams.Outlet<TOut> outlet)
Expand Down
Loading