Skip to content

Commit

Permalink
Merge pull request #1648 from alarsoft/fix-1522
Browse files Browse the repository at this point in the history
Fix #1522 Ensure extensions and persistence plugins are only registered/created once
  • Loading branch information
Aaronontheweb committed Jan 18, 2016
2 parents b026bb9 + b114e61 commit 2da2a88
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 43 deletions.
71 changes: 32 additions & 39 deletions src/core/Akka.Persistence/Persistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
using Akka.Configuration;
using Akka.Dispatch;
using Akka.Persistence.Journal;
using Akka.Util;
using Akka.Util.Internal.Collections;

namespace Akka.Persistence
{
Expand All @@ -41,8 +39,8 @@ public class PersistenceExtension : IExtension
private readonly Lazy<string> _defaultJournalPluginId;
private readonly Lazy<string> _defaultSnapshotPluginId;

private readonly AtomicReference<IImmutableMap<string, Lazy<PluginHolder>>> _journalPluginExtensionIds = new AtomicReference<IImmutableMap<string, Lazy<PluginHolder>>>(ImmutableTreeMap<string, Lazy<PluginHolder>>.Empty);
private readonly AtomicReference<IImmutableMap<string, Lazy<PluginHolder>>> _snapshotPluginExtensionIds = new AtomicReference<IImmutableMap<string, Lazy<PluginHolder>>>(ImmutableTreeMap<string, Lazy<PluginHolder>>.Empty);
private readonly ConcurrentDictionary<string, Lazy<PluginHolder>> _journalPluginExtensionIds = new ConcurrentDictionary<string, Lazy<PluginHolder>>();
private readonly ConcurrentDictionary<string, Lazy<PluginHolder>> _snapshotPluginExtensionIds = new ConcurrentDictionary<string, Lazy<PluginHolder>>();

public PersistenceExtension(ExtendedActorSystem system)
{
Expand Down Expand Up @@ -81,15 +79,13 @@ public string PersistenceId(IActorRef actor)
public IActorRef SnapshotStoreFor(string snapshotPluginId)
{
var configPath = string.IsNullOrEmpty(snapshotPluginId) ? _defaultSnapshotPluginId.Value : snapshotPluginId;
Lazy<PluginHolder> pluginContainer;
var extensionIdMap = _snapshotPluginExtensionIds.Value;
if (!extensionIdMap.TryGet(configPath, out pluginContainer))
{
pluginContainer = new Lazy<PluginHolder>(() => CreatePlugin(configPath, _ => DefaultPluginDispatcherId), LazyThreadSafetyMode.ExecutionAndPublication);
_snapshotPluginExtensionIds.CompareAndSet(extensionIdMap, extensionIdMap.AddOrUpdate(configPath, pluginContainer));
return SnapshotStoreFor(snapshotPluginId);
}
else return pluginContainer.Value.Ref;

var pluginContainer = _snapshotPluginExtensionIds.GetOrAdd(configPath,
cp =>
new Lazy<PluginHolder>(() => CreatePlugin(cp, _ => DefaultPluginDispatcherId),
LazyThreadSafetyMode.ExecutionAndPublication));

return pluginContainer.Value.Ref;
}

/// <summary>
Expand All @@ -99,19 +95,17 @@ public IActorRef SnapshotStoreFor(string snapshotPluginId)
public IActorRef JournalFor(string journalPluginId)
{
var configPath = string.IsNullOrEmpty(journalPluginId) ? _defaultJournalPluginId.Value : journalPluginId;
Lazy<PluginHolder> pluginContainer;
var extensionIdMap = _journalPluginExtensionIds.Value;
if (!extensionIdMap.TryGet(configPath, out pluginContainer))
{
pluginContainer = new Lazy<PluginHolder>(() => CreatePlugin(configPath, type =>
typeof (AsyncWriteJournal).IsAssignableFrom(type)
? Dispatchers.DefaultDispatcherId
: DefaultPluginDispatcherId),
LazyThreadSafetyMode.ExecutionAndPublication);
_journalPluginExtensionIds.CompareAndSet(extensionIdMap, extensionIdMap.AddOrUpdate(configPath, pluginContainer));
return JournalFor(journalPluginId);
}
else return pluginContainer.Value.Ref;
var pluginContainer = _journalPluginExtensionIds.GetOrAdd(configPath,
cp =>
new Lazy<PluginHolder>(
() =>
CreatePlugin(cp,
type =>
typeof (AsyncWriteJournal).IsAssignableFrom(type)
? Dispatchers.DefaultDispatcherId
: DefaultPluginDispatcherId), LazyThreadSafetyMode.ExecutionAndPublication));

return pluginContainer.Value.Ref;
}

/// <summary>
Expand All @@ -124,18 +118,17 @@ public IActorRef JournalFor(string journalPluginId)
public EventAdapters AdaptersFor(string journalPluginId)
{
var configPath = string.IsNullOrEmpty(journalPluginId) ? _defaultJournalPluginId.Value : journalPluginId;
Lazy<PluginHolder> pluginContainer;
var extensionIdMap = _journalPluginExtensionIds.Value;
if (!extensionIdMap.TryGet(configPath, out pluginContainer))
{
pluginContainer = new Lazy<PluginHolder>(() =>
CreatePlugin(configPath, type => typeof (AsyncWriteJournal).IsAssignableFrom(type)
? Dispatchers.DefaultDispatcherId
: DefaultPluginDispatcherId),
LazyThreadSafetyMode.ExecutionAndPublication);
_journalPluginExtensionIds.CompareAndSet(extensionIdMap, extensionIdMap.AddOrUpdate(configPath, pluginContainer));
return AdaptersFor(journalPluginId);
}else return pluginContainer.Value.Adapters;
var pluginContainer = _journalPluginExtensionIds.GetOrAdd(configPath,
cp =>
new Lazy<PluginHolder>(
() =>
CreatePlugin(cp,
type =>
typeof (AsyncWriteJournal).IsAssignableFrom(type)
? Dispatchers.DefaultDispatcherId
: DefaultPluginDispatcherId), LazyThreadSafetyMode.ExecutionAndPublication));

return pluginContainer.Value.Adapters;
}

/// <summary>
Expand All @@ -145,7 +138,7 @@ public EventAdapters AdaptersFor(string journalPluginId)
/// <returns></returns>
internal EventAdapters AdaptersFor(IActorRef journalRef)
{
return _journalPluginExtensionIds.Value.AllValuesMinToMax
return _journalPluginExtensionIds.Values
.Select(ext => Equals(ext.Value.Ref, journalRef) ? ext.Value.Adapters : null)
.FirstOrDefault(r => r != null)
?? IdentityEventAdapters.Instance;
Expand Down
6 changes: 2 additions & 4 deletions src/core/Akka/Actor/Internal/ActorSystemImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,8 @@ private void ConfigureExtensions(IEnumerable<IExtensionId> extensionIdProviders)
public override object RegisterExtension(IExtensionId extension)
{
if(extension == null) return null;
if(!_extensions.ContainsKey(extension.ExtensionType))
{
_extensions.TryAdd(extension.ExtensionType, new Lazy<object>(() => extension.CreateExtension(this)));
}

_extensions.GetOrAdd(extension.ExtensionType, t => new Lazy<object>(() => extension.CreateExtension(this), LazyThreadSafetyMode.ExecutionAndPublication));

return extension.Get(this);
}
Expand Down

0 comments on commit 2da2a88

Please sign in to comment.