Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ClusterClient initial contact discovery feature #7261

Merged
merged 10 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
//-----------------------------------------------------------------------
// <copyright file="ClusterClientHandoverSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Linq;
using Akka.Actor;
using Akka.Cluster.TestKit;
using Akka.Cluster.Tools.Client;
using Akka.Cluster.Tools.PublishSubscribe;
using Akka.Configuration;
using Akka.Discovery;
using Akka.Discovery.Config;
using Akka.MultiNode.TestAdapter;
using Akka.Remote.TestKit;
using Akka.TestKit.TestActors;
using FluentAssertions;

namespace Akka.Cluster.Tools.Tests.MultiNode.Client
{
public sealed class ClusterClientDiscoverySpecConfig : MultiNodeConfig
{
public RoleName Client { get; }
public RoleName First { get; }
public RoleName Second { get; }
public RoleName Third { get; }

public ClusterClientDiscoverySpecConfig()
{
Client = Role("client");
First = Role("first");
Second = Role("second");
Third = Role("third");

CommonConfig = ConfigurationFactory.ParseString("""
akka.loglevel = INFO

akka.remote.dot-netty.tcp.hostname = localhost
akka.actor.provider = cluster
akka.remote.log-remote-lifecycle-events = off
akka.cluster.client {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we disabling heartbeats on purpose here? 1d is an eternity in the context of this spec.

heartbeat-interval = 1d
acceptable-heartbeat-pause = 1d
reconnect-timeout = 3s
refresh-contacts-interval = 1d
}
akka.test.filter-leeway = 10s
""")
.WithFallback(ClusterClientReceptionist.DefaultConfig())
.WithFallback(DistributedPubSub.DefaultConfig())
.WithFallback(MultiNodeClusterSpec.ClusterConfig());

NodeConfig(new[]{ Client },
new []{
ConfigurationFactory.ParseString("""
akka {
cluster.client {
heartbeat-interval = 1s
acceptable-heartbeat-pause = 2s
use-initial-contacts-discovery = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New setting, got it

reconnect-timeout = 4s
verbose-logging = true
discovery
{
service-name = test-cluster
discovery-timeout = 10s
}
}


discovery
{
method = config
config.services.test-cluster.endpoints = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this gets populated later using the new internal methods you added for dynamically updating config-based discovery

}
}
""")});
TestTransport = true;
}
}

public class ClusterClientDiscoverySpec : MultiNodeClusterSpec
{
private readonly ClusterClientDiscoverySpecConfig _config;
private ConfigServiceDiscovery _discoveryService;

public ClusterClientDiscoverySpec() : this(new ClusterClientDiscoverySpecConfig()) { }

protected ClusterClientDiscoverySpec(ClusterClientDiscoverySpecConfig config)
: base(config, typeof(ClusterClientDiscoverySpec))
{
_config = config;
}

protected override int InitialParticipantsValueFactory => 3;

private void Join(RoleName from, RoleName to)
{
RunOn(() =>
{
Cluster.Join(Node(to).Address);
ClusterClientReceptionist.Get(Sys);
}, from);
EnterBarrier(from.Name + "-joined");
}

private IActorRef _clusterClient = null;

[MultiNodeFact]
public void ClusterClientDiscoverySpecs()
{
ClusterClient_must_startup_cluster_with_single_node();
ClusterClient_must_establish_connection_to_first_node();
ClusterClient_must_down_existing_cluster();
ClusterClient_second_node_must_form_a_new_cluster();
ClusterClient_must_re_establish_on_cluster_restart();
ClusterClient_must_simulate_a_cluster_forced_shutdown();
ClusterClient_third_node_formed_a_cluster();
ClusterClient_must_re_establish_on_cluster_restart_after_hard_shutdown();
}

private void ClusterClient_must_startup_cluster_with_single_node()
{
Within(TimeSpan.FromSeconds(30), () =>
{
Join(_config.First, _config.First);
RunOn(() =>
{
var service = Sys.ActorOf(EchoActor.Props(this), "testService");
ClusterClientReceptionist.Get(Sys).RegisterService(service);
AwaitMembersUp(1);
}, _config.First);
EnterBarrier("cluster-started");

RunOn(() =>
{
_discoveryService =
(ConfigServiceDiscovery)Discovery.Discovery.Get(Sys).LoadServiceDiscovery("config");
var address = GetAddress(_config.First);
_discoveryService.TryAddEndpoint("test-cluster", new ServiceDiscovery.ResolvedTarget(address.Host, address.Port));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Populates the discovery list here


var resolved = _discoveryService.Lookup(new Lookup("test-cluster"), TimeSpan.FromSeconds(1)).Result;
resolved.Addresses.Count.Should().Be(1);
}, _config.Client);
EnterBarrier("discovery-entry-added");
});
}

private void ClusterClient_must_establish_connection_to_first_node()
{
RunOn(() =>
{
_clusterClient = Sys.ActorOf(ClusterClient.Props(ClusterClientSettings.Create(Sys)), "client1");

Within(TimeSpan.FromSeconds(5), () =>
{
AwaitAssert(() =>
{
_clusterClient.Tell(GetContactPoints.Instance, TestActor);
var contacts = ExpectMsg<ContactPoints>(TimeSpan.FromSeconds(1)).ContactPointsList;
contacts.Count.Should().Be(1);
contacts.First().Address.Should().Be(Node(_config.First).Address);
}, RemainingOrDefault);
});

_clusterClient.Tell(new ClusterClient.Send("/user/testService", "hello", localAffinity:true));
ExpectMsg<string>().Should().Be("hello");
}, _config.Client);
EnterBarrier("established");
}

private void ClusterClient_must_down_existing_cluster()
{
RunOn(() =>
{
Cluster.Get(Sys).Leave(Node(_config.First).Address);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entire cluster goes down here, gracefully

}, _config.First);

EnterBarrier("cluster-downed");

RunOn(() =>
{
var address = GetAddress(_config.First);
_discoveryService.TryRemoveEndpoint("test-cluster", new ServiceDiscovery.ResolvedTarget(address.Host, address.Port));

var resolved = _discoveryService.Lookup(new Lookup("test-cluster"), TimeSpan.FromSeconds(1)).Result;
resolved.Addresses.Count.Should().Be(0);
}, _config.Client);
EnterBarrier("discovery-entry-removed");

}

private void ClusterClient_second_node_must_form_a_new_cluster()
{
Join(_config.Second, _config.Second);
RunOn(() =>
{
var service = Sys.ActorOf(EchoActor.Props(this), "testService");
ClusterClientReceptionist.Get(Sys).RegisterService(service);
AwaitMembersUp(1);
}, _config.Second);

EnterBarrier("cluster-restarted");

RunOn(() =>
{
var address = GetAddress(_config.Second);
_discoveryService.TryAddEndpoint("test-cluster", new ServiceDiscovery.ResolvedTarget(address.Host, address.Port));

var resolved = _discoveryService.Lookup(new Lookup("test-cluster"), TimeSpan.FromSeconds(1)).Result;
resolved.Addresses.Count.Should().Be(1);
}, _config.Client);
EnterBarrier("discovery-entry-updated");
}

private void ClusterClient_must_re_establish_on_cluster_restart()
{
RunOn(() =>
{
Within(TimeSpan.FromSeconds(5), () =>
{
AwaitAssert(() =>
{
_clusterClient.Tell(GetContactPoints.Instance, TestActor);
var contacts = ExpectMsg<ContactPoints>(TimeSpan.FromSeconds(1)).ContactPointsList;
contacts.Count.Should().Be(1);
contacts.First().Address.Should().Be(Node(_config.Second).Address);
}, RemainingOrDefault);
});

_clusterClient.Tell(new ClusterClient.Send("/user/testService", "hello", localAffinity: true));
ExpectMsg<string>().Should().Be("hello");

}, _config.Client);
EnterBarrier("re-establish-successful");
}

private void ClusterClient_must_simulate_a_cluster_forced_shutdown()
{
RunOn(() =>
{
// simulate a hard shutdown
TestConductor.Exit(_config.Second, 0).Wait();
}, _config.Client);
EnterBarrier("hard-shutdown-and-discovery-entry-updated");
}

private void ClusterClient_third_node_formed_a_cluster()
{
Join(_config.Third, _config.Third);
RunOn(() =>
{
var service = Sys.ActorOf(EchoActor.Props(this), "testService");
ClusterClientReceptionist.Get(Sys).RegisterService(service);
AwaitMembersUp(1);
}, _config.Third);

EnterBarrier("cluster-restarted");

RunOn(() =>
{
var address = GetAddress(_config.Third);
_discoveryService.TryAddEndpoint("test-cluster", new ServiceDiscovery.ResolvedTarget(address.Host, address.Port));
Aaronontheweb marked this conversation as resolved.
Show resolved Hide resolved

var resolved = _discoveryService.Lookup(new Lookup("test-cluster"), TimeSpan.FromSeconds(1)).Result;
resolved.Addresses.Count.Should().Be(2);
}, _config.Client);
EnterBarrier("discovery-entry-updated");
}

private void ClusterClient_must_re_establish_on_cluster_restart_after_hard_shutdown()
{
RunOn(() =>
{
Within(TimeSpan.FromSeconds(20), () =>
{
AwaitAssert(() =>
{
_clusterClient.Tell(GetContactPoints.Instance, TestActor);
var contacts = ExpectMsg<ContactPoints>(TimeSpan.FromSeconds(1)).ContactPointsList;
contacts.Count.Should().Be(1);
contacts.First().Address.Should().Be(Node(_config.Third).Address);
}, TimeSpan.FromSeconds(20));
});

_clusterClient.Tell(new ClusterClient.Send("/user/testService", "hello", localAffinity: true));
ExpectMsg<string>().Should().Be("hello");

}, _config.Client);
EnterBarrier("re-establish-successful");
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<ProjectReference Include="..\..\..\core\Akka.Discovery\Akka.Discovery.csproj" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,9 @@ public static Props Props(ClusterClientSettings settings)
if (settings == null)
throw new ArgumentNullException(nameof(settings));

return Actor.Props.Create(() => new ClusterClient(settings)).WithDeploy(Deploy.Local);
return settings.UseInitialContactDiscovery
? Actor.Props.Create(() => new ClusterClientDiscovery(settings)).WithDeploy(Deploy.Local)
: Actor.Props.Create(() => new ClusterClient(settings)).WithDeploy(Deploy.Local);
}

private ILoggingAdapter _log = Context.GetLogger();
Expand Down
Loading
Loading