Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ClusterClient initial contact discovery feature #7261

Merged
merged 10 commits into from
Jun 24, 2024

Conversation

Arkatufus
Copy link
Contributor

@Arkatufus Arkatufus commented Jun 23, 2024

Fixes #7243

This PR is superior compared to #7260, no underlying logic inside ClusterClient actor were changed at all.

Changes

  • Add wrapper discovery actor
  • Add HOCON settings
  • Add new discovery settings
  • Add unit test

Checklist

For significant changes, please ensure that the following have been completed (delete if not relevant):

Copy link
Contributor Author

@Arkatufus Arkatufus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self-review

#nullable enable
namespace Akka.Cluster.Tools.Client;

public class ClusterClientDiscovery: UntypedActor, IWithUnboundedStash, IWithTimers
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All new discovery logic are isolated into this new actor

return new RootActorPath(address) / "system" / _discoverySettings.ReceptionistName;
}

private static async Task<ResolveResult> ResolveContact(Contact contact, TimeSpan timeout, CancellationToken ct)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method mimics ActorSelection.ResolveOne() but instead of throwing on failure, it wraps the result in an envelope instead.

Self.Tell(DiscoverTick.Instance);
}

private ActorPath ResolvedTargetToReceptionistActorPath(ServiceDiscovery.ResolvedTarget target)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method converts ServiceDiscovery.ResolvedTarget to remote ActorPath

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

_targetActorSystemName = _discoverySettings.ActorSystemName!;
}

_transportProtocol = ((ExtendedActorSystem)Context.System).Provider.DefaultAddress.Protocol;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Best case guess on what transport protocol we're using right now. Should handle custom protocols, such as the one used in Remote.TestKit

Comment on lines +157 to +162
case DiscoverTick:
if(_verboseLogging && _log.IsDebugEnabled)
_log.Debug("Discovering initial contacts");

_serviceDiscovery!.Lookup(_lookup, _discoveryTimeout)
.PipeTo(Self, Self, failure: cause => new DiscoveryFailure(cause));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actual Akka.Discovery is done here.

Comment on lines +251 to +259
if (terminated.ActorRef.Equals(clusterClient))
{
if(_verboseLogging && _log.IsInfoEnabled)
_log.Info("Cluster client failed to reconnect to all receptionists, rediscovering.");

// Kickoff discovery lookup
Self.Tell(DiscoverTick.Instance);
Become(Discovering);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If cluster client dies, return to Discovering state

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this proxying guarantees that the caller's code doesn't get disrupted if we can to kill / recreate the underlying ClusterClient - LGTM.

Comment on lines +98 to +111
use-initial-contacts-discovery = false

discovery
{
method = <method>
actor-system-name = null
receptionist-name = receptionist
service-name = null
port-name = null
discovery-retry-interval = 1s
discovery-timeout = 60s
}

verbose-logging = false
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New discovery settings.

If method is null, empty, whitespace, or "" (not set), use the discovery method declared in "akka.discovery.method". If that also isn't set, use "config" method as default fallback. Warn the user when these happened.

If actor-system-name is null or whitespace, use the current actor system name instead. Warn the user when these happened.

}

[InternalStableApi]
public bool TryRemoveEndpoint(string serviceName, ResolvedTarget target)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New Discovery.Config method for MNTR testing

}

[InternalStableApi]
public bool TryAddEndpoint(string serviceName, ResolvedTarget target)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New Discovery.Config method for MNTR testing

Comment on lines +217 to +228
public ClusterClientSettings(
IImmutableSet<ActorPath> initialContacts,
TimeSpan establishingGetContactsInterval,
TimeSpan refreshContactsInterval,
TimeSpan heartbeatInterval,
TimeSpan acceptableHeartbeatPause,
int bufferSize,
bool useLegacySerialization,
bool useInitialContactsDiscovery,
ClusterClientDiscoverySettings? discoverySettings = null,
TimeSpan? reconnectTimeout = null,
bool verboseLogging = false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New ClusterClientSettings ctor, backward compatible.

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

akka.remote.dot-netty.tcp.hostname = localhost
akka.actor.provider = cluster
akka.remote.log-remote-lifecycle-events = off
akka.cluster.client {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we disabling heartbeats on purpose here? 1d is an eternity in the context of this spec.

cluster.client {
heartbeat-interval = 1s
acceptable-heartbeat-pause = 2s
use-initial-contacts-discovery = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New setting, got it

discovery
{
method = config
config.services.test-cluster.endpoints = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this gets populated later using the new internal methods you added for dynamically updating config-based discovery

_discoveryService =
(ConfigServiceDiscovery)Discovery.Discovery.Get(Sys).LoadServiceDiscovery("config");
var address = GetAddress(_config.First);
_discoveryService.TryAddEndpoint("test-cluster", new ServiceDiscovery.ResolvedTarget(address.Host, address.Port));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Populates the discovery list here

{
RunOn(() =>
{
Cluster.Get(Sys).Leave(Node(_config.First).Address);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entire cluster goes down here, gracefully

Self.Tell(DiscoverTick.Instance);
}

private ActorPath ResolvedTargetToReceptionistActorPath(ServiceDiscovery.ResolvedTarget target)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Comment on lines +251 to +259
if (terminated.ActorRef.Equals(clusterClient))
{
if(_verboseLogging && _log.IsInfoEnabled)
_log.Info("Cluster client failed to reconnect to all receptionists, rediscovering.");

// Kickoff discovery lookup
Self.Tell(DiscoverTick.Instance);
Become(Discovering);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this proxying guarantees that the caller's code doesn't get disrupted if we can to kill / recreate the underlying ClusterClient - LGTM.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@Aaronontheweb Aaronontheweb merged commit 8fd8c62 into akkadotnet:dev Jun 24, 2024
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Allow Akka.Discovery to be used with Akka.Cluster.Tools.ClusterClient
2 participants