Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
fix: streaming cat over http api (#1760)
Browse files Browse the repository at this point in the history
`/api/v0/cat` calls `ipfs.cat`, but `ipfs.cat` returns a buffer of file output. This changes `/api/v0/cat` to actually stream output by calling `ipfs.catPullStream` instead.

It uses `pull-pushable` in order to catch an initial error in the stream before it starts flowing and instead return a plain JSON response with an appropriate HTTP status.

This isn't ideal as it means there's no backpressure - if the consumer doesn't consume fast enough then data will start to get buffered into memory. However this is significantly better than buffering _everything_ into memory before replying.

License: MIT
Signed-off-by: Alan Shaw <alan.shaw@protocol.ai>
  • Loading branch information
Alan Shaw authored Dec 11, 2018
1 parent 6bfa4dd commit 3ded576
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 27 deletions.
21 changes: 14 additions & 7 deletions src/core/components/files-regular.js
Original file line number Diff line number Diff line change
Expand Up @@ -232,21 +232,28 @@ module.exports = function (self) {

pull(
exporter(ipfsPath, self._ipld, options),
pull.filter(filterFile),
pull.take(1),
pull.collect((err, files) => {
if (err) { return d.abort(err) }
if (files && files.length > 1) {
files = files.filter(filterFile)
if (err) {
return d.abort(err)
}
if (!files || !files.length) {

if (!files.length) {
return d.abort(new Error('No such file'))
}

const file = files[0]
const content = file.content
if (!content && file.type === 'dir') {

if (!file.content && file.type === 'dir') {
return d.abort(new Error('this dag node is a directory'))
}
d.resolve(content)

if (!file.content) {
return d.abort(new Error('this dag node has no content'))
}

d.resolve(file.content)
})
)

Expand Down
55 changes: 35 additions & 20 deletions src/http/api/resources/files-regular.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const toStream = require('pull-stream-to-stream')
const abortable = require('pull-abortable')
const Joi = require('joi')
const ndjson = require('pull-ndjson')
const { PassThrough } = require('readable-stream')

exports = module.exports

Expand Down Expand Up @@ -79,27 +80,41 @@ exports.cat = {
const options = request.pre.args.options
const ipfs = request.server.app.ipfs

ipfs.cat(key, options, (err, stream) => {
if (err) {
log.error(err)
if (err.message === 'No such file') {
reply({ Message: 'No such file', Code: 0, Type: 'error' }).code(500)
} else {
reply({ Message: 'Failed to cat file: ' + err, Code: 0, Type: 'error' }).code(500)
}
return
}
let pusher
let started = false

// hapi is not very clever and throws if no
// - _read method
// - _readableState object
// are there :(
if (!stream._read) {
stream._read = () => {}
stream._readableState = {}
}
return reply(stream).header('X-Stream-Output', '1')
})
pull(
ipfs.catPullStream(key, options),
pull.drain(
chunk => {
if (!started) {
started = true
pusher = pushable()
reply(toStream.source(pusher).pipe(new PassThrough()))
.header('X-Stream-Output', '1')
}
pusher.push(chunk)
},
err => {
if (err) {
log.error(err)

// We already started flowing, abort the stream
if (started) {
return pusher.end(err)
}

const msg = err.message === 'No such file'
? err.message
: 'Failed to cat file: ' + err

return reply({ Message: msg, Code: 0, Type: 'error' }).code(500)
}

pusher.end()
}
)
)
}
}

Expand Down

0 comments on commit 3ded576

Please sign in to comment.