Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
fix: pull-stream-to-stream replaced with duplex stream (#809)
Browse files Browse the repository at this point in the history
* fix: pull-stream-to-stream replaced with duplex stream because of end event
  • Loading branch information
pgte authored and daviddias committed Mar 24, 2017
1 parent 835e993 commit 4b064a1
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 3 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ const node = new IPFS({
start: true,
// start: false,
EXPERIMENTAL: { // enable experimental features
pubsub: true
pubsub: true,
sharding: true // enable dir sharding
},
config: { // overload the default config
Addresses: {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
"ipfs-multipart": "~0.1.0",
"ipfs-repo": "~0.13.0",
"ipfs-unixfs": "~0.1.11",
"ipfs-unixfs-engine": "~0.18.0",
"ipfs-unixfs-engine": "~0.19.0",
"ipld-resolver": "~0.11.0",
"isstream": "^0.1.2",
"joi": "^10.3.0",
Expand Down
41 changes: 40 additions & 1 deletion src/core/components/files.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ const promisify = require('promisify-es6')
const multihashes = require('multihashes')
const pull = require('pull-stream')
const sort = require('pull-sort')
const pushable = require('pull-pushable')
const toStream = require('pull-stream-to-stream')
const toPull = require('stream-to-pull-stream')
const CID = require('cids')
const waterfall = require('async/waterfall')
const isStream = require('isstream')
const Duplex = require('stream').Duplex

module.exports = function files (self) {
const createAddPullStream = (options) => {
Expand All @@ -30,7 +32,19 @@ module.exports = function files (self) {
callback = options
options = undefined
}
callback(null, toStream(createAddPullStream(options)))

const addPullStream = createAddPullStream(options)
const p = pushable()
const s = pull(
p,
addPullStream
)

const retStream = new AddStreamDuplex(s, p)

retStream.once('finish', () => p.end())

callback(null, retStream)
},

createAddPullStream: createAddPullStream,
Expand Down Expand Up @@ -164,3 +178,28 @@ function normalizeContent (content) {
}

function noop () {}

class AddStreamDuplex extends Duplex {
constructor (pullStream, push, options) {
super(Object.assign({ objectMode: true }, options))
this._pullStream = pullStream
this._pushable = push
}

_read () {
this._pullStream(null, (end, data) => {
if (end) {
if (end instanceof Error) {
this.emit('error', end)
}
} else {
this.push(data)
}
})
}

_write (chunk, encoding, callback) {
this._pushable.push(chunk)
callback()
}
}

0 comments on commit 4b064a1

Please sign in to comment.