Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FSharp API: fixed problem with JObject deserialization #1203

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/core/Akka.FSharp.Tests/Akka.FSharp.Tests.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@
<Name>Akka.FSharp</Name>
<Project>{81574240-BC31-4BE4-B447-ADF0D32F4246}</Project>
</ProjectReference>
<ProjectReference Include="..\Akka.Remote\Akka.Remote.csproj">
<Name>Akka.Remote</Name>
<Project>{ea4ff8fd-7c53-49c8-b9aa-02e458b3e6a7}</Project>
<Private>True</Private>
</ProjectReference>
<ProjectReference Include="..\Akka.TestKit\Akka.TestKit.csproj">
<Name>Akka.TestKit</Name>
<Project>{0D3CBAD0-BBDB-43E5-AFC4-ED1D3ECDC224}</Project>
Expand Down
61 changes: 29 additions & 32 deletions src/core/Akka.FSharp.Tests/ApiTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -39,39 +39,36 @@ type TestUnion2 =
| C of string * TestUnion
| D of int


[<Fact>]
let ``can serialize discriminated unions`` () =
let x = B (23,"hello")
use sys = System.create "system" (Configuration.defaultConfig())
let serializer = sys.Serialization.FindSerializerFor x
let bytes = serializer.ToBinary x
let des = serializer.FromBinary (bytes, typeof<TestUnion>) :?> TestUnion
des
|> equals x
let ``can serialize and deserialize discriminated unions over remote nodes`` () =
let remoteConfig port =
sprintf """
akka {
actor {
ask-timeout = 5s
provider = "Akka.Remote.RemoteActorRefProvider, Akka.Remote"
}
remote {
helios.tcp {
port = %i
hostname = localhost
}
}
}
""" port
|> Configuration.parse

[<Fact>]
let ``can serialize nested discriminated unions`` () =
let x = C("bar",B (23,"hello"))
use sys = System.create "system" (Configuration.defaultConfig())
let serializer = sys.Serialization.FindSerializerFor x
let bytes = serializer.ToBinary x
let des = serializer.FromBinary (bytes, typeof<TestUnion2>) :?> TestUnion2
des
|> equals x
use server = System.create "server-system" (remoteConfig 9911)
use client = System.create "client-system" (remoteConfig 0)

type testType1 =
string * int
let aref =
spawne client "a-1" <@ actorOf2 (fun mailbox msg ->
match msg with
| C("a-11", B(11, "a-12")) -> mailbox.Sender() <! msg
| _ -> mailbox.Unhandled msg) @>
[SpawnOption.Deploy (Deploy(RemoteScope (Address.Parse "akka.tcp://server-system@localhost:9911")))]
let msg = C("a-11", B(11, "a-12"))
let response = aref <? msg |> Async.RunSynchronously
response
|> equals msg

type testType2 =
| V2 of testType1

[<Fact>]
let MyTest () =
let x = V2("hello!",123)
use sys = System.create "system" (Configuration.defaultConfig())
let serializer = sys.Serialization.FindSerializerFor x
let bytes = serializer.ToBinary x
let des = serializer.FromBinary (bytes, typeof<testType2>) :?> testType2
des
|> equals x
99 changes: 67 additions & 32 deletions src/core/Akka.FSharp/FsApi.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,66 @@ open System
open Microsoft.FSharp.Quotations
open Microsoft.FSharp.Linq.QuotationEvaluation

module Serialization =
open Nessos.FsPickler
open Akka.Serialization

let internal serializeToBinary (fsp:BinarySerializer) o =
use stream = new System.IO.MemoryStream()
fsp.Serialize(stream, o)
stream.ToArray()

let internal deserializeFromBinary<'t> (fsp:BinarySerializer) (bytes: byte array) =
use stream = new System.IO.MemoryStream(bytes)
fsp.Deserialize<'t> stream

let private jobjectType = Type.GetType("Newtonsoft.Json.Linq.JObject, Newtonsoft.Json")
let private jsonSerlizerType = Type.GetType("Newtonsoft.Json.JsonSerializer, Newtonsoft.Json")
let private toObjectMethod = jobjectType.GetMethod("ToObject", [|typeof<System.Type>; jsonSerlizerType|])

let tryDeserializeJObject jsonSerializer o : 'Message option =
let t = typeof<'Message>
if o <> null && o.GetType().Equals jobjectType
then
try
let res = toObjectMethod.Invoke(o, [|t; jsonSerializer|])
Some (res :?> 'Message)
with
| _ -> None // type conversion failed (passed JSON is not of expected type)
else None


// used for top level serialization
type ExprSerializer(system) =
inherit Serializer(system)
let fsp = FsPickler.CreateBinary()
override __.Identifier = 9
override __.IncludeManifest = true
override __.ToBinary(o) = serializeToBinary fsp o
override __.FromBinary(bytes, _) = deserializeFromBinary fsp bytes


let internal exprSerializationSupport (system: ActorSystem) =
let serializer = ExprSerializer(system :?> ExtendedActorSystem)
system.Serialization.AddSerializer(serializer)
system.Serialization.AddSerializationMap(typeof<Expr>, serializer)

[<AutoOpen>]
module Actors =
open System.Threading.Tasks

let private tryCast (t:Task<obj>) : 'Message =
match t.Result with
| :? 'Message as m -> m
| o ->
let context = Akka.Actor.Internal.InternalCurrentActorCellKeeper.Current
if context = null
then failwith "Cannot cast JObject outside the actor system context "
else
let serializer = context.System.Serialization.FindSerializerForType typeof<'Message> :?> Akka.Serialization.NewtonSoftJsonSerializer
match Serialization.tryDeserializeJObject serializer.Serializer o with
| Some m -> m
| None -> raise (InvalidCastException("Tried to cast JObject to " + typeof<'Message>.ToString()))

/// <summary>
/// Unidirectional send operator.
Expand All @@ -26,7 +83,9 @@ module Actors =
/// Bidirectional send operator. Sends a message object directly to actor
/// tracked by actorRef and awaits for response send back from corresponding actor.
/// </summary>
let inline (<?) (tell : #ICanTell) (msg : obj) : Async<'Message> = tell.Ask<'Message> msg |> Async.AwaitTask
let (<?) (tell : #ICanTell) (msg : obj) : Async<'Message> =
tell.Ask(msg).ContinueWith(Func<_,'Message>(tryCast), TaskContinuationOptions.AttachedToParent|||TaskContinuationOptions.ExecuteSynchronously)
|> Async.AwaitTask

/// Pipes an output of asynchronous expression directly to the recipients mailbox.
let pipeTo (computation : Async<'T>) (recipient : ICanTell) (sender : IActorRef) : unit =
Expand Down Expand Up @@ -239,16 +298,20 @@ module Actors =
member __.Stash() = (this :> IWithUnboundedStash).Stash.Stash()
member __.Unstash() = (this :> IWithUnboundedStash).Stash.Unstash()
member __.UnstashAll() = (this :> IWithUnboundedStash).Stash.UnstashAll() }

new(actor : Expr<Actor<'Message> -> Cont<'Message, 'Returned>>) = FunActor(actor.Compile () ())
member __.Sender() : IActorRef = base.Sender
member __.Unhandled msg = base.Unhandled msg
override x.OnReceive msg =
match state with
| Func f ->
match msg with
| :? 'Message as matched -> state <- f matched
| _ -> x.Unhandled msg
| :? 'Message as m -> state <- f m
| _ ->
let serializer = UntypedActor.Context.System.Serialization.FindSerializerForType typeof<obj> :?> Akka.Serialization.NewtonSoftJsonSerializer
match Serialization.tryDeserializeJObject serializer.Serializer msg with
| Some(m) -> state <- f m
| None -> x.Unhandled msg
| Return _ -> x.PostStop()
override x.PostStop() =
base.PostStop ()
Expand Down Expand Up @@ -337,35 +400,7 @@ module Linq =
type Expression =
static member ToExpression(f : System.Linq.Expressions.Expression<System.Func<FunActor<'Message, 'v>>>) = toExpression<FunActor<'Message, 'v>> f
static member ToExpression<'Actor>(f : Quotations.Expr<(unit -> 'Actor)>) = toExpression<'Actor> (QuotationEvaluator.ToLinqExpression f)

module Serialization =
open Nessos.FsPickler
open Akka.Serialization

let internal serializeToBinary (fsp:BinarySerializer) o =
use stream = new System.IO.MemoryStream()
fsp.Serialize(stream, o)
stream.ToArray()

let internal deserializeFromBinary<'t> (fsp:BinarySerializer) (bytes: byte array) =
use stream = new System.IO.MemoryStream(bytes)
fsp.Deserialize<'t> stream

// used for top level serialization
type ExprSerializer(system) =
inherit Serializer(system)
let fsp = FsPickler.CreateBinary()
override __.Identifier = 9
override __.IncludeManifest = true
override __.ToBinary(o) = serializeToBinary fsp o
override __.FromBinary(bytes, _) = deserializeFromBinary fsp bytes


let internal exprSerializationSupport (system: ActorSystem) =
let serializer = ExprSerializer(system :?> ExtendedActorSystem)
system.Serialization.AddSerializer(serializer)
system.Serialization.AddSerializationMap(typeof<Expr>, serializer)

[<RequireQualifiedAccess>]
module Configuration =

Expand Down
36 changes: 29 additions & 7 deletions src/core/Akka.Persistence.FSharp/FsApi.fs
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,25 @@ type FunPersistentActor<'Command, 'Event, 'State>(aggregate: Aggregate<'Command,
member __.SaveSnapshot state = this.SaveSnapshot(state)
member __.DeleteSnapshot seqNr timestamp = this.DeleteSnapshot(seqNr, timestamp)
member __.DeleteSnapshots criteria = this.DeleteSnapshots(criteria) }

member __.Sender() : IActorRef = base.Sender
member __.Unhandled msg = base.Unhandled msg
override x.OnCommand (msg: obj) =
match msg with
| :? 'Command as cmd -> aggregate.exec mailbox state cmd
| _ -> () // ignore?
| _ ->
let serializer = UntypedActor.Context.System.Serialization.FindSerializerForType typeof<obj> :?> Akka.Serialization.NewtonSoftJsonSerializer
match Serialization.tryDeserializeJObject serializer.Serializer msg with
| Some(cmd) -> aggregate.exec mailbox state cmd
| None -> x.Unhandled msg
override x.OnRecover (msg: obj) =
match msg with
| :? 'Event as e -> state <- aggregate.apply mailbox state e
| _ -> () // ignore?
| _ ->
let serializer = UntypedActor.Context.System.Serialization.FindSerializerForType typeof<obj> :?> Akka.Serialization.NewtonSoftJsonSerializer
match Serialization.tryDeserializeJObject serializer.Serializer msg with
| Some(e) -> state <- aggregate.apply mailbox state e
| None -> x.Unhandled msg
override x.PostStop () =
base.PostStop ()
List.iter (fun fn -> fn()) deferables
Expand Down Expand Up @@ -265,8 +273,14 @@ type FunPersistentView<'Event, 'State>(perspective: Perspective<'Event, 'State>,
match msg with
| :? 'Event as e ->
state <- perspective.apply mailbox state e
true
| _ -> false // ignore?
true
| _ ->
let serializer = UntypedActor.Context.System.Serialization.FindSerializerForType typeof<obj> :?> Akka.Serialization.NewtonSoftJsonSerializer
match Serialization.tryDeserializeJObject serializer.Serializer msg with
| Some(e) ->
state <- perspective.apply mailbox state e
true
| None -> false
override x.PostStop () =
base.PostStop ()
List.iter (fun fn -> fn()) deferables
Expand Down Expand Up @@ -335,12 +349,20 @@ type Deliverer<'Command, 'Event, 'State>(aggregate: DeliveryAggregate<'Command,
override x.ReceiveCommand (msg: obj) =
match msg with
| :? 'Command as cmd -> aggregate.exec mailbox state cmd
| _ -> () // ignore?
| _ ->
let serializer = UntypedActor.Context.System.Serialization.FindSerializerForType typeof<obj> :?> Akka.Serialization.NewtonSoftJsonSerializer
match Serialization.tryDeserializeJObject serializer.Serializer msg with
| Some(cmd) -> aggregate.exec mailbox state cmd
| None -> x.Unhandled msg
true
override x.ReceiveRecover (msg: obj) =
match msg with
| :? 'Event as e -> state <- aggregate.apply mailbox state e
| _ -> () // ignore?
| _ ->
let serializer = UntypedActor.Context.System.Serialization.FindSerializerForType typeof<obj> :?> Akka.Serialization.NewtonSoftJsonSerializer
match Serialization.tryDeserializeJObject serializer.Serializer msg with
| Some(e) -> state <- aggregate.apply mailbox state e
| None -> x.Unhandled msg
true
override x.PostStop () =
base.PostStop ()
Expand Down
1 change: 1 addition & 0 deletions src/core/Akka/Serialization/NewtonSoftJsonSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class NewtonSoftJsonSerializer : Serializer
private readonly JsonSerializer _serializer;

public JsonSerializerSettings Settings { get { return _settings; } }
public object Serializer { get { return _serializer; } }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mark this field as internal here? Any reason to make it public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To implement this workaround, field must be public (or reacher via reflection) and it must return object and not direct serializer (to maintain Akka.FSharp free from direct dependency on JSON.NET).


/// <summary>
/// Initializes a new instance of the <see cref="NewtonSoftJsonSerializer" /> class.
Expand Down