From a1b5a276d75d71f85e591811cea6d5052cb03359 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Sun, 23 Jun 2024 18:30:04 +0700 Subject: [PATCH 1/9] Add cluster client initial contact discovery feature --- .../Akka.Cluster.Tools.csproj | 1 + .../Client/ClusterClient.cs | 4 +- .../Client/ClusterClientDiscovery.cs | 247 ++++++++++++++++++ .../Client/ClusterClientDiscoverySettings.cs | 41 +++ .../Client/ClusterClientSettings.cs | 62 ++++- .../Akka.Cluster.Tools/Client/reference.conf | 13 + 6 files changed, 365 insertions(+), 3 deletions(-) create mode 100644 src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs create mode 100644 src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscoverySettings.cs diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Akka.Cluster.Tools.csproj b/src/contrib/cluster/Akka.Cluster.Tools/Akka.Cluster.Tools.csproj index 2d9908dcc6c..83e10ee8fc6 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Akka.Cluster.Tools.csproj +++ b/src/contrib/cluster/Akka.Cluster.Tools/Akka.Cluster.Tools.csproj @@ -15,6 +15,7 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive + diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClient.cs b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClient.cs index 074ee6ddd6f..42d05eaf562 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClient.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClient.cs @@ -274,7 +274,9 @@ public static Props Props(ClusterClientSettings settings) if (settings == null) throw new ArgumentNullException(nameof(settings)); - return Actor.Props.Create(() => new ClusterClient(settings)).WithDeploy(Deploy.Local); + return settings.UseInitialContactDiscovery + ? Actor.Props.Create(() => new ClusterClientDiscovery(settings)).WithDeploy(Deploy.Local) + : Actor.Props.Create(() => new ClusterClient(settings)).WithDeploy(Deploy.Local); } private ILoggingAdapter _log = Context.GetLogger(); diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs new file mode 100644 index 00000000000..a57adee9683 --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs @@ -0,0 +1,247 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2024 Lightbend Inc. +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.Collections.Immutable; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Discovery; +using Akka.Event; + +#nullable enable +namespace Akka.Cluster.Tools.Client; + +public class ClusterClientDiscovery: ActorBase, IWithUnboundedStash +{ + #region Discovery messages + + internal sealed class DiscoverTick: IEquatable + { + public static readonly DiscoverTick Instance = new(); + + private DiscoverTick() { } + public bool Equals(DiscoverTick? other) => other is not null; + public override bool Equals(object? obj) => ReferenceEquals(this, obj) || obj is DiscoverTick; + public override int GetHashCode() => 0; + } + + private sealed record Contact(ActorPath Path, ActorSelection Selection); + private sealed record DiscoveryFailure(Exception Cause); + private sealed record ResolveResult(Contact Contact, IActorRef? Subject); + + #endregion + + private readonly TimeSpan _defaultReconnectTimeout = TimeSpan.FromSeconds(10); + private readonly ILoggingAdapter _log = Context.GetLogger(); + private readonly ClusterClientSettings _settings; + + private readonly ClusterClientDiscoverySettings _discoverySettings; + private readonly ServiceDiscovery? _serviceDiscovery; + private readonly Lookup? _lookup; + private readonly TimeSpan _discoveryTimeout; + private ICancelable? _discoveryCancelable; + private readonly string _targetActorSystemName; + private readonly string _transportProtocol; + + public ClusterClientDiscovery(ClusterClientSettings settings) + { + _settings = settings; + _discoverySettings = settings.DiscoverySettings; + + if(_settings.InitialContacts.Count > 0) + _log.Warning("Initial contacts is being ignored because ClusterClient contacts discovery is being used"); + + var discoveryMethod = _discoverySettings.DiscoveryMethod; + if(string.IsNullOrWhiteSpace(discoveryMethod) || discoveryMethod == "") + discoveryMethod = Context.System.Settings.Config.GetString("akka.discovery.method"); + if (string.IsNullOrWhiteSpace(discoveryMethod) || discoveryMethod == "") + { + _log.Warning( + "No default initial contacts discovery implementation configured in both\n" + + "`akka.cluster.client.discovery.method` and `akka.discovery.method`.\n" + + "Make sure to configure this setting to your preferred implementation such as 'config'\n" + + "in your application.conf (from the akka-discovery module). Falling back to default config\n" + + "based discovery method"); + discoveryMethod = "config"; + } + + if (_settings.ReconnectTimeout is null) + { + _log.Warning( + "No reconnect timeout were configured in `akka.cluster.client.reconnect-timeout`,\n" + + "this setting is required when using cluster client initial contact discovery feature.\n" + + "Falling back to default value ({0}) instead.", _defaultReconnectTimeout); + _settings = _settings.WithReconnectTimeout(_defaultReconnectTimeout); + } + + if (string.IsNullOrWhiteSpace(_discoverySettings.ActorSystemName)) + { + _log.Warning( + "No target ActorSystem name configured in `akka.cluster.client.discovery.actor-system-name`,\n" + + "falling back to this ActorSystem name ({0}) instead.", Context.System.Name); + } + _targetActorSystemName = string.IsNullOrWhiteSpace(_discoverySettings.ActorSystemName) + ? Context.System.Name : _discoverySettings.ActorSystemName; + _transportProtocol = ((ExtendedActorSystem)Context.System).Provider.DefaultAddress.Protocol; + + _lookup = new Lookup(_discoverySettings.ServiceName, _discoverySettings.PortName); + _serviceDiscovery = Discovery.Discovery.Get(Context.System) + .LoadServiceDiscovery(discoveryMethod); + _discoveryTimeout = _discoverySettings.DiscoveryTimeout; + + Rediscover(); + } + + public IStash Stash { get; set; } = null!; + + protected override bool Receive(object message) + { + throw new NotImplementedException("Should never reach this code"); + } + + protected override void PostStop() + { + _discoveryCancelable?.Cancel(); + _discoveryCancelable = null; + base.PostStop(); + } + + private ActorPath ResolvedTargetToReceptionistActorPath(ServiceDiscovery.ResolvedTarget target) + { + var networkAddress = string.IsNullOrWhiteSpace(target.Host) ? target.Address.ToString() : target.Host; + var address = new Address(_transportProtocol, _targetActorSystemName, networkAddress, target.Port); + return new RootActorPath(address) / "system" / _discoverySettings.ReceptionistName; + } + + private static async Task ResolveContact(Contact contact, TimeSpan timeout, CancellationToken ct) + { + try + { + var identity = await contact.Selection.Ask(new Identify(null), timeout, ct); + return new ResolveResult(contact, identity.Subject); + } + catch (Exception) + { + return new ResolveResult(contact, null); + } + } + + private void Rediscover() + { + Become(Discovering); + _serviceDiscovery!.Lookup(_lookup, _discoveryTimeout) + .PipeTo(Self, Self, failure: cause => new DiscoveryFailure(cause)); + } + + private bool Discovering(object message) + { + switch (message) + { + case DiscoverTick: + Rediscover(); + return true; + + case ServiceDiscovery.Resolved resolved: + { + _discoveryCancelable?.Cancel(); + _discoveryCancelable = null; + + if (resolved.Addresses.Count == 0) + { + // discovery didn't find any contacts, retry discovery + _discoveryCancelable = Context.System.Scheduler.ScheduleTellOnceCancelable( + delay: _settings.DiscoverySettings.DiscoveryRetryInterval, + receiver: Self, + message: DiscoverTick.Instance, + sender: Self); + return true; + } + + var contacts = resolved.Addresses.Select(address => { + var path = ResolvedTargetToReceptionistActorPath(address); + return new Contact(path, Context.ActorSelection(path)); + }).ToImmutableHashSet(); + + VerifyContacts().PipeTo(Self, Self); + + return true; + + async Task VerifyContacts() + { + var tasks = contacts.Select(c => ResolveContact(c, TimeSpan.FromSeconds(1), default)); + return await Task.WhenAll(tasks); + } + } + + case ResolveResult[] resolved: + { + var contacts = resolved.Where(r => r.Subject is not null).Select(r => r.Contact).ToArray(); + if (contacts.Length == 0) + { + _log.Warning("Cluster.Client contact point resolution phase failed, will try again."); + _discoveryCancelable = Context.System.Scheduler.ScheduleTellOnceCancelable( + delay: _settings.DiscoverySettings.DiscoveryRetryInterval, + receiver: Self, + message: DiscoverTick.Instance, + sender: Self); + } + else + { + Become(Active(contacts)); + } + + return true; + } + + case DiscoveryFailure fail: + _log.Warning(fail.Cause, "Cluster.Client contact point service discovery phase failed, will try again."); + _discoveryCancelable = Context.System.Scheduler.ScheduleTellOnceCancelable( + delay: _settings.DiscoverySettings.DiscoveryRetryInterval, + receiver: Self, + message: DiscoverTick.Instance, + sender: Self); + return true; + + default: + Stash.Stash(); + return true; + } + } + + private Receive Active(Contact[] contacts) + { + var currentSettings = _settings.WithInitialContacts(contacts.Select(c => c.Path).ToImmutableHashSet()); + var clusterClient = Context.System.ActorOf(ClusterClient.Props(currentSettings), "clusterClient"); + Context.Watch(clusterClient); + Stash.UnstashAll(); + + return message => + { + switch (message) + { + case Terminated terminated: + if (terminated.ActorRef.Equals(clusterClient)) + { + Become(Discovering); + } + else + { + clusterClient.Forward(message); + } + break; + + default: + clusterClient.Forward(message); + break; + } + + return true; + }; + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscoverySettings.cs b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscoverySettings.cs new file mode 100644 index 00000000000..d26ccad9cb4 --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscoverySettings.cs @@ -0,0 +1,41 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2024 Lightbend Inc. +// Copyright (C) 2013-2024 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using Akka.Configuration; + +namespace Akka.Cluster.Tools.Client; + +#nullable enable +public sealed record ClusterClientDiscoverySettings( + string? DiscoveryMethod, + string? ActorSystemName, + string? ServiceName, + string ReceptionistName, + string? PortName, + TimeSpan DiscoveryRetryInterval, + TimeSpan DiscoveryTimeout) +{ + public static readonly ClusterClientDiscoverySettings Empty = new ("", null, null, "receptionist", null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(60)); + + public static ClusterClientDiscoverySettings Create(Config clusterClientConfig) + { + var config = clusterClientConfig.GetConfig("discovery"); + if (config is null) + return Empty; + + return new ClusterClientDiscoverySettings( + config.GetString("method"), + config.GetString("actor-system-name"), + config.GetString("service-name"), + config.GetString("receptionist-name", "receptionist"), + config.GetString("port-name"), + config.GetTimeSpan("discovery-retry-interval", TimeSpan.FromSeconds(1)), + config.GetTimeSpan("discovery-timeout", TimeSpan.FromSeconds(60)) + ); + } +} \ No newline at end of file diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientSettings.cs b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientSettings.cs index 6e74648b760..a76832cbc99 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientSettings.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientSettings.cs @@ -14,6 +14,7 @@ using Akka.Configuration; using Akka.Remote; +#nullable enable namespace Akka.Cluster.Tools.Client { /// @@ -56,7 +57,7 @@ public static ClusterClientSettings Create(Config config) var initialContacts = config.GetStringList("initial-contacts", new string[] { }).Select(ActorPath.Parse).ToImmutableSortedSet(); var useReconnect = config.GetString("reconnect-timeout", "").ToLowerInvariant(); - TimeSpan? reconnectTimeout = + var reconnectTimeout = useReconnect.Equals("off") || useReconnect.Equals("false") || useReconnect.Equals("no") ? @@ -70,6 +71,8 @@ public static ClusterClientSettings Create(Config config) config.GetTimeSpan("acceptable-heartbeat-pause"), config.GetInt("buffer-size"), config.GetBoolean("use-legacy-serialization"), + config.GetBoolean("use-initial-contacts-discovery"), + ClusterClientDiscoverySettings.Create(config), reconnectTimeout); } @@ -122,6 +125,10 @@ public static ClusterClientSettings Create(Config config) /// public bool UseLegacySerialization { get; } + public bool UseInitialContactDiscovery { get; } + + public ClusterClientDiscoverySettings DiscoverySettings { get; } + /// /// TBD /// @@ -166,6 +173,7 @@ public ClusterClientSettings( /// TBD /// TBD /// TBD + [Obsolete("Use constructor with useInitialContactsDiscovery and discoverySettings argument instead. Since 1.5.25")] public ClusterClientSettings( IImmutableSet initialContacts, TimeSpan establishingGetContactsInterval, @@ -175,6 +183,45 @@ public ClusterClientSettings( int bufferSize, bool useLegacySerialization, TimeSpan? reconnectTimeout = null) + : this( + initialContacts: initialContacts, + establishingGetContactsInterval: establishingGetContactsInterval, + refreshContactsInterval: refreshContactsInterval, + heartbeatInterval: heartbeatInterval, + acceptableHeartbeatPause: acceptableHeartbeatPause, + bufferSize: bufferSize, + useLegacySerialization: useLegacySerialization, + useInitialContactsDiscovery: false, + discoverySettings: null, + reconnectTimeout: reconnectTimeout) + { + } + + /// + /// TBD + /// + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + public ClusterClientSettings( + IImmutableSet initialContacts, + TimeSpan establishingGetContactsInterval, + TimeSpan refreshContactsInterval, + TimeSpan heartbeatInterval, + TimeSpan acceptableHeartbeatPause, + int bufferSize, + bool useLegacySerialization, + bool useInitialContactsDiscovery, + ClusterClientDiscoverySettings? discoverySettings = null, + TimeSpan? reconnectTimeout = null) { if (bufferSize is < 0 or > 10000) { @@ -189,6 +236,8 @@ public ClusterClientSettings( BufferSize = bufferSize; ReconnectTimeout = reconnectTimeout; UseLegacySerialization = useLegacySerialization; + UseInitialContactDiscovery = useInitialContactsDiscovery; + DiscoverySettings = discoverySettings ?? ClusterClientDiscoverySettings.Empty; } /// @@ -260,14 +309,21 @@ public ClusterClientSettings WithReconnectTimeout(TimeSpan? reconnectTimeout) public ClusterClientSettings WithUseLegacySerialization(bool useLegacySerialization) => Copy(useLegacySerialization: useLegacySerialization); + public ClusterClientSettings WithInitialContactsDiscovery( + bool useInitialContactsDiscovery, + ClusterClientDiscoverySettings? discoverySettings = null) + => Copy(useInitialContactsDiscovery: useInitialContactsDiscovery, discoverySettings: discoverySettings); + private ClusterClientSettings Copy( - IImmutableSet initialContacts = null, + IImmutableSet? initialContacts = null, TimeSpan? establishingGetContactsInterval = null, TimeSpan? refreshContactsInterval = null, TimeSpan? heartbeatInterval = null, TimeSpan? acceptableHeartbeatPause = null, int? bufferSize = null, bool? useLegacySerialization = null, + bool? useInitialContactsDiscovery = null, + ClusterClientDiscoverySettings? discoverySettings = null, TimeSpan? reconnectTimeout = null) { return new ClusterClientSettings( @@ -278,6 +334,8 @@ private ClusterClientSettings Copy( acceptableHeartbeatPause ?? AcceptableHeartbeatPause, bufferSize ?? BufferSize, useLegacySerialization ?? UseLegacySerialization, + useInitialContactsDiscovery ?? UseInitialContactDiscovery, + discoverySettings ?? DiscoverySettings, reconnectTimeout ?? ReconnectTimeout); } } diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/reference.conf b/src/contrib/cluster/Akka.Cluster.Tools/Client/reference.conf index bf6c91489fe..ed15c6bf2da 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Client/reference.conf +++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/reference.conf @@ -94,6 +94,19 @@ akka.cluster.client { # Turning this setting to off or false will cause the ClusterClient messages # to be serialized using the ClusterClientMessageSerializer and not the default Object serializer. use-legacy-serialization = on + + use-initial-contacts-discovery = false + + discovery + { + method = + actor-system-name = null + receptionist-name = receptionist + service-name = null + port-name = null + discovery-retry-interval = 1s + discovery-timeout = 60s + } } # //#cluster-client-config From 6dac608d9711f3afdabf50b08291a0e5f5286952 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Sun, 23 Jun 2024 18:33:03 +0700 Subject: [PATCH 2/9] Add unit test --- .../ClusterClientDiscoverySpec.cs | 295 ++++++++++++++++++ .../Config/ConfigServiceDiscovery.cs | 64 +++- 2 files changed, 350 insertions(+), 9 deletions(-) create mode 100644 src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/ClusterClient/ClusterClientDiscoverySpec.cs diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/ClusterClient/ClusterClientDiscoverySpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/ClusterClient/ClusterClientDiscoverySpec.cs new file mode 100644 index 00000000000..032a511fdf6 --- /dev/null +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/ClusterClient/ClusterClientDiscoverySpec.cs @@ -0,0 +1,295 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2023 Lightbend Inc. +// Copyright (C) 2013-2023 .NET Foundation +// +//----------------------------------------------------------------------- + +using System; +using System.Linq; +using Akka.Actor; +using Akka.Cluster.TestKit; +using Akka.Cluster.Tools.Client; +using Akka.Cluster.Tools.PublishSubscribe; +using Akka.Configuration; +using Akka.Discovery; +using Akka.Discovery.Config; +using Akka.MultiNode.TestAdapter; +using Akka.Remote.TestKit; +using Akka.TestKit.TestActors; +using FluentAssertions; + +namespace Akka.Cluster.Tools.Tests.MultiNode.Client +{ + public sealed class ClusterClientDiscoverySpecConfig : MultiNodeConfig + { + public RoleName Client { get; } + public RoleName First { get; } + public RoleName Second { get; } + public RoleName Third { get; } + + public ClusterClientDiscoverySpecConfig() + { + Client = Role("client"); + First = Role("first"); + Second = Role("second"); + Third = Role("third"); + + CommonConfig = ConfigurationFactory.ParseString(""" +akka.loglevel = INFO + +akka.remote.dot-netty.tcp.hostname = localhost +akka.actor.provider = cluster +akka.remote.log-remote-lifecycle-events = off +akka.cluster.client { + heartbeat-interval = 1d + acceptable-heartbeat-pause = 1d + reconnect-timeout = 3s + refresh-contacts-interval = 1d +} +akka.test.filter-leeway = 10s +""") + .WithFallback(ClusterClientReceptionist.DefaultConfig()) + .WithFallback(DistributedPubSub.DefaultConfig()) + .WithFallback(MultiNodeClusterSpec.ClusterConfig()); + + NodeConfig(new[]{ Client }, + new []{ + ConfigurationFactory.ParseString(""" +akka { + cluster.client { + heartbeat-interval = 1s + acceptable-heartbeat-pause = 2s + use-initial-contacts-discovery = true + discovery + { + service-name = test-cluster + discovery-timeout = 10s + } + } + + + discovery + { + method = config + config.services.test-cluster.endpoints = [] + } +} +""")}); + TestTransport = true; + } + } + + public class ClusterClientDiscoverySpec : MultiNodeClusterSpec + { + private readonly ClusterClientDiscoverySpecConfig _config; + private ConfigServiceDiscovery _discoveryService; + + public ClusterClientDiscoverySpec() : this(new ClusterClientDiscoverySpecConfig()) { } + + protected ClusterClientDiscoverySpec(ClusterClientDiscoverySpecConfig config) + : base(config, typeof(ClusterClientDiscoverySpec)) + { + _config = config; + } + + protected override int InitialParticipantsValueFactory => 3; + + private void Join(RoleName from, RoleName to) + { + RunOn(() => + { + Cluster.Join(Node(to).Address); + ClusterClientReceptionist.Get(Sys); + }, from); + EnterBarrier(from.Name + "-joined"); + } + + private IActorRef _clusterClient = null; + + [MultiNodeFact] + public void ClusterClientDiscoverySpecs() + { + ClusterClient_must_startup_cluster_with_single_node(); + ClusterClient_must_establish_connection_to_first_node(); + ClusterClient_must_down_existing_cluster(); + ClusterClient_second_node_must_form_a_new_cluster(); + ClusterClient_must_re_establish_on_cluster_restart(); + ClusterClient_must_simulate_a_cluster_forced_shutdown(); + ClusterClient_third_node_formed_a_cluster(); + ClusterClient_must_re_establish_on_cluster_restart_after_hard_shutdown(); + } + + private void ClusterClient_must_startup_cluster_with_single_node() + { + Within(TimeSpan.FromSeconds(30), () => + { + Join(_config.First, _config.First); + RunOn(() => + { + var service = Sys.ActorOf(EchoActor.Props(this), "testService"); + ClusterClientReceptionist.Get(Sys).RegisterService(service); + AwaitMembersUp(1); + }, _config.First); + EnterBarrier("cluster-started"); + + RunOn(() => + { + _discoveryService = + (ConfigServiceDiscovery)Discovery.Discovery.Get(Sys).LoadServiceDiscovery("config"); + var address = GetAddress(_config.First); + _discoveryService.TryAddEndpoint("test-cluster", new ServiceDiscovery.ResolvedTarget(address.Host, address.Port)); + + var resolved = _discoveryService.Lookup(new Lookup("test-cluster"), TimeSpan.FromSeconds(1)).Result; + resolved.Addresses.Count.Should().Be(1); + }, _config.Client); + EnterBarrier("discovery-entry-added"); + }); + } + + private void ClusterClient_must_establish_connection_to_first_node() + { + RunOn(() => + { + _clusterClient = Sys.ActorOf(ClusterClient.Props(ClusterClientSettings.Create(Sys)), "client1"); + + Within(TimeSpan.FromSeconds(5), () => + { + AwaitAssert(() => + { + _clusterClient.Tell(GetContactPoints.Instance, TestActor); + var contacts = ExpectMsg(TimeSpan.FromSeconds(1)).ContactPointsList; + contacts.Count.Should().Be(1); + contacts.First().Address.Should().Be(Node(_config.First).Address); + }, RemainingOrDefault); + }); + + _clusterClient.Tell(new ClusterClient.Send("/user/testService", "hello", localAffinity:true)); + ExpectMsg().Should().Be("hello"); + }, _config.Client); + EnterBarrier("established"); + } + + private void ClusterClient_must_down_existing_cluster() + { + RunOn(() => + { + Cluster.Get(Sys).Leave(Node(_config.First).Address); + }, _config.First); + + EnterBarrier("cluster-downed"); + + RunOn(() => + { + var address = GetAddress(_config.First); + _discoveryService.TryRemoveEndpoint("test-cluster", new ServiceDiscovery.ResolvedTarget(address.Host, address.Port)); + + var resolved = _discoveryService.Lookup(new Lookup("test-cluster"), TimeSpan.FromSeconds(1)).Result; + resolved.Addresses.Count.Should().Be(0); + }, _config.Client); + EnterBarrier("discovery-entry-removed"); + + } + + private void ClusterClient_second_node_must_form_a_new_cluster() + { + Join(_config.Second, _config.Second); + RunOn(() => + { + var service = Sys.ActorOf(EchoActor.Props(this), "testService"); + ClusterClientReceptionist.Get(Sys).RegisterService(service); + AwaitMembersUp(1); + }, _config.Second); + + EnterBarrier("cluster-restarted"); + + RunOn(() => + { + var address = GetAddress(_config.Second); + _discoveryService.TryAddEndpoint("test-cluster", new ServiceDiscovery.ResolvedTarget(address.Host, address.Port)); + + var resolved = _discoveryService.Lookup(new Lookup("test-cluster"), TimeSpan.FromSeconds(1)).Result; + resolved.Addresses.Count.Should().Be(1); + }, _config.Client); + EnterBarrier("discovery-entry-updated"); + } + + private void ClusterClient_must_re_establish_on_cluster_restart() + { + RunOn(() => + { + Within(TimeSpan.FromSeconds(5), () => + { + AwaitAssert(() => + { + _clusterClient.Tell(GetContactPoints.Instance, TestActor); + var contacts = ExpectMsg(TimeSpan.FromSeconds(1)).ContactPointsList; + contacts.Count.Should().Be(1); + contacts.First().Address.Should().Be(Node(_config.Second).Address); + }, RemainingOrDefault); + }); + + _clusterClient.Tell(new ClusterClient.Send("/user/testService", "hello", localAffinity: true)); + ExpectMsg().Should().Be("hello"); + + }, _config.Client); + EnterBarrier("re-establish-successful"); + } + + private void ClusterClient_must_simulate_a_cluster_forced_shutdown() + { + RunOn(() => + { + // simulate a hard shutdown + TestConductor.Exit(_config.Second, 0).Wait(); + }, _config.Client); + EnterBarrier("hard-shutdown-and-discovery-entry-updated"); + } + + private void ClusterClient_third_node_formed_a_cluster() + { + Join(_config.Third, _config.Third); + RunOn(() => + { + var service = Sys.ActorOf(EchoActor.Props(this), "testService"); + ClusterClientReceptionist.Get(Sys).RegisterService(service); + AwaitMembersUp(1); + }, _config.Third); + + EnterBarrier("cluster-restarted"); + + RunOn(() => + { + var address = GetAddress(_config.Third); + _discoveryService.TryAddEndpoint("test-cluster", new ServiceDiscovery.ResolvedTarget(address.Host, address.Port)); + + var resolved = _discoveryService.Lookup(new Lookup("test-cluster"), TimeSpan.FromSeconds(1)).Result; + resolved.Addresses.Count.Should().Be(2); + }, _config.Client); + EnterBarrier("discovery-entry-updated"); + } + + private void ClusterClient_must_re_establish_on_cluster_restart_after_hard_shutdown() + { + RunOn(() => + { + Within(TimeSpan.FromSeconds(20), () => + { + AwaitAssert(() => + { + _clusterClient.Tell(GetContactPoints.Instance, TestActor); + var contacts = ExpectMsg(TimeSpan.FromSeconds(1)).ContactPointsList; + contacts.Count.Should().Be(1); + contacts.First().Address.Should().Be(Node(_config.Third).Address); + }, TimeSpan.FromSeconds(20)); + }); + + _clusterClient.Tell(new ClusterClient.Send("/user/testService", "hello", localAffinity: true)); + ExpectMsg().Should().Be("hello"); + + }, _config.Client); + EnterBarrier("re-establish-successful"); + } + + } +} diff --git a/src/core/Akka.Discovery/Config/ConfigServiceDiscovery.cs b/src/core/Akka.Discovery/Config/ConfigServiceDiscovery.cs index 86bba8d376c..7ff03909c4c 100644 --- a/src/core/Akka.Discovery/Config/ConfigServiceDiscovery.cs +++ b/src/core/Akka.Discovery/Config/ConfigServiceDiscovery.cs @@ -7,6 +7,7 @@ using System; using System.Collections.Generic; +using System.Collections.Immutable; using System.Linq; using System.Threading.Tasks; using Akka.Actor; @@ -39,11 +40,12 @@ public static class ConfigServicesParser [InternalApi] public class ConfigServiceDiscovery : ServiceDiscovery { - private readonly Dictionary _resolvedServices; + private readonly ILoggingAdapter _log; + private ImmutableDictionary _resolvedServices; public ConfigServiceDiscovery(ExtendedActorSystem system) { - var log = Logging.GetLogger(system, nameof(ConfigServiceDiscovery)); + _log = Logging.GetLogger(system, nameof(ConfigServiceDiscovery)); var config = system.Settings.Config.GetConfig("akka.discovery.config") ?? throw new ArgumentException( @@ -52,35 +54,79 @@ public ConfigServiceDiscovery(ExtendedActorSystem system) var servicePath = config.GetString("services-path"); if (string.IsNullOrWhiteSpace(servicePath)) { - log.Warning( + _log.Warning( "The config path [akka.discovery.config] must contain field `service-path` that points to a " + "configuration path that contains an array of node services for Discovery to contact."); - _resolvedServices = new Dictionary(); + _resolvedServices = ImmutableDictionary.Empty; } else { var services = system.Settings.Config.GetConfig(servicePath); if (services == null) { - log.Warning( + _log.Warning( "You are trying to use config based discovery service and the settings path described in\n" + $"`akka.discovery.config.services-path` does not exists. Make sure that [{servicePath}] path \n" + "exists and to fill this setting with pre-defined node addresses to make sure that a cluster \n" + "can be formed"); - _resolvedServices = new Dictionary(); + _resolvedServices = ImmutableDictionary.Empty; } else { - _resolvedServices = ConfigServicesParser.Parse(services); + _resolvedServices = ConfigServicesParser.Parse(services).ToImmutableDictionary(); if(_resolvedServices.Count == 0) - log.Warning( + _log.Warning( $"You are trying to use config based discovery service and the settings path [{servicePath}]\n" + "described `akka.discovery.config.services-path` is empty. Make sure to fill this setting \n" + "with pre-defined node addresses to make sure that a cluster can be formed."); } } - log.Debug($"Config discovery serving: {string.Join(", ", _resolvedServices.Values)}"); + _log.Debug($"Config discovery serving: {string.Join(", ", _resolvedServices.Values)}"); + } + + public bool TryRemoveEndpoint(string serviceName, ResolvedTarget target) + { + if (!_resolvedServices.TryGetValue(serviceName, out var resolved)) + { + _log.Info($"Could not find service {serviceName}, adding a new service. Available services: {string.Join(", ", _resolvedServices.Keys)}"); + resolved = new Resolved(serviceName); + _resolvedServices = _resolvedServices.SetItem(serviceName, resolved); + } + + if (!resolved.Addresses.Contains(target)) + { + _log.Info($"ResolvedTarget was not in service {serviceName}, nothing to remove."); + return false; + } + + var newResolved = new Resolved(serviceName, resolved.Addresses.Remove(target)); + _resolvedServices = _resolvedServices.SetItem(serviceName, newResolved); + + _log.Debug($"ResolvedTarget {target} has been removed from service {serviceName}"); + return true; + } + + public bool TryAddEndpoint(string serviceName, ResolvedTarget target) + { + if (!_resolvedServices.TryGetValue(serviceName, out var resolved)) + { + _log.Info($"Could not find service {serviceName}, adding a new service. Available services: {string.Join(", ", _resolvedServices.Keys)}"); + resolved = new Resolved(serviceName); + _resolvedServices = _resolvedServices.SetItem(serviceName, resolved); + } + + if (resolved.Addresses.Contains(target)) + { + _log.Info($"ResolvedTarget is already in service {serviceName}, nothing to add."); + return false; + } + + var newResolved = new Resolved(serviceName, resolved.Addresses.Add(target)); + _resolvedServices = _resolvedServices.SetItem(serviceName, newResolved); + + _log.Debug($"ResolvedTarget {target} has been added to service {serviceName}"); + return true; } public override Task Lookup(Lookup lookup, TimeSpan resolveTimeout) From f5f11b2bea90ab21640e5e3311eee17dc1beeaa3 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Sun, 23 Jun 2024 20:26:27 +0700 Subject: [PATCH 3/9] Fix timers, add verbose logging flag --- .../ClusterClientDiscoverySpec.cs | 2 + .../Client/ClusterClientDiscovery.cs | 99 +++++++++++++------ .../Client/ClusterClientSettings.cs | 9 +- .../Akka.Cluster.Tools/Client/reference.conf | 2 + 4 files changed, 78 insertions(+), 34 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/ClusterClient/ClusterClientDiscoverySpec.cs b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/ClusterClient/ClusterClientDiscoverySpec.cs index 032a511fdf6..819009d9458 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/ClusterClient/ClusterClientDiscoverySpec.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools.Tests.MultiNode/ClusterClient/ClusterClientDiscoverySpec.cs @@ -61,6 +61,8 @@ public ClusterClientDiscoverySpecConfig() heartbeat-interval = 1s acceptable-heartbeat-pause = 2s use-initial-contacts-discovery = true + reconnect-timeout = 4s + verbose-logging = true discovery { service-name = test-cluster diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs index a57adee9683..e62e445ca8b 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs @@ -17,7 +17,7 @@ #nullable enable namespace Akka.Cluster.Tools.Client; -public class ClusterClientDiscovery: ActorBase, IWithUnboundedStash +public class ClusterClientDiscovery: UntypedActor, IWithUnboundedStash, IWithTimers { #region Discovery messages @@ -45,9 +45,12 @@ private sealed record ResolveResult(Contact Contact, IActorRef? Subject); private readonly ServiceDiscovery? _serviceDiscovery; private readonly Lookup? _lookup; private readonly TimeSpan _discoveryTimeout; - private ICancelable? _discoveryCancelable; + private readonly TimeSpan _discoveryRetryInterval; private readonly string _targetActorSystemName; + private readonly string _receptionistName; private readonly string _transportProtocol; + + private readonly bool _verboseLogging; public ClusterClientDiscovery(ClusterClientSettings settings) { @@ -59,7 +62,13 @@ public ClusterClientDiscovery(ClusterClientSettings settings) var discoveryMethod = _discoverySettings.DiscoveryMethod; if(string.IsNullOrWhiteSpace(discoveryMethod) || discoveryMethod == "") + { + _log.Info( + "No default initial contacts discovery implementation configured in\n" + + "`akka.cluster.client.discovery.method`. Trying to Fall back to default\n" + + "discovery method declared in `akka.discovery.method`"); discoveryMethod = Context.System.Settings.Config.GetString("akka.discovery.method"); + } if (string.IsNullOrWhiteSpace(discoveryMethod) || discoveryMethod == "") { _log.Warning( @@ -85,31 +94,40 @@ public ClusterClientDiscovery(ClusterClientSettings settings) _log.Warning( "No target ActorSystem name configured in `akka.cluster.client.discovery.actor-system-name`,\n" + "falling back to this ActorSystem name ({0}) instead.", Context.System.Name); + _targetActorSystemName = Context.System.Name; + } + else + { + _targetActorSystemName = _discoverySettings.ActorSystemName!; } - _targetActorSystemName = string.IsNullOrWhiteSpace(_discoverySettings.ActorSystemName) - ? Context.System.Name : _discoverySettings.ActorSystemName; + _transportProtocol = ((ExtendedActorSystem)Context.System).Provider.DefaultAddress.Protocol; + _receptionistName = settings.DiscoverySettings.ReceptionistName; _lookup = new Lookup(_discoverySettings.ServiceName, _discoverySettings.PortName); - _serviceDiscovery = Discovery.Discovery.Get(Context.System) - .LoadServiceDiscovery(discoveryMethod); + _serviceDiscovery = Discovery.Discovery.Get(Context.System).LoadServiceDiscovery(discoveryMethod); + _discoveryRetryInterval = _settings.DiscoverySettings.DiscoveryRetryInterval; _discoveryTimeout = _discoverySettings.DiscoveryTimeout; + + _verboseLogging = _settings.VerboseLogging; - Rediscover(); + Become(Discovering); } public IStash Stash { get; set; } = null!; + public ITimerScheduler Timers { get; set; } = null!; - protected override bool Receive(object message) + protected override void OnReceive(object message) { throw new NotImplementedException("Should never reach this code"); } - protected override void PostStop() + protected override void PreStart() { - _discoveryCancelable?.Cancel(); - _discoveryCancelable = null; - base.PostStop(); + base.PreStart(); + + // Kickoff discovery lookup + Self.Tell(DiscoverTick.Instance); } private ActorPath ResolvedTargetToReceptionistActorPath(ServiceDiscovery.ResolvedTarget target) @@ -144,22 +162,23 @@ private bool Discovering(object message) switch (message) { case DiscoverTick: + if(_verboseLogging && _log.IsDebugEnabled) + _log.Debug("Discovering initial contacts"); + Rediscover(); return true; case ServiceDiscovery.Resolved resolved: { - _discoveryCancelable?.Cancel(); - _discoveryCancelable = null; + Timers.CancelAll(); if (resolved.Addresses.Count == 0) { + if(_verboseLogging && _log.IsInfoEnabled) + _log.Info("No initial contact were discovered. Will try again."); + // discovery didn't find any contacts, retry discovery - _discoveryCancelable = Context.System.Scheduler.ScheduleTellOnceCancelable( - delay: _settings.DiscoverySettings.DiscoveryRetryInterval, - receiver: Self, - message: DiscoverTick.Instance, - sender: Self); + Timers.StartSingleTimer(DiscoverTick.Instance, DiscoverTick.Instance, _discoveryRetryInterval); return true; } @@ -168,6 +187,9 @@ private bool Discovering(object message) return new Contact(path, Context.ActorSelection(path)); }).ToImmutableHashSet(); + if(_verboseLogging && _log.IsDebugEnabled) + _log.Debug("Initial contacts are discovered at [{0}], verifying existence.", string.Join(", ", contacts.Select(c => c.Path))); + VerifyContacts().PipeTo(Self, Self); return true; @@ -184,15 +206,16 @@ async Task VerifyContacts() var contacts = resolved.Where(r => r.Subject is not null).Select(r => r.Contact).ToArray(); if (contacts.Length == 0) { - _log.Warning("Cluster.Client contact point resolution phase failed, will try again."); - _discoveryCancelable = Context.System.Scheduler.ScheduleTellOnceCancelable( - delay: _settings.DiscoverySettings.DiscoveryRetryInterval, - receiver: Self, - message: DiscoverTick.Instance, - sender: Self); + if(_verboseLogging && _log.IsInfoEnabled) + _log.Info("Cluster client contact point resolution phase failed, will try again."); + + Timers.StartSingleTimer(DiscoverTick.Instance, DiscoverTick.Instance, _discoveryRetryInterval); } else { + if(_log.IsInfoEnabled) + _log.Info("Cluster client initial contacts are verified at [{0}], starting cluster client actor.", string.Join(", ", contacts.Select(c => c.Path))); + Become(Active(contacts)); } @@ -200,12 +223,10 @@ async Task VerifyContacts() } case DiscoveryFailure fail: - _log.Warning(fail.Cause, "Cluster.Client contact point service discovery phase failed, will try again."); - _discoveryCancelable = Context.System.Scheduler.ScheduleTellOnceCancelable( - delay: _settings.DiscoverySettings.DiscoveryRetryInterval, - receiver: Self, - message: DiscoverTick.Instance, - sender: Self); + if(_verboseLogging && _log.IsInfoEnabled) + _log.Info(fail.Cause, "Cluster client contact point service discovery phase failed, will try again."); + + Timers.StartSingleTimer(DiscoverTick.Instance, DiscoverTick.Instance, _discoveryRetryInterval); return true; default: @@ -216,8 +237,17 @@ async Task VerifyContacts() private Receive Active(Contact[] contacts) { + if(_verboseLogging && _log.IsDebugEnabled) + _log.Debug("Entering active state"); + + Timers.CancelAll(); + + // Setup cluster client initial contacts var currentSettings = _settings.WithInitialContacts(contacts.Select(c => c.Path).ToImmutableHashSet()); - var clusterClient = Context.System.ActorOf(ClusterClient.Props(currentSettings), "clusterClient"); + + var clusterClient = Context.System.ActorOf( + Props.Create(() => new ClusterClient(currentSettings)).WithDeploy(Deploy.Local), + $"cluster-client-{_targetActorSystemName}-{_receptionistName}-contact-discovery"); Context.Watch(clusterClient); Stash.UnstashAll(); @@ -228,6 +258,11 @@ private Receive Active(Contact[] contacts) case Terminated terminated: if (terminated.ActorRef.Equals(clusterClient)) { + if(_verboseLogging && _log.IsInfoEnabled) + _log.Info("Cluster client failed to reconnect to all receptionists, rediscovering."); + + // Kickoff discovery lookup + Self.Tell(DiscoverTick.Instance); Become(Discovering); } else diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientSettings.cs b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientSettings.cs index a76832cbc99..c1d70896446 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientSettings.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientSettings.cs @@ -73,7 +73,8 @@ public static ClusterClientSettings Create(Config config) config.GetBoolean("use-legacy-serialization"), config.GetBoolean("use-initial-contacts-discovery"), ClusterClientDiscoverySettings.Create(config), - reconnectTimeout); + reconnectTimeout, + config.GetBoolean("verbose-logging")); } /// @@ -129,6 +130,8 @@ public static ClusterClientSettings Create(Config config) public ClusterClientDiscoverySettings DiscoverySettings { get; } + public bool VerboseLogging { get; } + /// /// TBD /// @@ -221,7 +224,8 @@ public ClusterClientSettings( bool useLegacySerialization, bool useInitialContactsDiscovery, ClusterClientDiscoverySettings? discoverySettings = null, - TimeSpan? reconnectTimeout = null) + TimeSpan? reconnectTimeout = null, + bool verboseLogging = false) { if (bufferSize is < 0 or > 10000) { @@ -238,6 +242,7 @@ public ClusterClientSettings( UseLegacySerialization = useLegacySerialization; UseInitialContactDiscovery = useInitialContactsDiscovery; DiscoverySettings = discoverySettings ?? ClusterClientDiscoverySettings.Empty; + VerboseLogging = verboseLogging; } /// diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/reference.conf b/src/contrib/cluster/Akka.Cluster.Tools/Client/reference.conf index ed15c6bf2da..33af33d46db 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Client/reference.conf +++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/reference.conf @@ -107,6 +107,8 @@ akka.cluster.client { discovery-retry-interval = 1s discovery-timeout = 60s } + + verbose-logging = false } # //#cluster-client-config From 0ee26e2dd435102897e7c2160e01aa75d923460c Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Sun, 23 Jun 2024 20:36:12 +0700 Subject: [PATCH 4/9] Update API Approval list --- ...ec.ApproveClusterTools.DotNet.verified.txt | 33 +++++++++++++++++++ ...ISpec.ApproveClusterTools.Net.verified.txt | 33 +++++++++++++++++++ ...ISpec.ApproveDiscovery.DotNet.verified.txt | 2 ++ ...eAPISpec.ApproveDiscovery.Net.verified.txt | 2 ++ 4 files changed, 70 insertions(+) diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt index e8b13f02aac..bfbf5aad29c 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.DotNet.verified.txt @@ -41,6 +41,31 @@ namespace Akka.Cluster.Tools.Client public override int GetHashCode() { } } } + [System.Runtime.CompilerServices.NullableAttribute(0)] + public class ClusterClientDiscovery : Akka.Actor.UntypedActor, Akka.Actor.IActorStash, Akka.Actor.IWithTimers, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue + { + public ClusterClientDiscovery(Akka.Cluster.Tools.Client.ClusterClientSettings settings) { } + public Akka.Actor.IStash Stash { get; set; } + public Akka.Actor.ITimerScheduler Timers { get; set; } + protected override void OnReceive(object message) { } + protected override void PreStart() { } + } + [System.Runtime.CompilerServices.NullableAttribute(0)] + public sealed class ClusterClientDiscoverySettings : System.IEquatable + { + [System.Runtime.CompilerServices.NullableAttribute(1)] + public static readonly Akka.Cluster.Tools.Client.ClusterClientDiscoverySettings Empty; + public ClusterClientDiscoverySettings(string DiscoveryMethod, string ActorSystemName, string ServiceName, [System.Runtime.CompilerServices.NullableAttribute(1)] string ReceptionistName, string PortName, System.TimeSpan DiscoveryRetryInterval, System.TimeSpan DiscoveryTimeout) { } + public string ActorSystemName { get; set; } + public string DiscoveryMethod { get; set; } + public System.TimeSpan DiscoveryRetryInterval { get; set; } + public System.TimeSpan DiscoveryTimeout { get; set; } + public string PortName { get; set; } + [System.Runtime.CompilerServices.NullableAttribute(1)] + public string ReceptionistName { get; set; } + public string ServiceName { get; set; } + public static Akka.Cluster.Tools.Client.ClusterClientDiscoverySettings Create(Akka.Configuration.Config clusterClientConfig) { } + } public sealed class ClusterClientReceptionist : Akka.Actor.IExtension { public ClusterClientReceptionist(Akka.Actor.ExtendedActorSystem system) { } @@ -58,25 +83,33 @@ namespace Akka.Cluster.Tools.Client public ClusterClientReceptionistExtensionProvider() { } public override Akka.Cluster.Tools.Client.ClusterClientReceptionist CreateExtension(Akka.Actor.ExtendedActorSystem system) { } } + [System.Runtime.CompilerServices.NullableAttribute(0)] public sealed class ClusterClientSettings : Akka.Actor.INoSerializationVerificationNeeded { [System.ObsoleteAttribute("Use constructor with useLegacySerialization argument instead. Since 1.5.15")] public ClusterClientSettings(System.Collections.Immutable.IImmutableSet initialContacts, System.TimeSpan establishingGetContactsInterval, System.TimeSpan refreshContactsInterval, System.TimeSpan heartbeatInterval, System.TimeSpan acceptableHeartbeatPause, int bufferSize, System.Nullable reconnectTimeout = null) { } + [System.ObsoleteAttribute("Use constructor with useInitialContactsDiscovery and discoverySettings argument i" + + "nstead. Since 1.5.25")] public ClusterClientSettings(System.Collections.Immutable.IImmutableSet initialContacts, System.TimeSpan establishingGetContactsInterval, System.TimeSpan refreshContactsInterval, System.TimeSpan heartbeatInterval, System.TimeSpan acceptableHeartbeatPause, int bufferSize, bool useLegacySerialization, System.Nullable reconnectTimeout = null) { } + public ClusterClientSettings(System.Collections.Immutable.IImmutableSet initialContacts, System.TimeSpan establishingGetContactsInterval, System.TimeSpan refreshContactsInterval, System.TimeSpan heartbeatInterval, System.TimeSpan acceptableHeartbeatPause, int bufferSize, bool useLegacySerialization, bool useInitialContactsDiscovery, [System.Runtime.CompilerServices.NullableAttribute(2)] Akka.Cluster.Tools.Client.ClusterClientDiscoverySettings discoverySettings = null, System.Nullable reconnectTimeout = null, bool verboseLogging = False) { } public System.TimeSpan AcceptableHeartbeatPause { get; } public int BufferSize { get; } + public Akka.Cluster.Tools.Client.ClusterClientDiscoverySettings DiscoverySettings { get; } public System.TimeSpan EstablishingGetContactsInterval { get; } public System.TimeSpan HeartbeatInterval { get; } public System.Collections.Immutable.IImmutableSet InitialContacts { get; } public System.Nullable ReconnectTimeout { get; } public System.TimeSpan RefreshContactsInterval { get; } + public bool UseInitialContactDiscovery { get; } public bool UseLegacySerialization { get; } + public bool VerboseLogging { get; } public static Akka.Cluster.Tools.Client.ClusterClientSettings Create(Akka.Actor.ActorSystem system) { } public static Akka.Cluster.Tools.Client.ClusterClientSettings Create(Akka.Configuration.Config config) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithBufferSize(int bufferSize) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithEstablishingGetContactsInterval(System.TimeSpan value) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithHeartbeatInterval(System.TimeSpan value) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithInitialContacts(System.Collections.Immutable.IImmutableSet initialContacts) { } + public Akka.Cluster.Tools.Client.ClusterClientSettings WithInitialContactsDiscovery(bool useInitialContactsDiscovery, [System.Runtime.CompilerServices.NullableAttribute(2)] Akka.Cluster.Tools.Client.ClusterClientDiscoverySettings discoverySettings = null) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithReconnectTimeout(System.Nullable reconnectTimeout) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithRefreshContactsInterval(System.TimeSpan value) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithUseLegacySerialization(bool useLegacySerialization) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt index a25e701e6aa..284c0d053f8 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveClusterTools.Net.verified.txt @@ -41,6 +41,31 @@ namespace Akka.Cluster.Tools.Client public override int GetHashCode() { } } } + [System.Runtime.CompilerServices.NullableAttribute(0)] + public class ClusterClientDiscovery : Akka.Actor.UntypedActor, Akka.Actor.IActorStash, Akka.Actor.IWithTimers, Akka.Actor.IWithUnboundedStash, Akka.Actor.IWithUnrestrictedStash, Akka.Dispatch.IRequiresMessageQueue + { + public ClusterClientDiscovery(Akka.Cluster.Tools.Client.ClusterClientSettings settings) { } + public Akka.Actor.IStash Stash { get; set; } + public Akka.Actor.ITimerScheduler Timers { get; set; } + protected override void OnReceive(object message) { } + protected override void PreStart() { } + } + [System.Runtime.CompilerServices.NullableAttribute(0)] + public sealed class ClusterClientDiscoverySettings : System.IEquatable + { + [System.Runtime.CompilerServices.NullableAttribute(1)] + public static readonly Akka.Cluster.Tools.Client.ClusterClientDiscoverySettings Empty; + public ClusterClientDiscoverySettings(string DiscoveryMethod, string ActorSystemName, string ServiceName, [System.Runtime.CompilerServices.NullableAttribute(1)] string ReceptionistName, string PortName, System.TimeSpan DiscoveryRetryInterval, System.TimeSpan DiscoveryTimeout) { } + public string ActorSystemName { get; set; } + public string DiscoveryMethod { get; set; } + public System.TimeSpan DiscoveryRetryInterval { get; set; } + public System.TimeSpan DiscoveryTimeout { get; set; } + public string PortName { get; set; } + [System.Runtime.CompilerServices.NullableAttribute(1)] + public string ReceptionistName { get; set; } + public string ServiceName { get; set; } + public static Akka.Cluster.Tools.Client.ClusterClientDiscoverySettings Create(Akka.Configuration.Config clusterClientConfig) { } + } public sealed class ClusterClientReceptionist : Akka.Actor.IExtension { public ClusterClientReceptionist(Akka.Actor.ExtendedActorSystem system) { } @@ -58,25 +83,33 @@ namespace Akka.Cluster.Tools.Client public ClusterClientReceptionistExtensionProvider() { } public override Akka.Cluster.Tools.Client.ClusterClientReceptionist CreateExtension(Akka.Actor.ExtendedActorSystem system) { } } + [System.Runtime.CompilerServices.NullableAttribute(0)] public sealed class ClusterClientSettings : Akka.Actor.INoSerializationVerificationNeeded { [System.ObsoleteAttribute("Use constructor with useLegacySerialization argument instead. Since 1.5.15")] public ClusterClientSettings(System.Collections.Immutable.IImmutableSet initialContacts, System.TimeSpan establishingGetContactsInterval, System.TimeSpan refreshContactsInterval, System.TimeSpan heartbeatInterval, System.TimeSpan acceptableHeartbeatPause, int bufferSize, System.Nullable reconnectTimeout = null) { } + [System.ObsoleteAttribute("Use constructor with useInitialContactsDiscovery and discoverySettings argument i" + + "nstead. Since 1.5.25")] public ClusterClientSettings(System.Collections.Immutable.IImmutableSet initialContacts, System.TimeSpan establishingGetContactsInterval, System.TimeSpan refreshContactsInterval, System.TimeSpan heartbeatInterval, System.TimeSpan acceptableHeartbeatPause, int bufferSize, bool useLegacySerialization, System.Nullable reconnectTimeout = null) { } + public ClusterClientSettings(System.Collections.Immutable.IImmutableSet initialContacts, System.TimeSpan establishingGetContactsInterval, System.TimeSpan refreshContactsInterval, System.TimeSpan heartbeatInterval, System.TimeSpan acceptableHeartbeatPause, int bufferSize, bool useLegacySerialization, bool useInitialContactsDiscovery, [System.Runtime.CompilerServices.NullableAttribute(2)] Akka.Cluster.Tools.Client.ClusterClientDiscoverySettings discoverySettings = null, System.Nullable reconnectTimeout = null, bool verboseLogging = False) { } public System.TimeSpan AcceptableHeartbeatPause { get; } public int BufferSize { get; } + public Akka.Cluster.Tools.Client.ClusterClientDiscoverySettings DiscoverySettings { get; } public System.TimeSpan EstablishingGetContactsInterval { get; } public System.TimeSpan HeartbeatInterval { get; } public System.Collections.Immutable.IImmutableSet InitialContacts { get; } public System.Nullable ReconnectTimeout { get; } public System.TimeSpan RefreshContactsInterval { get; } + public bool UseInitialContactDiscovery { get; } public bool UseLegacySerialization { get; } + public bool VerboseLogging { get; } public static Akka.Cluster.Tools.Client.ClusterClientSettings Create(Akka.Actor.ActorSystem system) { } public static Akka.Cluster.Tools.Client.ClusterClientSettings Create(Akka.Configuration.Config config) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithBufferSize(int bufferSize) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithEstablishingGetContactsInterval(System.TimeSpan value) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithHeartbeatInterval(System.TimeSpan value) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithInitialContacts(System.Collections.Immutable.IImmutableSet initialContacts) { } + public Akka.Cluster.Tools.Client.ClusterClientSettings WithInitialContactsDiscovery(bool useInitialContactsDiscovery, [System.Runtime.CompilerServices.NullableAttribute(2)] Akka.Cluster.Tools.Client.ClusterClientDiscoverySettings discoverySettings = null) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithReconnectTimeout(System.Nullable reconnectTimeout) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithRefreshContactsInterval(System.TimeSpan value) { } public Akka.Cluster.Tools.Client.ClusterClientSettings WithUseLegacySerialization(bool useLegacySerialization) { } diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveDiscovery.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveDiscovery.DotNet.verified.txt index db85958f822..a413533fe1d 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveDiscovery.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveDiscovery.DotNet.verified.txt @@ -22,6 +22,8 @@ namespace Akka.Discovery.Config { public ConfigServiceDiscovery(Akka.Actor.ExtendedActorSystem system) { } public override System.Threading.Tasks.Task Lookup(Akka.Discovery.Lookup lookup, System.TimeSpan resolveTimeout) { } + public bool TryAddEndpoint(string serviceName, Akka.Discovery.ServiceDiscovery.ResolvedTarget target) { } + public bool TryRemoveEndpoint(string serviceName, Akka.Discovery.ServiceDiscovery.ResolvedTarget target) { } } [Akka.Annotations.InternalApiAttribute()] public class static ConfigServicesParser diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveDiscovery.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveDiscovery.Net.verified.txt index 2811d821db1..714ab62104d 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveDiscovery.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveDiscovery.Net.verified.txt @@ -22,6 +22,8 @@ namespace Akka.Discovery.Config { public ConfigServiceDiscovery(Akka.Actor.ExtendedActorSystem system) { } public override System.Threading.Tasks.Task Lookup(Akka.Discovery.Lookup lookup, System.TimeSpan resolveTimeout) { } + public bool TryAddEndpoint(string serviceName, Akka.Discovery.ServiceDiscovery.ResolvedTarget target) { } + public bool TryRemoveEndpoint(string serviceName, Akka.Discovery.ServiceDiscovery.ResolvedTarget target) { } } [Akka.Annotations.InternalApiAttribute()] public class static ConfigServicesParser From 73aff229f8b2a532441ec4e555752747390ebac4 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Sun, 23 Jun 2024 20:44:06 +0700 Subject: [PATCH 5/9] Simplify logic --- .../Client/ClusterClientDiscovery.cs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs index e62e445ca8b..21876293415 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs @@ -150,13 +150,6 @@ private static async Task ResolveContact(Contact contact, TimeSpa } } - private void Rediscover() - { - Become(Discovering); - _serviceDiscovery!.Lookup(_lookup, _discoveryTimeout) - .PipeTo(Self, Self, failure: cause => new DiscoveryFailure(cause)); - } - private bool Discovering(object message) { switch (message) @@ -165,7 +158,8 @@ private bool Discovering(object message) if(_verboseLogging && _log.IsDebugEnabled) _log.Debug("Discovering initial contacts"); - Rediscover(); + _serviceDiscovery!.Lookup(_lookup, _discoveryTimeout) + .PipeTo(Self, Self, failure: cause => new DiscoveryFailure(cause)); return true; case ServiceDiscovery.Resolved resolved: From 5fbd5c1a4b1f419e583f4583658e0ea4f820ef14 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Sun, 23 Jun 2024 20:52:12 +0700 Subject: [PATCH 6/9] Simplify cluster client actor name --- .../cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs index 21876293415..c8b55dbcb3a 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs @@ -241,7 +241,7 @@ private Receive Active(Contact[] contacts) var clusterClient = Context.System.ActorOf( Props.Create(() => new ClusterClient(currentSettings)).WithDeploy(Deploy.Local), - $"cluster-client-{_targetActorSystemName}-{_receptionistName}-contact-discovery"); + $"cluster-client-{_targetActorSystemName}-{_receptionistName}"); Context.Watch(clusterClient); Stash.UnstashAll(); From 590695ffba0d7fd284015cc595dda91e79749e06 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Sun, 23 Jun 2024 20:54:02 +0700 Subject: [PATCH 7/9] Remove cluster client name --- .../Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs index c8b55dbcb3a..688f89b43d3 100644 --- a/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs +++ b/src/contrib/cluster/Akka.Cluster.Tools/Client/ClusterClientDiscovery.cs @@ -239,9 +239,7 @@ private Receive Active(Contact[] contacts) // Setup cluster client initial contacts var currentSettings = _settings.WithInitialContacts(contacts.Select(c => c.Path).ToImmutableHashSet()); - var clusterClient = Context.System.ActorOf( - Props.Create(() => new ClusterClient(currentSettings)).WithDeploy(Deploy.Local), - $"cluster-client-{_targetActorSystemName}-{_receptionistName}"); + var clusterClient = Context.System.ActorOf(Props.Create(() => new ClusterClient(currentSettings)).WithDeploy(Deploy.Local)); Context.Watch(clusterClient); Stash.UnstashAll(); From 04702f3bfc413ca4929e34d43580a295458c53a5 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Sun, 23 Jun 2024 21:01:12 +0700 Subject: [PATCH 8/9] Mark new Discovery.Config methods as InternalStableApi --- src/core/Akka.Discovery/Config/ConfigServiceDiscovery.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/core/Akka.Discovery/Config/ConfigServiceDiscovery.cs b/src/core/Akka.Discovery/Config/ConfigServiceDiscovery.cs index 7ff03909c4c..22f3cf6e50f 100644 --- a/src/core/Akka.Discovery/Config/ConfigServiceDiscovery.cs +++ b/src/core/Akka.Discovery/Config/ConfigServiceDiscovery.cs @@ -85,6 +85,7 @@ public ConfigServiceDiscovery(ExtendedActorSystem system) _log.Debug($"Config discovery serving: {string.Join(", ", _resolvedServices.Values)}"); } + [InternalStableApi] public bool TryRemoveEndpoint(string serviceName, ResolvedTarget target) { if (!_resolvedServices.TryGetValue(serviceName, out var resolved)) @@ -107,6 +108,7 @@ public bool TryRemoveEndpoint(string serviceName, ResolvedTarget target) return true; } + [InternalStableApi] public bool TryAddEndpoint(string serviceName, ResolvedTarget target) { if (!_resolvedServices.TryGetValue(serviceName, out var resolved)) From 332e5c85ca101b21b4a5419b26906d4df979d712 Mon Sep 17 00:00:00 2001 From: Gregorius Soedharmo Date: Mon, 24 Jun 2024 21:07:12 +0700 Subject: [PATCH 9/9] Update API Approval list --- .../verify/CoreAPISpec.ApproveDiscovery.DotNet.verified.txt | 2 ++ .../verify/CoreAPISpec.ApproveDiscovery.Net.verified.txt | 2 ++ 2 files changed, 4 insertions(+) diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveDiscovery.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveDiscovery.DotNet.verified.txt index a413533fe1d..c9fc1b4eaa4 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveDiscovery.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveDiscovery.DotNet.verified.txt @@ -22,7 +22,9 @@ namespace Akka.Discovery.Config { public ConfigServiceDiscovery(Akka.Actor.ExtendedActorSystem system) { } public override System.Threading.Tasks.Task Lookup(Akka.Discovery.Lookup lookup, System.TimeSpan resolveTimeout) { } + [Akka.Annotations.InternalStableApiAttribute()] public bool TryAddEndpoint(string serviceName, Akka.Discovery.ServiceDiscovery.ResolvedTarget target) { } + [Akka.Annotations.InternalStableApiAttribute()] public bool TryRemoveEndpoint(string serviceName, Akka.Discovery.ServiceDiscovery.ResolvedTarget target) { } } [Akka.Annotations.InternalApiAttribute()] diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveDiscovery.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveDiscovery.Net.verified.txt index 714ab62104d..830caa28560 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveDiscovery.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveDiscovery.Net.verified.txt @@ -22,7 +22,9 @@ namespace Akka.Discovery.Config { public ConfigServiceDiscovery(Akka.Actor.ExtendedActorSystem system) { } public override System.Threading.Tasks.Task Lookup(Akka.Discovery.Lookup lookup, System.TimeSpan resolveTimeout) { } + [Akka.Annotations.InternalStableApiAttribute()] public bool TryAddEndpoint(string serviceName, Akka.Discovery.ServiceDiscovery.ResolvedTarget target) { } + [Akka.Annotations.InternalStableApiAttribute()] public bool TryRemoveEndpoint(string serviceName, Akka.Discovery.ServiceDiscovery.ResolvedTarget target) { } } [Akka.Annotations.InternalApiAttribute()]