From 61785e106765f4b44041de318f6e387d93759e60 Mon Sep 17 00:00:00 2001 From: nlf Date: Tue, 17 May 2022 12:58:10 -0700 Subject: [PATCH] feat: allow external integrity/size source (#110) --- README.md | 13 ++++++++++ lib/content/write.js | 29 ++++++++++++---------- test/content/write.js | 58 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 87 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 6dc11ba..cd39b37 100644 --- a/README.md +++ b/README.md @@ -413,6 +413,19 @@ with an `EINTEGRITY` error. `algorithms` has no effect if this option is present. +##### `opts.integrityEmitter` + +*Streaming only* If present, uses the provided event emitter as a source of +truth for both integrity and size. This allows use cases where integrity is +already being calculated outside of cacache to reuse that data instead of +calculating it a second time. + +The emitter must emit both the `'integrity'` and `'size'` events. + +NOTE: If this option is provided, you must verify that you receive the correct +integrity value yourself and emit an `'error'` event if there is a mismatch. +[ssri Integrity Streams](https://github.com/npm/ssri#integrity-stream) do this for you when given an expected integrity. + ##### `opts.algorithms` Default: ['sha512'] diff --git a/lib/content/write.js b/lib/content/write.js index a877710..62388dc 100644 --- a/lib/content/write.js +++ b/lib/content/write.js @@ -1,5 +1,6 @@ 'use strict' +const events = require('events') const util = require('util') const contentPath = require('./path') @@ -114,6 +115,20 @@ async function handleContent (inputStream, cache, opts) { } async function pipeToTmp (inputStream, cache, tmpTarget, opts) { + const outStream = new fsm.WriteStream(tmpTarget, { + flags: 'wx', + }) + + if (opts.integrityEmitter) { + // we need to create these all simultaneously since they can fire in any order + const [integrity, size] = await Promise.all([ + events.once(opts.integrityEmitter, 'integrity').then(res => res[0]), + events.once(opts.integrityEmitter, 'size').then(res => res[0]), + new Pipeline(inputStream, outStream).promise(), + ]) + return { integrity, size } + } + let integrity let size const hashStream = ssri.integrityStream({ @@ -128,19 +143,7 @@ async function pipeToTmp (inputStream, cache, tmpTarget, opts) { size = s }) - const outStream = new fsm.WriteStream(tmpTarget, { - flags: 'wx', - }) - - // NB: this can throw if the hashStream has a problem with - // it, and the data is fully written. but pipeToTmp is only - // called in promisory contexts where that is handled. - const pipeline = new Pipeline( - inputStream, - hashStream, - outStream - ) - + const pipeline = new Pipeline(inputStream, hashStream, outStream) await pipeline.promise() return { integrity, size } } diff --git a/test/content/write.js b/test/content/write.js index f565767..787fb65 100644 --- a/test/content/write.js +++ b/test/content/write.js @@ -1,6 +1,8 @@ 'use strict' +const events = require('events') const fs = require('@npmcli/fs') +const Minipass = require('minipass') const path = require('path') const rimraf = require('rimraf') const ssri = require('ssri') @@ -32,6 +34,62 @@ t.test('basic put', (t) => { }) }) +t.test('basic put, providing external integrity emitter', async (t) => { + const CACHE = t.testdir() + const CONTENT = 'foobarbaz' + const INTEGRITY = ssri.fromData(CONTENT) + + const write = t.mock('../../lib/content/write.js', { + ssri: { + ...ssri, + integrityStream: () => { + throw new Error('Should not be called') + }, + }, + }) + + const source = new Minipass().end(CONTENT) + + const tee = new Minipass() + + const integrityStream = ssri.integrityStream() + // since the integrityStream is not going anywhere, we need to manually resume it + // otherwise it'll get stuck in paused mode and will never process any data events + integrityStream.resume() + const integrityStreamP = Promise.all([ + events.once(integrityStream, 'integrity').then((res) => res[0]), + events.once(integrityStream, 'size').then((res) => res[0]), + ]) + + const contentStream = write.stream(CACHE, { integrityEmitter: integrityStream }) + const contentStreamP = Promise.all([ + events.once(contentStream, 'integrity').then((res) => res[0]), + events.once(contentStream, 'size').then((res) => res[0]), + contentStream.promise(), + ]) + + tee.pipe(integrityStream) + tee.pipe(contentStream) + source.pipe(tee) + + const [ + [ssriIntegrity, ssriSize], + [contentIntegrity, contentSize], + ] = await Promise.all([ + integrityStreamP, + contentStreamP, + ]) + + t.equal(ssriSize, CONTENT.length, 'ssri got the right size') + t.equal(contentSize, CONTENT.length, 'content got the right size') + t.same(ssriIntegrity, INTEGRITY, 'ssri got the right integrity') + t.same(contentIntegrity, INTEGRITY, 'content got the right integrity') + + const cpath = contentPath(CACHE, ssriIntegrity) + t.ok(fs.lstatSync(cpath).isFile(), 'content inserted as a single file') + t.equal(fs.readFileSync(cpath, 'utf8'), CONTENT, 'contents are identical to inserted content') +}) + t.test("checks input digest doesn't match data", (t) => { const CONTENT = 'foobarbaz' const integrity = ssri.fromData(CONTENT)