Skip to content
This repository has been archived by the owner on Feb 26, 2021. It is now read-only.

[WIP] Move to pull-streams #16

Merged
merged 5 commits into from
Sep 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ Loading this module through a script tag will make the `Lip2pSpdy` obj available
**As a listener**

```JavaScript
const listener = spdy(socket, true)
const listener = spdy(conn, true)
```

**As a dialer**

```JavaScript
const dialer = spdy(socket, false)
const dialer = spdy(conn, false)
```

#### Opening a multiplex duplex stream
Expand Down Expand Up @@ -97,3 +97,29 @@ dialer.on('error', () => {})
```

note: Works the same on the listener side

### This module uses `pull-streams`

We expose a streaming interface based on `pull-streams`, rather then on the Node.js core streams implementation (aka Node.js streams). `pull-streams` offers us a better mechanism for error handling and flow control guarantees. If you would like to know more about why we did this, see the discussion at this [issue](https://github.com/ipfs/js-ipfs/issues/362).

You can learn more about pull-streams at:

- [The history of Node.js streams, nodebp April 2014](https://www.youtube.com/watch?v=g5ewQEuXjsQ)
- [The history of streams, 2016](http://dominictarr.com/post/145135293917/history-of-streams)
- [pull-streams, the simple streaming primitive](http://dominictarr.com/post/149248845122/pull-streams-pull-streams-are-a-very-simple)
- [pull-streams documentation](https://pull-stream.github.io/)

#### Converting `pull-streams` to Node.js Streams

If you are a Node.js streams user, you can convert a pull-stream to a Node.js stream using the module [`pull-stream-to-stream`](https://github.com/dominictarr/pull-stream-to-stream), giving you an instance of a Node.js stream that is linked to the pull-stream. For example:

```js
const pullToStream = require('pull-stream-to-stream')

const nodeStreamInstance = pullToStream(pullStreamInstance)
// nodeStreamInstance is an instance of a Node.js Stream
```

To learn more about this utility, visit https://pull-stream.github.io/#pull-stream-to-stream.


24 changes: 11 additions & 13 deletions examples/dialer.js
Original file line number Diff line number Diff line change
@@ -1,26 +1,24 @@
'use strict'

const tcp = require('net')
const pull = require('pull-stream')
const toPull = require('stream-to-pull-stream')
const libp2pSPDY = require('../src')

const socket = tcp.connect(9999)
const muxer = libp2pSPDY(socket, false)
const muxer = libp2pSPDY.dialer(toPull(socket))

muxer.on('stream', (stream) => {
console.log('-> got new muxed stream')
stream.on('data', (data) => {
console.log('do I ever get data?', data)
})
stream.pipe(stream)
pull(stream, pull.log, stream)
})

console.log('-> opening a stream from my side')
muxer.newStream((err, stream) => {
if (err) {
throw err
}

console.log('-> opened the stream')
stream.write('hey, how is it going. I am dialer')
stream.end()
const stream = muxer.newStream((err) => {
if (err) throw err
})

pull(
pull.values(['hey, how is it going. I am dialer']),
stream
)
28 changes: 17 additions & 11 deletions examples/listener.js
Original file line number Diff line number Diff line change
@@ -1,30 +1,36 @@
'use strict'

const tcp = require('net')
const pull = require('pull-stream')
const toPull = require('stream-to-pull-stream')
const libp2pSPDY = require('../src')

const listener = tcp.createServer((socket) => {
console.log('-> got connection')

const muxer = libp2pSPDY(socket, true)
const muxer = libp2pSPDY.listener(toPull(socket))

muxer.on('stream', (stream) => {
console.log('-> got new muxed stream')
stream.on('data', (data) => {
console.log('DO I GET DATA?', data)
})
stream.pipe(stream)
pull(
stream,
pull.through((data) => {
console.log('DO I GET DATA?', data)
}),
stream
)
})

console.log('-> opening a stream from my side')
muxer.newStream((err, stream) => {
if (err) {
throw err
}
const stream = muxer.newStream((err) => {
if (err) throw err
console.log('-> opened the stream')
stream.write('hey, how is it going')
stream.end()
})

pull(
pull.values(['hey, how is it going']),
stream
)
})

listener.listen(9999, () => {
Expand Down
8 changes: 4 additions & 4 deletions gulpfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const gulp = require('gulp')
const WSlibp2p = require('libp2p-websockets')
const multiaddr = require('multiaddr')
const pull = require('pull-stream')

const spdy = require('./src')

Expand All @@ -12,14 +13,13 @@ gulp.task('test:browser:before', (done) => {
const ws = new WSlibp2p()
const mh = multiaddr('/ip4/127.0.0.1/tcp/9095/ws')
listener = ws.createListener((transportSocket) => {
const muxedConn = spdy(transportSocket, true)

const muxedConn = spdy.listener(transportSocket)
muxedConn.on('stream', (connRx) => {
const connTx = muxedConn.newStream()
connRx.pipe(connTx)
connTx.pipe(connRx)
pull(connRx, connTx, connRx)
})
})

listener.listen(mh, done)
})

Expand Down
25 changes: 15 additions & 10 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
"main": "lib/index.js",
"jsnext:main": "src/index.js",
"scripts": {
"compliance": "node test/compliance.js | tap-spec",
"lint": "gulp lint",
"build": "gulp build",
"test": "gulp test",
Expand Down Expand Up @@ -35,22 +34,28 @@
},
"homepage": "https://github.com/libp2p/js-libp2p-spdy",
"devDependencies": {
"aegir": "^6.0.0",
"bl": "^1.1.2",
"aegir": "^6.0.1",
"chai": "^3.5.0",
"interface-stream-muxer": "^0.3.1",
"libp2p-tcp": "^0.7.4",
"libp2p-websockets": "^0.7.1",
"interface-stream-muxer": "^0.4.0",
"libp2p-tcp": "^0.8.0",
"libp2p-websockets": "^0.8.0",
"multiaddr": "^2.0.0",
"pre-commit": "^1.1.2",
"pull-file": "^0.5.0",
"pull-pair": "^1.1.0",
"pull-stream": "^3.4.3",
"run-parallel": "^1.1.6",
"stream-pair": "^1.0.3",
"stream-to-pull-stream": "^1.7.0",
"tap-spec": "^4.1.1",
"tape": "^4.2.0"
},
"dependencies": {
"browserify-zlib": "github:ipfs/browserify-zlib",
"interface-connection": "^0.1.8",
"spdy-transport": "^2.0.11"
"interface-connection": "^0.2.1",
"lodash.noop": "^3.0.1",
"pull-stream-to-stream": "^1.3.1",
"spdy-transport": "^2.0.14",
"stream-to-pull-stream": "^1.7.0"
},
"contributors": [
"David Dias <daviddias.p@gmail.com>",
Expand All @@ -59,4 +64,4 @@
"dignifiedquire <dignifiedquire@gmail.com>",
"nginnever <ginneversource@gmail.com>"
]
}
}
78 changes: 20 additions & 58 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,70 +1,32 @@
'use strict'

const spdy = require('spdy-transport')
const Connection = require('interface-connection').Connection
const EE = require('events').EventEmitter
const toStream = require('pull-stream-to-stream')

exports = module.exports = function (conn, isListener) {
const muxer = spdy.connection.create(conn, {
protocol: 'spdy',
isServer: isListener
})

const proxyMuxer = new EE()

muxer.start(3.1)

// method added to enable pure stream muxer feeling
proxyMuxer.newStream = (callback) => {
if (!callback) {
callback = noop
}

const muxedConn = new Connection(muxer.request({
method: 'POST',
path: '/',
headers: {}
}, callback))
const Muxer = require('./muxer')
const SPDY_CODEC = require('./spdy-codec')

if (conn.getObservedAddrs) {
muxedConn.getObservedAddrs = conn.getObservedAddrs.bind(conn)
muxedConn.getPeerInfo = conn.getPeerInfo.bind(conn)
muxedConn.setPeerInfo = conn.setPeerInfo.bind(conn)
}
function create (rawConn, isListener) {
const conn = toStream(rawConn)
// Let it flow, let it flooow
conn.resume()

return muxedConn
}

// The rest of the API comes by default with SPDY
muxer.on('close', () => {
proxyMuxer.emit('close')
})

muxer.on('error', (err) => {
proxyMuxer.emit('error', err)
conn.on('end', () => {
// Cleanup and destroy the connection when it ends
// as the converted stream doesn't emit 'close'
// but .destroy will trigger a 'close' event.
conn.destroy()
})

proxyMuxer.end = (cb) => {
muxer.end(cb)
}

// needed by other spdy impl that need the response headers
// in order to confirm the stream can be open
muxer.on('stream', (stream) => {
stream.respond(200, {})
const muxedConn = new Connection(stream)
if (conn.getObservedAddrs) {
muxedConn.getObservedAddrs = conn.getObservedAddrs.bind(conn)
muxedConn.getPeerInfo = conn.getPeerInfo.bind(conn)
muxedConn.setPeerInfo = conn.setPeerInfo.bind(conn)
}
proxyMuxer.emit('stream', muxedConn)
const spdyMuxer = spdy.connection.create(conn, {
protocol: 'spdy',
isServer: isListener
})

proxyMuxer.multicodec = exports.multicodec
return proxyMuxer
return new Muxer(rawConn, spdyMuxer)
}

exports.multicodec = '/spdy/3.1.0'

function noop () {}
exports = module.exports = create
exports.multicodec = SPDY_CODEC
exports.dialer = (conn) => create(conn, false)
exports.listener = (conn) => create(conn, true)
61 changes: 61 additions & 0 deletions src/muxer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
'use strict'

const EventEmitter = require('events').EventEmitter
const noop = require('lodash.noop')
const Connection = require('interface-connection').Connection
const toPull = require('stream-to-pull-stream')

const SPDY_CODEC = require('./spdy-codec')

module.exports = class Muxer extends EventEmitter {
constructor (conn, spdy) {
super()

this.spdy = spdy
this.conn = conn
this.multicodec = SPDY_CODEC

spdy.start(3.1)

// The rest of the API comes by default with SPDY
spdy.on('close', () => {
this.emit('close')
})

spdy.on('error', (err) => {
this.emit('error', err)
})

// needed by other spdy impl that need the response headers
// in order to confirm the stream can be open
spdy.on('stream', (stream) => {
stream.respond(200, {})
const muxedConn = new Connection(toPull.duplex(stream), this.conn)
this.emit('stream', muxedConn)
})
}

// method added to enable pure stream muxer feeling
newStream (callback) {
if (!callback) {
callback = noop
}
const conn = new Connection(null, this.conn)

this.spdy.request({
method: 'POST',
path: '/',
headers: {}
}, (err, stream) => {
conn.setInnerConn(toPull.duplex(stream), this.conn)

callback(err, conn)
})

return conn
}

end (cb) {
this.spdy.end(cb)
}
}
3 changes: 3 additions & 0 deletions src/spdy-codec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
'use strict'

module.exports = '/spdy/3.1.0'
Loading