From 742dfa0fd02431d380e310fd8115162120e02db1 Mon Sep 17 00:00:00 2001 From: ffreyer Date: Sun, 4 Jun 2023 00:17:32 +0200 Subject: [PATCH 1/5] add delayed update functionality --- src/Observables.jl | 152 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 138 insertions(+), 14 deletions(-) diff --git a/src/Observables.jl b/src/Observables.jl index a61e891..d0516ff 100644 --- a/src/Observables.jl +++ b/src/Observables.jl @@ -2,6 +2,7 @@ module Observables export Observable, on, off, onany, connect!, obsid, async_latest, throttle export Consume, ObserverFunction, AbstractObservable +export prepare_update!, execute_update!, @combine_updates import Base.Iterators.filter @@ -22,6 +23,42 @@ abstract type AbstractObservable{T} end const addhandler_callbacks = [] const removehandler_callbacks = [] +@enum CallbackState::Int8 begin + UPTODATE + CONSUMED + OUTOFDATE +end + +struct Consume + x::Bool +end +Consume() = Consume(true) + +mutable struct Callback + const f::Any + state::CallbackState +end +Callback(f::Any) = Callback(f, UPTODATE) +(cb::Callback)(val) = Base.invokelatest(cb.f, val) + +function update(cb::Callback, val) + if cb.state == UPTODATE # no need to update again + return false + elseif cb.state == CONSUMED # last update was blocking + return true + else + output = Base.invokelatest(cb.f, val) + if output isa Consume && output.x + cb.state = CONSUMED + return true + else + cb.state = UPTODATE + return false + end + end +end + + """ obs = Observable(val; ignore_equal_values=false) obs = Observable{T}(val; ignore_equal_values=false) @@ -30,17 +67,16 @@ Like a `Ref`, but updates can be watched by adding a handler using [`on`](@ref) Set `ignore_equal_values=true` to not trigger an event for `observable[] = new_value` if `isequal(observable[], new_value)`. """ mutable struct Observable{T} <: AbstractObservable{T} - - listeners::Vector{Pair{Int, Any}} + listeners::Vector{Pair{Int, Callback}} inputs::Vector{Any} # for map!ed Observables ignore_equal_values::Bool val::T function Observable{T}(; ignore_equal_values::Bool=false) where {T} - return new{T}(Pair{Int, Any}[], [], ignore_equal_values) + return new{T}(Pair{Int, Callback}[], [], ignore_equal_values) end function Observable{T}(@nospecialize(val); ignore_equal_values::Bool=false) where {T} - return new{T}(Pair{Int, Any}[], [], ignore_equal_values, val) + return new{T}(Pair{Int, Callback}[], [], ignore_equal_values, val) end end @@ -112,7 +148,7 @@ end # Optimized version of Base.searchsortedlast (optimized for our use case of pairs) -function pair_searchsortedlast(values::Vector{Pair{Int, Any}}, prio::Int)::Int +function pair_searchsortedlast(values::Vector{Pair{Int, Callback}}, prio::Int)::Int u = 1 lo = 0 hi = length(values) + u @@ -128,9 +164,10 @@ function pair_searchsortedlast(values::Vector{Pair{Int, Any}}, prio::Int)::Int end function register_callback(@nospecialize(observable), priority::Int, @nospecialize(f)) - ls = listeners(observable)::Vector{Pair{Int, Any}} + ls = listeners(observable)::Vector{Pair{Int, Callback}} idx = pair_searchsortedlast(ls, priority) - p = Pair{Int, Any}(priority, f) # faster than priority => f because of convert + # faster than priority => f because of convert + p = Pair{Int, Callback}(priority, Callback(f)) insert!(ls, idx + 1, p) return end @@ -152,11 +189,6 @@ Base.convert(::Type{T}, x) where {T<:Observable} = T(x) Base.convert(::Type{Observable{Any}}, x::AbstractObservable{Any}) = x Base.convert(::Type{Observables.Observable{Any}}, x::Observables.Observable{Any}) = x -struct Consume - x::Bool -end -Consume() = Consume(true) - """ notify(observable::AbstractObservable) @@ -165,8 +197,8 @@ Returns true if an event got consumed before notifying every listener. """ function Base.notify(@nospecialize(observable::AbstractObservable)) val = observable[] - for (_, f) in listeners(observable)::Vector{Pair{Int, Any}} - result = Base.invokelatest(f, val) + for (_, f) in listeners(observable)::Vector{Pair{Int, Callback}} + result = f(val) if result isa Consume && result.x # stop calling callbacks if event got consumed return true @@ -175,6 +207,98 @@ function Base.notify(@nospecialize(observable::AbstractObservable)) return false end +# Handling "synchronized" updates +""" + prepare_update!(observable, value) + +Sets `observable.val = value` and marks its listeners as `OUTOFDATE`. To run +the listeners, call `execute_update!(observable)` +""" +function prepare_update!(observable::Observable, val) + observable.val = val + for (_, cb) in listeners(observable)::Vector{Pair{Int, Callback}} + cb.state = OUTOFDATE + end + return +end + +""" + execute_update!(observable::observable) + +Iterates through each listener of the observable. If the listener is marked as +`OUTOFDATE` it executes and updates its state to `CONSUMED` or `UPTODATE` +depending on the return type of the listener. Listeners marked as `UPTODATE` +are skipped and those marked as `CONSUMED` result in termination of the +iteration. +""" +function execute_update!(observable::Observable) + val = observable[] + for (_, cb) in listeners(observable)::Vector{Pair{Int, Callback}} + if update(cb, val) + # stop calling callbacks if event got consumed + return true + end + end + return false +end + +""" + @combine_updates begin + obs1[] = val1 + obs2[] = val2 + end + +This macro delays the execution of listeners until the end of the enclosing +block. It also avoids executing the same listener multiple times if multiple +enclosed observables trigger it. + +The code generated from the above example is + begin + prepare_update!(obs1, val1) + prepare_update!(obs2, val1) + execute_update!(obs1) + execute_update!(obs2) + end +""" +macro combine_updates(block::Expr) + observables = Symbol[] + if block.head != :block + error("Expression should be a begin ... end block.") + end + + println("---| Initial") + dump(block) + _replace_observable_update!.(block.args, (observables,)) + + println("\n---| Replaced") + println("---| ", observables) + dump(block) + for name in unique(observables) + push!(block.args, :(Observables.execute_update!($name))) + end + + println("---| Finalized") + dump(block) + return esc(block) +end + +_replace_observable_update!(::Any, ::Vector{Symbol}) = nothing +function _replace_observable_update!(expr::Expr, observables::Vector{Symbol}) + if expr.head == Symbol("=") && expr.args[1] isa Expr && expr.args[1].head == :ref + # keep track of observable + name = expr.args[1].args[1] + push!(observables, name) + + # switch to prepare_update! call + expr.head = :call + expr.args[1] = :(Observables.prepare_update!) + insert!(expr.args, 2, name) + else + _replace_observable_update!.(expr.args, (observables,)) + end + return +end + function print_value(io::IO, x::Observable{T}; print_listeners=false) where T print(io, "Observable") real_eltype = T From 1903d62cf7078b5d644142ee015672c286c24fa6 Mon Sep 17 00:00:00 2001 From: ffreyer Date: Sun, 4 Jun 2023 10:55:37 +0200 Subject: [PATCH 2/5] drop const pre 1.8 --- src/Observables.jl | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/Observables.jl b/src/Observables.jl index d0516ff..109630a 100644 --- a/src/Observables.jl +++ b/src/Observables.jl @@ -34,10 +34,17 @@ struct Consume end Consume() = Consume(true) +if VERSION >= v"1.8.0" mutable struct Callback const f::Any state::CallbackState end +else + mutable struct Callback + f::Any + state::CallbackState + end +end Callback(f::Any) = Callback(f, UPTODATE) (cb::Callback)(val) = Base.invokelatest(cb.f, val) From 6f71762fa3c4d436b810ffc17039adc6c8b866bb Mon Sep 17 00:00:00 2001 From: ffreyer Date: Sun, 4 Jun 2023 10:55:54 +0200 Subject: [PATCH 3/5] cleanup debug code --- src/Observables.jl | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/src/Observables.jl b/src/Observables.jl index 109630a..10719d7 100644 --- a/src/Observables.jl +++ b/src/Observables.jl @@ -35,10 +35,10 @@ end Consume() = Consume(true) if VERSION >= v"1.8.0" -mutable struct Callback - const f::Any - state::CallbackState -end + mutable struct Callback + const f::Any + state::CallbackState + end else mutable struct Callback f::Any @@ -273,19 +273,12 @@ macro combine_updates(block::Expr) error("Expression should be a begin ... end block.") end - println("---| Initial") - dump(block) _replace_observable_update!.(block.args, (observables,)) - println("\n---| Replaced") - println("---| ", observables) - dump(block) for name in unique(observables) push!(block.args, :(Observables.execute_update!($name))) end - println("---| Finalized") - dump(block) return esc(block) end From 346a0f169ad5d4752047a51e804972e0f15adbe8 Mon Sep 17 00:00:00 2001 From: ffreyer Date: Sun, 4 Jun 2023 10:58:09 +0200 Subject: [PATCH 4/5] fix docstring formatting --- src/Observables.jl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Observables.jl b/src/Observables.jl index 10719d7..54dcf05 100644 --- a/src/Observables.jl +++ b/src/Observables.jl @@ -260,6 +260,7 @@ block. It also avoids executing the same listener multiple times if multiple enclosed observables trigger it. The code generated from the above example is + begin prepare_update!(obs1, val1) prepare_update!(obs2, val1) From 56be7a839951085022bb7974e04b3d07bb905015 Mon Sep 17 00:00:00 2001 From: ffreyer Date: Sun, 4 Jun 2023 11:21:03 +0200 Subject: [PATCH 5/5] add documentation --- docs/src/index.md | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/docs/src/index.md b/docs/src/index.md index 3e047dc..2e34bc2 100644 --- a/docs/src/index.md +++ b/docs/src/index.md @@ -90,6 +90,30 @@ on(obs) do x end ``` +### Synchronous Updates + +If you have a function which combines the contents of multiple Observables you may want to update all of their contents before calling the attached function. A common example for this is broadcasting over array Observables. + +```julia +input1 = Observable([1, 2]) +input2 = Observable([1, 2]) +output = map((a, b) -> tuple.(a, b), input1, input2) +input1[] = [1, 2, 3] +ERROR: DimensionMismatch: arrays could not be broadcast to a common size; got a dimension with lengths 3 and 2 +input2[] = [1, 2, 3] +``` + +After the second update `output` contains `[(1, 1), (2, 2), (3, 3)]` as it should, but it would be nice to avoid the intermediate update which errors. To do that the `@combine_updates` macro can be used. + +```julia +@combine_updates begin + input1[] = [1, 2, 3, 4] + input2[] = [1, 2, 3, 4] +end +``` + +This will update the observables in two steps. First, it will update content of each observable and mark every listener as out of date. Then it will go through all listeners and call the ones marked out of date, respecting priority and `Consume`. You can also do this manually with `prepare_update!(observable, value)` and `execute_update!(observable)`. + ### How is it different from Reactive.jl? The main difference is `Signal`s are manipulated mostly by converting one signal to another. For example, with signals, you can construct a changing UI by creating a `Signal` of UI objects and rendering them as the signal changes. On the other hand, you can use an Observable both as an input and an output. You can arbitrarily attach outputs to inputs allowing structuring code in a [signals-and-slots](http://doc.qt.io/qt-4.8/signalsandslots.html) kind of pattern.