Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AlsoTo downstream failure propagation support #7301

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2059,6 +2059,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Source<TOut2, TMat> Aggregate<TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Source<TOut1, TMat> flow, TOut2 zero, System.Func<TOut2, TOut1, TOut2> fold) { }
public static Akka.Streams.Dsl.Source<TOut2, TMat> AggregateAsync<TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Source<TOut1, TMat> flow, TOut2 zero, System.Func<TOut2, TOut1, System.Threading.Tasks.Task<TOut2>> fold) { }
public static Akka.Streams.Dsl.Source<TOut, TMat> AlsoTo<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat> that) { }
public static Akka.Streams.Dsl.Source<TOut, TMat> AlsoTo<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat> that, bool propagateFailure) { }
public static Akka.Streams.Dsl.Source<TOut, TMat3> AlsoToMaterialized<TOut, TMat, TMat2, TMat3>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> that, System.Func<TMat, TMat2, TMat3> materializerFunction) { }
public static Akka.Streams.Dsl.SourceWithContext<TOut, TCtx, TMat> AsSourceWithContext<TOut, TCtx, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, System.Func<TOut, TCtx> fn) { }
public static Akka.Streams.Dsl.Source<TOut, TMat> BackpressureTimeout<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, System.TimeSpan timeout) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2058,6 +2058,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Source<TOut2, TMat> Aggregate<TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Source<TOut1, TMat> flow, TOut2 zero, System.Func<TOut2, TOut1, TOut2> fold) { }
public static Akka.Streams.Dsl.Source<TOut2, TMat> AggregateAsync<TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Source<TOut1, TMat> flow, TOut2 zero, System.Func<TOut2, TOut1, System.Threading.Tasks.Task<TOut2>> fold) { }
public static Akka.Streams.Dsl.Source<TOut, TMat> AlsoTo<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat> that) { }
public static Akka.Streams.Dsl.Source<TOut, TMat> AlsoTo<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat> that, bool propagateFailure) { }
public static Akka.Streams.Dsl.Source<TOut, TMat3> AlsoToMaterialized<TOut, TMat, TMat2, TMat3>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> that, System.Func<TMat, TMat2, TMat3> materializerFunction) { }
public static Akka.Streams.Dsl.SourceWithContext<TOut, TCtx, TMat> AsSourceWithContext<TOut, TCtx, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, System.Func<TOut, TCtx> fn) { }
public static Akka.Streams.Dsl.Source<TOut, TMat> BackpressureTimeout<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, System.TimeSpan timeout) { }
Expand Down
105 changes: 105 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/FlowAlsoToSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// -----------------------------------------------------------------------
// <copyright file="FlowAlsoToSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2024 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2024 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System.Linq;
using System.Threading.Tasks;
using Akka.Pattern;
using Akka.Streams.Dsl;
using Akka.Streams.Implementation;
using Akka.Streams.TestKit;
using Akka.TestKit;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Streams.Tests.Dsl;

public class FlowAlsoToSpec: AkkaSpec
{
private ActorMaterializer Materializer { get; }

public FlowAlsoToSpec(ITestOutputHelper output) : base(output)
{
Materializer = ActorMaterializer.Create(Sys);
}

[Fact(DisplayName = "AlsoTo with no failure propagation should not cancel parent stream")]
public async Task AlsoToNoFailurePropagationTest()
{
await this.AssertAllStagesStoppedAsync(async () => {
var downstream = this.CreateSubscriberProbe<int>();
var alsoDownstream = this.CreateSubscriberProbe<int>();

var tapFlow = Flow.Create<int>()
.Select(i => i == 3 ? throw new TestException("equals 3") : i)
.To(Sink.FromSubscriber(alsoDownstream));

Source.From(Enumerable.Range(1, 5))
.AlsoTo(tapFlow)
.RunWith(Sink.FromSubscriber(downstream), Materializer);

await downstream.ExpectSubscriptionAsync();
await alsoDownstream.ExpectSubscriptionAsync();
await downstream.RequestAsync(3);
await alsoDownstream.RequestAsync(3);

await downstream.ExpectNextAsync(1);
await alsoDownstream.ExpectNextAsync(1);
await downstream.ExpectNextAsync(2);
await alsoDownstream.ExpectNextAsync(2);
await downstream.ExpectNextAsync(3);

// AlsoTo flow errored
var ex = await alsoDownstream.ExpectErrorAsync();
ex.Should().BeOfType<TestException>();
ex.Message.Should().Be("equals 3");

// Parent stream should still work till complete
await downstream.RequestAsync(3);
await downstream.ExpectNextAsync(4);
await downstream.ExpectNextAsync(5);
await downstream.ExpectCompleteAsync();
}, Materializer);
}

[Fact(DisplayName = "AlsoTo with failure propagation should cancel parent stream")]
public async Task AlsoToFailurePropagationTest()
{
await this.AssertAllStagesStoppedAsync(async () => {
var downstream = this.CreateSubscriberProbe<int>();
var alsoDownstream = this.CreateSubscriberProbe<int>();

var tapFlow = Flow.Create<int>()
.Select(i => i == 3 ? throw new TestException("equals 3") : i)
.To(Sink.FromSubscriber(alsoDownstream));

Source.From(Enumerable.Range(1, 5))
.AlsoTo(tapFlow, true)
.RunWith(Sink.FromSubscriber(downstream), Materializer);

await downstream.ExpectSubscriptionAsync();
await alsoDownstream.ExpectSubscriptionAsync();
await downstream.RequestAsync(4);
await alsoDownstream.RequestAsync(4);

await downstream.ExpectNextAsync(1);
await alsoDownstream.ExpectNextAsync(1);
await downstream.ExpectNextAsync(2);
await alsoDownstream.ExpectNextAsync(2);

// AlsoTo flow errored
var ex = await alsoDownstream.ExpectErrorAsync();
ex.Should().BeOfType<TestException>();
ex.Message.Should().Be("equals 3");

// Parent stream should receive the last element and also error out
await downstream.ExpectNextAsync(3);
var ex2 = await downstream.ExpectErrorAsync();
ex2.Should().Be(ex);
}, Materializer);
}
}
49 changes: 44 additions & 5 deletions src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2519,7 +2519,7 @@ public static IFlow<T, TMat3> OrElseMaterialized<T, TMat, TMat2, TMat3>(this IFl
/// Attaches the given <seealso cref="Sink{TIn,TMat}"/> to this <see cref="IFlow{TOut,TMat}"/>, meaning that elements that passes
/// through will also be sent to the <seealso cref="Sink{TIn,TMat}"/>.
///
/// @see <seealso cref="AlsoTo{TOut,TMat}"/>
/// @see <seealso cref="InternalFlowOperations.AlsoTo{TOut,TMat}(IFlow{TOut, TMat}, IGraph{SinkShape{TOut}, TMat})"/>
///
/// It is recommended to use the internally optimized <seealso cref="Keep.Left{TLeft,TRight}"/> and <seealso cref="Keep.Right{TLeft,TRight}"/> combiners
/// where appropriate instead of manually writing functions that pass through one of the values.
Expand All @@ -2528,7 +2528,23 @@ public static IFlow<TOut, TMat3> AlsoToMaterialized<TOut, TMat, TMat2, TMat3>(
this IFlow<TOut, TMat> flow, IGraph<SinkShape<TOut>, TMat2> that,
Func<TMat, TMat2, TMat3> materializerFunction)
{
return flow.ViaMaterialized(AlsoToGraph(that), materializerFunction);
return flow.ViaMaterialized(AlsoToGraph(that, false), materializerFunction);
}

/// <summary>
/// Attaches the given <seealso cref="Sink{TIn,TMat}"/> to this <see cref="IFlow{TOut,TMat}"/>, meaning that elements that passes
/// through will also be sent to the <seealso cref="Sink{TIn,TMat}"/>.
///
/// @see <seealso cref="InternalFlowOperations.AlsoTo{TOut,TMat}(IFlow{TOut, TMat}, IGraph{SinkShape{TOut}, TMat}, bool)"/>
///
/// It is recommended to use the internally optimized <seealso cref="Keep.Left{TLeft,TRight}"/> and <seealso cref="Keep.Right{TLeft,TRight}"/> combiners
/// where appropriate instead of manually writing functions that pass through one of the values.
/// </summary>
public static IFlow<TOut, TMat3> AlsoToMaterialized<TOut, TMat, TMat2, TMat3>(
this IFlow<TOut, TMat> flow, IGraph<SinkShape<TOut>, TMat2> that,
Func<TMat, TMat2, TMat3> materializerFunction, bool propagateFailure)
{
return flow.ViaMaterialized(AlsoToGraph(that, propagateFailure), materializerFunction);
}

/// <summary>
Expand All @@ -2550,14 +2566,37 @@ public static IFlow<TOut, TMat3> AlsoToMaterialized<TOut, TMat, TMat2, TMat3>(
/// <returns>TBD</returns>
public static IFlow<TOut, TMat> AlsoTo<TOut, TMat>(this IFlow<TOut, TMat> flow, IGraph<SinkShape<TOut>, TMat> that)
{
return flow.Via(AlsoToGraph(that));
return flow.Via(AlsoToGraph(that, false));
}

/// <summary>
/// Attaches the given <seealso cref="Sink{TIn,TMat}"/> to this <see cref="IFlow{TOut,TMat}"/>, meaning that elements that passes
/// through will also be sent to the <seealso cref="Sink{TIn,TMat}"/>.
///
/// Emits when element is available and demand exists both from the Sink and the downstream.
///
/// Backpressures when downstream or Sink backpressures
///
/// Completes when upstream completes
///
/// Cancels when downstream cancels
/// </summary>
/// <typeparam name="TOut">TBD</typeparam>
/// <typeparam name="TMat">TBD</typeparam>
/// <param name="flow">TBD</param>
/// <param name="that">TBD</param>
/// <param name="propagateFailure">Propagate downstream failures and cancels parent stream</param>
/// <returns>TBD</returns>
public static IFlow<TOut, TMat> AlsoTo<TOut, TMat>(this IFlow<TOut, TMat> flow, IGraph<SinkShape<TOut>, TMat> that, bool propagateFailure)
{
return flow.Via(AlsoToGraph(that, propagateFailure));
}

private static IGraph<FlowShape<TOut, TOut>, TMat> AlsoToGraph<TOut, TMat>(IGraph<SinkShape<TOut>, TMat> that)
private static IGraph<FlowShape<TOut, TOut>, TMat> AlsoToGraph<TOut, TMat>(IGraph<SinkShape<TOut>, TMat> that, bool propagateFailure)
{
return GraphDsl.Create(that, (b, r) =>
{
var broadcast = b.Add(new Broadcast<TOut>(2));
var broadcast = b.Add(new Broadcast<TOut>(2, propagateFailure));
b.From(broadcast.Out(1)).To(r);
return new FlowShape<TOut, TOut>(broadcast.In, broadcast.Out(0));
});
Expand Down
23 changes: 23 additions & 0 deletions src/core/Akka.Streams/Dsl/SourceOperations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1748,6 +1748,29 @@ public static Source<TOut, TMat> AlsoTo<TOut, TMat>(this Source<TOut, TMat> flow
return (Source<TOut, TMat>)InternalFlowOperations.AlsoTo(flow, that);
}

/// <summary>
/// Attaches the given <seealso cref="Sink{TIn,TMat}"/> to this <see cref="IFlow{TOut,TMat}"/>, meaning that elements that passes
/// through will also be sent to the <seealso cref="Sink{TIn,TMat}"/>.
///
/// Emits when element is available and demand exists both from the Sink and the downstream.
///
/// Backpressures when downstream or Sink backpressures
///
/// Completes when upstream completes
///
/// Cancels when downstream cancels
/// </summary>
/// <typeparam name="TOut">TBD</typeparam>
/// <typeparam name="TMat">TBD</typeparam>
/// <param name="flow">TBD</param>
/// <param name="that">TBD</param>
/// <param name="propagateFailure">Propagate downstream failures and cancels parent stream</param>
/// <returns>TBD</returns>
public static Source<TOut, TMat> AlsoTo<TOut, TMat>(this Source<TOut, TMat> flow, IGraph<SinkShape<TOut>, TMat> that, bool propagateFailure)
{
return (Source<TOut, TMat>)InternalFlowOperations.AlsoTo(flow, that, propagateFailure);
}

/// <summary>
/// This is a simplified version of <seealso cref="WireTap{T}"/> that takes only a simple procedure.
/// Elements will be passed into this "side channel" delegate, and any of its results will be ignored.
Expand Down