diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt index e17b26a58ce..7426f505ba9 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt @@ -2059,6 +2059,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Source Aggregate(this Akka.Streams.Dsl.Source flow, TOut2 zero, System.Func fold) { } public static Akka.Streams.Dsl.Source AggregateAsync(this Akka.Streams.Dsl.Source flow, TOut2 zero, System.Func> fold) { } public static Akka.Streams.Dsl.Source AlsoTo(this Akka.Streams.Dsl.Source flow, Akka.Streams.IGraph, TMat> that) { } + public static Akka.Streams.Dsl.Source AlsoTo(this Akka.Streams.Dsl.Source flow, Akka.Streams.IGraph, TMat> that, bool propagateFailure) { } public static Akka.Streams.Dsl.Source AlsoToMaterialized(this Akka.Streams.Dsl.Source flow, Akka.Streams.IGraph, TMat2> that, System.Func materializerFunction) { } public static Akka.Streams.Dsl.SourceWithContext AsSourceWithContext(this Akka.Streams.Dsl.Source flow, System.Func fn) { } public static Akka.Streams.Dsl.Source BackpressureTimeout(this Akka.Streams.Dsl.Source flow, System.TimeSpan timeout) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt index 107bc594f50..c3a9e8a63fe 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt @@ -2058,6 +2058,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Source Aggregate(this Akka.Streams.Dsl.Source flow, TOut2 zero, System.Func fold) { } public static Akka.Streams.Dsl.Source AggregateAsync(this Akka.Streams.Dsl.Source flow, TOut2 zero, System.Func> fold) { } public static Akka.Streams.Dsl.Source AlsoTo(this Akka.Streams.Dsl.Source flow, Akka.Streams.IGraph, TMat> that) { } + public static Akka.Streams.Dsl.Source AlsoTo(this Akka.Streams.Dsl.Source flow, Akka.Streams.IGraph, TMat> that, bool propagateFailure) { } public static Akka.Streams.Dsl.Source AlsoToMaterialized(this Akka.Streams.Dsl.Source flow, Akka.Streams.IGraph, TMat2> that, System.Func materializerFunction) { } public static Akka.Streams.Dsl.SourceWithContext AsSourceWithContext(this Akka.Streams.Dsl.Source flow, System.Func fn) { } public static Akka.Streams.Dsl.Source BackpressureTimeout(this Akka.Streams.Dsl.Source flow, System.TimeSpan timeout) { } diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowAlsoToSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowAlsoToSpec.cs new file mode 100644 index 00000000000..f9f51e32281 --- /dev/null +++ b/src/core/Akka.Streams.Tests/Dsl/FlowAlsoToSpec.cs @@ -0,0 +1,105 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2024 Lightbend Inc. +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System.Linq; +using System.Threading.Tasks; +using Akka.Pattern; +using Akka.Streams.Dsl; +using Akka.Streams.Implementation; +using Akka.Streams.TestKit; +using Akka.TestKit; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Streams.Tests.Dsl; + +public class FlowAlsoToSpec: AkkaSpec +{ + private ActorMaterializer Materializer { get; } + + public FlowAlsoToSpec(ITestOutputHelper output) : base(output) + { + Materializer = ActorMaterializer.Create(Sys); + } + + [Fact(DisplayName = "AlsoTo with no failure propagation should not cancel parent stream")] + public async Task AlsoToNoFailurePropagationTest() + { + await this.AssertAllStagesStoppedAsync(async () => { + var downstream = this.CreateSubscriberProbe(); + var alsoDownstream = this.CreateSubscriberProbe(); + + var tapFlow = Flow.Create() + .Select(i => i == 3 ? throw new TestException("equals 3") : i) + .To(Sink.FromSubscriber(alsoDownstream)); + + Source.From(Enumerable.Range(1, 5)) + .AlsoTo(tapFlow) + .RunWith(Sink.FromSubscriber(downstream), Materializer); + + await downstream.ExpectSubscriptionAsync(); + await alsoDownstream.ExpectSubscriptionAsync(); + await downstream.RequestAsync(3); + await alsoDownstream.RequestAsync(3); + + await downstream.ExpectNextAsync(1); + await alsoDownstream.ExpectNextAsync(1); + await downstream.ExpectNextAsync(2); + await alsoDownstream.ExpectNextAsync(2); + await downstream.ExpectNextAsync(3); + + // AlsoTo flow errored + var ex = await alsoDownstream.ExpectErrorAsync(); + ex.Should().BeOfType(); + ex.Message.Should().Be("equals 3"); + + // Parent stream should still work till complete + await downstream.RequestAsync(3); + await downstream.ExpectNextAsync(4); + await downstream.ExpectNextAsync(5); + await downstream.ExpectCompleteAsync(); + }, Materializer); + } + + [Fact(DisplayName = "AlsoTo with failure propagation should cancel parent stream")] + public async Task AlsoToFailurePropagationTest() + { + await this.AssertAllStagesStoppedAsync(async () => { + var downstream = this.CreateSubscriberProbe(); + var alsoDownstream = this.CreateSubscriberProbe(); + + var tapFlow = Flow.Create() + .Select(i => i == 3 ? throw new TestException("equals 3") : i) + .To(Sink.FromSubscriber(alsoDownstream)); + + Source.From(Enumerable.Range(1, 5)) + .AlsoTo(tapFlow, true) + .RunWith(Sink.FromSubscriber(downstream), Materializer); + + await downstream.ExpectSubscriptionAsync(); + await alsoDownstream.ExpectSubscriptionAsync(); + await downstream.RequestAsync(4); + await alsoDownstream.RequestAsync(4); + + await downstream.ExpectNextAsync(1); + await alsoDownstream.ExpectNextAsync(1); + await downstream.ExpectNextAsync(2); + await alsoDownstream.ExpectNextAsync(2); + + // AlsoTo flow errored + var ex = await alsoDownstream.ExpectErrorAsync(); + ex.Should().BeOfType(); + ex.Message.Should().Be("equals 3"); + + // Parent stream should receive the last element and also error out + await downstream.ExpectNextAsync(3); + var ex2 = await downstream.ExpectErrorAsync(); + ex2.Should().Be(ex); + }, Materializer); + } +} \ No newline at end of file diff --git a/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs b/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs index 8175e40c0b1..9724eba414d 100644 --- a/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs @@ -2519,7 +2519,7 @@ public static IFlow OrElseMaterialized(this IFl /// Attaches the given to this , meaning that elements that passes /// through will also be sent to the . /// - /// @see + /// @see /// /// It is recommended to use the internally optimized and combiners /// where appropriate instead of manually writing functions that pass through one of the values. @@ -2528,7 +2528,23 @@ public static IFlow AlsoToMaterialized( this IFlow flow, IGraph, TMat2> that, Func materializerFunction) { - return flow.ViaMaterialized(AlsoToGraph(that), materializerFunction); + return flow.ViaMaterialized(AlsoToGraph(that, false), materializerFunction); + } + + /// + /// Attaches the given to this , meaning that elements that passes + /// through will also be sent to the . + /// + /// @see + /// + /// It is recommended to use the internally optimized and combiners + /// where appropriate instead of manually writing functions that pass through one of the values. + /// + public static IFlow AlsoToMaterialized( + this IFlow flow, IGraph, TMat2> that, + Func materializerFunction, bool propagateFailure) + { + return flow.ViaMaterialized(AlsoToGraph(that, propagateFailure), materializerFunction); } /// @@ -2550,14 +2566,37 @@ public static IFlow AlsoToMaterialized( /// TBD public static IFlow AlsoTo(this IFlow flow, IGraph, TMat> that) { - return flow.Via(AlsoToGraph(that)); + return flow.Via(AlsoToGraph(that, false)); + } + + /// + /// Attaches the given to this , meaning that elements that passes + /// through will also be sent to the . + /// + /// Emits when element is available and demand exists both from the Sink and the downstream. + /// + /// Backpressures when downstream or Sink backpressures + /// + /// Completes when upstream completes + /// + /// Cancels when downstream cancels + /// + /// TBD + /// TBD + /// TBD + /// TBD + /// Propagate downstream failures and cancels parent stream + /// TBD + public static IFlow AlsoTo(this IFlow flow, IGraph, TMat> that, bool propagateFailure) + { + return flow.Via(AlsoToGraph(that, propagateFailure)); } - private static IGraph, TMat> AlsoToGraph(IGraph, TMat> that) + private static IGraph, TMat> AlsoToGraph(IGraph, TMat> that, bool propagateFailure) { return GraphDsl.Create(that, (b, r) => { - var broadcast = b.Add(new Broadcast(2)); + var broadcast = b.Add(new Broadcast(2, propagateFailure)); b.From(broadcast.Out(1)).To(r); return new FlowShape(broadcast.In, broadcast.Out(0)); }); diff --git a/src/core/Akka.Streams/Dsl/SourceOperations.cs b/src/core/Akka.Streams/Dsl/SourceOperations.cs index 7ceed16caae..e294a0eb733 100644 --- a/src/core/Akka.Streams/Dsl/SourceOperations.cs +++ b/src/core/Akka.Streams/Dsl/SourceOperations.cs @@ -1748,6 +1748,29 @@ public static Source AlsoTo(this Source flow return (Source)InternalFlowOperations.AlsoTo(flow, that); } + /// + /// Attaches the given to this , meaning that elements that passes + /// through will also be sent to the . + /// + /// Emits when element is available and demand exists both from the Sink and the downstream. + /// + /// Backpressures when downstream or Sink backpressures + /// + /// Completes when upstream completes + /// + /// Cancels when downstream cancels + /// + /// TBD + /// TBD + /// TBD + /// TBD + /// Propagate downstream failures and cancels parent stream + /// TBD + public static Source AlsoTo(this Source flow, IGraph, TMat> that, bool propagateFailure) + { + return (Source)InternalFlowOperations.AlsoTo(flow, that, propagateFailure); + } + /// /// This is a simplified version of that takes only a simple procedure. /// Elements will be passed into this "side channel" delegate, and any of its results will be ignored.