Archived. Please see more.async.dataflow
Arcangelo Corelli was an Italian violinist and composer of the Baroque era. His music was key in the development of the modern genres of sonata and concerto, in establishing the preeminence of the violin, and as the first coalescing of modern tonality and functional harmony.
This library is designed to compose simple core.async processes. It’s probably suitable for micro-services.
The configuration and data model are inspired by Onyx.
There is usually little need to go through the logistics and ceremony, when using core.async, to manage the channels, puts and takes, and in most instances, lifecycle, as it naturally emerges from the topology.
Processes’ topologies are usually an emergent phenomenon and not explicitly stated. There is a mix between topology, business logic, and low level async apis.
The idea is to separate the topology of the process from logic as much as possible by providing a data language to describe the data flow, and functions and other vars are to be resolved when “compiling” the model.
- Edges: core.async channels
- Vertices: processing units, connected by channels. Can be pipes, drivers, sinks.
The graph is describe in terms of two collections:
- Edges: data describing only the channels, including buffer types, buffer functions, size and transducers.
- Nodes: data describing a pipeline between two channels, mult, producer or consumer.
{:buffer/type :buffer.type/blocking
:buffer/size 8}
{:buffer/type :buffer.type/sliding
:buffer/size 8}
{:buffer/type :buffer.type/dropping
:buffer/size 8}
{:chan/name :a
:chan/type :chan.type/simple}
{:chan/name :b
:chan/type :chan.type/sized
:chan/size 8}
{:chan/name :c
:chan/type :chan.type/buffered
:chan/buffer {:buffer/type :buffer.type/blocking
:buffer/size 8}}
(defmethod buffer-type :buffer.type/your-new-type [_] ::spec-for-your-type)
(defmethod compile-buffer :buffer.type/your-new-type
[{:keys [:buffer/arg1 :buffer/arg2]}]
(your-buffer-fn arg1 arg2))
(defmethod chan-type :chan.type/your-new-type [_] ::spec-for-your-type)
(defmethod compile-chan :chan.type/your-new-type
[{:keys [:chan/arg1 :chan/arg2]}]
(your-chan-fn arg1 arg2))
(defmethod worker-type :worker.type/your-new-type [_] ::spec-for-your-type)
(defmethod compile-worker :worker.type/your-new-type
[{:keys [:worker/arg1 :worker/arg2]}]
(your-worker-fn arg1 arg2))
- Define correct sequences of edges and nodes (can be verified using spec).
- Define the required vars.
- Try compiling the model using
compile-model
.
(def model
{:channels [{:chan/name :in
:chan/type :chan.type/sized
:chan/size 1}
{:chan/name :out
:chan/type :chan.type/sized
:chan/size 1}]
:workers [{:worker/name :producer
:worker/type :worker.type/produce
:worker/produce
{:produce/chan :in
:produce/async? true
:produce/fn (let [a (atom 0)]
(fn drive []
(Thread/sleep 1000)
(swap! a inc)))}}
{:worker/name :pipeline
:worker/type :worker.type/pipeline-blocking
:worker/pipeline
{:pipeline/from :in
:pipeline/to :out
:pipeline/size 4
:pipeline/xf (map (fn [x] (println x) (Thread/sleep 2500) x))}}
{:worker/name :consumer
:worker/type :worker.type/consume
:worker/consume
{:consume/chan :out
:consume/fn (fn [x] (println :OUT x))
:consume/async? true}}]})
(s/valid? ::model model)
(s/valid? ::connected model)
(def system (compile-model model))
(a/close! (:in (:chans system)))
Highly experimental. Not even tested yet. Don’t use it.
- [ ] Tests
- [ ] Analyze the topology to find any dangling channels or disconnected pipes before instancing the pipes.
- [ ] Implement
select
based onalt!
and/oralts!
. - [ ] Find an idiomatic way to connect a web handler as driver.
- [ ] Refine specs, currently have no way to differentiate transducers from regular functions.