From 8e4990748ae2068bd95d20f207202f153e076edf Mon Sep 17 00:00:00 2001 From: Chris Constantin Date: Fri, 15 Jan 2016 10:33:28 -0800 Subject: [PATCH 1/2] Revert "attemt to fix akka.persistence plugin race conditions" This reverts commit b8545a70092b6d7f2f552cd1dac07d50256a16f8. --- src/core/Akka.Persistence/Persistence.cs | 42 +++++++++++------------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/src/core/Akka.Persistence/Persistence.cs b/src/core/Akka.Persistence/Persistence.cs index d2b28b02335..8eb7d639fc9 100644 --- a/src/core/Akka.Persistence/Persistence.cs +++ b/src/core/Akka.Persistence/Persistence.cs @@ -13,8 +13,6 @@ using Akka.Configuration; using Akka.Dispatch; using Akka.Persistence.Journal; -using Akka.Util; -using Akka.Util.Internal.Collections; namespace Akka.Persistence { @@ -41,8 +39,8 @@ public class PersistenceExtension : IExtension private readonly Lazy _defaultJournalPluginId; private readonly Lazy _defaultSnapshotPluginId; - private readonly AtomicReference>> _journalPluginExtensionIds = new AtomicReference>>(ImmutableTreeMap>.Empty); - private readonly AtomicReference>> _snapshotPluginExtensionIds = new AtomicReference>>(ImmutableTreeMap>.Empty); + private readonly ConcurrentDictionary> _journalPluginExtensionIds = new ConcurrentDictionary>(); + private readonly ConcurrentDictionary> _snapshotPluginExtensionIds = new ConcurrentDictionary>(); public PersistenceExtension(ExtendedActorSystem system) { @@ -82,14 +80,13 @@ public IActorRef SnapshotStoreFor(string snapshotPluginId) { var configPath = string.IsNullOrEmpty(snapshotPluginId) ? _defaultSnapshotPluginId.Value : snapshotPluginId; Lazy pluginContainer; - var extensionIdMap = _snapshotPluginExtensionIds.Value; - if (!extensionIdMap.TryGet(configPath, out pluginContainer)) + if (!_snapshotPluginExtensionIds.TryGetValue(configPath, out pluginContainer)) { - pluginContainer = new Lazy(() => CreatePlugin(configPath, _ => DefaultPluginDispatcherId), LazyThreadSafetyMode.ExecutionAndPublication); - _snapshotPluginExtensionIds.CompareAndSet(extensionIdMap, extensionIdMap.AddOrUpdate(configPath, pluginContainer)); - return SnapshotStoreFor(snapshotPluginId); + var plugin = new Lazy(() => CreatePlugin(configPath, _ => DefaultPluginDispatcherId), LazyThreadSafetyMode.ExecutionAndPublication); + pluginContainer = _snapshotPluginExtensionIds.AddOrUpdate(configPath, plugin, (key, old) => plugin); } - else return pluginContainer.Value.Ref; + + return pluginContainer.Value.Ref; } /// @@ -100,18 +97,17 @@ public IActorRef JournalFor(string journalPluginId) { var configPath = string.IsNullOrEmpty(journalPluginId) ? _defaultJournalPluginId.Value : journalPluginId; Lazy pluginContainer; - var extensionIdMap = _journalPluginExtensionIds.Value; - if (!extensionIdMap.TryGet(configPath, out pluginContainer)) + if (!_journalPluginExtensionIds.TryGetValue(configPath, out pluginContainer)) { - pluginContainer = new Lazy(() => CreatePlugin(configPath, type => + var plugin = new Lazy(() => CreatePlugin(configPath, type => typeof (AsyncWriteJournal).IsAssignableFrom(type) ? Dispatchers.DefaultDispatcherId : DefaultPluginDispatcherId), LazyThreadSafetyMode.ExecutionAndPublication); - _journalPluginExtensionIds.CompareAndSet(extensionIdMap, extensionIdMap.AddOrUpdate(configPath, pluginContainer)); - return JournalFor(journalPluginId); + pluginContainer = _journalPluginExtensionIds.AddOrUpdate(configPath, plugin, (key, old) => plugin); } - else return pluginContainer.Value.Ref; + + return pluginContainer.Value.Ref; } /// @@ -125,17 +121,17 @@ public EventAdapters AdaptersFor(string journalPluginId) { var configPath = string.IsNullOrEmpty(journalPluginId) ? _defaultJournalPluginId.Value : journalPluginId; Lazy pluginContainer; - var extensionIdMap = _journalPluginExtensionIds.Value; - if (!extensionIdMap.TryGet(configPath, out pluginContainer)) + if (!_journalPluginExtensionIds.TryGetValue(configPath, out pluginContainer)) { - pluginContainer = new Lazy(() => + var plugin = new Lazy(() => CreatePlugin(configPath, type => typeof (AsyncWriteJournal).IsAssignableFrom(type) ? Dispatchers.DefaultDispatcherId : DefaultPluginDispatcherId), LazyThreadSafetyMode.ExecutionAndPublication); - _journalPluginExtensionIds.CompareAndSet(extensionIdMap, extensionIdMap.AddOrUpdate(configPath, pluginContainer)); - return AdaptersFor(journalPluginId); - }else return pluginContainer.Value.Adapters; + pluginContainer = _journalPluginExtensionIds.AddOrUpdate(configPath, plugin, (key, old) => plugin); + } + + return pluginContainer.Value.Adapters; } /// @@ -145,7 +141,7 @@ public EventAdapters AdaptersFor(string journalPluginId) /// internal EventAdapters AdaptersFor(IActorRef journalRef) { - return _journalPluginExtensionIds.Value.AllValuesMinToMax + return _journalPluginExtensionIds.Values .Select(ext => Equals(ext.Value.Ref, journalRef) ? ext.Value.Adapters : null) .FirstOrDefault(r => r != null) ?? IdentityEventAdapters.Instance; From b114e6194bef5f588fb87543989210f646b4156b Mon Sep 17 00:00:00 2001 From: Chris Constantin Date: Fri, 15 Jan 2016 10:59:53 -0800 Subject: [PATCH 2/2] #1522 Ensure extensions and persistence plugins are only registered/created once --- src/core/Akka.Persistence/Persistence.cs | 49 +++++++++---------- .../Akka/Actor/Internal/ActorSystemImpl.cs | 6 +-- 2 files changed, 25 insertions(+), 30 deletions(-) diff --git a/src/core/Akka.Persistence/Persistence.cs b/src/core/Akka.Persistence/Persistence.cs index 8eb7d639fc9..4e06350ac68 100644 --- a/src/core/Akka.Persistence/Persistence.cs +++ b/src/core/Akka.Persistence/Persistence.cs @@ -79,12 +79,11 @@ public string PersistenceId(IActorRef actor) public IActorRef SnapshotStoreFor(string snapshotPluginId) { var configPath = string.IsNullOrEmpty(snapshotPluginId) ? _defaultSnapshotPluginId.Value : snapshotPluginId; - Lazy pluginContainer; - if (!_snapshotPluginExtensionIds.TryGetValue(configPath, out pluginContainer)) - { - var plugin = new Lazy(() => CreatePlugin(configPath, _ => DefaultPluginDispatcherId), LazyThreadSafetyMode.ExecutionAndPublication); - pluginContainer = _snapshotPluginExtensionIds.AddOrUpdate(configPath, plugin, (key, old) => plugin); - } + + var pluginContainer = _snapshotPluginExtensionIds.GetOrAdd(configPath, + cp => + new Lazy(() => CreatePlugin(cp, _ => DefaultPluginDispatcherId), + LazyThreadSafetyMode.ExecutionAndPublication)); return pluginContainer.Value.Ref; } @@ -96,16 +95,15 @@ public IActorRef SnapshotStoreFor(string snapshotPluginId) public IActorRef JournalFor(string journalPluginId) { var configPath = string.IsNullOrEmpty(journalPluginId) ? _defaultJournalPluginId.Value : journalPluginId; - Lazy pluginContainer; - if (!_journalPluginExtensionIds.TryGetValue(configPath, out pluginContainer)) - { - var plugin = new Lazy(() => CreatePlugin(configPath, type => - typeof (AsyncWriteJournal).IsAssignableFrom(type) - ? Dispatchers.DefaultDispatcherId - : DefaultPluginDispatcherId), - LazyThreadSafetyMode.ExecutionAndPublication); - pluginContainer = _journalPluginExtensionIds.AddOrUpdate(configPath, plugin, (key, old) => plugin); - } + var pluginContainer = _journalPluginExtensionIds.GetOrAdd(configPath, + cp => + new Lazy( + () => + CreatePlugin(cp, + type => + typeof (AsyncWriteJournal).IsAssignableFrom(type) + ? Dispatchers.DefaultDispatcherId + : DefaultPluginDispatcherId), LazyThreadSafetyMode.ExecutionAndPublication)); return pluginContainer.Value.Ref; } @@ -120,16 +118,15 @@ public IActorRef JournalFor(string journalPluginId) public EventAdapters AdaptersFor(string journalPluginId) { var configPath = string.IsNullOrEmpty(journalPluginId) ? _defaultJournalPluginId.Value : journalPluginId; - Lazy pluginContainer; - if (!_journalPluginExtensionIds.TryGetValue(configPath, out pluginContainer)) - { - var plugin = new Lazy(() => - CreatePlugin(configPath, type => typeof (AsyncWriteJournal).IsAssignableFrom(type) - ? Dispatchers.DefaultDispatcherId - : DefaultPluginDispatcherId), - LazyThreadSafetyMode.ExecutionAndPublication); - pluginContainer = _journalPluginExtensionIds.AddOrUpdate(configPath, plugin, (key, old) => plugin); - } + var pluginContainer = _journalPluginExtensionIds.GetOrAdd(configPath, + cp => + new Lazy( + () => + CreatePlugin(cp, + type => + typeof (AsyncWriteJournal).IsAssignableFrom(type) + ? Dispatchers.DefaultDispatcherId + : DefaultPluginDispatcherId), LazyThreadSafetyMode.ExecutionAndPublication)); return pluginContainer.Value.Adapters; } diff --git a/src/core/Akka/Actor/Internal/ActorSystemImpl.cs b/src/core/Akka/Actor/Internal/ActorSystemImpl.cs index ceeecaee7d4..201072ef132 100644 --- a/src/core/Akka/Actor/Internal/ActorSystemImpl.cs +++ b/src/core/Akka/Actor/Internal/ActorSystemImpl.cs @@ -177,10 +177,8 @@ private void ConfigureExtensions(IEnumerable extensionIdProviders) public override object RegisterExtension(IExtensionId extension) { if(extension == null) return null; - if(!_extensions.ContainsKey(extension.ExtensionType)) - { - _extensions.TryAdd(extension.ExtensionType, new Lazy(() => extension.CreateExtension(this))); - } + + _extensions.GetOrAdd(extension.ExtensionType, t => new Lazy(() => extension.CreateExtension(this), LazyThreadSafetyMode.ExecutionAndPublication)); return extension.Get(this); }