-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Akka.Remote EndpointWriter backoff bugfix #1777
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ | |
using System; | ||
using System.Collections.Concurrent; | ||
using System.Collections.Generic; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using Akka.Actor; | ||
using Akka.Configuration; | ||
|
@@ -65,17 +66,23 @@ public IActorRef Controller | |
public async Task<INode> StartController(int participants, RoleName name, INode controllerPort) | ||
{ | ||
if(_controller != null) throw new IllegalStateException("TestConductorServer was already started"); | ||
_controller = _system.ActorOf(new Props(typeof (Controller), new object[] {participants, controllerPort}), | ||
"controller"); | ||
_controller = _system.ActorOf(Props.Create(() => new Controller(participants, controllerPort)), | ||
"controller"); | ||
//TODO: Need to review this async stuff | ||
var node = await _controller.Ask<INode>(TestKit.Controller.GetSockAddr.Instance).ConfigureAwait(false); | ||
var node = await _controller.Ask<INode>(TestKit.Controller.GetSockAddr.Instance, Settings.QueryTimeout).ConfigureAwait(false); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I made a lot of the same changes everywhere - every |
||
await StartClient(name, node).ConfigureAwait(false); | ||
return node; | ||
} | ||
|
||
/// <summary> | ||
/// Obtain the port to which the controller’s socket is actually bound. This | ||
/// will deviate from the configuration in `akka.testconductor.port` in case | ||
/// that was given as zero. | ||
/// </summary> | ||
/// <returns>The address of the controller's socket endpoint</returns> | ||
public Task<INode> SockAddr() | ||
{ | ||
return _controller.Ask<INode>(TestKit.Controller.GetSockAddr.Instance); | ||
return Controller.Ask<INode>(TestKit.Controller.GetSockAddr.Instance, Settings.QueryTimeout); | ||
} | ||
|
||
/// <summary> | ||
|
@@ -104,7 +111,7 @@ public Task<Done> Throttle(RoleName node, RoleName target, ThrottleTransportAdap | |
float rateMBit) | ||
{ | ||
RequireTestConductorTransport(); | ||
return Controller.Ask<Done>(new Throttle(node, target, direction, rateMBit)); | ||
return Controller.Ask<Done>(new Throttle(node, target, direction, rateMBit), Settings.QueryTimeout); | ||
} | ||
|
||
/// <summary> | ||
|
@@ -128,9 +135,10 @@ public Task<Done> Blackhole(RoleName node, RoleName target, ThrottleTransportAda | |
|
||
private void RequireTestConductorTransport() | ||
{ | ||
// Verifies that the Throttle and FailureInjector TransportAdapters are active | ||
if(!Transport.DefaultAddress.Protocol.Contains(".trttl.gremlin.")) | ||
throw new ConfigurationException("To use this feature you must activate the failure injector adapters " + | ||
"(trttl, gremlin) by specifying `testTransport(on = true)` in your MultiNodeConfig."); | ||
"(trttl, gremlin) by specifying `TestTransport(on = true)` in your MultiNodeConfig."); | ||
} | ||
|
||
/// <summary> | ||
|
@@ -160,7 +168,7 @@ public Task<Done> PassThrough(RoleName node, RoleName target, ThrottleTransportA | |
/// <returns></returns> | ||
public Task<Done> Disconnect(RoleName node, RoleName target) | ||
{ | ||
return Controller.Ask<Done>(new Disconnect(node, target, false)); | ||
return Controller.Ask<Done>(new Disconnect(node, target, false), Settings.QueryTimeout); | ||
} | ||
|
||
/// <summary> | ||
|
@@ -173,7 +181,7 @@ public Task<Done> Disconnect(RoleName node, RoleName target) | |
/// <returns></returns> | ||
public Task<Done> Abort(RoleName node, RoleName target) | ||
{ | ||
return Controller.Ask<Done>(new Disconnect(node, target, true)); | ||
return Controller.Ask<Done>(new Disconnect(node, target, true), Settings.QueryTimeout); | ||
} | ||
|
||
/// <summary> | ||
|
@@ -187,7 +195,7 @@ public Task<Done> Exit(RoleName node, int exitValue) | |
{ | ||
// the recover is needed to handle ClientDisconnectedException exception, | ||
// which is normal during shutdown | ||
return Controller.Ask(new Terminate(node, new Right<bool, int>(exitValue))).ContinueWith(t => | ||
return Controller.Ask(new Terminate(node, new Right<bool, int>(exitValue)), Settings.QueryTimeout).ContinueWith(t => | ||
{ | ||
if(t.Result is Done) return Done.Instance; | ||
var failure = t.Result as FSMBase.Failure; | ||
|
@@ -209,7 +217,7 @@ public Task<Done> Shutdown(RoleName node, bool abort = false) | |
{ | ||
// the recover is needed to handle ClientDisconnectedException exception, | ||
// which is normal during shutdown | ||
return Controller.Ask(new Terminate(node, new Left<bool, int>(abort))).ContinueWith(t => | ||
return Controller.Ask(new Terminate(node, new Left<bool, int>(abort)), Settings.QueryTimeout).ContinueWith(t => | ||
{ | ||
if (t.Result is Done) return Done.Instance; | ||
var failure = t.Result as FSMBase.Failure; | ||
|
@@ -224,7 +232,7 @@ public Task<Done> Shutdown(RoleName node, bool abort = false) | |
/// </summary> | ||
public Task<IEnumerable<RoleName>> GetNodes() | ||
{ | ||
return Controller.Ask<IEnumerable<RoleName>>(TestKit.Controller.GetNodes.Instance); | ||
return Controller.Ask<IEnumerable<RoleName>>(TestKit.Controller.GetNodes.Instance, Settings.QueryTimeout); | ||
} | ||
|
||
/// <summary> | ||
|
@@ -237,7 +245,7 @@ public Task<IEnumerable<RoleName>> GetNodes() | |
/// <returns></returns> | ||
public Task<Done> RemoveNode(RoleName node) | ||
{ | ||
return Controller.Ask<Done>(new Remove(node)); | ||
return Controller.Ask<Done>(new Remove(node), Settings.QueryTimeout); | ||
} | ||
} | ||
|
||
|
@@ -262,7 +270,8 @@ public ConductorHandler(IActorRef controller, ILoggingAdapter log) | |
public async void OnConnect(INode remoteAddress, IConnection responseChannel) | ||
{ | ||
_log.Debug("connection from {0}", responseChannel.RemoteHost); | ||
//TODO: Seems wrong to create new RemoteConnection here | ||
|
||
// Duration of this Ask operation needs to be infinite | ||
var fsm = await _controller.Ask<IActorRef>(new Controller.CreateServerFSM(new RemoteConnection(responseChannel, this)), TimeSpan.FromMilliseconds(Int32.MaxValue)); | ||
_clients.AddOrUpdate(responseChannel, fsm, (connection, @ref) => fsm); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,7 +43,7 @@ akka { | |
pool-size-factor = 1.0 | ||
|
||
# Max number of threads to cap factor-based number to | ||
pool-size-max = 2 | ||
pool-size-max = 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The internal Helios connection used in the MNTR for things like the shared synchronization barriers now run on one thread - probably doesn't impact much since messaging guarantees aren't as important here but I thought it best to be consistent. |
||
} | ||
|
||
# (I&O) Used to configure the number of I/O worker threads on client sockets | ||
|
@@ -58,7 +58,7 @@ akka { | |
pool-size-factor = 1.0 | ||
|
||
# Max number of threads to cap factor-based number to | ||
pool-size-max = 2 | ||
pool-size-max = 1 | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -74,12 +74,15 @@ public void Encode(IConnection connection, object message, out List<IByteBuf> en | |
.SetOp(BarrierOp.Fail))) | ||
.With<ThrottleMsg>( | ||
throttle => | ||
{ | ||
w.SetFailure( | ||
InjectFailure.CreateBuilder() | ||
.SetFailure(TCP.FailType.Throttle) | ||
.SetAddress(Address2Proto(throttle.Target)) | ||
.SetFailure(TCP.FailType.Throttle) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixes some bugs with using throttling inside the MNTR - we weren't encoding all of the fields into the protobuf message correctly before. |
||
.SetDirection(Direction2Proto(throttle.Direction)) | ||
.SetRateMBit(throttle.RateMBit))) | ||
.SetRateMBit(throttle.RateMBit)); | ||
}) | ||
.With<DisconnectMsg>( | ||
disconnect => | ||
w.SetFailure( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,10 +52,9 @@ public IActorRef Client | |
public Task<Done> StartClient(RoleName name, INode controllerAddr) | ||
{ | ||
if(_client != null) throw new IllegalStateException("TestConductorClient already started"); | ||
_client = | ||
_system.ActorOf(new Props(typeof (ClientFSM), | ||
new object[] {name, controllerAddr}), "TestConductorClient"); | ||
|
||
_client = | ||
_system.ActorOf(Props.Create(() => new ClientFSM(name, controllerAddr)), "TestConductorClient"); | ||
|
||
//TODO: IRequiresMessageQueue | ||
var a = _system.ActorOf(Props.Create<WaitForClientFSMToConnect>()); | ||
|
||
|
@@ -132,22 +131,26 @@ public void Enter(TimeSpan timeout, ImmutableList<string> names) | |
try | ||
{ | ||
var askTimeout = barrierTimeout + Settings.QueryTimeout; | ||
//TODO: Wait? | ||
// Need to force barrier to wait here, so we can pass along a "fail barrier" message in the event | ||
// of a failed operation | ||
_client.Ask(new ToServer<EnterBarrier>(new EnterBarrier(name, barrierTimeout)), askTimeout).Wait(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there is a wait here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The wait is correct - have to wait for the barrier to ACK before we can proceed. Has a timeout though, so it'll only wait up to 3-5 seconds IIRC. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wasn't sure if this was in an async method or not |
||
} | ||
catch (OperationCanceledException) | ||
catch (AggregateException) | ||
{ | ||
_client.Tell(new ToServer<FailBarrier>(new FailBarrier(name))); | ||
throw new TimeoutException("Client timed out while waiting for barrier " + name); | ||
} | ||
catch (OperationCanceledException) | ||
{ | ||
_system.Log.Debug("OperationCanceledException was thrown instead of AggregateException"); | ||
} | ||
_system.Log.Debug("passed barrier {0}", name); | ||
} | ||
} | ||
|
||
public Task<Address> GetAddressFor(RoleName name) | ||
{ | ||
//TODO: QueryTimeout implicit? | ||
return _client.Ask<Address>(new ToServer<GetAddress>(new GetAddress(name))); | ||
return _client.Ask<Address>(new ToServer<GetAddress>(new GetAddress(name)), Settings.QueryTimeout); | ||
} | ||
} | ||
|
||
|
@@ -363,7 +366,7 @@ public void InitFSM() | |
_log.Info("disconnected from TestConductor"); | ||
throw new ConnectionFailure("disconnect"); | ||
} | ||
if(@event.FsmEvent is ToServer<Done> && @event.StateData.Channel != null && @event.StateData.RunningOp == null) | ||
if(@event.FsmEvent is ToServer<Done> && @event.StateData.Channel != null) | ||
{ | ||
@event.StateData.Channel.Write(Done.Instance); | ||
return Stay(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this to make it easier to debug multi-node tests when launching them directly from visual studio.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could also use
System.Diagnostics.Debugger.Break()