Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added RestartSource/Flow/Sink #3103

Merged
merged 9 commits into from
Sep 24, 2017
Merged

Conversation

alexvaluyskiy
Copy link
Contributor

@alexvaluyskiy alexvaluyskiy commented Sep 15, 2017

Implemented #3090

@alexvaluyskiy alexvaluyskiy changed the title Added RestartSource/Flow/Sink [WIP] Added RestartSource/Flow/Sink Sep 15, 2017
/// <param name="minBackoff">Minimum (initial) duration until the child actor will started again, if it is terminated</param>
/// <param name="maxBackoff">The exponential back-off is capped to this duration</param>
/// <param name="randomFactor">After calculation of the exponential back-off an additional random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`.</param>
public static Source<T, NotUsed> WithBackoff<T>(Func<Source<T, NotUsed>> sourceFactory, TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor)
Copy link
Contributor Author

@alexvaluyskiy alexvaluyskiy Sep 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sourceFactory should have a signature Creator[Source[T, _]], I don't know how to do it in C#

/// <param name="minBackoff">Minimum (initial) duration until the child actor will started again, if it is terminated</param>
/// <param name="maxBackoff">The exponential back-off is capped to this duration</param>
/// <param name="randomFactor">After calculation of the exponential back-off an additional random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`.</param>
public static Sink<T, NotUsed> WithBackoff<T>(Func<Sink<T, NotUsed>> sinkFactory, TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sinkFactory should have a signature Creator[Sink[T, _]], I don't know how to do it in C#

/// <param name="minBackoff">Minimum (initial) duration until the child actor will started again, if it is terminated</param>
/// <param name="maxBackoff">The exponential back-off is capped to this duration</param>
/// <param name="randomFactor">After calculation of the exponential back-off an additional random delay based on this factor is added, e.g. `0.2` adds up to `20%` delay. In order to skip this additional delay pass in `0`.</param>
public static Flow<TIn, TOut, NotUsed> WithBackoff<TIn, TOut>(Func<Flow<TIn, TOut, NotUsed>> flowFactory, TimeSpan minBackoff, TimeSpan maxBackoff, double randomFactor)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flowFactory should have a signature Creator[Flow[In, Out, _], I don't know how to do it in C#

public override void PreStart() => StartGraph();
}

internal sealed class Deadline
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a similar class in Akka.Remote, but it accepts DateTime, not TimeSpan

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Aaronontheweb What to do with this class?


// We should have reset, so the restart delay should be back to 200ms, ie we should definitely receive the
// next element within 300ms
probe.RequestNext().Should().Be("a"); // TODO: why I can't set timeout here?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't setup timeouts here

/// may be used to ensure boundedness.
/// Materializes into a <see cref="Task"/> of <see cref="Seq{TIn}"/> containing all the collected elements.
/// `Seq` is limited to <see cref="int.MaxValue"/> elements, this Sink will cancel the stream
/// after having received that many elements.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a XML doc for Sink.Seq

.MapMaterializedValue(task => task.ContinueWith(
t1 =>
{
if (t1.IsFaulted || t1.IsCanceled)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

@@ -479,129 +485,123 @@ public void A_restart_with_backoff_flow_should_run_normally()
[Fact]
public void A_restart_with_backoff_flow_should_restart_on_cancellation()
{
this.AssertAllStagesStopped(() =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the problem with AssertAllStagesStopped

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all specs are designed in a way that all stages are stopped at this point so the spec fails.

@alexvaluyskiy alexvaluyskiy changed the title [WIP] Added RestartSource/Flow/Sink Added RestartSource/Flow/Sink Sep 19, 2017
internal sealed class RestartWithBackoffSource<T, TMat> : GraphStage<SourceShape<T>>
{
public Func<Source<T, TMat>> SourceFactory { get; }
public TimeSpan MinBackoff { get; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why public properties?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to access to those properties from GraphStageLogic. Anyway I can't see these properties because the class is internal

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the Logic class is inside the Source so you can access them even if they are private

Copy link
Contributor Author

@alexvaluyskiy alexvaluyskiy Sep 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is true. But what the difference? Scala converts all constructor's parameters (without additional val or val) to public fields. Why can't we do it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No real difference, was just curious if there is a reason because in the base logic they’re private

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a manual How to convert a default Scala constructor to C#. If I am not right, you can change this manual
https://github.com/akkadotnet/akka.net/wiki/Scala-to-C%23-Conversion-Guide#class-constructors

/// <summary>
/// Shared logic for all restart with backoff logics.
/// </summary>
internal abstract class RestartWithBackoffLogic<S> : TimerGraphStageLogic where S : Shape
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could implement IInAndOutHandler and use SetHandler(Out/In, this) to save a few allocations, we’ve done that in many other stages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm working on microbenchmark set for Akka.Streams, and I will try to reduce some allocations in the future. I don;t like to do some unobvious performance improvements without any tests.

@Aaronontheweb
Copy link
Member

The error on Mono is persistent

@marcpiechura marcpiechura merged commit b9af0c9 into akkadotnet:dev Sep 24, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants