From 0e601a75267e8fe5402849f8a4c04ed7b64fd034 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Fri, 20 Apr 2018 17:04:50 +0100 Subject: [PATCH] fix: files.add with pull streams --- package.json | 1 + src/core/components/files.js | 81 +++++++++++++++++++++++------------- 2 files changed, 52 insertions(+), 30 deletions(-) diff --git a/package.json b/package.json index fb7720e60f..c8208502e0 100644 --- a/package.json +++ b/package.json @@ -116,6 +116,7 @@ "ipfs-unixfs-engine": "~0.29.0", "ipld": "^0.17.0", "is-ipfs": "^0.3.2", + "is-pull-stream": "0.0.0", "is-stream": "^1.1.0", "joi": "^13.2.0", "joi-browser": "^13.0.1", diff --git a/src/core/components/files.js b/src/core/components/files.js index c1a8f4cb7b..1df23848a4 100644 --- a/src/core/components/files.js +++ b/src/core/components/files.js @@ -12,6 +12,7 @@ const toPull = require('stream-to-pull-stream') const deferred = require('pull-defer') const waterfall = require('async/waterfall') const isStream = require('is-stream') +const isSource = require('is-pull-stream').isSource const Duplex = require('readable-stream').Duplex const OtherBuffer = require('buffer').Buffer const CID = require('cids') @@ -60,6 +61,10 @@ function normalizeContent (opts, content) { data = { path: '', content: toPull.source(data) } } + if (isSource(data)) { + data = { path: '', content: data } + } + if (data && data.content && typeof data.content !== 'function') { if (Buffer.isBuffer(data.content)) { data.content = pull.values([data.content]) @@ -196,40 +201,56 @@ module.exports = function files (self) { } return { - add: promisify((data, options = {}, callback) => { - if (typeof options === 'function') { - callback = options - options = {} - } else if (!callback || typeof callback !== 'function') { - callback = noop - } + add: (() => { + const add = promisify((data, options = {}, callback) => { + if (typeof options === 'function') { + callback = options + options = {} + } else if (!callback || typeof callback !== 'function') { + callback = noop + } - const ok = Buffer.isBuffer(data) || - isStream.readable(data) || - Array.isArray(data) || - OtherBuffer.isBuffer(data) || - typeof data === 'object' + const ok = Buffer.isBuffer(data) || + isStream.readable(data) || + Array.isArray(data) || + OtherBuffer.isBuffer(data) || + typeof data === 'object' || + isSource(data) - if (!ok) { - return callback(new Error('first arg must be a buffer, readable stream, an object or array of objects')) - } + if (!ok) { + return callback(new Error('first arg must be a buffer, readable stream, pull stream, an object or array of objects')) + } - // CID v0 is for multihashes encoded with sha2-256 - if (options.hashAlg && options.cidVersion !== 1) { - options.cidVersion = 1 - } + // CID v0 is for multihashes encoded with sha2-256 + if (options.hashAlg && options.cidVersion !== 1) { + options.cidVersion = 1 + } - pull( - pull.values([data]), - _addPullStream(options), - sort((a, b) => { - if (a.path < b.path) return 1 - if (a.path > b.path) return -1 - return 0 - }), - pull.collect(callback) - ) - }), + pull( + pull.values([data]), + _addPullStream(options), + sort((a, b) => { + if (a.path < b.path) return 1 + if (a.path > b.path) return -1 + return 0 + }), + pull.collect(callback) + ) + }) + + return function () { + const args = Array.from(arguments) + + // If we files.add(), then promisify thinks the pull stream + // is a callback! Add an empty options object in this case so that a + // promise is returned. + if (args.length === 1 && isSource(args[0])) { + args.push({}) + } + + return add.apply(null, args) + } + })(), addReadableStream: (options) => { options = options || {}