Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka.Remote EndpointWriter backoff bugfix #1777

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1928,6 +1928,7 @@ namespace Akka.Configuration
{
public class Config
{
public static readonly Akka.Configuration.Config Empty;
public Config() { }
public Config(Akka.Configuration.Hocon.HoconRoot root) { }
public Config(Akka.Configuration.Config source, Akka.Configuration.Config fallback) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@
<Error Condition="!Exists('$(SolutionDir)\.nuget\NuGet.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\.nuget\NuGet.targets'))" />
<Error Condition="!Exists('..\..\packages\xunit.core.2.0.0\build\portable-net45+win+wpa81+wp80+monotouch+monoandroid+Xamarin.iOS\xunit.core.props')" Text="$([System.String]::Format('$(ErrorText)', '..\..\packages\xunit.core.2.0.0\build\portable-net45+win+wpa81+wp80+monotouch+monoandroid+Xamarin.iOS\xunit.core.props'))" />
</Target>
<PropertyGroup>
<PreBuildEvent>
</PreBuildEvent>
</PropertyGroup>
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild">
Expand Down
5 changes: 5 additions & 0 deletions src/core/Akka.MultiNodeTestRunner/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ static void Main(string[] args)
//Block until all Sinks have been terminated.
TestRunSystem.WhenTerminated.Wait(TimeSpan.FromMinutes(1));

if (Debugger.IsAttached)
{
Console.ReadLine(); //block when debugging
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this to make it easier to debug multi-node tests when launching them directly from visual studio.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also use System.Diagnostics.Debugger.Break()

}

//Return the proper exit code
Environment.Exit(ExitCodeContainer.ExitCode);
}
Expand Down
35 changes: 22 additions & 13 deletions src/core/Akka.Remote.TestKit/Conductor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
Expand Down Expand Up @@ -65,17 +66,23 @@ public IActorRef Controller
public async Task<INode> StartController(int participants, RoleName name, INode controllerPort)
{
if(_controller != null) throw new IllegalStateException("TestConductorServer was already started");
_controller = _system.ActorOf(new Props(typeof (Controller), new object[] {participants, controllerPort}),
"controller");
_controller = _system.ActorOf(Props.Create(() => new Controller(participants, controllerPort)),
"controller");
//TODO: Need to review this async stuff
var node = await _controller.Ask<INode>(TestKit.Controller.GetSockAddr.Instance).ConfigureAwait(false);
var node = await _controller.Ask<INode>(TestKit.Controller.GetSockAddr.Instance, Settings.QueryTimeout).ConfigureAwait(false);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a lot of the same changes everywhere - every Ask operation used by the internals of the MultiNodeTestkit (such as the Player, Controller, Conductor, etc...) all have a configurable timeout.

await StartClient(name, node).ConfigureAwait(false);
return node;
}

/// <summary>
/// Obtain the port to which the controller’s socket is actually bound. This
/// will deviate from the configuration in `akka.testconductor.port` in case
/// that was given as zero.
/// </summary>
/// <returns>The address of the controller's socket endpoint</returns>
public Task<INode> SockAddr()
{
return _controller.Ask<INode>(TestKit.Controller.GetSockAddr.Instance);
return Controller.Ask<INode>(TestKit.Controller.GetSockAddr.Instance, Settings.QueryTimeout);
}

/// <summary>
Expand Down Expand Up @@ -104,7 +111,7 @@ public Task<Done> Throttle(RoleName node, RoleName target, ThrottleTransportAdap
float rateMBit)
{
RequireTestConductorTransport();
return Controller.Ask<Done>(new Throttle(node, target, direction, rateMBit));
return Controller.Ask<Done>(new Throttle(node, target, direction, rateMBit), Settings.QueryTimeout);
}

/// <summary>
Expand All @@ -128,9 +135,10 @@ public Task<Done> Blackhole(RoleName node, RoleName target, ThrottleTransportAda

private void RequireTestConductorTransport()
{
// Verifies that the Throttle and FailureInjector TransportAdapters are active
if(!Transport.DefaultAddress.Protocol.Contains(".trttl.gremlin."))
throw new ConfigurationException("To use this feature you must activate the failure injector adapters " +
"(trttl, gremlin) by specifying `testTransport(on = true)` in your MultiNodeConfig.");
"(trttl, gremlin) by specifying `TestTransport(on = true)` in your MultiNodeConfig.");
}

/// <summary>
Expand Down Expand Up @@ -160,7 +168,7 @@ public Task<Done> PassThrough(RoleName node, RoleName target, ThrottleTransportA
/// <returns></returns>
public Task<Done> Disconnect(RoleName node, RoleName target)
{
return Controller.Ask<Done>(new Disconnect(node, target, false));
return Controller.Ask<Done>(new Disconnect(node, target, false), Settings.QueryTimeout);
}

/// <summary>
Expand All @@ -173,7 +181,7 @@ public Task<Done> Disconnect(RoleName node, RoleName target)
/// <returns></returns>
public Task<Done> Abort(RoleName node, RoleName target)
{
return Controller.Ask<Done>(new Disconnect(node, target, true));
return Controller.Ask<Done>(new Disconnect(node, target, true), Settings.QueryTimeout);
}

/// <summary>
Expand All @@ -187,7 +195,7 @@ public Task<Done> Exit(RoleName node, int exitValue)
{
// the recover is needed to handle ClientDisconnectedException exception,
// which is normal during shutdown
return Controller.Ask(new Terminate(node, new Right<bool, int>(exitValue))).ContinueWith(t =>
return Controller.Ask(new Terminate(node, new Right<bool, int>(exitValue)), Settings.QueryTimeout).ContinueWith(t =>
{
if(t.Result is Done) return Done.Instance;
var failure = t.Result as FSMBase.Failure;
Expand All @@ -209,7 +217,7 @@ public Task<Done> Shutdown(RoleName node, bool abort = false)
{
// the recover is needed to handle ClientDisconnectedException exception,
// which is normal during shutdown
return Controller.Ask(new Terminate(node, new Left<bool, int>(abort))).ContinueWith(t =>
return Controller.Ask(new Terminate(node, new Left<bool, int>(abort)), Settings.QueryTimeout).ContinueWith(t =>
{
if (t.Result is Done) return Done.Instance;
var failure = t.Result as FSMBase.Failure;
Expand All @@ -224,7 +232,7 @@ public Task<Done> Shutdown(RoleName node, bool abort = false)
/// </summary>
public Task<IEnumerable<RoleName>> GetNodes()
{
return Controller.Ask<IEnumerable<RoleName>>(TestKit.Controller.GetNodes.Instance);
return Controller.Ask<IEnumerable<RoleName>>(TestKit.Controller.GetNodes.Instance, Settings.QueryTimeout);
}

/// <summary>
Expand All @@ -237,7 +245,7 @@ public Task<IEnumerable<RoleName>> GetNodes()
/// <returns></returns>
public Task<Done> RemoveNode(RoleName node)
{
return Controller.Ask<Done>(new Remove(node));
return Controller.Ask<Done>(new Remove(node), Settings.QueryTimeout);
}
}

Expand All @@ -262,7 +270,8 @@ public ConductorHandler(IActorRef controller, ILoggingAdapter log)
public async void OnConnect(INode remoteAddress, IConnection responseChannel)
{
_log.Debug("connection from {0}", responseChannel.RemoteHost);
//TODO: Seems wrong to create new RemoteConnection here

// Duration of this Ask operation needs to be infinite
var fsm = await _controller.Ask<IActorRef>(new Controller.CreateServerFSM(new RemoteConnection(responseChannel, this)), TimeSpan.FromMilliseconds(Int32.MaxValue));
_clients.AddOrUpdate(responseChannel, fsm, (connection, @ref) => fsm);
}
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Remote.TestKit/Internals/Reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ akka {
pool-size-factor = 1.0

# Max number of threads to cap factor-based number to
pool-size-max = 2
pool-size-max = 1
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The internal Helios connection used in the MNTR for things like the shared synchronization barriers now run on one thread - probably doesn't impact much since messaging guarantees aren't as important here but I thought it best to be consistent.

}

# (I&O) Used to configure the number of I/O worker threads on client sockets
Expand All @@ -58,7 +58,7 @@ akka {
pool-size-factor = 1.0

# Max number of threads to cap factor-based number to
pool-size-max = 2
pool-size-max = 1
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/core/Akka.Remote.TestKit/MsgEncoder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,15 @@ public void Encode(IConnection connection, object message, out List<IByteBuf> en
.SetOp(BarrierOp.Fail)))
.With<ThrottleMsg>(
throttle =>
{
w.SetFailure(
InjectFailure.CreateBuilder()
.SetFailure(TCP.FailType.Throttle)
.SetAddress(Address2Proto(throttle.Target))
.SetFailure(TCP.FailType.Throttle)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixes some bugs with using throttling inside the MNTR - we weren't encoding all of the fields into the protobuf message correctly before.

.SetDirection(Direction2Proto(throttle.Direction))
.SetRateMBit(throttle.RateMBit)))
.SetRateMBit(throttle.RateMBit));
})
.With<DisconnectMsg>(
disconnect =>
w.SetFailure(
Expand Down
6 changes: 5 additions & 1 deletion src/core/Akka.Remote.TestKit/MultiNodeSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ namespace Akka.Remote.TestKit
/// </summary>
public abstract class MultiNodeConfig
{
Config _commonConf = null;
// allows us to avoid NullReferenceExceptions if we make this empty rather than null
// so that way if a MultiNodeConfig doesn't explicitly set CommonConfig to some value
// it will remain safe by defaut
Config _commonConf = Akka.Configuration.Config.Empty;

ImmutableDictionary<RoleName, Config> _nodeConf = ImmutableDictionary.Create<RoleName, Config>();
ImmutableList<RoleName> _roles = ImmutableList.Create<RoleName>();
ImmutableDictionary<RoleName, ImmutableList<string>> _deployments = ImmutableDictionary.Create<RoleName, ImmutableList<string>>();
Expand Down
21 changes: 12 additions & 9 deletions src/core/Akka.Remote.TestKit/Player.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@ public IActorRef Client
public Task<Done> StartClient(RoleName name, INode controllerAddr)
{
if(_client != null) throw new IllegalStateException("TestConductorClient already started");
_client =
_system.ActorOf(new Props(typeof (ClientFSM),
new object[] {name, controllerAddr}), "TestConductorClient");

_client =
_system.ActorOf(Props.Create(() => new ClientFSM(name, controllerAddr)), "TestConductorClient");

//TODO: IRequiresMessageQueue
var a = _system.ActorOf(Props.Create<WaitForClientFSMToConnect>());

Expand Down Expand Up @@ -132,22 +131,26 @@ public void Enter(TimeSpan timeout, ImmutableList<string> names)
try
{
var askTimeout = barrierTimeout + Settings.QueryTimeout;
//TODO: Wait?
// Need to force barrier to wait here, so we can pass along a "fail barrier" message in the event
// of a failed operation
_client.Ask(new ToServer<EnterBarrier>(new EnterBarrier(name, barrierTimeout)), askTimeout).Wait();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a wait here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wait is correct - have to wait for the barrier to ACK before we can proceed. Has a timeout though, so it'll only wait up to 3-5 seconds IIRC.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't sure if this was in an async method or not

}
catch (OperationCanceledException)
catch (AggregateException)
{
_client.Tell(new ToServer<FailBarrier>(new FailBarrier(name)));
throw new TimeoutException("Client timed out while waiting for barrier " + name);
}
catch (OperationCanceledException)
{
_system.Log.Debug("OperationCanceledException was thrown instead of AggregateException");
}
_system.Log.Debug("passed barrier {0}", name);
}
}

public Task<Address> GetAddressFor(RoleName name)
{
//TODO: QueryTimeout implicit?
return _client.Ask<Address>(new ToServer<GetAddress>(new GetAddress(name)));
return _client.Ask<Address>(new ToServer<GetAddress>(new GetAddress(name)), Settings.QueryTimeout);
}
}

Expand Down Expand Up @@ -363,7 +366,7 @@ public void InitFSM()
_log.Info("disconnected from TestConductor");
throw new ConnectionFailure("disconnect");
}
if(@event.FsmEvent is ToServer<Done> && @event.StateData.Channel != null && @event.StateData.RunningOp == null)
if(@event.FsmEvent is ToServer<Done> && @event.StateData.Channel != null)
{
@event.StateData.Channel.Write(Done.Instance);
return Stay();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
<Compile Include="RemoteNodeShutdownAndComesBackSpec.cs" />
<Compile Include="RemoteRoundRobinSpec.cs" />
<Compile Include="RemoteRandomSpec.cs" />
<Compile Include="TestConductor\TestConductorSpec.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\contrib\testkits\Akka.TestKit.Xunit2\Akka.TestKit.Xunit2.csproj">
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Remote.Tests.MultiNode/RemoteRandomSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ protected override void OnReceive(object message)
}
}

[MultiNodeFact]
[MultiNodeFact()]
public void RemoteRandomSpecs()
{
A_remote_random_pool_must_be_locally_instantiated_on_a_remote_node_and_be_able_to_communicate_through_its_remote_actor_ref();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public override int Resize(IEnumerable<Routee> currentRoutees)
}
}

[MultiNodeFact]
[MultiNodeFact()]
public void RemoteRoundRobinSpecs()
{
A_remote_round_robin_must_be_locally_instantiated_on_a_remote_node_and_be_able_to_communicate_through_its_remote_actor_ref();
Expand Down
Loading