-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cluster.JoinAsync / Cluster.JoinSeedNodesAsync #3196
Merged
Merged
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
f8ebbf7
Cluster.JoinAsync / Cluster.JoinSeedNodesAsync
Horusiath 2dd2f12
Cluster.JoinAsync: updated API approvals
Horusiath 1c15fdb
added Cluster.JoinAsync exceptions
Horusiath 5af5338
removed API approvals received.txt
Horusiath b7e767a
added test cases for failure scenarios
Horusiath 5f2a287
Merge branch 'dev' into join-async
Horusiath 1afcb4e
added double checks for success/failure scenarios
Horusiath 9000f75
Merge branch 'dev' into join-async
Horusiath 011e610
Merge branch 'dev' into join-async
Horusiath File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,12 +9,14 @@ | |
using System.Collections.Immutable; | ||
using System.Linq; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using Akka.Actor; | ||
using Akka.Configuration; | ||
using Akka.TestKit; | ||
using Akka.Util.Internal; | ||
using Xunit; | ||
using FluentAssertions; | ||
using Xunit.Abstractions; | ||
|
||
namespace Akka.Cluster.Tests | ||
{ | ||
|
@@ -45,8 +47,8 @@ public class ClusterSpec : AkkaSpec | |
|
||
internal ClusterReadView ClusterView { get { return _cluster.ReadView; } } | ||
|
||
public ClusterSpec() | ||
: base(Config) | ||
public ClusterSpec(ITestOutputHelper output) | ||
: base(Config, output) | ||
{ | ||
_selfAddress = Sys.AsInstanceOf<ExtendedActorSystem>().Provider.DefaultAddress; | ||
_cluster = Cluster.Get(Sys); | ||
|
@@ -132,7 +134,7 @@ public void A_cluster_must_publish_member_removed_when_shutdown() | |
_cluster.Join(_selfAddress); | ||
LeaderActions(); // Joining -> Up | ||
callbackProbe.ExpectMsg("OnMemberUp"); // verify that callback hooks are registered | ||
|
||
|
||
_cluster.Subscribe(TestActor, new[] { typeof(ClusterEvent.MemberRemoved) }); | ||
// first, is in response to the subscription | ||
|
@@ -196,11 +198,11 @@ public void A_cluster_must_complete_LeaveAsync_task_upon_being_removed() | |
Cluster.Get(sys2).LeaveAsync().IsCompleted.Should().BeTrue(); | ||
} | ||
|
||
//#if CORECLR | ||
// [Fact(Skip = "Fails on .NET Core")] | ||
//#else | ||
// [Fact(Skip = "Fails flakily on .NET 4.5")] | ||
//#endif | ||
//#if CORECLR | ||
// [Fact(Skip = "Fails on .NET Core")] | ||
//#else | ||
// [Fact(Skip = "Fails flakily on .NET 4.5")] | ||
//#endif | ||
[Fact] | ||
public void A_cluster_must_return_completed_LeaveAsync_task_if_member_already_removed() | ||
{ | ||
|
@@ -306,6 +308,160 @@ public void A_cluster_must_be_allowed_to_join_and_leave_with_local_address() | |
} | ||
} | ||
|
||
[Fact] | ||
public void A_cluster_must_be_able_to_JoinAsync() | ||
{ | ||
var timeout = TimeSpan.FromSeconds(10); | ||
|
||
try | ||
{ | ||
_cluster.JoinAsync(_selfAddress).Wait(timeout).Should().BeTrue(); | ||
LeaderActions(); | ||
// Member should already be up | ||
_cluster.Subscribe(TestActor, ClusterEvent.InitialStateAsEvents, new[] { typeof(ClusterEvent.IMemberEvent) }); | ||
ExpectMsg<ClusterEvent.MemberUp>(); | ||
|
||
// join second time - response should be immediate success | ||
_cluster.JoinAsync(_selfAddress).Wait(TimeSpan.FromMilliseconds(100)).Should().BeTrue(); | ||
} | ||
finally | ||
{ | ||
_cluster.Shutdown(); | ||
} | ||
|
||
// JoinAsync should fail after cluster has been shutdown - a manual actor system restart is required | ||
Assert.ThrowsAsync<ClusterJoinFailedException>(async () => | ||
{ | ||
await _cluster.JoinAsync(_selfAddress); | ||
LeaderActions(); | ||
ExpectMsg<ClusterEvent.MemberRemoved>(); | ||
}).Wait(timeout); | ||
} | ||
|
||
[Fact] | ||
public void A_cluster_must_be_able_to_prematurelly_cancel_JoinAsync() | ||
{ | ||
var timeout = TimeSpan.FromSeconds(10); | ||
|
||
try | ||
{ | ||
var cancel = new CancellationToken(true); | ||
var task = _cluster.JoinAsync(_selfAddress, cancel); | ||
|
||
Assert.Throws<AggregateException>(() => task.Wait(timeout)) | ||
.Flatten() | ||
.InnerException.Should().BeOfType<TaskCanceledException>(); | ||
|
||
task.IsCanceled.Should().BeTrue(); | ||
} | ||
finally | ||
{ | ||
_cluster.Shutdown(); | ||
} | ||
} | ||
|
||
[Fact] | ||
public void A_cluster_JoinAsync_must_fail_if_could_not_connect_to_cluster() | ||
{ | ||
var timeout = TimeSpan.FromSeconds(10); | ||
|
||
try | ||
{ | ||
_cluster.Subscribe(TestActor, ClusterEvent.InitialStateAsEvents, new[] { typeof(ClusterEvent.IMemberEvent) }); | ||
|
||
var nonexisting = Address.Parse($"akka.tcp://{_selfAddress.System}@127.0.0.1:9999/"); | ||
Assert.ThrowsAsync<ClusterJoinFailedException>(async () => | ||
{ | ||
await _cluster.JoinAsync(nonexisting); | ||
LeaderActions(); | ||
|
||
ExpectMsg<ClusterEvent.MemberRemoved>(); | ||
}).Wait(timeout); | ||
|
||
} | ||
finally | ||
{ | ||
_cluster.Shutdown(); | ||
} | ||
} | ||
|
||
[Fact] | ||
public void A_cluster_must_be_able_to_join_async_to_seed_nodes() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Horusiath need a spec here that tests for "what happens if this method is called when we're already connected to the cluster?" |
||
{ | ||
var timeout = TimeSpan.FromSeconds(10); | ||
|
||
try | ||
{ | ||
_cluster.JoinSeedNodesAsync(new[] { _selfAddress }).Wait(timeout).Should().BeTrue(); | ||
LeaderActions(); | ||
// Member should already be up | ||
_cluster.Subscribe(TestActor, ClusterEvent.InitialStateAsEvents, new[] { typeof(ClusterEvent.IMemberEvent) }); | ||
ExpectMsg<ClusterEvent.MemberUp>(); | ||
|
||
// join second time - response should be immediate success | ||
_cluster.JoinSeedNodesAsync(new[] { _selfAddress }).Wait(TimeSpan.FromMilliseconds(100)).Should().BeTrue(); | ||
} | ||
finally | ||
{ | ||
_cluster.Shutdown(); | ||
} | ||
|
||
// JoinSeedNodesAsync should fail after cluster has been shutdown - a manual actor system restart is required | ||
Assert.ThrowsAsync<ClusterJoinFailedException>(async () => | ||
{ | ||
await _cluster.JoinSeedNodesAsync(new[] { _selfAddress }); | ||
LeaderActions(); | ||
ExpectMsg<ClusterEvent.MemberRemoved>(); | ||
}).Wait(timeout); | ||
} | ||
|
||
[Fact] | ||
public void A_cluster_JoinSeedNodesAsync_must_fail_if_could_not_connect_to_cluster() | ||
{ | ||
var timeout = TimeSpan.FromSeconds(10); | ||
|
||
try | ||
{ | ||
_cluster.Subscribe(TestActor, ClusterEvent.InitialStateAsEvents, new[] { typeof(ClusterEvent.IMemberEvent) }); | ||
|
||
var nonexisting = Address.Parse($"akka.tcp://{_selfAddress.System}@127.0.0.1:9999/"); | ||
Assert.ThrowsAsync<ClusterJoinFailedException>(async () => | ||
{ | ||
await _cluster.JoinSeedNodesAsync(new[] { nonexisting }); | ||
LeaderActions(); | ||
|
||
ExpectMsg<ClusterEvent.MemberRemoved>(); | ||
}).Wait(timeout); | ||
|
||
} | ||
finally | ||
{ | ||
_cluster.Shutdown(); | ||
} | ||
} | ||
|
||
[Fact] | ||
public void A_cluster_must_be_able_to_prematurelly_cancel_join_async_seed_nodes() | ||
{ | ||
var timeout = TimeSpan.FromSeconds(10); | ||
|
||
try | ||
{ | ||
var cancel = new CancellationToken(true); | ||
var task = _cluster.JoinSeedNodesAsync(new[] { _selfAddress }, cancel); | ||
|
||
Assert.Throws<AggregateException>(() => task.Wait(timeout)) | ||
.Flatten() | ||
.InnerException.Should().BeOfType<TaskCanceledException>(); | ||
|
||
task.IsCanceled.Should().BeTrue(); | ||
} | ||
finally | ||
{ | ||
_cluster.Shutdown(); | ||
} | ||
} | ||
|
||
[Fact] | ||
public void A_cluster_must_allow_to_resolve_RemotePathOf_any_actor() | ||
{ | ||
|
@@ -368,7 +524,7 @@ public void A_cluster_must_terminate_ActorSystem_via_leave_CoordinatedShutdown() | |
|
||
probe.ExpectMsg<ClusterEvent.MemberLeft>(); | ||
probe.ExpectMsg<ClusterEvent.MemberExited>(); | ||
probe.ExpectMsg<ClusterEvent.MemberRemoved>(); | ||
probe.ExpectMsg<ClusterEvent.MemberRemoved>(); | ||
AwaitCondition(() => sys2.WhenTerminated.IsCompleted, TimeSpan.FromSeconds(10)); | ||
Cluster.Get(sys2).IsTerminated.Should().BeTrue(); | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Horusiath need a spec here that tests for "what happens if this method is called when we're already connected to the cluster?"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will immediately complete with success (I've seen this in the impl), but I guess you're right. We probably should have tests for that.