Skip to content

Commit

Permalink
close #1586
Browse files Browse the repository at this point in the history
brings Akka.Remote up to code with latest JVM stable
converted all Akka.Remote system actors to ReceiveActor
fixed race condition in Akka.Remote.Tests.Performance
  • Loading branch information
Aaronontheweb committed Dec 31, 2015
1 parent 2eacc39 commit 193a62e
Show file tree
Hide file tree
Showing 20 changed files with 994 additions and 883 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@
<ItemGroup>
<None Include="packages.config" />
</ItemGroup>
<ItemGroup>
<Service Include="{82A7F48D-3B50-4B1E-B82E-3ADA8210C358}" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,21 @@ private class BenchmarkActorRef : MinimalActorRef
{
private readonly Counter _counter;

public BenchmarkActorRef(Counter counter)
public BenchmarkActorRef(Counter counter, RemoteActorRefProvider provider)
{
_counter = counter;
Provider = provider;
Path = new RootActorPath(Provider.DefaultAddress) / "user" / "tempRef";
}

protected override void TellInternal(object message, IActorRef sender)
{
_counter.Increment();
}

public override ActorPath Path { get { return null; } }
public override IActorRefProvider Provider { get { return null; } }
public override ActorPath Path { get; }

public override IActorRefProvider Provider { get; }
}

private static readonly Config RemoteHocon = ConfigurationFactory.ParseString(@"
Expand Down Expand Up @@ -76,7 +79,7 @@ public void Setup(BenchmarkContext context)
_inboundMessageDispatcherCounter = context.GetCounter(MessageDispatcherThroughputCounterName);
_message = SerializedMessage.CreateBuilder().SetSerializerId(0).SetMessage(ByteString.CopyFromUtf8("foo")).Build();
_dispatcher = new DefaultMessageDispatcher(_actorSystem, RARP.For(_actorSystem).Provider, _actorSystem.Log);
_targetActorRef = new BenchmarkActorRef(_inboundMessageDispatcherCounter);
_targetActorRef = new BenchmarkActorRef(_inboundMessageDispatcherCounter, RARP.For(_actorSystem).Provider);
}

[PerfBenchmark(Description = "Tests the performance of the Default", RunMode = RunMode.Throughput, NumberOfIterations = 13, TestMode = TestMode.Measurement)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ public void Setup(BenchmarkContext context)
var system1EchoActorPath = new RootActorPath(system1Address) / "user" / "echo";
var system2RemoteActorPath = new RootActorPath(system2Address) / "user" / "benchmark";

_remoteReceiver = System1.ActorSelection(system2RemoteActorPath).ResolveOne(TimeSpan.FromSeconds(2)).Result;
// set the timeout high here to avoid timeouts
// TL;DR; - on slow machines it can take longer than 2 seconds to form the association, do the handshake, and reply back
// using the in-memory transport.
_remoteReceiver = System1.ActorSelection(system2RemoteActorPath).ResolveOne(TimeSpan.FromSeconds(30)).Result;
_remoteEcho =
System2.ActorSelection(system1EchoActorPath).ResolveOne(TimeSpan.FromSeconds(2)).Result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,23 @@ public override string CreateRegistryKey()
public override Config CreateActorSystemConfig(string actorSystemName, string ipOrHostname, int port, string registryKey = null)
{
var baseConfig = ConfigurationFactory.ParseString(@"
akka {
actor.provider = ""Akka.Remote.RemoteActorRefProvider,Akka.Remote""
remote {
log-remote-lifecycle-events = off
enabled-transports = [
""akka.remote.test"",
]
test {
transport-class = ""Akka.Remote.Transport.TestTransport,Akka.Remote""
applied-adapters = []
maximum-payload-bytes = 128000b
scheme-identifier = test
akka {
actor.provider = ""Akka.Remote.RemoteActorRefProvider,Akka.Remote""
remote {
log-remote-lifecycle-events = off
enabled-transports = [
""akka.remote.test"",
]
test {
transport-class = ""Akka.Remote.Transport.TestTransport,Akka.Remote""
applied-adapters = []
maximum-payload-bytes = 128000b
scheme-identifier = test
}
}
}
}
");

port = 10; //BUG: setting the port to 0 causes the DefaultAddress to report the port as -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,27 @@ public override Config CreateActorSystemConfig(string actorSystemName, string ip
{
var baseConfig = ConfigurationFactory.ParseString(@"
akka {
actor.provider = ""Akka.Remote.RemoteActorRefProvider,Akka.Remote""
remote {
log-remote-lifecycle-events = off
enabled-transports = [
""akka.remote.test"",
]
test {
transport-class = ""Akka.Remote.Transport.TestTransport,Akka.Remote""
applied-adapters = []
maximum-payload-bytes = 128000b
scheme-identifier = test
loglevel = ""WARNING""
stdout-loglevel = ""WARNING""
actor.provider = ""Akka.Remote.RemoteActorRefProvider,Akka.Remote""
remote {
log-received-messages = off
log-sent-messages = off
log-remote-lifecycle-events = off
enabled-transports = [
""akka.remote.test"",
]
test {
transport-class = ""Akka.Remote.Transport.TestTransport,Akka.Remote""
applied-adapters = []
maximum-payload-bytes = 128000b
scheme-identifier = test
}
}
}
}
");

port = 10; //BUG: setting the port to 0 causes the DefaultAddress to report the port as -1
Expand Down
24 changes: 12 additions & 12 deletions src/core/Akka.Remote.Tests/EndpointRegistrySpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void EndpointRegistry_must_be_able_to_register_a_writeable_endpoint_and_p
var reg = new EndpointRegistry();
Assert.Null(reg.WritableEndpointWithPolicyFor(address1));

Assert.Equal(actorA, reg.RegisterWritableEndpoint(address1, actorA));
Assert.Equal(actorA, reg.RegisterWritableEndpoint(address1, actorA,null,null));

Assert.IsType<EndpointManager.Pass>(reg.WritableEndpointWithPolicyFor(address1));
Assert.Equal(actorA, reg.WritableEndpointWithPolicyFor(address1).AsInstanceOf<EndpointManager.Pass>().Endpoint);
Expand All @@ -52,8 +52,8 @@ public void EndpointRegistry_must_be_able_to_register_a_readonly_endpoint()
var reg = new EndpointRegistry();
Assert.Null(reg.ReadOnlyEndpointFor(address1));

Assert.Equal(actorA, reg.RegisterReadOnlyEndpoint(address1, actorA));
Assert.Equal(actorA, reg.ReadOnlyEndpointFor(address1));
Assert.Equal(actorA, reg.RegisterReadOnlyEndpoint(address1, actorA, 0));
Assert.Equal(Tuple.Create(actorA, 0), reg.ReadOnlyEndpointFor(address1));
Assert.Null(reg.WritableEndpointWithPolicyFor(address1));
Assert.False(reg.IsWritable(actorA));
Assert.True(reg.IsReadOnly(actorA));
Expand All @@ -67,10 +67,10 @@ public void EndpointRegistry_must_be_able_to_register_writable_and_readonly_endp
Assert.Null(reg.ReadOnlyEndpointFor(address1));
Assert.Null(reg.WritableEndpointWithPolicyFor(address1));

Assert.Equal(actorA, reg.RegisterReadOnlyEndpoint(address1, actorA));
Assert.Equal(actorB, reg.RegisterWritableEndpoint(address1, actorB));
Assert.Equal(actorA, reg.RegisterReadOnlyEndpoint(address1, actorA, 1));
Assert.Equal(actorB, reg.RegisterWritableEndpoint(address1, actorB, null,null));

Assert.Equal(actorA, reg.ReadOnlyEndpointFor(address1));
Assert.Equal(Tuple.Create(actorA,1), reg.ReadOnlyEndpointFor(address1));
Assert.Equal(actorB, reg.WritableEndpointWithPolicyFor(address1).AsInstanceOf<EndpointManager.Pass>().Endpoint);

Assert.False(reg.IsWritable(actorA));
Expand All @@ -85,7 +85,7 @@ public void EndpointRegistry_must_be_able_to_register_Gated_policy_for_an_addres
{
var reg = new EndpointRegistry();
Assert.Null(reg.WritableEndpointWithPolicyFor(address1));
reg.RegisterWritableEndpoint(address1, actorA);
reg.RegisterWritableEndpoint(address1, actorA, null, null);
var deadline = Deadline.Now;
reg.MarkAsFailed(actorA, deadline);
Assert.Equal(deadline, reg.WritableEndpointWithPolicyFor(address1).AsInstanceOf<EndpointManager.Gated>().TimeOfRelease);
Expand All @@ -97,7 +97,7 @@ public void EndpointRegistry_must_be_able_to_register_Gated_policy_for_an_addres
public void EndpointRegistry_must_remove_readonly_endpoints_if_marked_as_failed()
{
var reg = new EndpointRegistry();
reg.RegisterReadOnlyEndpoint(address1, actorA);
reg.RegisterReadOnlyEndpoint(address1, actorA, 2);
reg.MarkAsFailed(actorA, Deadline.Now);
Assert.Null(reg.ReadOnlyEndpointFor(address1));
}
Expand All @@ -106,8 +106,8 @@ public void EndpointRegistry_must_remove_readonly_endpoints_if_marked_as_failed(
public void EndpointRegistry_must_keep_tombstones_when_removing_an_endpoint()
{
var reg = new EndpointRegistry();
reg.RegisterWritableEndpoint(address1, actorA);
reg.RegisterWritableEndpoint(address2, actorB);
reg.RegisterWritableEndpoint(address1, actorA, null, null);
reg.RegisterWritableEndpoint(address2, actorB, null, null);
var deadline = Deadline.Now;
reg.MarkAsFailed(actorA, deadline);
reg.MarkAsQuarantined(address2, 42, deadline);
Expand All @@ -124,8 +124,8 @@ public void EndpointRegistry_must_keep_tombstones_when_removing_an_endpoint()
public void EndpointRegistry_should_prune_outdated_Gated_directives_properly()
{
var reg = new EndpointRegistry();
reg.RegisterWritableEndpoint(address1, actorA);
reg.RegisterWritableEndpoint(address2, actorB);
reg.RegisterWritableEndpoint(address1, actorA, null, null);
reg.RegisterWritableEndpoint(address2, actorB, null, null);
reg.MarkAsFailed(actorA, Deadline.Now);
var farIntheFuture = Deadline.Now + TimeSpan.FromSeconds(60);
reg.MarkAsFailed(actorB, farIntheFuture);
Expand Down
9 changes: 7 additions & 2 deletions src/core/Akka.Remote.Tests/RemoteConfigSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ public void Remoting_should_contain_correct_configuration_values_in_ReferenceCon
Assert.Equal(TimeSpan.FromSeconds(2), remoteSettings.FlushWait);
Assert.Equal(TimeSpan.FromSeconds(10), remoteSettings.StartupTimeout);
Assert.Equal(TimeSpan.FromSeconds(5), remoteSettings.RetryGateClosedFor);
//Assert.Equal("akka.remote.default-remote-dispatcher", remoteSettings.Dispatcher); //TODO: add RemoteDispatcher support
Assert.Equal("akka.remote.default-remote-dispatcher", remoteSettings.Dispatcher);
Assert.True(remoteSettings.UsePassiveConnections);
Assert.Equal(TimeSpan.FromMilliseconds(50), remoteSettings.BackoffPeriod);
Assert.Equal(TimeSpan.FromSeconds(0.3d), remoteSettings.SysMsgAckTimeout);
Assert.Equal(TimeSpan.FromSeconds(2), remoteSettings.SysResendTimeout);
Assert.Equal(1000, remoteSettings.SysMsgBufferSize);
Assert.Equal(20000, remoteSettings.SysMsgBufferSize);
Assert.Equal(TimeSpan.FromMinutes(3), remoteSettings.InitialSysMsgDeliveryTimeout);
Assert.Equal(TimeSpan.FromDays(5), remoteSettings.QuarantineDuration);
Assert.Equal(TimeSpan.FromDays(5), remoteSettings.QuarantineSilentSystemTimeout);
Assert.Equal(TimeSpan.FromSeconds(30), remoteSettings.CommandAckTimeout);
Assert.Equal(1, remoteSettings.Transports.Length);
Assert.Equal(typeof(HeliosTcpTransport), Type.GetType(remoteSettings.Transports.Head().TransportClass));
Expand All @@ -58,6 +59,10 @@ public void Remoting_should_contain_correct_configuration_values_in_ReferenceCon
Assert.Equal(TimeSpan.FromMilliseconds(100), remoteSettings.WatchFailureDetectorConfig.GetTimeSpan("min-std-deviation"));

//TODO add adapter support

// TODO add WatchFailureDetectorConfig assertions

remoteSettings.Config.GetString("akka.remote.log-frame-size-exceeding").ShouldBe("off");
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ public static Config ThrottlerTransportAdapterSpecConfig
{
return ConfigurationFactory.ParseString(@"
akka {
akka.test.single-expect-default = 6s #to help overcome issues with gated connections
loglevel = ""DEBUG""
stdout-loglevel = ""DEBUG""
test.single-expect-default = 6s #to help overcome issues with gated connections
actor.provider = ""Akka.Remote.RemoteActorRefProvider, Akka.Remote""
remote.helios.tcp.hostname = ""localhost""
remote.log-remote-lifecycle-events = off
Expand Down Expand Up @@ -90,7 +92,7 @@ public Lost(string msg)
Msg = msg;
}

public string Msg { get; private set; }
public string Msg { get; }

public bool Equals(Lost other)
{
Expand All @@ -108,7 +110,12 @@ public override bool Equals(object obj)

public override int GetHashCode()
{
return (Msg != null ? Msg.GetHashCode() : 0);
return Msg?.GetHashCode() ?? 0;
}

public override string ToString()
{
return GetType() + ": " + Msg;
}
}
}
Expand Down Expand Up @@ -200,7 +207,7 @@ public void ThrottlerTransportAdapter_must_survive_blackholing()
here.Tell(new ThrottlingTester.Lost("BlackHole 2"));
ExpectNoMsg(TimeSpan.FromSeconds(1));
Disassociate().ShouldBeTrue();
ExpectNoMsg(TimeSpan.FromSeconds(3));
ExpectNoMsg(TimeSpan.FromSeconds(1));

Throttle(ThrottleTransportAdapter.Direction.Both, Unthrottled.Instance).ShouldBeTrue();

Expand Down
25 changes: 24 additions & 1 deletion src/core/Akka.Remote/Configuration/Remote.conf
Original file line number Diff line number Diff line change
Expand Up @@ -237,10 +237,25 @@ akka {
# the affected systems after lifting the quarantine is undefined.
prune-quarantine-marker-after = 5 d

# If system messages have been exchanged between two systems (i.e. remote death
# watch or remote deployment has been used) a remote system will be marked as
# quarantined after the two system has no active association, and no
# communication happens during the time configured here.
# The only purpose of this setting is to avoid storing system message redelivery
# data (sequence number state, etc.) for an undefined amount of time leading to long
# term memory leak. Instead, if a system has been gone for this period,
# or more exactly
# - there is no association between the two systems (TCP connection, if TCP transport is used)
# - neither side has been attempting to communicate with the other
# - there are no pending system messages to deliver
# for the amount of time configured here, the remote system will be quarantined and all state
# associated with it will be dropped.
quarantine-after-silence = 5 d

# This setting defines the maximum number of unacknowledged system messages
# allowed for a remote system. If this limit is reached the remote system is
# declared to be dead and its UID marked as tainted.
system-message-buffer-size = 1000
system-message-buffer-size = 20000

# This setting defines the maximum idle time after an individual
# acknowledgement for system messages is sent. System message delivery
Expand All @@ -257,6 +272,14 @@ akka {
# resent.
resend-interval = 2 s

# Maximum number of unacknowledged system messages that will be resent
# each 'resend-interval'. If you watch many (> 1000) remote actors you can
# increase this value to for example 600, but a too large limit (e.g. 10000)
# may flood the connection and might cause false failure detection to trigger.
# Test such a configuration by watching all actors at the same time and stop
# all watched actors at the same time.
resend-limit = 200

# WARNING: this setting should not be not changed unless all of its consequences
# are properly understood which assumes experience with remoting internals
# or expert advice.
Expand Down
Loading

0 comments on commit 193a62e

Please sign in to comment.