Skip to content
This repository has been archived by the owner on Dec 23, 2021. It is now read-only.

bsless/corelli

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

38 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Archived. Please see more.async.dataflow

Corelli

Arcangelo Corelli:

Arcangelo Corelli was an Italian violinist and composer of the Baroque era. His music was key in the development of the modern genres of sonata and concerto, in establishing the preeminence of the violin, and as the first coalescing of modern tonality and functional harmony.

This library is designed to compose simple core.async processes. It’s probably suitable for micro-services.

The configuration and data model are inspired by Onyx.

Motivation

There is usually little need to go through the logistics and ceremony, when using core.async, to manage the channels, puts and takes, and in most instances, lifecycle, as it naturally emerges from the topology.

Processes’ topologies are usually an emergent phenomenon and not explicitly stated. There is a mix between topology, business logic, and low level async apis.

The idea is to separate the topology of the process from logic as much as possible by providing a data language to describe the data flow, and functions and other vars are to be resolved when “compiling” the model.

Data Model

  • Edges: core.async channels
  • Vertices: processing units, connected by channels. Can be pipes, drivers, sinks.

The graph is describe in terms of two collections:

  • Edges: data describing only the channels, including buffer types, buffer functions, size and transducers.
  • Nodes: data describing a pipeline between two channels, mult, producer or consumer.

Buffers

{:buffer/type :buffer.type/blocking
 :buffer/size 8}

{:buffer/type :buffer.type/sliding
 :buffer/size 8}

{:buffer/type :buffer.type/dropping
 :buffer/size 8}

Channels

{:chan/name :a
 :chan/type :chan.type/simple}

{:chan/name :b
 :chan/type :chan.type/sized
 :chan/size 8}

{:chan/name :c
 :chan/type :chan.type/buffered
 :chan/buffer {:buffer/type :buffer.type/blocking
               :buffer/size 8}}

Extension

Buffers

(defmethod buffer-type :buffer.type/your-new-type [_] ::spec-for-your-type)

(defmethod compile-buffer :buffer.type/your-new-type
  [{:keys [:buffer/arg1 :buffer/arg2]}]
  (your-buffer-fn arg1 arg2))

Channels

(defmethod chan-type :chan.type/your-new-type [_] ::spec-for-your-type)

(defmethod compile-chan :chan.type/your-new-type
  [{:keys [:chan/arg1 :chan/arg2]}]
  (your-chan-fn arg1 arg2))

Workers

(defmethod worker-type :worker.type/your-new-type [_] ::spec-for-your-type)

(defmethod compile-worker :worker.type/your-new-type
  [{:keys [:worker/arg1 :worker/arg2]}]
  (your-worker-fn arg1 arg2))

Usage

  • Define correct sequences of edges and nodes (can be verified using spec).
  • Define the required vars.
  • Try compiling the model using compile-model.

Example

(def model
    {:channels [{:chan/name :in
                 :chan/type :chan.type/sized
                 :chan/size 1}
                {:chan/name :out
                 :chan/type :chan.type/sized
                 :chan/size 1}]
     :workers [{:worker/name :producer
                :worker/type :worker.type/produce
                :worker/produce
                {:produce/chan :in
                 :produce/async? true
                 :produce/fn (let [a (atom 0)]
                               (fn drive []
                                 (Thread/sleep 1000)
                                 (swap! a inc)))}}
               {:worker/name :pipeline
                :worker/type :worker.type/pipeline-blocking
                :worker/pipeline
                {:pipeline/from :in
                 :pipeline/to :out
                 :pipeline/size 4
                 :pipeline/xf (map (fn [x] (println x) (Thread/sleep 2500) x))}}
               {:worker/name :consumer
                :worker/type :worker.type/consume
                :worker/consume
                {:consume/chan :out
                 :consume/fn (fn [x] (println :OUT x))
                 :consume/async? true}}]})

  (s/valid? ::model model)
  (s/valid? ::connected model)
  (def system (compile-model model))

  (a/close! (:in (:chans system)))

Status

Highly experimental. Not even tested yet. Don’t use it.

Roadmap

  • [ ] Tests
  • [ ] Analyze the topology to find any dangling channels or disconnected pipes before instancing the pipes.
  • [ ] Implement select based on alt! and/or alts!.
  • [ ] Find an idiomatic way to connect a web handler as driver.
  • [ ] Refine specs, currently have no way to differentiate transducers from regular functions.

About

Compose clojure core.async processes

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published