Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing premature pruning of Topics #3322

Merged
merged 4 commits into from
Feb 15, 2018

Conversation

joshgarnett
Copy link
Contributor

The DistributedPubSubMediator wasn't checking if the TopicActor was actually terminated before pruning it from the bucket. This can cause problems if a TopicActor is re-suscribed to before being stopped. The Subscribe message only checks Context.Child, but does not check if the bucket is still valid. So it was possible to get in a state where subscribes/unsubscribes were succeeding, but any publishes to the topic where being dropped on the floor.

I've also switched from null to ActorRefs.Nobody. Previously, if a Topic actor had terminated and a publish for that topic was received before the DistributedPubSubMediator did a prune, the publish would throw an exception.

The DistributedPubSubMediator wasn't checking if the TopicActor was actually terminated before pruning it from the bucket.  This can cause problems if a TopicActor is re-suscribed to before being stopped.  The Subscribe message only checks Context.Child, but does not check if the bucket is still valid.  So it was possible to get in a state where subscribes/unsubscribes were succeeding, but any publishes to the topic where being dropped on the floor.

I've also switched from null to ActorRefs.Nobody.  Previously, if a Topic actor had terminated and a publish for that topic was received before the DistributedPubSubMediator did a prune, the publish would throw an exception.
@joshgarnett
Copy link
Contributor Author

My guess is whomever did the initial port misunderstood how Scala pattern matching works. From https://github.com/akka/akka/blob/master/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/DistributedPubSubMediator.scala#L835

  def prune(): Unit = {
    registry foreach {
      case (owner, bucket) ⇒
        val oldRemoved = bucket.content.collect {
          case (key, ValueHolder(version, None)) if (bucket.version - version > removedTimeToLiveMillis) ⇒ key
        }
        if (oldRemoved.nonEmpty)
          registry += owner → bucket.copy(content = bucket.content -- oldRemoved)
    }
  }

case (key, ValueHolder(version, None)) is actually doing a pattern match on ValueHolder.Ref == None. If that value is not None it does not do the version compare and will not prune the topic from the bucket.

@Aaronontheweb Aaronontheweb self-assigned this Feb 11, 2018
@Aaronontheweb
Copy link
Member

Looks like we have a failure in the multi-node DistributedPubSub.Mediator specs as a result of this change:

[Node1:first][FAIL] Akka.Cluster.Tools.Tests.MultiNode.PublishSubscribe.DistributedPubSubMediatorSpec.DistributedPubSubMediatorSpecs
[Node1:first][FAIL-EXCEPTION] Type: Xunit.Sdk.EqualException
--> [Node1:first][FAIL-EXCEPTION] Message: Assert.Equal() Failure
Expected: 4
Actual:   5
--> [Node1:first][FAIL-EXCEPTION] StackTrace:    at Xunit.Assert.Equal[T](T expected, T actual, IEqualityComparer`1 comparer)
   at Akka.Cluster.Tools.Tests.MultiNode.PublishSubscribe.DistributedPubSubMediatorSpec.<>c__DisplayClass24_0.<AwaitCount>b__0() in D:\work\edbce350daf6a08d\src\contrib\cluster\Akka.Cluster.Tools.Tests.MultiNode\PublishSubscribe\DistributedPubSubMediatorSpec.cs:line 286
   at Akka.TestKit.TestKitBase.AwaitAssert(Action assertion, Nullable`1 duration, Nullable`1 interval)
   at Akka.Cluster.Tools.Tests.MultiNode.PublishSubscribe.DistributedPubSubMediatorSpec.AwaitCount(Int32 expected) in D:\work\edbce350daf6a08d\src\contrib\cluster\Akka.Cluster.Tools.Tests.MultiNode\PublishSubscribe\DistributedPubSubMediatorSpec.cs:line 287
   at Akka.Cluster.Tools.Tests.MultiNode.PublishSubscribe.DistributedPubSubMediatorSpec.<DistributedPubSubMediator_must_remove_terminated_users>b__30_0() in D:\work\edbce350daf6a08d\src\contrib\cluster\Akka.Cluster.Tools.Tests.MultiNode\PublishSubscribe\DistributedPubSubMediatorSpec.cs:line 440
   at Akka.TestKit.TestKitBase.<>c__DisplayClass141_0.<Within>b__0()
   at Akka.TestKit.TestKitBase.Within[T](TimeSpan min, TimeSpan max, Func`1 function, String hint, Nullable`1 epsilonValue)
   at Akka.TestKit.TestKitBase.Within(TimeSpan min, TimeSpan max, Action action, String hint, Nullable`1 epsilonValue)
   at Akka.TestKit.TestKitBase.Within(TimeSpan max, Action action, Nullable`1 epsilonValue)
   at Akka.Cluster.Tools.Tests.MultiNode.PublishSubscribe.DistributedPubSubMediatorSpec.DistributedPubSubMediator_must_remove_terminated_users() in D:\work\edbce350daf6a08d\src\contrib\cluster\Akka.Cluster.Tools.Tests.MultiNode\PublishSubscribe\DistributedPubSubMediatorSpec.cs:line 442
   at Akka.Cluster.Tools.Tests.MultiNode.PublishSubscribe.DistributedPubSubMediatorSpec.DistributedPubSubMediatorSpecs() in D:\work\edbce350daf6a08d\src\contrib\cluster\Akka.Cluster.Tools.Tests.MultiNode\PublishSubscribe\DistributedPubSubMediatorSpec.cs:line 299

I'll take a look at the source in a moment here and see what's up

@Aaronontheweb
Copy link
Member

(Could be that the spec itself implements the same "misinterpretation" of the Scala code as the underlying source did)

@Aaronontheweb Aaronontheweb added this to the 1.3.5 milestone Feb 11, 2018
@Aaronontheweb
Copy link
Member

This appears to be the offending line of code:

public void DistributedPubSubMediator_must_remove_terminated_users()
{
Within(TimeSpan.FromSeconds(5), () =>
{
RunOn(() =>
{
ChatUser("u3").Tell(PoisonPill.Instance);
}, _second);
AwaitCount(4);
EnterBarrier("after-5");
});
}

Should see the number of publishers drop from 5 to 4 with this change, but that's no longer the case.

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of stylistic changes. Still taking a look at the code to see what can be causing the spec to fail...

@@ -220,10 +220,10 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
{
if (_registry.TryGetValue(_cluster.SelfAddress, out var bucket))
{
if (bucket.Content.TryGetValue(remove.Path, out var valueHolder) && valueHolder.Ref != null)
if (bucket.Content.TryGetValue(remove.Path, out var valueHolder) && !Equals(valueHolder.Ref, ActorRefs.Nobody))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the IsNobody extension method here IMHO. It'll perform a null check and this reference comparison to ActorRefs.Nobody as well.

@@ -371,7 +371,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)
Receive<ClusterEvent.IMemberEvent>(_ => { /* ignore */ });
Receive<Count>(_ =>
{
var count = _registry.Sum(entry => entry.Value.Content.Count(kv => kv.Value.Ref != null));
var count = _registry.Sum(entry => entry.Value.Content.Count(kv => !Equals(kv.Value.Ref, ActorRefs.Nobody)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IsNobody again

@@ -329,7 +329,7 @@ public DistributedPubSubMediator(DistributedPubSubSettings settings)

if (_registry.TryGetValue(_cluster.SelfAddress, out var bucket))
if (bucket.Content.TryGetValue(key, out var holder) && terminated.ActorRef.Equals(holder.Ref))
PutToRegistry(key, null); // remove
PutToRegistry(key, ActorRefs.Nobody); // remove
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like the change

@Aaronontheweb
Copy link
Member

Looks like the issue occurs earlier in the spec, when node 2 goes down as a result of a normal Remove command to a set of subscribers not being propagated:

[Node2:second][FAIL-EXCEPTION] Type: Xunit.Sdk.EqualException
--> [Node2:second][FAIL-EXCEPTION] Message: Assert.Equal() Failure
Expected: 5
Actual:   6
--> [Node2:second][FAIL-EXCEPTION] StackTrace:    at Xunit.Assert.Equal[T](T expected, T actual, IEqualityComparer`1 comparer)
   at Akka.Cluster.Tools.Tests.MultiNode.PublishSubscribe.DistributedPubSubMediatorSpec.<>c__DisplayClass24_0.<AwaitCount>b__0() in D:\work\edbce350daf6a08d\src\contrib\cluster\Akka.Cluster.Tools.Tests.MultiNode\PublishSubscribe\DistributedPubSubMediatorSpec.cs:line 286
   at Akka.TestKit.TestKitBase.AwaitAssert(Action assertion, Nullable`1 duration, Nullable`1 interval)
   at Akka.Cluster.Tools.Tests.MultiNode.PublishSubscribe.DistributedPubSubMediatorSpec.AwaitCount(Int32 expected) in D:\work\edbce350daf6a08d\src\contrib\cluster\Akka.Cluster.Tools.Tests.MultiNode\PublishSubscribe\DistributedPubSubMediatorSpec.cs:line 287
   at Akka.Cluster.Tools.Tests.MultiNode.PublishSubscribe.DistributedPubSubMediatorSpec.<DistributedPubSubMediator_must_keep_track_of_removed_users>b__29_0() in D:\work\edbce350daf6a08d\src\contrib\cluster\Akka.Cluster.Tools.Tests.MultiNode\PublishSubscribe\DistributedPubSubMediatorSpec.cs:line 426
   at Akka.TestKit.TestKitBase.<>c__DisplayClass141_0.<Within>b__0()
   at Akka.TestKit.TestKitBase.Within[T](TimeSpan min, TimeSpan max, Func`1 function, String hint, Nullable`1 epsilonValue)
   at Akka.TestKit.TestKitBase.Within(TimeSpan min, TimeSpan max, Action action, String hint, Nullable`1 epsilonValue)
   at Akka.TestKit.TestKitBase.Within(TimeSpan max, Action action, Nullable`1 epsilonValue)
   at Akka.Cluster.Tools.Tests.MultiNode.PublishSubscribe.DistributedPubSubMediatorSpec.DistributedPubSubMediator_must_keep_track_of_removed_users() in D:\work\edbce350daf6a08d\src\contrib\cluster\Akka.Cluster.Tools.Tests.MultiNode\PublishSubscribe\DistributedPubSubMediatorSpec.cs:line 428
   at Akka.Cluster.Tools.Tests.MultiNode.PublishSubscribe.DistributedPubSubMediatorSpec.DistributedPubSubMediatorSpecs() in D:\work\edbce350daf6a08d\src\contrib\cluster\Akka.Cluster.Tools.Tests.MultiNode\PublishSubscribe\DistributedPubSubMediatorSpec.cs:line 298

Offending code:

RunOn(() =>
{
Mediator.Tell(new Remove("/user/u6"));
}, _first);
AwaitCount(5);

@Aaronontheweb
Copy link
Member

On a whim, going to re-run this and see if the issue is a race condition or not. I don't think so though: it failed in both the .NET 4.6.1 and .NET Core 1.1 implementations.

@Aaronontheweb
Copy link
Member

Node 3 failed in the same spot as node 2. So only node 1's count was accurate after the Remove event after ~10 seconds.

@Aaronontheweb
Copy link
Member

@joshgarnett sure looks like the issue was the ActorRefs.Nobody comparison... Tests pass now.

@Aaronontheweb
Copy link
Member

Looks like the reason why the ActorRefs.Nobody substitution didn't work is because of the way the built-in DistributedPubSub serializer works:

https://github.com/akkadotnet/akka.net/blob/dev/src/contrib/cluster/Akka.Cluster.Tools/PublishSubscribe/Serialization/DistributedPubSubMessageSerializer.cs#L272

https://github.com/akkadotnet/akka.net/blob/dev/src/core/Akka/Actor/ActorRef.cs#L593

Haven't written a test for it, but I'm pretty sure that calling system.Provider.ResolveActorRef(path); on the serialized path of Nobody.Path won't resolve Nobody.Instance, which will cause the subsequent equality comparison to fail. Weird.

@Aaronontheweb
Copy link
Member

Going to merge this to get the bugfix in; don't think the juice is worth the squeeze in trying to make the serialization / deserialization of ActorRefs.Nobody consistent as part of this PR. Sounds like an Akka core issue. I'll file a separate issue for this.

@Aaronontheweb Aaronontheweb merged commit 7e2688f into akkadotnet:dev Feb 15, 2018
Aaronontheweb pushed a commit that referenced this pull request Feb 19, 2018
* Fixing premature pruning of Topics

The DistributedPubSubMediator wasn't checking if the TopicActor was actually terminated before pruning it from the bucket.  This can cause problems if a TopicActor is re-suscribed to before being stopped.  The Subscribe message only checks Context.Child, but does not check if the bucket is still valid.  So it was possible to get in a state where subscribes/unsubscribes were succeeding, but any publishes to the topic where being dropped on the floor.

I've also switched from null to ActorRefs.Nobody.  Previously, if a Topic actor had terminated and a publish for that topic was received before the DistributedPubSubMediator did a prune, the publish would throw an exception.

* Switching to IsNobody() extension method
This was referenced Feb 21, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants