Skip to content

ismasan/plumber.js

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

77 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Plumber JS

Pipes everywhere. Work in progress.

Motivation

MVC-style JS frameworks (Backbone.js, others) encourage a proliferation of public methods and custom events. This makes apps difficult to understand and debug, pretty quickly. It also means that apps end up being imperative code all the way through - that is implementaion details all the way through.

This toolkit attempts to focus JS app design around the idea of composable pipes with a constrained API. If MVC encourages few nouns and many verbs, Plumber.js encourages many nouns and very few verbs. It also encourages composition and encapsulation, traceability and a declarative style that focuses on describing the relationship between your app's components rather than their implementation.

Inspiration for this library can be found in ZeroMQ, Node's streams interface and other JS libs such as Angular.js.

Structs

Structs hold data and emit change events when the data change. Nothing more. They are the base value objects that you pass around. You can define methods on them but you probably shouldn't. They are not models.

var mario = new Plumber.Struct({name: 'Mario'})

mario.on('change:name', function () {
  console.log('Name changed!')
})

mario.set('name', 'Luigi') // triggers `change:name` event

mario.get('name') // "Luigi"

Pipes

This is the core type in Plumber.js. You add and remove structs from pipes.

A pipe implements add, remove and pipe. Pipes can be chained.

var p1 = new Plumber.Pipe()
var p2 = new Plumber.Pipe()
var p3 = new Plumber.Pipe()

p1.pipe(p2).pipe(p3)

p3.on('add', function (struct) {
  console.log('p3 received data!', struct)
})

p1.add(mario) // forwards mario to other pipes downstream, including p2 and then p3

You can implement you own pipes with data filters.

var MarioFilter = Plumber.Pipe.extend({
  filter: function (struct, filterPromise) {
    if(struct.get('name') == 'Mario') filterPromise.resolve(struct)
    else filterPromise.reject(struct)
  }
})

var filter = new MarioFilter()

filter.pipe(p2).pipe(p3)

filter.add(new Plumber.Struct({name: 'Mario'})) // p2 and p3 get struct added
filter.add(new Plumber.Struct({name: 'Luigi'})) // filter is rejected. p2 and p3 DO NOT get struct added

Filters expect a struct and a promise (jQuery.Deferred), so you can implement asynchronous filters (example: Ajax lookups)

var MarioFilter = Plumber.Pipe.extend({
  filter: function (struct, filterPromise) {
    $.get('http://some.api.com/valid_mario', {name: struct.get('name')}).then(
      function () { filterPromise.resolve(struct) }, // success
      function () { filterPromise.reject(struct) } // error
    )
  }
})

A pipe's public API of add, remove and pipe is composed of methods that you can customize to get specialized behaviour. The add lifecycle breaks down as follows:

add(struct)
  filter(struct, filterPromise)
    _add(struct, addPromise)
      _formwardAdd(struct)

add returns a promise, so multiple add() operations can be chained:

$.when(pipe1.add(struct), pipe2.add(struct), pipe3.add(struct)).then(...)

Indexes

An index is a pipe that keeps track of structs added to it. From the outside, an index is just a pipe with add, remove and pipe methods, but it also does:

  • forward existing data onto newly piped pipes.
  • when adding a struct already in the index, it updates its attributes and triggers change events in it instead of forwading it to other pipes.
  • Wrap raw data in Structs if not already a struct.

Forward existing data

var index = new Plumber.Index()
index.add(mario)
index.add(luigi)

index.pipe(another_pipe) // `another_pipe` will get mario and luigi added to it now

index.add(princess) // 'another_pipe' gets princess added to it.

Do not forward existing data

index.add(mario) // forwards to other pipes

mario.set(age: 30)

index.add(mario) // does not forward.

Wrap raw data

index.add({name: 'Mario'}) // wraps data into Plumber.Struct before forwarding.

You can initialize an index with a custom Struct subclass to use as the data wrapper.

var index = new Plumber.Index(Person)

Custom indexes

Plumber.Index can be subclassed as well. The following example implements a "capped index" that removes older structs over a limit of 10

var CappedIndex = Plumber.Index.extend({
  limit: 10,
  
  _add: function (item, promise) {
    // remove first if limit reached
    if(this._list.length > this.limit - 1) this.remove(this._list[0])
    // add next
    return Plumber.Index.prototype._add.call(this, item, promise)
  }
  
})

In fact the last example is part of the built-in "devices" described below.

Devices

Devices are pipes that wrap other pipes and posibly implement routing strategies. You can build your own devices by reimplementing the internals of add, remove or pipe.

Plumber.js ships with the following basic devices:

Capped Index

A "capped index" can hold a defined number of structs at a time. Older structs are removed as new ones are added. This is useful for building rolling views, infinite scrolling, real time charts and others where you only want to keep a limited number of data points in memory or view.

var index = new Plumber.Devices.CappedIndex(10) // holds up to 10 structs.

index.pipe(someViewPipe)

for(var i = 0; i < 100; i++) {
  index.add(new Struct({name: i}))
}

The previous example will start removeing from the beginning of the list after struct number 10. Calls to remove will be forwarded to any piped pipes.

Choke Point

A choke point adds to a set of pipes and waits for all of them to complete before forwarding to other pipes.

var p1 = new Plumber.Pipe()
var p2 = new SomeAjaxPipe()
var results = new Plumber.Pipe()
var choke = new Plumber.Devices.ChokePoint(p1, p2)

choke.pipe(results)

choke.add(struct) // won't pipe to `results` until both p1 and p2 have finished adding., which may be asynchronous.

Leaky Pipeline

Wraps multiple pipes and chains them using their pipe method.

This pipeline is "leaky" because data added to pipes in the chain will be forwarded on to the other pipes in the chain.

var p1 = new Plumber.Pipe()
var p2 = new SomeAjaxPipe()
var results = new Plumber.Pipe()
var pipeline = new Plumber.Devices.LeakyPipeline(p1, p2)

pipeline.add(struct) // will forward struct to p1, p2 and finally pipe on to results pipe

p2.add(struct) // will also forward down to results pipe

Tight Pipeline

Wraps multiple pipes, chain their #add and #remove calls sequentially while allowing for async execution.

This pipeline is "tight" because it can only be initiated by calling add on the pipeline itself.

var p1 = new Plumber.Pipe()
var p2 = new SomeAjaxPipe()
var results = new Plumber.Pipe()
var pipeline = new Plumber.Devices.TightPipeline(p1, p2)

pipeline.add(struct) // will forward struct to p1, p2 and finally pipe on to results pipe

Ventilator

Two-way ventilator that can be use for fan-in and fan-out behaviour.

  • It will pipe any messages added to pipes in the IN array to all pipes in the OUT array.
  • Messages added to pipes in the IN array will also be piped normally through Ventilator#pipe()

Fan-in example

var in1 = new Plumber.Pipe()  
var in2 = new Plumber.Pipe()
var results = new Plumber.Pipe()
var ventilator = new Plumber.Devices.Ventilator([in1, in2])
ventilator.pipe(results)

in1.add(struct) // forwards struct to out1 and out2

Fan-out example

var ventilator = new Plumber.Devices.Ventilator(null, [out1, out2])
ventilator.add(struct) // forwards struct to out1 and out2

Many-to-many

var ventilator = new Plumber.Devices.Ventilator([in1, in2], [out1, out2])

in1.add(struct) // forwards struct to out1 and out2
in2.add(struct) // forwards struct to out1 and out2

Router

Declare routing functions and pipe instances to route to.

var p1 = new Plumber.Pipe()
var p2 = new lumber.Pipe()
var default = new Plumber.Pipe()
var router = new Plumber.Devices.Router()

router
 .route(p1, function (struct, promise) {
     if(struct.get('name') == 'Joe') promise.resolve()
     else promise.reject()
  })
  .route(p2, function (struct, promise) {
     if(struct.get('name') == 'Jane') promise.resolve()
     else promise.reject()
  })
  .default(default)

router.add(new Plumber.Struct({name: 'Joe'})) // forwards struct to p1
router.add(new Plumber.Struct({name: 'Jane'})) // forwards struct to p2
router.add(new Plumber.Struct({name: 'Paul'})) // forwards struct to default

router.pipe(other) // pipes all data to `other`.

TimedAggregator

Timed aggregator (or "reducer") that buffers structs as per user-provided delay and runs an aggregate function. After delay has passed it forwards aggregated struct to pipes downstream. Useful for building time-interval charts and widgets, aggregated logs, etc.

Examples:

// Define your custom aggregator
var Counter = Plumber.Devices.TimedAggregator.extend({
  startValues: {
    count: 0
  },
 
  aggregate: function (mem, struct) {
    mem.set('count', mem.get('count') + 1)
  }
})

// Use it.
var aggregator = new Counter(200)  
var results = new Plumber.Pipe()
aggregator.pipe(results)

aggregator.add(new Plumber.Struct({count: 2})) // aggregates into "mem" struct.
aggregator.add(new Plumber.Struct({count: 5}))

// ... After 200 milliseconds, a "mem" struct is piped to results with a `count` value of 7

Custom device

Implementing your own device or piping strategies by overriding any of the following hooks:

var CustomDevice = Plumber.Pipe.extend({
  
  filter: function (struct, filterPromise) {...},
  
  _add: function (struct, addPromise) {...},
  
  _remove: function (struct, removePromise) {...},
  
  _forwardAdd: function (struct) {...},
  
  _forwardRemove: function (struct) {...}
})

Repositories

A "repository" is used to initiate a data stream or data request. A repository is a pipe so it can be piped to other pipes, wrapped, decorated, etc.

An example Ajax repository:

// users.pipe(results).get('http://some.api.com/users')
var Users = Plumber.Pipe.extend({
  
  // Get from remote API
  get: function (url) {
    var self = this
    
    $.getJSON(url).then(function (data) {
      data.forEach(function (user) {
        self.add(user)    
      })
    })
    return this
  }
})

A more advanced repository that keeps its own internal index

var Users = Plumber.Pipe.extend({
  
  // A repository can have it's own index
  initialize: function () {
    this.__index = new Plumber.Index(UserStruct)  
    // pipe index to itself so repository can pipe to other pipes transparently
    this.__index.pipe(this)
  },
  
  // Get from remote API
  get: function (url) {
    var self = this
    
    $.getJSON(url).then(function (data) {
      data.forEach(function (user) {
        self.__index.add(user)    
      })
    })
    return this
  }
})

As any other pipe, repositories can be composed into devices or piped to and from other pipes.

The following example implements saving data to a server by customising the _add hook.

var Users = Plumber.Pipe.extend({
  
  _add: function (struct, addPromise) {
    // send data to server
    $.ajax(this.url, {
      type: 'post',
      dataType: 'json',
      data: struct.attributes
    }).then(function (data) {
      struct.set(data) // update struct with data coming from server
      addPromise.resolve(struct)
    })
  }
})

Developing

This library uses my jbundle command line for help with bundling and minifying source files.

Tests are in the test directory.

Run the test server to serve bundled source files

$ jbundle s

Run Jasmine test suite by opening test/index.html in your browser.

Source files are in the src directory.

Bundle manifest is in the JFile file.

About

[WiP] Plumbing and pipelines for JavaScript apps

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published