diff --git a/README.md b/README.md index 229da0378b..121164bec9 100644 --- a/README.md +++ b/README.md @@ -232,7 +232,8 @@ const node = new IPFS({ start: true, // start: false, EXPERIMENTAL: { // enable experimental features - pubsub: true + pubsub: true, + sharding: true // enable dir sharding }, config: { // overload the default config Addresses: { diff --git a/package.json b/package.json index 59e29db7cc..ca27039d3d 100644 --- a/package.json +++ b/package.json @@ -108,7 +108,7 @@ "ipfs-multipart": "~0.1.0", "ipfs-repo": "~0.13.0", "ipfs-unixfs": "~0.1.11", - "ipfs-unixfs-engine": "~0.18.0", + "ipfs-unixfs-engine": "~0.19.0", "ipld-resolver": "~0.11.0", "isstream": "^0.1.2", "joi": "^10.3.0", diff --git a/src/core/components/files.js b/src/core/components/files.js index 7f142edf18..3c371da981 100644 --- a/src/core/components/files.js +++ b/src/core/components/files.js @@ -8,11 +8,13 @@ const promisify = require('promisify-es6') const multihashes = require('multihashes') const pull = require('pull-stream') const sort = require('pull-sort') +const pushable = require('pull-pushable') const toStream = require('pull-stream-to-stream') const toPull = require('stream-to-pull-stream') const CID = require('cids') const waterfall = require('async/waterfall') const isStream = require('isstream') +const Duplex = require('stream').Duplex module.exports = function files (self) { const createAddPullStream = (options) => { @@ -30,7 +32,19 @@ module.exports = function files (self) { callback = options options = undefined } - callback(null, toStream(createAddPullStream(options))) + + const addPullStream = createAddPullStream(options) + const p = pushable() + const s = pull( + p, + addPullStream + ) + + const retStream = new AddStreamDuplex(s, p) + + retStream.once('finish', () => p.end()) + + callback(null, retStream) }, createAddPullStream: createAddPullStream, @@ -164,3 +178,28 @@ function normalizeContent (content) { } function noop () {} + +class AddStreamDuplex extends Duplex { + constructor (pullStream, push, options) { + super(Object.assign({ objectMode: true }, options)) + this._pullStream = pullStream + this._pushable = push + } + + _read () { + this._pullStream(null, (end, data) => { + if (end) { + if (end instanceof Error) { + this.emit('error', end) + } + } else { + this.push(data) + } + }) + } + + _write (chunk, encoding, callback) { + this._pushable.push(chunk) + callback() + } +}