Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replaced Receive(Func<T, Task> handler) by ReceiveAsync(...) #1747

Merged
merged 1 commit into from
Mar 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/benchmark/PingPong/ClientAsyncActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public ClientAsyncActor(IActorRef actor, long repeat, TaskCompletionSource<bool>
{
var received = 0L;
var sent = 0L;
Receive<Messages.Msg>(async m =>
ReceiveAsync<Messages.Msg>(async m =>
{
received++;
if (sent < repeat)
Expand Down
6 changes: 6 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1318,6 +1318,7 @@ namespace Akka.Actor
protected void Become(System.Action configure, bool discardOld = True) { }
protected void BecomeStacked(System.Action configure) { }
protected virtual void OnReceive(object message) { }
[System.ObsoleteAttribute("Use ReceiveAsync instead. This method will be removed in future versions")]
protected void Receive<T>(System.Func<T, System.Threading.Tasks.Task> handler) { }
protected void Receive<T>(System.Action<T> handler, System.Predicate<T> shouldHandle = null) { }
protected void Receive<T>(System.Predicate<T> shouldHandle, System.Action<T> handler) { }
Expand All @@ -1326,6 +1327,11 @@ namespace Akka.Actor
protected void Receive<T>(System.Func<T, bool> handler) { }
protected void Receive(System.Type messageType, System.Func<object, bool> handler) { }
protected void ReceiveAny(System.Action<object> handler) { }
protected void ReceiveAnyAsync(System.Func<object, System.Threading.Tasks.Task> handler) { }
protected void ReceiveAsync<T>(System.Func<T, System.Threading.Tasks.Task> handler, System.Predicate<T> shouldHandle = null) { }
protected void ReceiveAsync<T>(System.Predicate<T> shouldHandle, System.Func<T, System.Threading.Tasks.Task> handler) { }
protected void ReceiveAsync(System.Type messageType, System.Func<object, System.Threading.Tasks.Task> handler, System.Predicate<object> shouldHandle = null) { }
protected void ReceiveAsync(System.Type messageType, System.Predicate<object> shouldHandle, System.Func<object, System.Threading.Tasks.Task> handler) { }
}
public class ReceiveTimeout : Akka.Actor.IPossiblyHarmful
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ private void SetReceive()
_currentSpecRunActor.Forward(message);
});
Receive<BeginNewSpec>(spec => ReceiveBeginSpecRun(spec));
Receive<EndSpec>(spec => ReceiveEndSpecRun(spec));
ReceiveAsync<EndSpec>(spec => ReceiveEndSpecRun(spec));
Receive<RequestTestRunState>(state => Sender.Tell(TestRunData.Copy(TestRunPassed(TestRunData))));
Receive<SubscribeFactCompletionMessages>(messages => AddSubscriber(messages));
Receive<UnsubscribeFactCompletionMessages>(messages => RemoveSubscriber(messages));
Receive<EndTestRun>(async run =>
ReceiveAsync<EndTestRun>(async run =>
{
//clean up the current spec, if it hasn't been done already
if (_currentSpecRunActor != null)
Expand Down
23 changes: 13 additions & 10 deletions src/core/Akka.Remote/EndpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -470,18 +470,21 @@ private void Receiving()
* those results will then be piped back to Remoting, who waits for the results of
* listen.AddressPromise.
* */
Receive<Listen>(listen => Listens.ContinueWith<INoSerializationVerificationNeeded>(listens =>
Receive<Listen>(listen =>
{
if (listens.IsFaulted)
Listens.ContinueWith<INoSerializationVerificationNeeded>(listens =>
{
return new ListensFailure(listen.AddressesPromise, listens.Exception);
}
else
{
return new ListensResult(listen.AddressesPromise, listens.Result);
}
}, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default)
.PipeTo(Self));
if (listens.IsFaulted)
{
return new ListensFailure(listen.AddressesPromise, listens.Exception);
}
else
{
return new ListensResult(listen.AddressesPromise, listens.Result);
}
}, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default)
.PipeTo(Self);
});

Receive<ListensResult>(listens =>
{
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Tests/Actor/ActorCellSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class DummyAsyncActor : ReceiveActor
{
public DummyAsyncActor(AutoResetEvent autoResetEvent)
{
Receive<string>(async m =>
ReceiveAsync<string>(async m =>
{
await Task.Delay(500);
autoResetEvent.Set();
Expand Down
55 changes: 45 additions & 10 deletions src/core/Akka.Tests/Dispatch/AsyncAwaitSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public ReceiveTimeoutAsyncActor()
{
_replyTo.Tell("GotIt");
});
Receive<string>(async s =>
ReceiveAsync<string>(async s =>
{
_replyTo = Sender;

Expand All @@ -36,7 +36,7 @@ class AsyncActor : ReceiveActor
{
public AsyncActor()
{
Receive<string>(async s =>
ReceiveAsync<string>(async s =>
{
await Task.Yield();
await Task.Delay(TimeSpan.FromMilliseconds(100));
Expand All @@ -57,7 +57,7 @@ public SuspendActor()
{
state = 1;
});
Receive<string>(async m_ =>
ReceiveAsync<string>(async m_ =>
{
Self.Tell("change");
await Task.Delay(TimeSpan.FromSeconds(1));
Expand All @@ -75,7 +75,7 @@ public class AsyncAwaitActor : ReceiveActor
{
public AsyncAwaitActor()
{
Receive<string>(async _ =>
ReceiveAsync<string>(async _ =>
{
var sender = Sender;
var self = Self;
Expand All @@ -85,6 +85,24 @@ public AsyncAwaitActor()
Assert.Same(self, Self);
Sender.Tell("done");
});

ReceiveAsync<int>(async msg =>
{
await Task.Yield();
Sender.Tell("handled");
}, i => i > 10);

ReceiveAsync(typeof(double), async msg =>
{
await Task.Yield();
Sender.Tell("handled");
});

ReceiveAnyAsync(async msg =>
{
await Task.Yield();
Sender.Tell("receiveany");
});
}
}

Expand Down Expand Up @@ -112,7 +130,7 @@ public class Asker : ReceiveActor
{
public Asker(IActorRef other)
{
Receive<string>(async _ =>
ReceiveAsync<string>(async _ =>
{
var sender = Sender;
var self = Self;
Expand Down Expand Up @@ -189,7 +207,7 @@ public class AsyncExceptionActor : ReceiveActor
public AsyncExceptionActor(IActorRef callback)
{
_callback = callback;
Receive<string>(async _ =>
ReceiveAsync<string>(async _ =>
{
await Task.Yield();
ThrowException();
Expand Down Expand Up @@ -374,7 +392,7 @@ public class AsyncExceptionCatcherActor : ReceiveActor

public AsyncExceptionCatcherActor()
{
Receive<string>(async m =>
ReceiveAsync<string>(async m =>
{
_lastMessage = m;
try
Expand Down Expand Up @@ -410,7 +428,7 @@ public class AsyncFailingActor : ReceiveActor
{
public AsyncFailingActor()
{
Receive<string>(async m =>
ReceiveAsync<string>(async m =>
{
ThrowException();
});
Expand Down Expand Up @@ -443,7 +461,7 @@ public class AsyncPipeToDelayActor : ReceiveActor
{
public AsyncPipeToDelayActor()
{
Receive<string>(async msg =>
ReceiveAsync<string>(async msg =>
{
Task.Run(() =>
{
Expand All @@ -460,7 +478,7 @@ public class AsyncReentrantActor : ReceiveActor
{
public AsyncReentrantActor()
{
Receive<string>(async msg =>
ReceiveAsync<string>(async msg =>
{
var sender = Sender;
Task.Run(() =>
Expand Down Expand Up @@ -492,6 +510,23 @@ public void Actor_PipeTo_should_not_be_delayed_by_async_receive()
actor.Tell("hello");
ExpectMsg<string>(m => "hello".Equals(m), TimeSpan.FromMilliseconds(1000));
}

[Fact]
public async Task Actor_receiveasync_overloads_should_work()
{
var actor = Sys.ActorOf<AsyncAwaitActor>();

actor.Tell(11);
ExpectMsg<string>(m => "handled".Equals(m), TimeSpan.FromMilliseconds(1000));

actor.Tell(9);
ExpectMsg<string>(m => "receiveany".Equals(m), TimeSpan.FromMilliseconds(1000));

actor.Tell(1.0);
ExpectMsg<string>(m => "handled".Equals(m), TimeSpan.FromMilliseconds(1000));


}
}
}

2 changes: 1 addition & 1 deletion src/core/Akka.Tests/Routing/TailChoppingSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class TailChopTestActor : ReceiveActor

public TailChopTestActor(int sleepTime)
{
Receive<string>(async command =>
ReceiveAsync<string>(async command =>
{
switch (command)
{
Expand Down
92 changes: 87 additions & 5 deletions src/core/Akka/Actor/ReceiveActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,96 @@ private PartialAction<object> CreateNewHandler(Action configure)
return newHandler;
}

protected void Receive<T>(Func<T,Task> handler)
[Obsolete("Use ReceiveAsync instead. This method will be removed in future versions")]
protected void Receive<T>(Func<T, Task> handler)
{
EnsureMayConfigureMessageHandlers();
_matchHandlerBuilders.Peek().Match<T>( m =>
ReceiveAsync(handler);
}

private Action<T> WrapAsyncHandler<T>(Func<T, Task> asyncHandler)
{
return m =>
{
Func<Task> wrap = () => handler(m);
Func<Task> wrap = () => asyncHandler(m);
ActorTaskScheduler.RunTask(wrap);
});
};
}

/// <summary>
/// Registers an asynchronous handler for incoming messages of the specified type <typeparamref name="T"/>.
/// If <paramref name="shouldHandle"/>!=<c>null</c> then it must return true before a message is passed to <paramref name="handler"/>.
/// <remarks>The actor will be suspended until the task returned by <paramref name="handler"/> completes.</remarks>
/// <remarks>This method may only be called when constructing the actor or from <see cref="Become(System.Action)"/> or <see cref="BecomeStacked"/>.</remarks>
/// <remarks>Note that handlers registered prior to this may have handled the message already.
/// In that case, this handler will not be invoked.</remarks>
/// </summary>
/// <typeparam name="T">The type of the message</typeparam>
/// <param name="handler">The message handler that is invoked for incoming messages of the specified type <typeparamref name="T"/></param>
/// <param name="shouldHandle">When not <c>null</c> it is used to determine if the message matches.</param>
protected void ReceiveAsync<T>(Func<T,Task> handler, Predicate<T> shouldHandle = null)
{
Receive(WrapAsyncHandler(handler), shouldHandle);
}

/// <summary>
/// Registers an asynchronous handler for incoming messages of the specified type <typeparamref name="T"/>.
/// If <paramref name="shouldHandle"/>!=<c>null</c> then it must return true before a message is passed to <paramref name="handler"/>.
/// <remarks>The actor will be suspended until the task returned by <paramref name="handler"/> completes.</remarks>
/// <remarks>This method may only be called when constructing the actor or from <see cref="Become(System.Action)"/> or <see cref="BecomeStacked"/>.</remarks>
/// <remarks>Note that handlers registered prior to this may have handled the message already.
/// In that case, this handler will not be invoked.</remarks>
/// </summary>
/// <typeparam name="T">The type of the message</typeparam>
/// <param name="shouldHandle">When not <c>null</c> it is used to determine if the message matches.</param>
/// <param name="handler">The message handler that is invoked for incoming messages of the specified type <typeparamref name="T"/></param>
protected void ReceiveAsync<T>(Predicate<T> shouldHandle, Func<T, Task> handler)
{
Receive(WrapAsyncHandler(handler), shouldHandle);
}

/// <summary>
/// Registers an asynchronous handler for incoming messages of the specified <paramref name="messageType"/>.
/// If <paramref name="shouldHandle"/>!=<c>null</c> then it must return true before a message is passed to <paramref name="handler"/>.
/// <remarks>The actor will be suspended until the task returned by <paramref name="handler"/> completes.</remarks>
/// <remarks>This method may only be called when constructing the actor or from <see cref="Become(Action)"/> or <see cref="BecomeStacked"/>.</remarks>
/// <remarks>Note that handlers registered prior to this may have handled the message already.
/// In that case, this handler will not be invoked.</remarks>
/// </summary>
/// <param name="messageType">The type of the message</param>
/// <param name="handler">The message handler that is invoked for incoming messages of the specified <paramref name="messageType"/></param>
/// <param name="shouldHandle">When not <c>null</c> it is used to determine if the message matches.</param>
protected void ReceiveAsync(Type messageType, Func<object, Task> handler, Predicate<object> shouldHandle = null)
{
Receive(messageType, WrapAsyncHandler(handler), shouldHandle);
}

/// <summary>
/// Registers an asynchronous handler for incoming messages of the specified <paramref name="messageType"/>.
/// If <paramref name="shouldHandle"/>!=<c>null</c> then it must return true before a message is passed to <paramref name="handler"/>.
/// <remarks>The actor will be suspended until the task returned by <paramref name="handler"/> completes.</remarks>
/// <remarks>This method may only be called when constructing the actor or from <see cref="Become(Action)"/> or <see cref="BecomeStacked"/>.</remarks>
/// <remarks>Note that handlers registered prior to this may have handled the message already.
/// In that case, this handler will not be invoked.</remarks>
/// </summary>
/// <param name="messageType">The type of the message</param>
/// <param name="shouldHandle">When not <c>null</c> it is used to determine if the message matches.</param>
/// <param name="handler">The message handler that is invoked for incoming messages of the specified <paramref name="messageType"/></param>
protected void ReceiveAsync(Type messageType, Predicate<object> shouldHandle, Func<object, Task> handler)
{
Receive(messageType, WrapAsyncHandler(handler), shouldHandle);
}

/// <summary>
/// Registers an asynchronous handler for incoming messages of any type.
/// <remarks>The actor will be suspended until the task returned by <paramref name="handler"/> completes.</remarks>
/// <remarks>This method may only be called when constructing the actor or from <see cref="Become(Action)"/> or <see cref="BecomeStacked"/>.</remarks>
/// <remarks>Note that handlers registered prior to this may have handled the message already.
/// In that case, this handler will not be invoked.</remarks>
/// </summary>
/// <param name="handler">The message handler that is invoked for all</param>
protected void ReceiveAnyAsync(Func<object, Task> handler)
{
ReceiveAny(WrapAsyncHandler(handler));
}

/// <summary>
Expand Down