From 948f016c5cc3ca80eb43ba94c0a271b35bc24032 Mon Sep 17 00:00:00 2001 From: Bartosz Sypytkowski Date: Tue, 5 May 2015 23:27:55 +0300 Subject: [PATCH] created Akka.Persistence.Sql.Common project fixed AsyncWriteJournal and commented last parts of the SQL-based persistence APIs fixed minor issues in the solution --- build.fsx | 2 +- src/Akka.sln | 11 + .../Akka.Persistence.Sql.Common.csproj | 70 ++++ .../InternalExtensions.cs | 12 + .../Journal/JournalDbEngine.cs | 335 ++++++++++++++++++ .../Journal/QueryBuilder.cs | 31 ++ .../Journal/QueryMapper.cs | 17 +- .../Properties/AssemblyInfo.cs | 36 ++ .../Akka.Persistence.Sql.Common/Settings.cs | 77 ++++ .../Snapshot/DbSnapshotStore.cs | 168 +++++++++ .../Snapshot/QueryBuilder.cs | 77 ++++ .../Snapshot/QueryMapper.cs | 10 +- .../Akka.Persistence.SqlServer.Tests.csproj | 4 + .../Akka.Persistence.SqlServer.csproj | 6 +- .../Akka.Persistence.SqlServer/Extension.cs | 76 +--- .../InternalExtensions.cs | 5 - .../Journal/QueryBuilder.cs | 37 +- .../Journal/SqlServerJournal.cs | 248 +++++-------- .../Snapshot/QueryBuilder.cs | 81 +---- .../Snapshot/SqlServerSnapshotStore.cs | 143 +------- .../Akka.MultiNodeTests.csproj | 3 + .../Journal/AsyncWriteJournal.cs | 3 +- 22 files changed, 965 insertions(+), 487 deletions(-) create mode 100644 src/contrib/persistence/Akka.Persistence.Sql.Common/Akka.Persistence.Sql.Common.csproj create mode 100644 src/contrib/persistence/Akka.Persistence.Sql.Common/InternalExtensions.cs create mode 100644 src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/JournalDbEngine.cs create mode 100644 src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryBuilder.cs rename src/contrib/persistence/{Akka.Persistence.SqlServer => Akka.Persistence.Sql.Common}/Journal/QueryMapper.cs (69%) create mode 100644 src/contrib/persistence/Akka.Persistence.Sql.Common/Properties/AssemblyInfo.cs create mode 100644 src/contrib/persistence/Akka.Persistence.Sql.Common/Settings.cs create mode 100644 src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/DbSnapshotStore.cs create mode 100644 src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/QueryBuilder.cs rename src/contrib/persistence/{Akka.Persistence.SqlServer => Akka.Persistence.Sql.Common}/Snapshot/QueryMapper.cs (84%) diff --git a/build.fsx b/build.fsx index dd315258a94..0042f15dba8 100644 --- a/build.fsx +++ b/build.fsx @@ -58,7 +58,7 @@ let libDir = workingDir @@ @"lib\net45\" let nugetExe = FullName @"src\.nuget\NuGet.exe" let docDir = "bin" @@ "doc" - +open Fake.RestorePackageHelper Target "RestorePackages" (fun _ -> "./src/Akka.sln" |> RestoreMSSolutionPackages (fun p -> diff --git a/src/Akka.sln b/src/Akka.sln index 7b3be2db2f2..bc127249502 100644 --- a/src/Akka.sln +++ b/src/Akka.sln @@ -198,6 +198,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.TestKit.Xunit2", "cont EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.MultiNodeTests", "core\Akka.MultiNodeTests\Akka.MultiNodeTests.csproj", "{F0781BEA-5BA0-4AF0-BB15-E3F209B681F5}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Akka.Persistence.Sql.Common", "contrib\persistence\Akka.Persistence.Sql.Common\Akka.Persistence.Sql.Common.csproj", "{3B9E6211-9488-4DB5-B714-24248693B38F}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug Mono|Any CPU = Debug Mono|Any CPU @@ -723,6 +725,14 @@ Global {F0781BEA-5BA0-4AF0-BB15-E3F209B681F5}.Release Mono|Any CPU.Build.0 = Release|Any CPU {F0781BEA-5BA0-4AF0-BB15-E3F209B681F5}.Release|Any CPU.ActiveCfg = Release|Any CPU {F0781BEA-5BA0-4AF0-BB15-E3F209B681F5}.Release|Any CPU.Build.0 = Release|Any CPU + {3B9E6211-9488-4DB5-B714-24248693B38F}.Debug Mono|Any CPU.ActiveCfg = Debug|Any CPU + {3B9E6211-9488-4DB5-B714-24248693B38F}.Debug Mono|Any CPU.Build.0 = Debug|Any CPU + {3B9E6211-9488-4DB5-B714-24248693B38F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {3B9E6211-9488-4DB5-B714-24248693B38F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {3B9E6211-9488-4DB5-B714-24248693B38F}.Release Mono|Any CPU.ActiveCfg = Release|Any CPU + {3B9E6211-9488-4DB5-B714-24248693B38F}.Release Mono|Any CPU.Build.0 = Release|Any CPU + {3B9E6211-9488-4DB5-B714-24248693B38F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {3B9E6211-9488-4DB5-B714-24248693B38F}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -811,5 +821,6 @@ Global {5A3C24D7-0D1C-4974-BBB4-22AC792666DE} = {264C22A4-CAFC-41F6-B82C-4DDC5C196767} {7DBD5C17-5E9D-40C4-9201-D092751532A7} = {7625FD95-4B2C-4A5B-BDD5-94B1493FAC8E} {F0781BEA-5BA0-4AF0-BB15-E3F209B681F5} = {01167D3C-49C4-4CDE-9787-C176D139ACDD} + {3B9E6211-9488-4DB5-B714-24248693B38F} = {264C22A4-CAFC-41F6-B82C-4DDC5C196767} EndGlobalSection EndGlobal diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Akka.Persistence.Sql.Common.csproj b/src/contrib/persistence/Akka.Persistence.Sql.Common/Akka.Persistence.Sql.Common.csproj new file mode 100644 index 00000000000..5b993c045f3 --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Akka.Persistence.Sql.Common.csproj @@ -0,0 +1,70 @@ + + + + + Debug + AnyCPU + {3B9E6211-9488-4DB5-B714-24248693B38F} + Library + Properties + Akka.Persistence.Sql.Common + Akka.Persistence.Sql.Common + v4.5 + 512 + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + + + + + + + + + + + + + + + + + + + + + {fca84dea-c118-424b-9eb8-34375dfef18a} + Akka.Persistence + + + {5deddf90-37f0-48d3-a0b0-a5cbd8a7e377} + Akka + + + + + \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/InternalExtensions.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/InternalExtensions.cs new file mode 100644 index 00000000000..b4608d841b3 --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/InternalExtensions.cs @@ -0,0 +1,12 @@ +using System; + +namespace Akka.Persistence.Sql.Common +{ + internal static class InternalExtensions + { + public static string QualifiedTypeName(this Type type) + { + return type.FullName + ", " + type.Assembly.GetName().Name; + } + } +} \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/JournalDbEngine.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/JournalDbEngine.cs new file mode 100644 index 00000000000..bba3c9dc620 --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/JournalDbEngine.cs @@ -0,0 +1,335 @@ +using System; +using System.Collections.Generic; +using System.Data; +using System.Data.Common; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Akka.Actor; + +namespace Akka.Persistence.Sql.Common.Journal +{ + /// + /// Class used for storing intermediate result of the + /// in form which is ready to be stored directly in the SQL table. + /// + public class JournalEntry + { + public readonly string PersistenceId; + public readonly long SequenceNr; + public readonly bool IsDeleted; + public readonly string PayloadType; + public readonly byte[] Payload; + + public JournalEntry(string persistenceId, long sequenceNr, bool isDeleted, string payloadType, byte[] payload) + { + PersistenceId = persistenceId; + SequenceNr = sequenceNr; + IsDeleted = isDeleted; + PayloadType = payloadType; + Payload = payload; + } + } + + /// + /// Class used to abstract SQL persistence capabilities for concrete implementation of actor journal. + /// + public abstract class JournalDbEngine : IDisposable + { + /// + /// Settings applied to journal mapped from HOCON config file. + /// + public readonly JournalSettings Settings; + + /// + /// List of cancellation tokens for each of the currently pending database operations. + /// + protected readonly LinkedList PendingOperations; + + private readonly Akka.Serialization.Serialization _serialization; + private DbConnection _dbConnection; + + protected JournalDbEngine(JournalSettings settings, Akka.Serialization.Serialization serialization) + { + Settings = settings; + _serialization = serialization; + + QueryMapper = new DefaultJournalQueryMapper(serialization); + + PendingOperations = new LinkedList(); + } + + /// + /// Initializes a database connection. + /// + protected abstract DbConnection CreateDbConnection(); + + /// + /// Copies values from entities to database command. + /// + /// + /// + protected abstract void CopyParamsToCommand(DbCommand sqlCommand, JournalEntry entry); + + /// + /// Gets database connection. + /// + public IDbConnection DbConnection { get { return _dbConnection; } } + + /// + /// Used for generating SQL commands for journal-related database operations. + /// + public IJournalQueryBuilder QueryBuilder { get; protected set; } + + /// + /// Used for mapping results returned from database into objects. + /// + public IJournalQueryMapper QueryMapper { get; protected set; } + + /// + /// Initializes and opens a database connection. + /// + public void Open() + { + // close connection if it was open + Close(); + + _dbConnection = CreateDbConnection(); + _dbConnection.Open(); + } + + /// + /// Closes database connection if exists. + /// + public void Close() + { + if (_dbConnection != null) + { + StopPendingOperations(); + + _dbConnection.Dispose(); + _dbConnection = null; + } + } + + /// + /// Stops all currently executing database operations. + /// + protected void StopPendingOperations() + { + // stop all operations executed in the background + var node = PendingOperations.First; + while (node != null) + { + var curr = node; + node = node.Next; + + curr.Value.Cancel(); + PendingOperations.Remove(curr); + } + } + + void IDisposable.Dispose() + { + Close(); + } + + /// + /// Asynchronously replays all requested messages related to provided , + /// using provided sequence ranges (inclusive) with number of messages replayed + /// (counting from the beginning). Replay callback is invoked for each replayed message. + /// + /// Identifier of persistent messages stream to be replayed. + /// Lower inclusive sequence number bound. Unbound by default. + /// Upper inclusive sequence number bound. Unbound by default. + /// Maximum number of messages to be replayed. Unbound by default. + /// Action invoked for each replayed message. + public Task ReplayMessagesAsync(string persistenceId, long fromSequenceNr, long toSequenceNr, long max, IActorRef sender, Action replayCallback) + { + var sqlCommand = QueryBuilder.SelectMessages(persistenceId, fromSequenceNr, toSequenceNr, max); + CompleteCommand(sqlCommand); + + var tokenSource = GetCancellationTokenSource(); + + return sqlCommand + .ExecuteReaderAsync(tokenSource.Token) + .ContinueWith(task => + { + var reader = task.Result; + try + { + while (reader.Read()) + { + var persistent = QueryMapper.Map(reader, sender); + if (persistent != null) + { + replayCallback(persistent); + } + } + } + finally + { + PendingOperations.Remove(tokenSource); + reader.Close(); + } + }, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.AttachedToParent); + } + + /// + /// Asynchronously reads a highest sequence number of the event stream related with provided . + /// + public Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) + { + var sqlCommand = QueryBuilder.SelectHighestSequenceNr(persistenceId); + CompleteCommand(sqlCommand); + + var tokenSource = GetCancellationTokenSource(); + + return sqlCommand + .ExecuteScalarAsync(tokenSource.Token) + .ContinueWith(task => + { + PendingOperations.Remove(tokenSource); + var result = task.Result; + return result is long ? Convert.ToInt64(task.Result) : 0L; + }, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.AttachedToParent); + } + + /// + /// Synchronously writes all persistent inside SQL Server database. + /// + /// Specific table used for message persistence may be defined through configuration within + /// 'akka.persistence.journal.sql-server' scope with 'schema-name' and 'table-name' keys. + /// + public void WriteMessages(IEnumerable messages) + { + var persistentMessages = messages.ToArray(); + var sqlCommand = QueryBuilder.InsertBatchMessages(persistentMessages); + CompleteCommand(sqlCommand); + + var journalEntires = persistentMessages.Select(ToJournalEntry).ToList(); + + InsertInTransaction(sqlCommand, journalEntires); + } + + /// + /// Synchronously deletes all persisted messages identified by provided + /// up to provided message sequence number (inclusive). If flag is cleared, + /// messages will still reside inside database, but will be logically counted as deleted. + /// + public void DeleteMessagesTo(string persistenceId, long toSequenceNr, bool isPermanent) + { + var sqlCommand = QueryBuilder.DeleteBatchMessages(persistenceId, toSequenceNr, isPermanent); + CompleteCommand(sqlCommand); + + sqlCommand.ExecuteNonQuery(); + } + + /// + /// Asynchronously writes all persistent inside SQL Server database. + /// + /// Specific table used for message persistence may be defined through configuration within + /// 'akka.persistence.journal.sql-server' scope with 'schema-name' and 'table-name' keys. + /// + public async Task WriteMessagesAsync(IEnumerable messages) + { + var persistentMessages = messages.ToArray(); + var sqlCommand = QueryBuilder.InsertBatchMessages(persistentMessages); + CompleteCommand(sqlCommand); + + var journalEntires = persistentMessages.Select(ToJournalEntry).ToList(); + + await InsertInTransactionAsync(sqlCommand, journalEntires); + } + + /// + /// Asynchronously deletes all persisted messages identified by provided + /// up to provided message sequence number (inclusive). If flag is cleared, + /// messages will still reside inside database, but will be logically counted as deleted. + /// + public async Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, bool isPermanent) + { + var sqlCommand = QueryBuilder.DeleteBatchMessages(persistenceId, toSequenceNr, isPermanent); + CompleteCommand(sqlCommand); + + await sqlCommand.ExecuteNonQueryAsync(); + } + + private void CompleteCommand(DbCommand sqlCommand) + { + sqlCommand.Connection = _dbConnection; + sqlCommand.CommandTimeout = (int)Settings.ConnectionTimeout.TotalMilliseconds; + } + + private CancellationTokenSource GetCancellationTokenSource() + { + var source = new CancellationTokenSource(); + PendingOperations.AddLast(source); + return source; + } + + private JournalEntry ToJournalEntry(IPersistentRepresentation message) + { + var payloadType = message.Payload.GetType(); + var serializer = _serialization.FindSerializerForType(payloadType); + + return new JournalEntry(message.PersistenceId, message.SequenceNr, message.IsDeleted, + payloadType.QualifiedTypeName(), serializer.ToBinary(message.Payload)); + } + + private void InsertInTransaction(DbCommand sqlCommand, IEnumerable journalEntires) + { + using (var tx = _dbConnection.BeginTransaction()) + { + sqlCommand.Transaction = tx; + try + { + foreach (var entry in journalEntires) + { + CopyParamsToCommand(sqlCommand, entry); + + if (sqlCommand.ExecuteNonQuery() != 1) + { + //TODO: something went wrong, ExecuteNonQuery() should return 1 (number of rows added) + } + } + + tx.Commit(); + } + catch (Exception) + { + tx.Rollback(); + throw; + } + } + } + + private async Task InsertInTransactionAsync(DbCommand sqlCommand, IEnumerable journalEntires) + { + using (var tx = _dbConnection.BeginTransaction()) + { + sqlCommand.Transaction = tx; + try + { + foreach (var entry in journalEntires) + { + CopyParamsToCommand(sqlCommand, entry); + + var commandResult = await sqlCommand.ExecuteNonQueryAsync(); + if (commandResult != 1) + { + //TODO: something went wrong, ExecuteNonQuery() should return 1 (number of rows added) + } + } + + tx.Commit(); + } + catch (Exception) + { + tx.Rollback(); + throw; + } + } + } + } +} \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryBuilder.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryBuilder.cs new file mode 100644 index 00000000000..9c7f23d6e5e --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryBuilder.cs @@ -0,0 +1,31 @@ +using System.Data.Common; + +namespace Akka.Persistence.Sql.Common.Journal +{ + /// + /// SQL query builder used for generating queries required to perform journal's tasks. + /// + public interface IJournalQueryBuilder + { + /// + /// Returns query which should return a frame of messages filtered accordingly to provided parameters. + /// + DbCommand SelectMessages(string persistenceId, long fromSequenceNr, long toSequenceNr, long max); + + /// + /// Returns query returning single number considered as the highest sequence number in current journal. + /// + DbCommand SelectHighestSequenceNr(string persistenceId); + + /// + /// Returns a non-query command used to insert collection of in journal table. + /// + DbCommand InsertBatchMessages(IPersistentRepresentation[] messages); + + /// + /// Depending on flag this method may return either UPDATE or DELETE statement + /// used to alter IsDeleted field or delete rows permanently. + /// + DbCommand DeleteBatchMessages(string persistenceId, long toSequenceNr, bool permanent); + } +} \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.SqlServer/Journal/QueryMapper.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryMapper.cs similarity index 69% rename from src/contrib/persistence/Akka.Persistence.SqlServer/Journal/QueryMapper.cs rename to src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryMapper.cs index fa36665fb38..991258603b4 100644 --- a/src/contrib/persistence/Akka.Persistence.SqlServer/Journal/QueryMapper.cs +++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Journal/QueryMapper.cs @@ -1,7 +1,8 @@ using System; -using System.Data.SqlClient; +using System.Data.Common; +using Akka.Actor; -namespace Akka.Persistence.SqlServer.Journal +namespace Akka.Persistence.Sql.Common.Journal { /// /// Mapper used for generating persistent representations based on SQL query results. @@ -12,9 +13,13 @@ public interface IJournalQueryMapper /// Takes a current row from the SQL data reader and produces a persistent representation object in result. /// It's not supposed to move reader's cursor in any way. /// - IPersistentRepresentation Map(SqlDataReader reader); + IPersistentRepresentation Map(DbDataReader reader, IActorRef sender = null); } + /// + /// Default implementation of used for mapping data + /// returned from ADO.NET data readers back to messages. + /// internal class DefaultJournalQueryMapper : IJournalQueryMapper { private readonly Akka.Serialization.Serialization _serialization; @@ -24,17 +29,17 @@ public DefaultJournalQueryMapper(Akka.Serialization.Serialization serialization) _serialization = serialization; } - public IPersistentRepresentation Map(SqlDataReader reader) + public IPersistentRepresentation Map(DbDataReader reader, IActorRef sender = null) { var persistenceId = reader.GetString(0); var sequenceNr = reader.GetInt64(1); var isDeleted = reader.GetBoolean(2); var payload = GetPayload(reader); - return new Persistent(payload, sequenceNr, persistenceId, isDeleted); + return new Persistent(payload, sequenceNr, persistenceId, isDeleted, sender); } - private object GetPayload(SqlDataReader reader) + private object GetPayload(DbDataReader reader) { var payloadType = reader.GetString(3); var type = Type.GetType(payloadType, true); diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Properties/AssemblyInfo.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Properties/AssemblyInfo.cs new file mode 100644 index 00000000000..e4be5070d3a --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Akka.Persistence.Sql.Common")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("Akka.Persistence.Sql.Common")] +[assembly: AssemblyCopyright("Copyright © 2015")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("e438d2c3-1075-4b01-bb84-e9efd3a36691")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Settings.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Settings.cs new file mode 100644 index 00000000000..6c4e20fec85 --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Settings.cs @@ -0,0 +1,77 @@ +using System; +using Akka.Configuration; + +namespace Akka.Persistence.Sql.Common +{ + /// + /// Configuration settings representation targeting Sql Server journal actor. + /// + public class JournalSettings + { + /// + /// Connection string used to access a persistent SQL Server instance. + /// + public string ConnectionString { get; private set; } + + /// + /// Connection timeout for SQL Server related operations. + /// + public TimeSpan ConnectionTimeout { get; private set; } + + /// + /// Schema name, where table corresponding to event journal is placed. + /// + public string SchemaName { get; private set; } + + /// + /// Name of the table corresponding to event journal. + /// + public string TableName { get; private set; } + + public JournalSettings(Config config) + { + if (config == null) throw new ArgumentNullException("config", "SqlServer journal settings cannot be initialized, because required HOCON section couldn't been found"); + + ConnectionString = config.GetString("connection-string"); + ConnectionTimeout = config.GetTimeSpan("connection-timeout"); + SchemaName = config.GetString("schema-name"); + TableName = config.GetString("table-name"); + } + } + + /// + /// Configuration settings representation targeting Sql Server snapshot store actor. + /// + public class SnapshotStoreSettings + { + /// + /// Connection string used to access a persistent SQL Server instance. + /// + public string ConnectionString { get; private set; } + + /// + /// Connection timeout for SQL Server related operations. + /// + public TimeSpan ConnectionTimeout { get; private set; } + + /// + /// Schema name, where table corresponding to snapshot store is placed. + /// + public string SchemaName { get; private set; } + + /// + /// Name of the table corresponding to snapshot store. + /// + public string TableName { get; private set; } + + public SnapshotStoreSettings(Config config) + { + if (config == null) throw new ArgumentNullException("config", "SqlServer snapshot store settings cannot be initialized, because required HOCON section couldn't been found"); + + ConnectionString = config.GetString("connection-string"); + ConnectionTimeout = config.GetTimeSpan("connection-timeout"); + SchemaName = config.GetString("schema-name"); + TableName = config.GetString("table-name"); + } + } +} \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/DbSnapshotStore.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/DbSnapshotStore.cs new file mode 100644 index 00000000000..f76389da503 --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/DbSnapshotStore.cs @@ -0,0 +1,168 @@ +using System.Collections.Generic; +using System.Data.Common; +using System.Threading; +using System.Threading.Tasks; +using Akka.Persistence.Snapshot; + +namespace Akka.Persistence.Sql.Common.Snapshot +{ + /// + /// Abstract snapshot store implementation, customized to work with SQL-based persistence providers. + /// + public abstract class DbSnapshotStore : SnapshotStore + { + /// + /// List of cancellation tokens for all pending asynchronous database operations. + /// + protected readonly LinkedList PendingOperations; + + private DbConnection _connection; + + protected DbSnapshotStore() + { + QueryMapper = new DefaultSnapshotQueryMapper(Context.System.Serialization); + PendingOperations = new LinkedList(); + } + + /// + /// Returns a new instance of database connection. + /// + protected abstract DbConnection CreateDbConnection(); + + /// + /// Gets settings for the current snapshot store. + /// + protected abstract SnapshotStoreSettings Settings { get; } + + /// + /// Gets current database connection. + /// + public DbConnection DbConnection { get { return _connection; } } + + /// + /// Query builder used to convert snapshot store related operations into corresponding SQL queries. + /// + public ISnapshotQueryBuilder QueryBuilder { get; set; } + + /// + /// Query mapper used to map SQL query results into snapshots. + /// + public ISnapshotQueryMapper QueryMapper { get; set; } + + protected override void PreStart() + { + base.PreStart(); + + _connection = CreateDbConnection(); + _connection.Open(); + } + + protected override void PostStop() + { + base.PostStop(); + + // stop all operations executed in the background + var node = PendingOperations.First; + while (node != null) + { + var curr = node; + node = node.Next; + + curr.Value.Cancel(); + PendingOperations.Remove(curr); + } + + _connection.Close(); + } + + /// + /// Asynchronously loads snapshot with the highest sequence number for a persistent actor/view matching specified criteria. + /// + protected override Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria) + { + var sqlCommand = QueryBuilder.SelectSnapshot(persistenceId, criteria.MaxSequenceNr, criteria.MaxTimeStamp); + CompleteCommand(sqlCommand); + + var tokenSource = GetCancellationTokenSource(); + return sqlCommand + .ExecuteReaderAsync(tokenSource.Token) + .ContinueWith(task => + { + var reader = task.Result; + try + { + return reader.Read() ? QueryMapper.Map(reader) : null; + } + finally + { + PendingOperations.Remove(tokenSource); + reader.Close(); + } + }, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.AttachedToParent); + } + + /// + /// Asynchronously stores a snapshot with metadata as record in SQL table. + /// + protected override Task SaveAsync(SnapshotMetadata metadata, object snapshot) + { + var entry = ToSnapshotEntry(metadata, snapshot); + var sqlCommand = QueryBuilder.InsertSnapshot(entry); + CompleteCommand(sqlCommand); + + var tokenSource = GetCancellationTokenSource(); + + return sqlCommand.ExecuteNonQueryAsync(tokenSource.Token) + .ContinueWith(task => + { + PendingOperations.Remove(tokenSource); + }, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.AttachedToParent); + } + + protected override void Saved(SnapshotMetadata metadata) { } + + protected override void Delete(SnapshotMetadata metadata) + { + var sqlCommand = QueryBuilder.DeleteOne(metadata.PersistenceId, metadata.SequenceNr, metadata.Timestamp); + CompleteCommand(sqlCommand); + + sqlCommand.ExecuteNonQuery(); + } + + protected override void Delete(string persistenceId, SnapshotSelectionCriteria criteria) + { + var sqlCommand = QueryBuilder.DeleteMany(persistenceId, criteria.MaxSequenceNr, criteria.MaxTimeStamp); + CompleteCommand(sqlCommand); + + sqlCommand.ExecuteNonQuery(); + } + + private void CompleteCommand(DbCommand command) + { + command.Connection = _connection; + command.CommandTimeout = (int)Settings.ConnectionTimeout.TotalMilliseconds; + } + + private CancellationTokenSource GetCancellationTokenSource() + { + var source = new CancellationTokenSource(); + PendingOperations.AddLast(source); + return source; + } + + private SnapshotEntry ToSnapshotEntry(SnapshotMetadata metadata, object snapshot) + { + var snapshotType = snapshot.GetType(); + var serializer = Context.System.Serialization.FindSerializerForType(snapshotType); + + var binary = serializer.ToBinary(snapshot); + + return new SnapshotEntry( + persistenceId: metadata.PersistenceId, + sequenceNr: metadata.SequenceNr, + timestamp: metadata.Timestamp, + snapshotType: snapshotType.QualifiedTypeName(), + snapshot: binary); + } + } +} \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/QueryBuilder.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/QueryBuilder.cs new file mode 100644 index 00000000000..9c2fc5d20ee --- /dev/null +++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/QueryBuilder.cs @@ -0,0 +1,77 @@ +using System; +using System.Data.Common; + +namespace Akka.Persistence.Sql.Common.Snapshot +{ + /// + /// Flattened and serialized snapshot object used as intermediate representation + /// before saving snapshot with metadata inside SQL Server database. + /// + public class SnapshotEntry + { + /// + /// Persistence identifier of persistent actor, current snapshot relates to. + /// + public readonly string PersistenceId; + + /// + /// Sequence number used to identify snapshot in it's persistent actor scope. + /// + public readonly long SequenceNr; + + /// + /// Timestamp used to specify date, when the snapshot has been made. + /// + public readonly DateTime Timestamp; + + /// + /// Stringified fully qualified CLR type name of the serialized object. + /// + public readonly string SnapshotType; + + /// + /// Serialized object data. + /// + public readonly byte[] Snapshot; + + public SnapshotEntry(string persistenceId, long sequenceNr, DateTime timestamp, string snapshotType, byte[] snapshot) + { + PersistenceId = persistenceId; + SequenceNr = sequenceNr; + Timestamp = timestamp; + SnapshotType = snapshotType; + Snapshot = snapshot; + } + } + + /// + /// Query builder used for prepare SQL commands used for snapshot store persistence operations. + /// + public interface ISnapshotQueryBuilder + { + /// + /// Deletes a single snapshot identified by it's persistent actor's , + /// and . + /// + DbCommand DeleteOne(string persistenceId, long sequenceNr, DateTime timestamp); + + /// + /// Deletes all snapshot matching persistent actor's as well as + /// upper (inclusive) bounds of the both and . + /// + DbCommand DeleteMany(string persistenceId, long maxSequenceNr, DateTime maxTimestamp); + + /// + /// Inserts a single snapshot represented by provided instance. + /// + DbCommand InsertSnapshot(SnapshotEntry entry); + + /// + /// Selects a single snapshot identified by persistent actor's , + /// matching upper (inclusive) bounds of both and . + /// In case, when more than one snapshot matches specified criteria, one with the highest sequence number will be selected. + /// + DbCommand SelectSnapshot(string persistenceId, long maxSequenceNr, DateTime maxTimestamp); + } + +} \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.SqlServer/Snapshot/QueryMapper.cs b/src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/QueryMapper.cs similarity index 84% rename from src/contrib/persistence/Akka.Persistence.SqlServer/Snapshot/QueryMapper.cs rename to src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/QueryMapper.cs index d2547c3208b..1bce34e01fc 100644 --- a/src/contrib/persistence/Akka.Persistence.SqlServer/Snapshot/QueryMapper.cs +++ b/src/contrib/persistence/Akka.Persistence.Sql.Common/Snapshot/QueryMapper.cs @@ -1,7 +1,7 @@ using System; -using System.Data.SqlClient; +using System.Data.Common; -namespace Akka.Persistence.SqlServer.Snapshot +namespace Akka.Persistence.Sql.Common.Snapshot { /// /// Mapper used to map results of snapshot SELECT queries into valid snapshot objects. @@ -11,7 +11,7 @@ public interface ISnapshotQueryMapper /// /// Map data found under current cursor pointed by SQL data reader into instance. /// - SelectedSnapshot Map(SqlDataReader reader); + SelectedSnapshot Map(DbDataReader reader); } internal class DefaultSnapshotQueryMapper : ISnapshotQueryMapper @@ -23,7 +23,7 @@ public DefaultSnapshotQueryMapper(Akka.Serialization.Serialization serialization _serialization = serialization; } - public SelectedSnapshot Map(SqlDataReader reader) + public SelectedSnapshot Map(DbDataReader reader) { var persistenceId = reader.GetString(0); var sequenceNr = reader.GetInt64(1); @@ -35,7 +35,7 @@ public SelectedSnapshot Map(SqlDataReader reader) return new SelectedSnapshot(metadata, snapshot); } - private object GetSnapshot(SqlDataReader reader) + private object GetSnapshot(DbDataReader reader) { var type = Type.GetType(reader.GetString(3), true); var serializer = _serialization.FindSerializerForType(type); diff --git a/src/contrib/persistence/Akka.Persistence.SqlServer.Tests/Akka.Persistence.SqlServer.Tests.csproj b/src/contrib/persistence/Akka.Persistence.SqlServer.Tests/Akka.Persistence.SqlServer.Tests.csproj index 5c19ca2eaaa..215b87e0526 100644 --- a/src/contrib/persistence/Akka.Persistence.SqlServer.Tests/Akka.Persistence.SqlServer.Tests.csproj +++ b/src/contrib/persistence/Akka.Persistence.SqlServer.Tests/Akka.Persistence.SqlServer.Tests.csproj @@ -85,6 +85,10 @@ {11f4d4b8-7e07-4457-abf2-609b3e7b2649} Akka.TestKit.Xunit + + {3b9e6211-9488-4db5-b714-24248693b38f} + Akka.Persistence.Sql.Common + {bac85686-afc4-413e-98dc-5ed8f468bc63} Akka.Persistence.SqlServer diff --git a/src/contrib/persistence/Akka.Persistence.SqlServer/Akka.Persistence.SqlServer.csproj b/src/contrib/persistence/Akka.Persistence.SqlServer/Akka.Persistence.SqlServer.csproj index 4847156456b..7a53e5b3a79 100644 --- a/src/contrib/persistence/Akka.Persistence.SqlServer/Akka.Persistence.SqlServer.csproj +++ b/src/contrib/persistence/Akka.Persistence.SqlServer/Akka.Persistence.SqlServer.csproj @@ -41,12 +41,10 @@ - - @@ -59,6 +57,10 @@ {5deddf90-37f0-48d3-a0b0-a5cbd8a7e377} Akka + + {3b9e6211-9488-4db5-b714-24248693b38f} + Akka.Persistence.Sql.Common + diff --git a/src/contrib/persistence/Akka.Persistence.SqlServer/Extension.cs b/src/contrib/persistence/Akka.Persistence.SqlServer/Extension.cs index df961f40972..92333106158 100644 --- a/src/contrib/persistence/Akka.Persistence.SqlServer/Extension.cs +++ b/src/contrib/persistence/Akka.Persistence.SqlServer/Extension.cs @@ -1,93 +1,37 @@ using System; using Akka.Actor; using Akka.Configuration; +using Akka.Persistence.Sql.Common; namespace Akka.Persistence.SqlServer { - /// - /// Configuration settings representation targeting Sql Server journal actor. - /// - public class JournalSettings + + public class SqlServerJournalSettings : JournalSettings { public const string ConfigPath = "akka.persistence.journal.sql-server"; - /// - /// Connection string used to access a persistent SQL Server instance. - /// - public string ConnectionString { get; private set; } - - /// - /// Connection timeout for SQL Server related operations. - /// - public TimeSpan ConnectionTimeout { get; private set; } - - /// - /// Schema name, where table corresponding to event journal is placed. - /// - public string SchemaName { get; private set; } - - /// - /// Name of the table corresponding to event journal. - /// - public string TableName { get; private set; } - /// /// Flag determining in in case of event journal table missing, it should be automatically initialized. /// public bool AutoInitialize { get; private set; } - public JournalSettings(Config config) + public SqlServerJournalSettings(Config config) : base(config) { - if (config == null) throw new ArgumentNullException("config", "SqlServer journal settings cannot be initialized, because required HOCON section couldn't been found"); - - ConnectionString = config.GetString("connection-string"); - ConnectionTimeout = config.GetTimeSpan("connection-timeout"); - SchemaName = config.GetString("schema-name"); - TableName = config.GetString("table-name"); AutoInitialize = config.GetBoolean("auto-initialize"); } } - /// - /// Configuration settings representation targeting Sql Server snapshot store actor. - /// - public class SnapshotStoreSettings + public class SqlServerSnapshotSettings : SnapshotStoreSettings { public const string ConfigPath = "akka.persistence.snapshot-store.sql-server"; - /// - /// Connection string used to access a persistent SQL Server instance. - /// - public string ConnectionString { get; private set; } - - /// - /// Connection timeout for SQL Server related operations. - /// - public TimeSpan ConnectionTimeout { get; private set; } - - /// - /// Schema name, where table corresponding to snapshot store is placed. - /// - public string SchemaName { get; private set; } - - /// - /// Name of the table corresponding to snapshot store. - /// - public string TableName { get; private set; } - /// /// Flag determining in in case of snapshot store table missing, it should be automatically initialized. /// public bool AutoInitialize { get; private set; } - public SnapshotStoreSettings(Config config) + public SqlServerSnapshotSettings(Config config) : base(config) { - if (config == null) throw new ArgumentNullException("config", "SqlServer snapshot store settings cannot be initialized, because required HOCON section couldn't been found"); - - ConnectionString = config.GetString("connection-string"); - ConnectionTimeout = config.GetTimeSpan("connection-timeout"); - SchemaName = config.GetString("schema-name"); - TableName = config.GetString("table-name"); AutoInitialize = config.GetBoolean("auto-initialize"); } } @@ -100,19 +44,19 @@ public class SqlServerPersistenceExtension : IExtension /// /// Journal-related settings loaded from HOCON configuration. /// - public readonly JournalSettings JournalSettings; + public readonly SqlServerJournalSettings JournalSettings; /// /// Snapshot store related settings loaded from HOCON configuration. /// - public readonly SnapshotStoreSettings SnapshotStoreSettings; + public readonly SqlServerSnapshotSettings SnapshotStoreSettings; public SqlServerPersistenceExtension(ExtendedActorSystem system) { system.Settings.InjectTopLevelFallback(SqlServerPersistence.DefaultConfiguration()); - JournalSettings = new JournalSettings(system.Settings.Config.GetConfig(JournalSettings.ConfigPath)); - SnapshotStoreSettings = new SnapshotStoreSettings(system.Settings.Config.GetConfig(SnapshotStoreSettings.ConfigPath)); + JournalSettings = new SqlServerJournalSettings(system.Settings.Config.GetConfig(SqlServerJournalSettings.ConfigPath)); + SnapshotStoreSettings = new SqlServerSnapshotSettings(system.Settings.Config.GetConfig(SqlServerSnapshotSettings.ConfigPath)); if (JournalSettings.AutoInitialize) { diff --git a/src/contrib/persistence/Akka.Persistence.SqlServer/InternalExtensions.cs b/src/contrib/persistence/Akka.Persistence.SqlServer/InternalExtensions.cs index 1fc0293dbc1..23bdb4977c5 100644 --- a/src/contrib/persistence/Akka.Persistence.SqlServer/InternalExtensions.cs +++ b/src/contrib/persistence/Akka.Persistence.SqlServer/InternalExtensions.cs @@ -5,11 +5,6 @@ namespace Akka.Persistence.SqlServer { internal static class InternalExtensions { - public static string QualifiedTypeName(this Type type) - { - return type.FullName + ", " + type.Assembly.GetName().Name; - } - public static string QuoteSchemaAndTable(this string sqlQuery, string schemaName, string tableName) { var cb = new SqlCommandBuilder(); diff --git a/src/contrib/persistence/Akka.Persistence.SqlServer/Journal/QueryBuilder.cs b/src/contrib/persistence/Akka.Persistence.SqlServer/Journal/QueryBuilder.cs index 9a4559e23f8..4b1197d8354 100644 --- a/src/contrib/persistence/Akka.Persistence.SqlServer/Journal/QueryBuilder.cs +++ b/src/contrib/persistence/Akka.Persistence.SqlServer/Journal/QueryBuilder.cs @@ -1,36 +1,11 @@ using System.Data; +using System.Data.Common; using System.Data.SqlClient; using System.Text; +using Akka.Persistence.Sql.Common.Journal; namespace Akka.Persistence.SqlServer.Journal { - /// - /// SQL query builder used for generating queries required to perform journal's tasks. - /// - public interface IJournalQueryBuilder - { - /// - /// Returns query which should return a frame of messages filtered accordingly to provided parameters. - /// - SqlCommand SelectMessages(string persistenceId, long fromSequenceNr, long toSequenceNr, long max); - - /// - /// Returns query returning single number considered as the highest sequence number in current journal. - /// - SqlCommand SelectHighestSequenceNr(string persistenceId); - - /// - /// Returns a non-query command used to insert collection of in journal table. - /// - SqlCommand InsertBatchMessages(IPersistentRepresentation[] messages); - - /// - /// Depending on flag this method may return either UPDATE or DELETE statement - /// used to alter IsDeleted field or delete rows permanently. - /// - SqlCommand DeleteBatchMessages(string persistenceId, long toSequenceNr, bool permanent); - } - internal class DefaultJournalQueryBuilder : IJournalQueryBuilder { private readonly string _schemaName; @@ -49,7 +24,7 @@ public DefaultJournalQueryBuilder(string tableName, string schemaName) _selectHighestSequenceNrSql = @"SELECT MAX(SequenceNr) FROM {0}.{1} WHERE CS_PID = CHECKSUM(@pid)".QuoteSchemaAndTable(_schemaName, _tableName); } - public SqlCommand SelectMessages(string persistenceId, long fromSequenceNr, long toSequenceNr, long max) + public DbCommand SelectMessages(string persistenceId, long fromSequenceNr, long toSequenceNr, long max) { var sql = BuildSelectMessagesSql(fromSequenceNr, toSequenceNr, max); var command = new SqlCommand(sql) @@ -60,7 +35,7 @@ public SqlCommand SelectMessages(string persistenceId, long fromSequenceNr, long return command; } - public SqlCommand SelectHighestSequenceNr(string persistenceId) + public DbCommand SelectHighestSequenceNr(string persistenceId) { var command = new SqlCommand(_selectHighestSequenceNrSql) { @@ -70,7 +45,7 @@ public SqlCommand SelectHighestSequenceNr(string persistenceId) return command; } - public SqlCommand InsertBatchMessages(IPersistentRepresentation[] messages) + public DbCommand InsertBatchMessages(IPersistentRepresentation[] messages) { var command = new SqlCommand(_insertMessagesSql); command.Parameters.Add("@PersistenceId", SqlDbType.NVarChar); @@ -82,7 +57,7 @@ public SqlCommand InsertBatchMessages(IPersistentRepresentation[] messages) return command; } - public SqlCommand DeleteBatchMessages(string persistenceId, long toSequenceNr, bool permanent) + public DbCommand DeleteBatchMessages(string persistenceId, long toSequenceNr, bool permanent) { var sql = BuildDeleteSql(toSequenceNr, permanent); var command = new SqlCommand(sql) diff --git a/src/contrib/persistence/Akka.Persistence.SqlServer/Journal/SqlServerJournal.cs b/src/contrib/persistence/Akka.Persistence.SqlServer/Journal/SqlServerJournal.cs index ade46f0f734..e5bcd5e7045 100644 --- a/src/contrib/persistence/Akka.Persistence.SqlServer/Journal/SqlServerJournal.cs +++ b/src/contrib/persistence/Akka.Persistence.SqlServer/Journal/SqlServerJournal.cs @@ -1,232 +1,154 @@ using System; using System.Collections.Generic; +using System.Data.Common; using System.Data.SqlClient; -using System.Linq; -using System.Threading; using System.Threading.Tasks; using Akka.Persistence.Journal; +using Akka.Persistence.Sql.Common; +using Akka.Persistence.Sql.Common.Journal; namespace Akka.Persistence.SqlServer.Journal { /// - /// Persistent journal actor using SQL Server as persistence layer. It processes write requests - /// one by one in synchronous manner, while reading results asynchronously. + /// Specialization of the which uses SQL Server as it's sql backend database. /// - public class SqlServerJournal : SyncWriteJournal + public class SqlJournalEngine : JournalDbEngine { - #region journal internal types definitions - - internal class JournalEntry + public SqlJournalEngine(JournalSettings journalSettings, Akka.Serialization.Serialization serialization) + : base(journalSettings, serialization) { - public readonly string PersistenceId; - public readonly long SequenceNr; - public readonly bool IsDeleted; - public readonly string PayloadType; - public readonly byte[] Payload; + QueryBuilder = new DefaultJournalQueryBuilder(journalSettings.TableName, journalSettings.SchemaName); + } - public JournalEntry(string persistenceId, long sequenceNr, bool isDeleted, string payloadType, byte[] payload) - { - PersistenceId = persistenceId; - SequenceNr = sequenceNr; - IsDeleted = isDeleted; - PayloadType = payloadType; - Payload = payload; - } + protected override DbConnection CreateDbConnection() + { + return new SqlConnection(Settings.ConnectionString); } - #endregion + protected override void CopyParamsToCommand(DbCommand sqlCommand, JournalEntry entry) + { + sqlCommand.Parameters["@PersistenceId"].Value = entry.PersistenceId; + sqlCommand.Parameters["@SequenceNr"].Value = entry.SequenceNr; + sqlCommand.Parameters["@IsDeleted"].Value = entry.IsDeleted; + sqlCommand.Parameters["@PayloadType"].Value = entry.PayloadType; + sqlCommand.Parameters["@Payload"].Value = entry.Payload; + } + } + /// + /// Persistent journal actor using SQL Server as persistence layer. It processes write requests + /// one by one in asynchronous manner, while reading results asynchronously. + /// + public class SqlServerJournal : AsyncWriteJournal + { private readonly SqlServerPersistenceExtension _extension; - private SqlConnection _connection; - - protected readonly LinkedList PendingOperations; - - /// - /// Used for generating SQL commands for journal-related database operations. - /// - public IJournalQueryBuilder QueryBuilder { get; protected set; } - /// - /// Used for mapping results returned from database into objects. - /// - public IJournalQueryMapper QueryMapper { get; protected set; } + private JournalDbEngine _engine; public SqlServerJournal() { _extension = SqlServerPersistence.Instance.Apply(Context.System); + } - var settings = _extension.JournalSettings; - QueryBuilder = new DefaultJournalQueryBuilder(settings.TableName, settings.SchemaName); - QueryMapper = new DefaultJournalQueryMapper(Context.System.Serialization); - PendingOperations = new LinkedList(); + /// + /// Gets an engine instance responsible for handling all database-related journal requests. + /// + protected virtual JournalDbEngine Engine + { + get + { + return _engine ?? (_engine = new SqlJournalEngine(_extension.JournalSettings, Context.System.Serialization)); + } } protected override void PreStart() { base.PreStart(); - - _connection = new SqlConnection(_extension.JournalSettings.ConnectionString); - _connection.Open(); + Engine.Open(); } protected override void PostStop() { base.PostStop(); - - // stop all operations executed in the background - var node = PendingOperations.First; - while (node != null) - { - var curr = node; - node = node.Next; - - curr.Value.Cancel(); - PendingOperations.Remove(curr); - } - - _connection.Close(); + Engine.Close(); } - /// - /// Asynchronously replays all requested messages related to provided , - /// using provided sequence ranges (inclusive) with number of messages replayed - /// (counting from the beginning). Replay callback is invoked for each replayed message. - /// - /// Identifier of persistent messages stream to be replayed. - /// Lower inclusive sequence number bound. Unbound by default. - /// Upper inclusive sequence number bound. Unbound by default. - /// Maximum number of messages to be replayed. Unbound by default. - /// Action invoked for each replayed message. public override Task ReplayMessagesAsync(string persistenceId, long fromSequenceNr, long toSequenceNr, long max, Action replayCallback) { - var sqlCommand = QueryBuilder.SelectMessages(persistenceId, fromSequenceNr, toSequenceNr, max); - CompleteCommand(sqlCommand); - - var tokenSource = GetCancellationTokenSource(); - - return sqlCommand - .ExecuteReaderAsync(tokenSource.Token) - .ContinueWith(task => - { - var reader = task.Result; - try - { - while (reader.Read()) - { - var persistent = QueryMapper.Map(reader); - if (persistent != null) - replayCallback(persistent); - } - } - finally - { - PendingOperations.Remove(tokenSource); - reader.Close(); - } - }, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.AttachedToParent); + return Engine.ReplayMessagesAsync(persistenceId, fromSequenceNr, toSequenceNr, max, Context.Sender, replayCallback); } - /// - /// Asynchronously reads a highest sequence number of the event stream related with provided . - /// public override Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) { - var sqlCommand = QueryBuilder.SelectHighestSequenceNr(persistenceId); - CompleteCommand(sqlCommand); - - var tokenSource = GetCancellationTokenSource(); + return Engine.ReadHighestSequenceNrAsync(persistenceId, fromSequenceNr); + } - return sqlCommand - .ExecuteScalarAsync(tokenSource.Token) - .ContinueWith(task => - { - PendingOperations.Remove(tokenSource); - var result = task.Result; - return result is long ? Convert.ToInt64(task.Result) : 0L; - }, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.AttachedToParent); + protected override Task WriteMessagesAsync(IEnumerable messages) + { + return Engine.WriteMessagesAsync(messages); } - /// - /// Synchronously writes all persistent inside SQL Server database. - /// - /// Specific table used for message persistence may be defined through configuration within - /// 'akka.persistence.journal.sql-server' scope with 'schema-name' and 'table-name' keys. - /// - public override void WriteMessages(IEnumerable messages) + protected override Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, bool isPermanent) { - var persistentMessages = messages.ToArray(); - var sqlCommand = QueryBuilder.InsertBatchMessages(persistentMessages); - CompleteCommand(sqlCommand); + return Engine.DeleteMessagesToAsync(persistenceId, toSequenceNr, isPermanent); + } + } - var journalEntires = persistentMessages.Select(ToJournalEntry).ToList(); + /// + /// Persistent journal actor using SQL Server as persistence layer. It processes write requests + /// one by one in synchronous manner, while reading results asynchronously. Use for tests only. + /// + public class SyncSqlServerJournal : SyncWriteJournal + { + private readonly SqlServerPersistenceExtension _extension; + private JournalDbEngine _engine; - InsertInTransaction(sqlCommand, journalEntires); + public SyncSqlServerJournal() + { + _extension = SqlServerPersistence.Instance.Apply(Context.System); } /// - /// Synchronously deletes all persisted messages identified by provided - /// up to provided message sequence number (inclusive). If flag is cleared, - /// messages will still reside inside database, but will be logically counted as deleted. + /// Gets an engine instance responsible for handling all database-related journal requests. /// - public override void DeleteMessagesTo(string persistenceId, long toSequenceNr, bool isPermanent) + protected virtual JournalDbEngine Engine { - var sqlCommand = QueryBuilder.DeleteBatchMessages(persistenceId, toSequenceNr, isPermanent); - CompleteCommand(sqlCommand); + get + { + return _engine ?? (_engine = new SqlJournalEngine(_extension.JournalSettings, Context.System.Serialization)); + } + } - sqlCommand.ExecuteNonQuery(); + protected override void PreStart() + { + base.PreStart(); + Engine.Open(); } - private void CompleteCommand(SqlCommand sqlCommand) + protected override void PostStop() { - sqlCommand.Connection = _connection; - sqlCommand.CommandTimeout = (int)_extension.JournalSettings.ConnectionTimeout.TotalMilliseconds; + base.PostStop(); + Engine.Close(); } - private CancellationTokenSource GetCancellationTokenSource() + public override Task ReplayMessagesAsync(string persistenceId, long fromSequenceNr, long toSequenceNr, long max, Action replayCallback) { - var source = new CancellationTokenSource(); - PendingOperations.AddLast(source); - return source; + return Engine.ReplayMessagesAsync(persistenceId, fromSequenceNr, toSequenceNr, max, Context.Sender, replayCallback); } - private static JournalEntry ToJournalEntry(IPersistentRepresentation message) + public override Task ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) { - var payloadType = message.Payload.GetType(); - var serializer = Context.System.Serialization.FindSerializerForType(payloadType); + return Engine.ReadHighestSequenceNrAsync(persistenceId, fromSequenceNr); + } - return new JournalEntry(message.PersistenceId, message.SequenceNr, message.IsDeleted, - payloadType.QualifiedTypeName(), serializer.ToBinary(message.Payload)); + public override void WriteMessages(IEnumerable messages) + { + Engine.WriteMessages(messages); } - private void InsertInTransaction(SqlCommand sqlCommand, IEnumerable journalEntires) + public override void DeleteMessagesTo(string persistenceId, long toSequenceNr, bool isPermanent) { - using (var tx = _connection.BeginTransaction()) - { - sqlCommand.Transaction = tx; - try - { - foreach (var entry in journalEntires) - { - sqlCommand.Parameters["@PersistenceId"].Value = entry.PersistenceId; - sqlCommand.Parameters["@SequenceNr"].Value = entry.SequenceNr; - sqlCommand.Parameters["@IsDeleted"].Value = entry.IsDeleted; - sqlCommand.Parameters["@PayloadType"].Value = entry.PayloadType; - sqlCommand.Parameters["@Payload"].Value = entry.Payload; - - if (sqlCommand.ExecuteNonQuery() != 1) - { - //TODO: something went wrong, ExecuteNonQuery() should return 1 (number of rows added) - } - } - - tx.Commit(); - } - catch (Exception) - { - tx.Rollback(); - throw; - } - } + Engine.DeleteMessagesTo(persistenceId, toSequenceNr, isPermanent); } } } \ No newline at end of file diff --git a/src/contrib/persistence/Akka.Persistence.SqlServer/Snapshot/QueryBuilder.cs b/src/contrib/persistence/Akka.Persistence.SqlServer/Snapshot/QueryBuilder.cs index bc03dc85bf7..6376f3a2403 100644 --- a/src/contrib/persistence/Akka.Persistence.SqlServer/Snapshot/QueryBuilder.cs +++ b/src/contrib/persistence/Akka.Persistence.SqlServer/Snapshot/QueryBuilder.cs @@ -1,81 +1,12 @@ using System; using System.Data; +using System.Data.Common; using System.Data.SqlClient; using System.Text; +using Akka.Persistence.Sql.Common.Snapshot; namespace Akka.Persistence.SqlServer.Snapshot { - /// - /// Flattened and serialized snapshot object used as intermediate representation - /// before saving snapshot with metadata inside SQL Server database. - /// - public class SnapshotEntry - { - /// - /// Persistence identifier of persistent actor, current snapshot relates to. - /// - public readonly string PersistenceId; - - /// - /// Sequence number used to identify snapshot in it's persistent actor scope. - /// - public readonly long SequenceNr; - - /// - /// Timestamp used to specify date, when the snapshot has been made. - /// - public readonly DateTime Timestamp; - - /// - /// Stringified fully qualified CLR type name of the serialized object. - /// - public readonly string SnapshotType; - - /// - /// Serialized object data. - /// - public readonly byte[] Snapshot; - - public SnapshotEntry(string persistenceId, long sequenceNr, DateTime timestamp, string snapshotType, byte[] snapshot) - { - PersistenceId = persistenceId; - SequenceNr = sequenceNr; - Timestamp = timestamp; - SnapshotType = snapshotType; - Snapshot = snapshot; - } - } - - /// - /// Query builder used for prepare SQL commands used for snapshot store persistence operations. - /// - public interface ISnapshotQueryBuilder - { - /// - /// Deletes a single snapshot identified by it's persistent actor's , - /// and . - /// - SqlCommand DeleteOne(string persistenceId, long sequenceNr, DateTime timestamp); - - /// - /// Deletes all snapshot matching persistent actor's as well as - /// upper (inclusive) bounds of the both and . - /// - SqlCommand DeleteMany(string persistenceId, long maxSequenceNr, DateTime maxTimestamp); - - /// - /// Inserts a single snapshot represented by provided instance. - /// - SqlCommand InsertSnapshot(SnapshotEntry entry); - - /// - /// Selects a single snapshot identified by persistent actor's , - /// matching upper (inclusive) bounds of both and . - /// In case, when more than one snapshot matches specified criteria, one with the highest sequence number will be selected. - /// - SqlCommand SelectSnapshot(string persistenceId, long maxSequenceNr, DateTime maxTimestamp); - } - internal class DefaultSnapshotQueryBuilder : ISnapshotQueryBuilder { private readonly string _deleteSql; @@ -89,7 +20,7 @@ public DefaultSnapshotQueryBuilder(string schemaName, string tableName) _selectSql = @"SELECT PersistenceId, SequenceNr, Timestamp, SnapshotType, Snapshot FROM {0}.{1} WHERE CS_PID = CHECKSUM(@PersistenceId)".QuoteSchemaAndTable(schemaName, tableName); } - public SqlCommand DeleteOne(string persistenceId, long sequenceNr, DateTime timestamp) + public DbCommand DeleteOne(string persistenceId, long sequenceNr, DateTime timestamp) { var sqlCommand = new SqlCommand(); sqlCommand.Parameters.Add(new SqlParameter("@PersistenceId", SqlDbType.NVarChar, persistenceId.Length) { Value = persistenceId }); @@ -112,7 +43,7 @@ public SqlCommand DeleteOne(string persistenceId, long sequenceNr, DateTime time return sqlCommand; } - public SqlCommand DeleteMany(string persistenceId, long maxSequenceNr, DateTime maxTimestamp) + public DbCommand DeleteMany(string persistenceId, long maxSequenceNr, DateTime maxTimestamp) { var sqlCommand = new SqlCommand(); sqlCommand.Parameters.Add(new SqlParameter("@PersistenceId", SqlDbType.NVarChar, persistenceId.Length) { Value = persistenceId }); @@ -135,7 +66,7 @@ public SqlCommand DeleteMany(string persistenceId, long maxSequenceNr, DateTime return sqlCommand; } - public SqlCommand InsertSnapshot(SnapshotEntry entry) + public DbCommand InsertSnapshot(SnapshotEntry entry) { var sqlCommand = new SqlCommand(_insertSql) { @@ -152,7 +83,7 @@ public SqlCommand InsertSnapshot(SnapshotEntry entry) return sqlCommand; } - public SqlCommand SelectSnapshot(string persistenceId, long maxSequenceNr, DateTime maxTimestamp) + public DbCommand SelectSnapshot(string persistenceId, long maxSequenceNr, DateTime maxTimestamp) { var sqlCommand = new SqlCommand(); sqlCommand.Parameters.Add(new SqlParameter("@PersistenceId", SqlDbType.NVarChar, persistenceId.Length) { Value = persistenceId }); diff --git a/src/contrib/persistence/Akka.Persistence.SqlServer/Snapshot/SqlServerSnapshotStore.cs b/src/contrib/persistence/Akka.Persistence.SqlServer/Snapshot/SqlServerSnapshotStore.cs index d69f9819e05..21061de6caf 100644 --- a/src/contrib/persistence/Akka.Persistence.SqlServer/Snapshot/SqlServerSnapshotStore.cs +++ b/src/contrib/persistence/Akka.Persistence.SqlServer/Snapshot/SqlServerSnapshotStore.cs @@ -1,149 +1,28 @@ -using System.Collections.Generic; +using System.Data.Common; using System.Data.SqlClient; -using System.Threading; -using System.Threading.Tasks; -using Akka.Persistence.Snapshot; +using Akka.Persistence.Sql.Common; +using Akka.Persistence.Sql.Common.Snapshot; namespace Akka.Persistence.SqlServer.Snapshot { /// /// Actor used for storing incoming snapshots into persistent snapshot store backed by SQL Server database. /// - public class SqlServerSnapshotStore : SnapshotStore + public class SqlServerSnapshotStore : DbSnapshotStore { - private readonly SqlServerPersistenceExtension _extension; - private SqlConnection _connection; + private readonly SqlServerSnapshotSettings _settings; - protected readonly LinkedList PendingOperations; - - public SqlServerSnapshotStore() - { - _extension = SqlServerPersistence.Instance.Apply(Context.System); - - var settings = _extension.SnapshotStoreSettings; - QueryBuilder = new DefaultSnapshotQueryBuilder(settings.SchemaName, settings.TableName); - QueryMapper = new DefaultSnapshotQueryMapper(Context.System.Serialization); - PendingOperations = new LinkedList(); - } - - /// - /// Query builder used to convert snapshot store related operations into corresponding SQL queries. - /// - public ISnapshotQueryBuilder QueryBuilder { get; set; } - - /// - /// Query mapper used to map SQL query results into snapshots. - /// - public ISnapshotQueryMapper QueryMapper { get; set; } - - protected override void PreStart() - { - base.PreStart(); - - _connection = new SqlConnection(_extension.SnapshotStoreSettings.ConnectionString); - _connection.Open(); - } - - protected override void PostStop() + public SqlServerSnapshotStore() : base() { - base.PostStop(); - - // stop all operations executed in the background - var node = PendingOperations.First; - while (node != null) - { - var curr = node; - node = node.Next; - - curr.Value.Cancel(); - PendingOperations.Remove(curr); - } - - _connection.Close(); + _settings = SqlServerPersistence.Instance.Apply(Context.System).SnapshotStoreSettings; + QueryBuilder = new DefaultSnapshotQueryBuilder(_settings.SchemaName, _settings.TableName); } - protected override Task LoadAsync(string persistenceId, SnapshotSelectionCriteria criteria) - { - var sqlCommand = QueryBuilder.SelectSnapshot(persistenceId, criteria.MaxSequenceNr, criteria.MaxTimeStamp); - CompleteCommand(sqlCommand); - - var tokenSource = GetCancellationTokenSource(); - return sqlCommand - .ExecuteReaderAsync(tokenSource.Token) - .ContinueWith(task => - { - var reader = task.Result; - try - { - return reader.Read() ? QueryMapper.Map(reader) : null; - } - finally - { - PendingOperations.Remove(tokenSource); - reader.Close(); - } - }, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.AttachedToParent); - } + protected override SnapshotStoreSettings Settings { get { return _settings; } } - protected override Task SaveAsync(SnapshotMetadata metadata, object snapshot) + protected override DbConnection CreateDbConnection() { - var entry = ToSnapshotEntry(metadata, snapshot); - var sqlCommand = QueryBuilder.InsertSnapshot(entry); - CompleteCommand(sqlCommand); - - var tokenSource = GetCancellationTokenSource(); - - return sqlCommand.ExecuteNonQueryAsync(tokenSource.Token) - .ContinueWith(task => - { - PendingOperations.Remove(tokenSource); - }, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.AttachedToParent); - } - - protected override void Saved(SnapshotMetadata metadata) { } - - protected override void Delete(SnapshotMetadata metadata) - { - var sqlCommand = QueryBuilder.DeleteOne(metadata.PersistenceId, metadata.SequenceNr, metadata.Timestamp); - CompleteCommand(sqlCommand); - - sqlCommand.ExecuteNonQuery(); - } - - protected override void Delete(string persistenceId, SnapshotSelectionCriteria criteria) - { - var sqlCommand = QueryBuilder.DeleteMany(persistenceId, criteria.MaxSequenceNr, criteria.MaxTimeStamp); - CompleteCommand(sqlCommand); - - sqlCommand.ExecuteNonQuery(); - } - - private void CompleteCommand(SqlCommand command) - { - command.Connection = _connection; - command.CommandTimeout = (int)_extension.SnapshotStoreSettings.ConnectionTimeout.TotalMilliseconds; - } - - private CancellationTokenSource GetCancellationTokenSource() - { - var source = new CancellationTokenSource(); - PendingOperations.AddLast(source); - return source; - } - - private SnapshotEntry ToSnapshotEntry(SnapshotMetadata metadata, object snapshot) - { - var snapshotType = snapshot.GetType(); - var serializer = Context.System.Serialization.FindSerializerForType(snapshotType); - - var binary = serializer.ToBinary(snapshot); - - return new SnapshotEntry( - persistenceId: metadata.PersistenceId, - sequenceNr: metadata.SequenceNr, - timestamp: metadata.Timestamp, - snapshotType: snapshotType.QualifiedTypeName(), - snapshot: binary); + return new SqlConnection(Settings.ConnectionString); } } } \ No newline at end of file diff --git a/src/core/Akka.MultiNodeTests/Akka.MultiNodeTests.csproj b/src/core/Akka.MultiNodeTests/Akka.MultiNodeTests.csproj index 8121c482765..855a38b74c6 100644 --- a/src/core/Akka.MultiNodeTests/Akka.MultiNodeTests.csproj +++ b/src/core/Akka.MultiNodeTests/Akka.MultiNodeTests.csproj @@ -102,6 +102,9 @@ + + + diff --git a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs index c51f2292438..589ca90d1b6 100644 --- a/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs +++ b/src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs @@ -63,10 +63,11 @@ protected override bool Receive(object message) private void HandleDeleteMessagesTo(DeleteMessagesTo message) { + var eventStream = Context.System.EventStream; DeleteMessagesToAsync(message.PersistenceId, message.ToSequenceNr, message.IsPermanent) .ContinueWith(t => { - if (!t.IsFaulted && CanPublish) Context.System.EventStream.Publish(message); + if (!t.IsFaulted && CanPublish) eventStream.Publish(message); }, _continuationOptions); }