Skip to content

Commit

Permalink
Merge branch 'dev' into askFix
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaronontheweb committed Dec 27, 2017
2 parents 0abbff8 + 78125b1 commit 51ca933
Show file tree
Hide file tree
Showing 28 changed files with 613 additions and 373 deletions.
18 changes: 10 additions & 8 deletions docs/examples/Tutorials/Tutorial4/DeviceGroupQuery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class DeviceGroupQuery : UntypedActor
{
private ICancelable queryTimeoutTimer;

#region query-state
public DeviceGroupQuery(Dictionary<IActorRef, string> actorToDeviceId, long requestId, IActorRef requester, TimeSpan timeout)
{
ActorToDeviceId = actorToDeviceId;
Expand All @@ -26,6 +27,8 @@ public DeviceGroupQuery(Dictionary<IActorRef, string> actorToDeviceId, long requ
Timeout = timeout;

queryTimeoutTimer = Context.System.Scheduler.ScheduleTellOnceCancelable(timeout, Self, CollectionTimeout.Instance, Self);

Become(WaitingForReplies(new Dictionary<string, ITemperatureReading>(), new HashSet<IActorRef>(ActorToDeviceId.Keys)));
}

protected override void PreStart()
Expand All @@ -48,12 +51,6 @@ protected override void PostStop()
public IActorRef Requester { get; }
public TimeSpan Timeout { get; }

#region query-state
protected override void OnReceive(object message)
{
Become(WaitingForReplies(new Dictionary<string, ITemperatureReading>(), new HashSet<IActorRef>(ActorToDeviceId.Keys)));
}

public UntypedReceive WaitingForReplies(
Dictionary<string, ITemperatureReading> repliesSoFar,
HashSet<IActorRef> stillWaiting)
Expand Down Expand Up @@ -106,7 +103,7 @@ public void ReceivedResponse(

repliesSoFar.Add(deviceId, reading);

if (repliesSoFar.Count == 0)
if (stillWaiting.Count == 0)
{
Requester.Tell(new RespondAllTemperatures(RequestId, repliesSoFar));
Context.Stop(Self);
Expand All @@ -116,7 +113,12 @@ public void ReceivedResponse(
Context.Become(WaitingForReplies(repliesSoFar, stillWaiting));
}
}
#endregion
#endregion

protected override void OnReceive(object message)
{

}

public static Props Props(Dictionary<IActorRef, string> actorToDeviceId, long requestId, IActorRef requester, TimeSpan timeout) =>
Akka.Actor.Props.Create(() => new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout));
Expand Down
141 changes: 67 additions & 74 deletions src/Akka.sln

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,10 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
{
if (IsMatchingRole(up.Member)) _nodes.Add(up.Member.Address);
});
Receive<ClusterEvent.MemberWeaklyUp>(weaklyUp =>
{
if (IsMatchingRole(weaklyUp.Member)) _nodes.Add(weaklyUp.Member.Address);
});
Receive<ClusterEvent.MemberLeft>(left =>
{
if (IsMatchingRole(left.Member))
Expand Down
9 changes: 9 additions & 0 deletions src/contrib/cluster/Akka.DistributedData/Replicator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ private void NormalReceive()
Receive<Subscribe>(s => ReceiveSubscribe(s.Key, s.Subscriber));
Receive<Unsubscribe>(u => ReceiveUnsubscribe(u.Key, u.Subscriber));
Receive<Terminated>(t => ReceiveTerminated(t.ActorRef));
Receive<ClusterEvent.MemberWeaklyUp>(m => ReceiveMemberWeaklyUp(m.Member));
Receive<ClusterEvent.MemberUp>(m => ReceiveMemberUp(m.Member));
Receive<ClusterEvent.MemberRemoved>(m => ReceiveMemberRemoved(m.Member));
Receive<ClusterEvent.IMemberEvent>(_ => { });
Expand Down Expand Up @@ -1193,6 +1194,14 @@ private void ReceiveTerminated(IActorRef terminated)
}
}

private void ReceiveMemberWeaklyUp(Member m)
{
if (MatchingRole(m) && m.Address != _selfAddress)
{
_weaklyUpNodes = _weaklyUpNodes.Add(m.Address);
}
}

private void ReceiveMemberUp(Member m)
{
if (MatchingRole(m) && m.Address != _selfAddress)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ namespace Akka.Cluster
{
public MemberUp(Akka.Cluster.Member member) { }
}
public sealed class MemberWeaklyUp : Akka.Cluster.ClusterEvent.MemberStatusChange
{
public MemberWeaklyUp(Akka.Cluster.Member member) { }
}
public abstract class ReachabilityEvent : Akka.Cluster.ClusterEvent.IClusterDomainEvent, Akka.Cluster.ClusterEvent.IReachabilityEvent
{
protected ReachabilityEvent(Akka.Cluster.Member member) { }
Expand Down Expand Up @@ -159,6 +163,7 @@ namespace Akka.Cluster
public sealed class ClusterSettings
{
public ClusterSettings(Akka.Configuration.Config config, string systemName) { }
public bool AllowWeaklyUpMembers { get; }
public System.Nullable<System.TimeSpan> AutoDownUnreachableAfter { get; }
public System.Type DowningProviderType { get; }
[System.ObsoleteAttribute("Use Cluster.DowningProvider.DownRemovalMargin [1.1.2]")]
Expand Down Expand Up @@ -224,6 +229,7 @@ namespace Akka.Cluster
Exiting = 3,
Down = 4,
Removed = 5,
WeaklyUp = 6,
}
public sealed class NoDowning : Akka.Cluster.IDowningProvider
{
Expand Down
4 changes: 3 additions & 1 deletion src/core/Akka.Cluster.Tests.MultiNode/ConvergenceSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ public ConvergenceSpecConfig(bool failureDetectorPuppet)
CommonConfig = ConfigurationFactory.ParseString(@"akka.cluster.publish-stats-interval = 25s")
.WithFallback(MultiNodeLoggingConfig.LoggingConfig)
.WithFallback(DebugConfig(true))
.WithFallback(@"akka.cluster.failure-detector.threshold = 4")
.WithFallback(@"
akka.cluster.failure-detector.threshold = 4
akka.cluster.allow-weakly-up-members = off")
.WithFallback(MultiNodeClusterSpec.ClusterConfig(failureDetectorPuppet));
}
}
Expand Down
161 changes: 161 additions & 0 deletions src/core/Akka.Cluster.Tests.MultiNode/MemberWeaklyUpSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
//-----------------------------------------------------------------------
// <copyright file="MemberWeaklyUpSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2016 Akka.NET project <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Immutable;
using System.Linq;
using Akka.Cluster.TestKit;
using Akka.Configuration;
using Akka.Remote.TestKit;
using Akka.Remote.Transport;
using Akka.TestKit;

namespace Akka.Cluster.Tests.MultiNode
{
public class MemberWeaklyUpConfig : MultiNodeConfig
{
public RoleName First { get; }
public RoleName Second { get; }
public RoleName Third { get; }
public RoleName Fourth { get; }
public RoleName Fifth { get; }

public MemberWeaklyUpConfig()
{
First = Role("first");
Second = Role("second");
Third = Role("third");
Fourth = Role("fourth");
Fifth = Role("fifth");

CommonConfig = DebugConfig(on: false)
.WithFallback(ConfigurationFactory.ParseString(@"
akka.remote.retry-gate-closed-for = 3s
akka.cluster.allow-weakly-up-members = on"))
.WithFallback(MultiNodeClusterSpec.ClusterConfig());

TestTransport = true;
}
}

public class MemberWeaklyUpSpec : MultiNodeClusterSpec
{
private readonly MemberWeaklyUpConfig _config;
private readonly ImmutableArray<RoleName> _side1;
private readonly ImmutableArray<RoleName> _side2;

public MemberWeaklyUpSpec() : this(new MemberWeaklyUpConfig())
{
}

private MemberWeaklyUpSpec(MemberWeaklyUpConfig config) : base(config, typeof(MemberWeaklyUpSpec))
{
_config = config;
_side1 = ImmutableArray.CreateRange(new[] { config.First, config.Second });
_side2 = ImmutableArray.CreateRange(new[] { config.Third, config.Fourth, config.Fifth });
MuteMarkingAsUnreachable();
}

[MultiNodeFact]
public void Spec()
{
A_cluster_of_3_members_should_reach_initial_convergence();
A_cluster_of_3_members_should_detect_network_partition_and_mark_nodes_on_the_other_side_as_unreachable();
A_cluster_of_3_members_should_accept_joining_on_each_side_and_set_status_to_WeaklyUp();
A_cluster_of_3_members_should_change_status_to_Up_after_healed_network_partition();
}

public void A_cluster_of_3_members_should_reach_initial_convergence()
{
AwaitClusterUp(_config.First, _config.Third, _config.Fourth);
EnterBarrier("after-1");
}

public void A_cluster_of_3_members_should_detect_network_partition_and_mark_nodes_on_the_other_side_as_unreachable()
{
Within(TimeSpan.FromSeconds(20), () =>
{
RunOn(() =>
{
// split the cluster in two parts (first, second) / (third, fourth, fifth)
foreach (var role1 in _side1)
foreach (var role2 in _side2)
TestConductor.Blackhole(role1, role2, ThrottleTransportAdapter.Direction.Both).Wait(TimeSpan.FromSeconds(3));
}, _config.First);
EnterBarrier("after-split");
RunOn(() =>
{
AwaitAssert(() =>
ClusterView.UnreachableMembers
.Select(m => m.Address).ToImmutableHashSet()
.ShouldBe(ImmutableHashSet.CreateRange(new[] { GetAddress(_config.Third), GetAddress(_config.Fourth) })));
}, _config.First);
RunOn(() =>
{
AwaitAssert(() =>
ClusterView.UnreachableMembers
.Select(m => m.Address).ToImmutableHashSet()
.ShouldBe(ImmutableHashSet.CreateRange(new[] { GetAddress(_config.First) })));
}, _config.Third, _config.Fourth);
EnterBarrier("after-2");
});
}

public void A_cluster_of_3_members_should_accept_joining_on_each_side_and_set_status_to_WeaklyUp()
{
Within(TimeSpan.FromSeconds(20), () =>
{
RunOn(() => Cluster.Get(Sys).Join(GetAddress(_config.First)), _config.Second);
RunOn(() => Cluster.Get(Sys).Join(GetAddress(_config.Fourth)), _config.Fifth);
EnterBarrier("joined");
RunOn(() => AwaitAssert(() =>
{
ClusterView.Members.Count.ShouldBe(4);
ClusterView.Members.Any(m => m.Address == GetAddress(_config.Second) && m.Status == MemberStatus.WeaklyUp).ShouldBe(true);
}), _side1.ToArray());
RunOn(() => AwaitAssert(() =>
{
ClusterView.Members.Count.ShouldBe(4);
ClusterView.Members.Any(m => m.Address == GetAddress(_config.Fifth) && m.Status == MemberStatus.WeaklyUp).ShouldBe(true);
}), _side2.ToArray());
EnterBarrier("after-3");
});
}

public void A_cluster_of_3_members_should_change_status_to_Up_after_healed_network_partition()
{
Within(TimeSpan.FromSeconds(20), () =>
{
RunOn(() =>
{
foreach (var role1 in _side1)
foreach (var role2 in _side2)
{
TestConductor.PassThrough(role1, role2, ThrottleTransportAdapter.Direction.Both).Wait(TimeSpan.FromSeconds(3));
}
}, _config.First);
EnterBarrier("after-passThrough");
AwaitAllReachable();
AwaitMembersUp(5);
EnterBarrier("after-4");
});
}
}
}
51 changes: 51 additions & 0 deletions src/core/Akka.Cluster.Tests.MultiNode/MinMembersBeforeUpSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

namespace Akka.Cluster.Tests.MultiNode
{
#region Member.Up

public class MinMembersBeforeUpSpecConfig : MultiNodeConfig
{
public readonly RoleName First;
Expand Down Expand Up @@ -83,6 +85,55 @@ public void Cluster_leader_must_wait_with_moving_members_to_up_until_minimum_num
}
}

#endregion

#region Member.WeaklyUp

public class MinMembersBeforeUpWithWeaklyUpSpecConfig : MultiNodeConfig
{
public readonly RoleName First;
public readonly RoleName Second;
public readonly RoleName Third;

public MinMembersBeforeUpWithWeaklyUpSpecConfig()
{
First = Role("first");
Second = Role("second");
Third = Role("third");

CommonConfig = ConfigurationFactory.ParseString(@"
akka.cluster.min-nr-of-members = 3
akka.cluster.allow-weakly-up-members = on
").WithFallback(MultiNodeClusterSpec.ClusterConfigWithFailureDetectorPuppet());
}
}
public class MinMembersBeforeUpWithWeaklyUpNode1 : MinMembersBeforeUpWithWeaklyUpSpec { }
public class MinMembersBeforeUpWithWeaklyUpNode2 : MinMembersBeforeUpWithWeaklyUpSpec { }
public class MinMembersBeforeUpWithWeaklyUpNode3 : MinMembersBeforeUpWithWeaklyUpSpec { }

public abstract class MinMembersBeforeUpWithWeaklyUpSpec : MinMembersBeforeUpBase
{
protected MinMembersBeforeUpWithWeaklyUpSpec() : this(new MinMembersBeforeUpWithWeaklyUpSpecConfig())
{
}

protected MinMembersBeforeUpWithWeaklyUpSpec(MinMembersBeforeUpWithWeaklyUpSpecConfig config)
: base(config, typeof(MinMembersBeforeUpWithWeaklyUpSpec))
{
First = config.First;
Second = config.Second;
Third = config.Third;
}

[MultiNodeFact]
public void Cluster_leader_must_wait_with_moving_members_to_up_until_minimum_number_of_members_have_joined_with_WeaklyUp_enabled()
{
TestWaitMovingMembersToUp();
}
}

#endregion

public class MinMembersOfRoleBeforeUpSpec : MinMembersBeforeUpBase
{
public MinMembersOfRoleBeforeUpSpec() : this(new MinMembersOfRoleBeforeUpSpecConfig())
Expand Down
4 changes: 3 additions & 1 deletion src/core/Akka.Cluster.Tests.MultiNode/RestartNode3Spec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ public RestartNode3SpecConfig()
Third = Role("third");

CommonConfig = DebugConfig(false)
.WithFallback(ConfigurationFactory.ParseString("akka.cluster.auto-down-unreachable-after = off"))
.WithFallback(ConfigurationFactory.ParseString(@"
akka.cluster.auto-down-unreachable-after = off
akka.cluster.allow-weakly-up-members = off"))
.WithFallback(MultiNodeClusterSpec.ClusterConfig());

TestTransport = true;
Expand Down
Loading

0 comments on commit 51ca933

Please sign in to comment.