-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added RestartSource/Flow/Sink #3103
Conversation
src/core/Akka.Streams/Dsl/Restart.cs
Outdated
/// <param name="minBackoff">Minimum (initial) duration until the child actor will started again, if it is terminated</param> | ||
/// <param name="maxBackoff">The exponential back-off is capped to this duration</param> | ||
/// <param name="randomFactor">After calculation of the exponential back-off an additional random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`.</param> | ||
public static Source<T, NotUsed> WithBackoff<T>(Func<Source<T, NotUsed>> sourceFactory, TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sourceFactory
should have a signature Creator[Source[T, _]]
, I don't know how to do it in C#
src/core/Akka.Streams/Dsl/Restart.cs
Outdated
/// <param name="minBackoff">Minimum (initial) duration until the child actor will started again, if it is terminated</param> | ||
/// <param name="maxBackoff">The exponential back-off is capped to this duration</param> | ||
/// <param name="randomFactor">After calculation of the exponential back-off an additional random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`.</param> | ||
public static Sink<T, NotUsed> WithBackoff<T>(Func<Sink<T, NotUsed>> sinkFactory, TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sinkFactory
should have a signature Creator[Sink[T, _]]
, I don't know how to do it in C#
src/core/Akka.Streams/Dsl/Restart.cs
Outdated
/// <param name="minBackoff">Minimum (initial) duration until the child actor will started again, if it is terminated</param> | ||
/// <param name="maxBackoff">The exponential back-off is capped to this duration</param> | ||
/// <param name="randomFactor">After calculation of the exponential back-off an additional random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`.</param> | ||
public static Flow<TIn, TOut, NotUsed> WithBackoff<TIn, TOut>(Func<Flow<TIn, TOut, NotUsed>> flowFactory, TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flowFactory
should have a signature Creator[Flow[In, Out, _]
, I don't know how to do it in C#
public override void PreStart() => StartGraph(); | ||
} | ||
|
||
internal sealed class Deadline |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a similar class in Akka.Remote, but it accepts DateTime
, not TimeSpan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Aaronontheweb What to do with this class?
|
||
// We should have reset, so the restart delay should be back to 200ms, ie we should definitely receive the | ||
// next element within 300ms | ||
probe.RequestNext().Should().Be("a"); // TODO: why I can't set timeout here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't setup timeouts here
/// may be used to ensure boundedness. | ||
/// Materializes into a <see cref="Task"/> of <see cref="Seq{TIn}"/> containing all the collected elements. | ||
/// `Seq` is limited to <see cref="int.MaxValue"/> elements, this Sink will cancel the stream | ||
/// after having received that many elements. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a XML doc for Sink.Seq
.MapMaterializedValue(task => task.ContinueWith( | ||
t1 => | ||
{ | ||
if (t1.IsFaulted || t1.IsCanceled) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch
@@ -479,129 +485,123 @@ public void A_restart_with_backoff_flow_should_run_normally() | |||
[Fact] | |||
public void A_restart_with_backoff_flow_should_restart_on_cancellation() | |||
{ | |||
this.AssertAllStagesStopped(() => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What was the problem with AssertAllStagesStopped
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not all specs are designed in a way that all stages are stopped at this point so the spec fails.
965a108
to
02f3e27
Compare
internal sealed class RestartWithBackoffSource<T, TMat> : GraphStage<SourceShape<T>> | ||
{ | ||
public Func<Source<T, TMat>> SourceFactory { get; } | ||
public TimeSpan MinBackoff { get; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why public properties?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to access to those properties from GraphStageLogic. Anyway I can't see these properties because the class is internal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the Logic class is inside the Source so you can access them even if they are private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is true. But what the difference? Scala converts all constructor's parameters (without additional val
or val
) to public fields. Why can't we do it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No real difference, was just curious if there is a reason because in the base logic they’re private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a manual How to convert a default Scala constructor to C#
. If I am not right, you can change this manual
https://github.com/akkadotnet/akka.net/wiki/Scala-to-C%23-Conversion-Guide#class-constructors
/// <summary> | ||
/// Shared logic for all restart with backoff logics. | ||
/// </summary> | ||
internal abstract class RestartWithBackoffLogic<S> : TimerGraphStageLogic where S : Shape |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could implement IInAndOutHandler and use SetHandler(Out/In, this) to save a few allocations, we’ve done that in many other stages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm working on microbenchmark set for Akka.Streams, and I will try to reduce some allocations in the future. I don;t like to do some unobvious performance improvements without any tests.
The error on Mono is persistent |
2a24bde
to
c33a23c
Compare
Implemented #3090