From 373f69e0a5ac15fa73cf9fbf45988f219bd7099e Mon Sep 17 00:00:00 2001 From: Marcin Rataj Date: Wed, 3 Apr 2019 14:17:54 +0200 Subject: [PATCH] fix(gateway): streaming compressed payload This change simplifies code responsible for streaming response and makes the streaming actually work by telling the payload compression stream to flush its content on every read(). (previous version was buffering entire thing in Hapi's compressor memory) We also do content-type detection based on the beginning of the stream by peeking at first `fileType.minimumBytes` bytes. License: MIT Signed-off-by: Marcin Rataj --- package.json | 1 + src/http/gateway/resources/gateway.js | 99 ++++++++++++++------------- test/gateway/index.js | 6 ++ 3 files changed, 57 insertions(+), 49 deletions(-) diff --git a/package.json b/package.json index 3436d3359e..ae5d1eb0c9 100644 --- a/package.json +++ b/package.json @@ -88,6 +88,7 @@ "bl": "^3.0.0", "boom": "^7.2.0", "bs58": "^4.0.1", + "buffer-peek-stream": "^1.0.1", "byteman": "^1.3.5", "cid-tool": "~0.2.0", "cids": "~0.5.8", diff --git a/src/http/gateway/resources/gateway.js b/src/http/gateway/resources/gateway.js index cd2ce1c433..6f1d324675 100644 --- a/src/http/gateway/resources/gateway.js +++ b/src/http/gateway/resources/gateway.js @@ -3,13 +3,12 @@ const debug = require('debug') const log = debug('ipfs:http-gateway') log.error = debug('ipfs:http-gateway:error') -const pull = require('pull-stream') -const pushable = require('pull-pushable') -const toStream = require('pull-stream-to-stream') + const fileType = require('file-type') const mime = require('mime-types') const { PassThrough } = require('readable-stream') const Boom = require('boom') +const peek = require('buffer-peek-stream') const { resolver } = require('ipfs-http-response') const PathUtils = require('../utils/path') @@ -30,6 +29,20 @@ function detectContentType (ref, chunk) { return mime.contentType(mimeType) } +// Enable streaming of compressed payload +// https://github.com/hapijs/hapi/issues/3599 +class ResponseStream extends PassThrough { + _read (size) { + super._read(size) + if (this._compressor) { + this._compressor.flush() + } + } + setCompressor (compressor) { + this._compressor = compressor + } +} + module.exports = { checkCID (request, h) { if (!request.params.cid) { @@ -85,58 +98,46 @@ module.exports = { return h.redirect(PathUtils.removeTrailingSlash(ref)).permanent(true) } - return new Promise((resolve, reject) => { - let pusher - let started = false - - pull( - ipfs.catPullStream(data.cid), - pull.drain( - chunk => { - if (!started) { - started = true - pusher = pushable() - const res = h.response(toStream.source(pusher).pipe(new PassThrough())) - - // Etag maps directly to an identifier for a specific version of a resource - res.header('Etag', `"${data.cid}"`) + const rawStream = ipfs.catReadableStream(data.cid) + const responseStream = new ResponseStream() + + // Pass-through Content-Type sniffing over initial bytes + const contentType = await new Promise((resolve, reject) => { + try { + const peekBytes = fileType.minimumBytes + peek(rawStream, peekBytes, (err, streamHead, outputStream) => { + if (err) { + log.error(err) + return reject(err) + } + outputStream.pipe(responseStream) + resolve(detectContentType(ref, streamHead)) + }) + } catch (err) { + log.error(err) + reject(err) + } + }) - // Set headers specific to the immutable namespace - if (ref.startsWith('/ipfs/')) { - res.header('Cache-Control', 'public, max-age=29030400, immutable') - } + const res = h.response(responseStream) - const contentType = detectContentType(ref, chunk) + // Etag maps directly to an identifier for a specific version of a resource + res.header('Etag', `"${data.cid}"`) - log('ref ', ref) - log('mime-type ', contentType) + // Set headers specific to the immutable namespace + if (ref.startsWith('/ipfs/')) { + res.header('Cache-Control', 'public, max-age=29030400, immutable') + } - if (contentType) { - log('writing content-type header') - res.header('Content-Type', contentType) - } + log('ref ', ref) + log('content-type ', contentType) - resolve(res) - } - pusher.push(chunk) - }, - err => { - if (err) { - log.error(err) - - // We already started flowing, abort the stream - if (started) { - return pusher.end(err) - } - - return reject(err) - } + if (contentType) { + log('writing content-type header') + res.header('Content-Type', contentType) + } - pusher.end() - } - ) - ) - }) + return res }, afterHandler (request, h) { diff --git a/test/gateway/index.js b/test/gateway/index.js index f8836700fb..51c59ebc70 100644 --- a/test/gateway/index.js +++ b/test/gateway/index.js @@ -156,10 +156,15 @@ describe('HTTP Gateway', function () { expect(res.statusCode).to.equal(200) expect(res.rawPayload).to.eql(bigFile) + expect(res.headers['x-ipfs-path']).to.equal(`/ipfs/${bigFileHash}`) + expect(res.headers['etag']).to.equal(`"${bigFileHash}"`) + expect(res.headers['cache-control']).to.equal('public, max-age=29030400, immutable') + expect(res.headers['content-type']).to.equal('application/octet-stream') }) it('load a jpg file', async () => { const kitty = 'QmW2WQi7j6c7UgJTarActp7tDNikE4B2qXtFCfLPdsgaTQ/cat.jpg' + const kittyDirectCid = 'Qmd286K6pohQcTKYqnS1YhWrCiS4gz7Xi34sdwMe9USZ7u' const res = await gateway.inject({ method: 'GET', @@ -169,6 +174,7 @@ describe('HTTP Gateway', function () { expect(res.statusCode).to.equal(200) expect(res.headers['content-type']).to.equal('image/jpeg') expect(res.headers['x-ipfs-path']).to.equal('/ipfs/' + kitty) + expect(res.headers['etag']).to.equal(`"${kittyDirectCid}"`) expect(res.headers['cache-control']).to.equal('public, max-age=29030400, immutable') expect(res.headers.etag).to.equal('"Qmd286K6pohQcTKYqnS1YhWrCiS4gz7Xi34sdwMe9USZ7u"') expect(res.headers.suborigin).to.equal('ipfs000bafybeidsg6t7ici2osxjkukisd5inixiunqdpq2q5jy4a2ruzdf6ewsqk4')