Skip to content

Commit

Permalink
Added RestartSource/Flow/Sink (#3103)
Browse files Browse the repository at this point in the history
* Restart Flow/Sink/Source

* Added RestartSource specs

* More tests and docs

* Ported all tests

* Fixed sink test

* fix specs

* Added docs

* Added timeout

* Increased timeouts for mono
  • Loading branch information
alexvaluyskiy authored and marcpiechura committed Sep 24, 2017
1 parent d971549 commit b9af0c9
Show file tree
Hide file tree
Showing 6 changed files with 1,273 additions and 2 deletions.
37 changes: 37 additions & 0 deletions docs/articles/streams/error-handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ In many cases you may want to avoid complete stream failure, this can be done in

- `Recover` to emit a final element then complete the stream normally on upstream failure
- `RecoverWithRetries` to create a new upstream and start consuming from that on failure
- Restarting sections of the stream after a backoff
- Using a supervision strategy for stages that support it

In addition to these built in tools for error handling, a common pattern is to wrap the stream inside an actor, and have the actor restart the entire stream on failure.
Expand Down Expand Up @@ -83,6 +84,42 @@ seven
eight
```

## Delayed restarts with a backoff stage

Just as Akka provides the [backoff supervision pattern for actors](xref:supervision#delayed-restarts-with-the-backoffsupervisor-pattern), Akka streams
also provides a `RestartSource`, `RestartSink` and `RestartFlow` for implementing the so-called *exponential backoff
supervision strategy*, starting a stage again when it fails, each time with a growing time delay between restarts.

This pattern is useful when the stage fails or completes because some external resource is not available
and we need to give it some time to start-up again. One of the prime examples when this is useful is
when a WebSocket connection fails due to the HTTP server it's running on going down, perhaps because it is overloaded.
By using an exponential backoff, we avoid going into a tight reconnect look, which both gives the HTTP server some time
to recover, and it avoids using needless resources on the client side.

The following snippet shows how to create a backoff supervisor using `Akka.Streams.Dsl.RestartSource`
which will supervise the given `Source`. The `Source` in this case is a
`HttpResponseMessage`, produced by `HttpCLient`. If the stream fails or completes at any point, the request will
be made again, in increasing intervals of 3, 6, 12, 24 and finally 30 seconds (at which point it will remain capped due
to the `maxBackoff` parameter):

[!code-csharp[RestartDocTests.cs](../../examples/DocsExamples/Streams/RestartDocTests.cs?name=restart-with-backoff-source)]

Using a `randomFactor` to add a little bit of additional variance to the backoff intervals
is highly recommended, in order to avoid multiple streams re-start at the exact same point in time,
for example because they were stopped due to a shared resource such as the same server going down
and re-starting after the same configured interval. By adding additional randomness to the
re-start intervals the streams will start in slightly different points in time, thus avoiding
large spikes of traffic hitting the recovering server or other resource that they all need to contact.

The above `RestartSource` will never terminate unless the `Sink` it's fed into cancels. It will often be handy to use
it in combination with a `KillSwitch`, so that you can terminate it when needed:

[!code-csharp[RestartDocTests.cs](../../examples/DocsExamples/Streams/RestartDocTests.cs?name=with-kill-switch)]

Sinks and flows can also be supervised, using `Akka.Streams.Dsl.RestartSink` and `Akka.Streams.Dsl.RestartFlow`.
The `RestartSink` is restarted when it cancels, while the `RestartFlow` is restarted when either the in port cancels,
the out port completes, or the out port sends an error.

## Supervision Strategies

> [!NOTE]
Expand Down
62 changes: 62 additions & 0 deletions docs/examples/DocsExamples/Streams/RestartDocTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
using System;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
using Akka;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.TestKit.Xunit2;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace DocsExamples.Streams
{
public class RestartDocTests : TestKit
{
private ActorMaterializer Materializer { get; }

public RestartDocTests(ITestOutputHelper output)
: base("", output)
{
Materializer = Sys.Materializer();
}

private void DoSomethingElse()
{
}

[Fact]
public void Restart_stages_should_demonstrate_a_restart_with_backoff_source()
{
#region restart-with-backoff-source
var httpClient = new HttpClient();

var restartSource = RestartSource.WithBackoff(() =>
{
// Create a source from a task
return Source.FromTask(
httpClient.GetAsync("http://example.com/eventstream") // Make a single request
)
.Select(c => c.Content.ReadAsStringAsync())
.Select(c => c.Result);
},
minBackoff: TimeSpan.FromSeconds(3),
maxBackoff: TimeSpan.FromSeconds(30),
randomFactor: 0.2 // adds 20% "noise" to vary the intervals slightly
);
#endregion

#region with-kill-switch
var killSwitch = restartSource
.ViaMaterialized(KillSwitches.Single<string>(), Keep.Right)
.ToMaterialized(Sink.ForEach<string>(evt => Console.WriteLine($"Got event: {evt}")), Keep.Left)
.Run(Materializer);

DoSomethingElse();

killSwitch.Shutdown();
#endregion
}
}
}
12 changes: 12 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,18 @@ namespace Akka.Streams.Dsl
{
public PartitionOutOfBoundsException(string message) { }
}
public class static RestartFlow
{
public static Akka.Streams.Dsl.Flow<TIn, TOut, Akka.NotUsed> WithBackoff<TIn, TOut, TMat>(System.Func<Akka.Streams.Dsl.Flow<TIn, TOut, TMat>> flowFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor) { }
}
public class static RestartSink
{
public static Akka.Streams.Dsl.Sink<T, Akka.NotUsed> WithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Sink<T, TMat>> sinkFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor) { }
}
public class static RestartSource
{
public static Akka.Streams.Dsl.Source<T, Akka.NotUsed> WithBackoff<T, TMat>(System.Func<Akka.Streams.Dsl.Source<T, TMat>> sourceFactory, System.TimeSpan minBackoff, System.TimeSpan maxBackoff, double randomFactor) { }
}
public class static ReverseOps
{
public static Akka.Streams.Dsl.GraphDsl.Builder<TMat> From<TIn, TOut, TMat>(this Akka.Streams.Dsl.GraphDsl.ReverseOps<TIn, TMat> ops, Akka.Streams.Outlet<TOut> outlet)
Expand Down
Loading

0 comments on commit b9af0c9

Please sign in to comment.