Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cluster.JoinAsync / Cluster.JoinSeedNodesAsync #3196

Merged
merged 9 commits into from
Jan 10, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ namespace Akka.Cluster
public void Down(Akka.Actor.Address address) { }
public static Akka.Cluster.Cluster Get(Akka.Actor.ActorSystem system) { }
public void Join(Akka.Actor.Address address) { }
public System.Threading.Tasks.Task JoinAsync(Akka.Actor.Address address, System.Threading.CancellationToken token = null) { }
public void JoinSeedNodes(System.Collections.Generic.IEnumerable<Akka.Actor.Address> seedNodes) { }
public System.Threading.Tasks.Task JoinSeedNodesAsync(System.Collections.Generic.IEnumerable<Akka.Actor.Address> seedNodes, System.Threading.CancellationToken token = null) { }
public void Leave(Akka.Actor.Address address) { }
public System.Threading.Tasks.Task LeaveAsync() { }
public System.Threading.Tasks.Task LeaveAsync(System.Threading.CancellationToken cancellationToken) { }
Expand Down Expand Up @@ -154,6 +156,10 @@ namespace Akka.Cluster
public ClusterExtension() { }
public override Akka.Cluster.Cluster CreateExtension(Akka.Actor.ExtendedActorSystem system) { }
}
public class ClusterJoinFailedException : Akka.Actor.AkkaException
{
public ClusterJoinFailedException(string message) { }
}
public class ClusterScope : Akka.Actor.Scope
{
public static readonly Akka.Cluster.ClusterScope Instance;
Expand Down
154 changes: 145 additions & 9 deletions src/core/Akka.Cluster.Tests/ClusterSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
using System.Collections.Immutable;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.TestKit;
using Akka.Util.Internal;
using Xunit;
using FluentAssertions;
using Xunit.Abstractions;

namespace Akka.Cluster.Tests
{
Expand Down Expand Up @@ -45,8 +47,8 @@ public class ClusterSpec : AkkaSpec

internal ClusterReadView ClusterView { get { return _cluster.ReadView; } }

public ClusterSpec()
: base(Config)
public ClusterSpec(ITestOutputHelper output)
: base(Config, output)
{
_selfAddress = Sys.AsInstanceOf<ExtendedActorSystem>().Provider.DefaultAddress;
_cluster = Cluster.Get(Sys);
Expand Down Expand Up @@ -132,7 +134,7 @@ public void A_cluster_must_publish_member_removed_when_shutdown()
_cluster.Join(_selfAddress);
LeaderActions(); // Joining -> Up
callbackProbe.ExpectMsg("OnMemberUp"); // verify that callback hooks are registered


_cluster.Subscribe(TestActor, new[] { typeof(ClusterEvent.MemberRemoved) });
// first, is in response to the subscription
Expand Down Expand Up @@ -196,11 +198,11 @@ public void A_cluster_must_complete_LeaveAsync_task_upon_being_removed()
Cluster.Get(sys2).LeaveAsync().IsCompleted.Should().BeTrue();
}

//#if CORECLR
// [Fact(Skip = "Fails on .NET Core")]
//#else
// [Fact(Skip = "Fails flakily on .NET 4.5")]
//#endif
//#if CORECLR
// [Fact(Skip = "Fails on .NET Core")]
//#else
// [Fact(Skip = "Fails flakily on .NET 4.5")]
//#endif
[Fact]
public void A_cluster_must_return_completed_LeaveAsync_task_if_member_already_removed()
{
Expand Down Expand Up @@ -306,6 +308,140 @@ public void A_cluster_must_be_allowed_to_join_and_leave_with_local_address()
}
}

[Fact]
public void A_cluster_must_be_able_to_JoinAsync()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Horusiath need a spec here that tests for "what happens if this method is called when we're already connected to the cluster?"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will immediately complete with success (I've seen this in the impl), but I guess you're right. We probably should have tests for that.

{
var timeout = TimeSpan.FromSeconds(10);

try
{
_cluster.JoinAsync(_selfAddress).Wait(timeout).Should().BeTrue();
LeaderActions();
// Member should already be up
_cluster.Subscribe(TestActor, ClusterEvent.InitialStateAsEvents, new[] { typeof(ClusterEvent.IMemberEvent) });
ExpectMsg<ClusterEvent.MemberUp>();

}
finally
{
_cluster.Shutdown();
}
}

[Fact]
public void A_cluster_must_be_able_to_prematurelly_cancel_JoinAsync()
{
var timeout = TimeSpan.FromSeconds(10);

try
{
var cancel = new CancellationToken(true);
var task = _cluster.JoinAsync(_selfAddress, cancel);

Assert.Throws<AggregateException>(() => task.Wait(timeout))
.Flatten()
.InnerException.Should().BeOfType<TaskCanceledException>();

task.IsCanceled.Should().BeTrue();
}
finally
{
_cluster.Shutdown();
}
}

[Fact]
public void A_cluster_JoinAsync_must_fail_if_could_not_connect_to_cluster()
{
var timeout = TimeSpan.FromSeconds(10);

try
{
_cluster.Subscribe(TestActor, ClusterEvent.InitialStateAsEvents, new[] { typeof(ClusterEvent.IMemberEvent) });

var nonexisting = Address.Parse($"akka.tcp://{_selfAddress.System}@127.0.0.1:9999/");
Assert.ThrowsAsync<ClusterJoinFailedException>(async () =>
{
await _cluster.JoinAsync(nonexisting);
LeaderActions();

ExpectMsg<ClusterEvent.MemberRemoved>();
}).Wait(timeout);

}
finally
{
_cluster.Shutdown();
}
}

[Fact]
public void A_cluster_must_be_able_to_join_async_to_seed_nodes()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Horusiath need a spec here that tests for "what happens if this method is called when we're already connected to the cluster?"

{
var timeout = TimeSpan.FromSeconds(10);

try
{
_cluster.JoinSeedNodesAsync(new[] { _selfAddress }).Wait(timeout).Should().BeTrue();
LeaderActions();
// Member should already be up
_cluster.Subscribe(TestActor, ClusterEvent.InitialStateAsEvents, new[] { typeof(ClusterEvent.IMemberEvent) });
ExpectMsg<ClusterEvent.MemberUp>();

}
finally
{
_cluster.Shutdown();
}
}

[Fact]
public void A_cluster_JoinSeedNodesAsync_must_fail_if_could_not_connect_to_cluster()
{
var timeout = TimeSpan.FromSeconds(10);

try
{
_cluster.Subscribe(TestActor, ClusterEvent.InitialStateAsEvents, new[] { typeof(ClusterEvent.IMemberEvent) });

var nonexisting = Address.Parse($"akka.tcp://{_selfAddress.System}@127.0.0.1:9999/");
Assert.ThrowsAsync<ClusterJoinFailedException>(async () =>
{
await _cluster.JoinSeedNodesAsync(new[] { nonexisting });
LeaderActions();

ExpectMsg<ClusterEvent.MemberRemoved>();
}).Wait(timeout);

}
finally
{
_cluster.Shutdown();
}
}

[Fact]
public void A_cluster_must_be_able_to_prematurelly_cancel_join_async_seed_nodes()
{
var timeout = TimeSpan.FromSeconds(10);

try
{
var cancel = new CancellationToken(true);
var task = _cluster.JoinSeedNodesAsync(new[] { _selfAddress }, cancel);

Assert.Throws<AggregateException>(() => task.Wait(timeout))
.Flatten()
.InnerException.Should().BeOfType<TaskCanceledException>();

task.IsCanceled.Should().BeTrue();
}
finally
{
_cluster.Shutdown();
}
}

[Fact]
public void A_cluster_must_allow_to_resolve_RemotePathOf_any_actor()
{
Expand Down Expand Up @@ -368,7 +504,7 @@ public void A_cluster_must_terminate_ActorSystem_via_leave_CoordinatedShutdown()

probe.ExpectMsg<ClusterEvent.MemberLeft>();
probe.ExpectMsg<ClusterEvent.MemberExited>();
probe.ExpectMsg<ClusterEvent.MemberRemoved>();
probe.ExpectMsg<ClusterEvent.MemberRemoved>();
AwaitCondition(() => sys2.WhenTerminated.IsCompleted, TimeSpan.FromSeconds(10));
Cluster.Get(sys2).IsTerminated.Should().BeTrue();
}
Expand Down
64 changes: 64 additions & 0 deletions src/core/Akka.Cluster/Cluster.cs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,34 @@ public void Join(Address address)
ClusterCore.Tell(new ClusterUserAction.JoinTo(FillLocal(address)));
}

/// <summary>
/// Try to asynchronously join this cluster node specified by <paramref name="address"/>.
/// A <see cref="Join"/> command is sent to the node to join. Returned task will be completed
/// once current cluster node will be moved into <see cref="MemberStatus.Up"/> state,
/// or cancelled when provided <paramref name="token"/> cancellation triggers. Cancelling this
/// token doesn't prevent current node from joining the cluster, therefore a manuall
/// call to <see cref="Leave"/>/<see cref="LeaveAsync()"/> may still be required in order to
/// leave the cluster gracefully.
///
/// An actor system can only join a cluster once. Additional attempts will be ignored.
/// When it has successfully joined it must be restarted to be able to join another
/// cluster or to join the same cluster again.
/// </summary>
/// <param name="address">The address of the node we want to join.</param>
/// <param name="token">An optional cancellation token used to cancel returned task before it completes.</param>
/// <returns>Task which completes, once current cluster node reaches <see cref="MemberStatus.Up"/> state.</returns>
public Task JoinAsync(Address address, CancellationToken token = default(CancellationToken))
{
var completion = new TaskCompletionSource<NotUsed>();
this.RegisterOnMemberUp(() => completion.TrySetResult(NotUsed.Instance));
this.RegisterOnMemberRemoved(() => completion.TrySetException(
new ClusterJoinFailedException($"Node has not managed to join the cluster using provided address: {address}")));

Join(address);

return completion.Task.WithCancellation(token);
}

private Address FillLocal(Address address)
{
// local address might be used if grabbed from IActorRef.Path.Address
Expand Down Expand Up @@ -251,6 +279,32 @@ public void JoinSeedNodes(IEnumerable<Address> seedNodes)
new InternalClusterAction.JoinSeedNodes(seedNodes.Select(FillLocal).ToImmutableList()));
}

/// <summary>
/// Joins the specified seed nodes without defining them in config.
/// Especially useful from tests when Addresses are unknown before startup time.
/// Returns a task, which completes once current cluster node has successfully joined the cluster
/// or which cancels, when a cancellation <paramref name="token"/> has been cancelled. Cancelling this
/// token doesn't prevent current node from joining the cluster, therefore a manuall
/// call to <see cref="Leave"/>/<see cref="LeaveAsync()"/> may still be required in order to
/// leave the cluster gracefully.
///
/// An actor system can only join a cluster once. Additional attempts will be ignored.
/// When it has successfully joined it must be restarted to be able to join another
/// cluster or to join the same cluster again.
/// </summary>
/// <param name="seedNodes">TBD</param>
public Task JoinSeedNodesAsync(IEnumerable<Address> seedNodes, CancellationToken token = default(CancellationToken))
{
var completion = new TaskCompletionSource<NotUsed>();
this.RegisterOnMemberUp(() => completion.TrySetResult(NotUsed.Instance));
this.RegisterOnMemberRemoved(() => completion.TrySetException(
new ClusterJoinFailedException($"Node has not managed to join the cluster using provided seed node addresses: {string.Join(", ", seedNodes)}.")));

JoinSeedNodes(seedNodes);

return completion.Task.WithCancellation(token);
}

/// <summary>
/// Sends a command to issue state transition to LEAVING for the node specified by <paramref name="address"/>.
/// The member will go through the status changes <see cref="MemberStatus.Leaving"/> (not published to
Expand Down Expand Up @@ -528,5 +582,15 @@ internal void LogInfo(string template, object arg1, object arg2)
_log.Info("Cluster Node [{0}] - " + template, SelfAddress, arg1, arg2);
}
}

/// <summary>
/// Exception thrown, when <see cref="Cluster.JoinAsync"/> or <see cref="Cluster.JoinSeedNodesAsync"/> fails to succeed.
/// </summary>
public class ClusterJoinFailedException : AkkaException
{
public ClusterJoinFailedException(string message) : base(message)
{
}
}
}