diff --git a/.aegir.js b/.aegir.js new file mode 100644 index 00000000..e99bbba0 --- /dev/null +++ b/.aegir.js @@ -0,0 +1,7 @@ +'use strict' + +module.exports = { + karma: { + browserNoActivityTimeout: 1000 * 1000, + } +} diff --git a/README.md b/README.md index b893980a..f8c491da 100644 --- a/README.md +++ b/README.md @@ -19,13 +19,22 @@ ## Table of Contents -- [Install](#install) -- [Usage](#usage) - - [Example](#example) - - [API](#api) - - [exporter(cid, ipld)](#exportercid-ipld-options) -- [Contribute](#contribute) -- [License](#license) +- [ipfs-unixfs-exporter](#ipfs-unixfs-exporter) + - [Lead Maintainer](#lead-maintainer) + - [Table of Contents](#table-of-contents) + - [Install](#install) + - [Usage](#usage) + - [Example](#example) + - [API](#api) + - [`exporter(cid, ipld)`](#exportercid-ipld) + - [UnixFS V1 entries](#unixfs-v1-entries) + - [Raw entries](#raw-entries) + - [CBOR entries](#cbor-entries) + - [`entry.content({ offset, length })`](#entrycontent-offset-length) + - [`exporter.path(cid, ipld)`](#exporterpathcid-ipld) + - [`exporter.recursive(cid, ipld)`](#exporterrecursivecid-ipld) + - [Contribute](#contribute) + - [License](#license) ## Install @@ -38,29 +47,41 @@ ### Example ```js -// Create an export source pull-stream cid or ipfs path you want to export and a -// to fetch the file from +// import a file and export it again +const importer = require('ipfs-unixfs-importer') const exporter = require('ipfs-unixfs-exporter') -const pull = require('pull-stream/pull') -const { stdout } = require('pull-stdio') - -const options = {} - -pull( - exporter(cid, ipld, options), - collect((error, files) => { - if (error) { - // ...handle error - } - - // Set up a pull stream that sends the file content to process.stdout - pull( - // files[0].content is a pull-stream that contains the bytes of the file - files[0].content, - stdout() - ) - }) -) + +const files = [] + +for await (const file of importer([{ + path: '/foo/bar.txt', + content: Buffer.from(0, 1, 2, 3) +}], ipld)) { + files.push(file) +} + +console.info(files[0].cid) // Qmbaz + +const entry = await exporter(files[0].cid, ipld) + +console.info(entry.cid) // Qmqux +console.info(entry.path) // Qmbaz/foo/bar.txt +console.info(entry.name) // bar.txt +console.info(entry.unixfs.fileSize()) // 4 + +// stream content from unixfs node +const bytes = [] + +for await (const buf of entry.content({ + offset: 0, // optional offset + length: 4 // optional length +})) { + bytes.push(buf) +} + +const content = Buffer.concat(bytes) + +console.info(content) // 0, 1, 2, 3 ``` #### API @@ -69,124 +90,146 @@ pull( const exporter = require('ipfs-unixfs-exporter') ``` -### exporter(cid, ipld, options) +### `exporter(cid, ipld)` -Uses the given [dag API][] or an [ipld-resolver instance][] to fetch an IPFS [UnixFS][] object(s) by their CID. +Uses the given [js-ipld instance][] to fetch an IPFS node by it's CID. -Creates a new pull stream that outputs objects of the form +Returns a Promise which resolves to an `entry`. -```js +#### UnixFS V1 entries + +Entries with a `dag-pb` codec `CID` return UnixFS V1 entries: + +```javascript { - path: 'a name', - content: + name: 'foo.txt', + path: 'Qmbar/foo.txt', + cid: CID, // see https://github.com/multiformats/js-cid + node: DAGNode, // see https://github.com/ipld/js-ipld-dag-pb + content: function, // returns an async iterator + unixfs: UnixFS // see https://github.com/ipfs/js-ipfs-unixfs } ``` -#### `offset` and `length` +If the entry is a file, `entry.content()` returns an async iterator that yields one or more buffers containing the file content: -`offset` and `length` arguments can optionally be passed to the exporter function. These will cause the returned stream to only emit bytes starting at `offset` and with length of `length`. +```javascript +if (entry.unixfs.type === 'file') { + for await (const chunk of entry.content()) { + // chunk is a Buffer + } +} +``` -See [the tests](test/exporter.js) for examples of using these arguments. +If the entry is a directory or hamt shard, `entry.content()` returns further `entry` objects: -```js -const exporter = require('ipfs-unixfs-exporter') -const pull = require('pull-stream') -const drain = require('pull-stream/sinks/drain') - -pull( - exporter(cid, ipld, { - offset: 0, - length: 10 - }) - drain((file) => { - // file.content is a pull stream containing only the first 10 bytes of the file - }) -) +```javascript +if (entry.unixfs.type.includes('directory')) { // can be 'directory' or 'hamt-sharded-directory' + for await (const entry of dir.content()) { + console.info(entry.name) + } +} ``` -### `fullPath` +#### Raw entries -If specified the exporter will emit an entry for every path component encountered. +Entries with a `raw` codec `CID` return raw entries: ```javascript -const exporter = require('ipfs-unixfs-exporter') -const pull = require('pull-stream') -const collect = require('pull-stream/sinks/collect') - -pull( - exporter('QmFoo.../bar/baz.txt', ipld, { - fullPath: true - }) - collect((err, files) => { - console.info(files) - - // [{ - // depth: 0, - // name: 'QmFoo...', - // path: 'QmFoo...', - // size: ... - // cid: CID - // content: undefined - // type: 'dir' - // }, { - // depth: 1, - // name: 'bar', - // path: 'QmFoo.../bar', - // size: ... - // cid: CID - // content: undefined - // type: 'dir' - // }, { - // depth: 2, - // name: 'baz.txt', - // path: 'QmFoo.../bar/baz.txt', - // size: ... - // cid: CID - // content: - // type: 'file' - // }] - // - }) -) +{ + name: 'foo.txt', + path: 'Qmbar/foo.txt', + cid: CID, // see https://github.com/multiformats/js-cid + node: Buffer, // see https://nodejs.org/api/buffer.html + content: function, // returns an async iterator +} ``` -### `maxDepth` +`entry.content()` returns an async iterator that yields a buffer containing the node content: -If specified the exporter will only emit entries up to the specified depth. +```javascript +for await (const chunk of entry.content()) { + // chunk is a Buffer +} +``` + +Unless you an options object containing `offset` and `length` keys as an argument to `entry.content()`, `chunk` will be equal to `entry.node`. + +#### CBOR entries + +Entries with a `dag-cbor` codec `CID` return JavaScript object entries: ```javascript -const exporter = require('ipfs-unixfs-exporter') -const pull = require('pull-stream') -const collect = require('pull-stream/sinks/collect') - -pull( - exporter('QmFoo.../bar/baz.txt', ipld, { - fullPath: true, - maxDepth: 1 - }) - collect((err, files) => { - console.info(files) - - // [{ - // depth: 0, - // name: 'QmFoo...', - // path: 'QmFoo...', - // size: ... - // cid: CID - // content: undefined - // type: 'dir' - // }, { - // depth: 1, - // name: 'bar', - // path: 'QmFoo.../bar', - // size: ... - // cid: CID - // content: undefined - // type: 'dir' - // }] - // - }) -) +{ + name: 'foo.txt', + path: 'Qmbar/foo.txt', + cid: CID, // see https://github.com/multiformats/js-cid + node: Object, // see https://github.com/ipld/js-ipld-dag-cbor +} +``` + +There is no `content` function for a `CBOR` node. + + +#### `entry.content({ offset, length })` + +When `entry` is a file or a `raw` node, `offset` and/or `length` arguments can be passed to `entry.content()` to return slices of data: + +```javascript +const bufs = [] + +for await (const chunk of entry.content({ + offset: 0, + length: 5 +})) { + bufs.push(chunk) +} + +// `data` contains the first 5 bytes of the file +const data = Buffer.concat(bufs) +``` + +If `entry` is a directory or hamt shard, passing `offset` and/or `length` to `entry.content()` will limit the number of files returned from the directory. + +```javascript +const entries = [] + +for await (const entry of dir.content({ + offset: 0, + length: 5 +})) { + entries.push(entry) +} + +// `entries` contains the first 5 files/directories in the directory +``` + +### `exporter.path(cid, ipld)` + +`exporter.path` will return an async iterator that yields entries for all segments in a path: + +```javascript +const entries = [] + +for await (const entry of exporter.path('Qmfoo/foo/bar/baz.txt', ipld)) { + entries.push(entry) +} + +// entries contains 4x `entry` objects +``` + +### `exporter.recursive(cid, ipld)` + +`exporter.recursive` will return an async iterator that yields all entries beneath a given CID or IPFS path, as well as the containing directory. + +```javascript +const entries = [] + +for await (const child of exporter.recursive('Qmfoo/foo/bar', ipld)) { + entries.push(entry) +} + +// entries contains all children of the `Qmfoo/foo/bar` directory and it's children ``` [dag API]: https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/DAG.md diff --git a/package.json b/package.json index c4e18f86..676f11ee 100644 --- a/package.json +++ b/package.json @@ -17,7 +17,8 @@ "release": "aegir release", "release-minor": "aegir release --type minor", "release-major": "aegir release --type major", - "coverage": "aegir coverage" + "coverage": "nyc -s npm run test:node && nyc report --reporter=html", + "dep-check": "aegir dep-check" }, "repository": { "type": "git", @@ -37,29 +38,27 @@ "homepage": "https://github.com/ipfs/js-ipfs-unixfs-exporter#readme", "devDependencies": { "aegir": "^18.0.2", + "async-iterator-all": "^1.0.0", + "async-iterator-buffer-stream": "^1.0.0", + "async-iterator-first": "^1.0.0", "chai": "^4.2.0", "detect-node": "^2.0.4", "dirty-chai": "^2.0.1", - "ipld": "~0.21.1", - "ipld-dag-pb": "~0.15.2", + "ipld": "~0.24.0", + "ipld-dag-pb": "~0.17.0", "ipld-in-memory": "^2.0.0", - "pull-pushable": "^2.2.0", - "pull-stream-to-stream": "^1.3.4", - "pull-zip": "^2.0.1", - "sinon": "^7.1.0", - "stream-to-pull-stream": "^1.7.2" + "multicodec": "~0.5.1", + "multihashes": "~0.4.14", + "nyc": "^14.0.0", + "sinon": "^7.1.0" }, "dependencies": { - "async": "^2.6.1", - "cids": "~0.5.5", - "hamt-sharding": "0.0.2", + "async-iterator-last": "^1.0.0", + "cids": "~0.7.1", + "err-code": "^1.1.2", + "hamt-sharding": "~0.0.2", "ipfs-unixfs": "~0.1.16", - "ipfs-unixfs-importer": "~0.38.0", - "pull-cat": "^1.1.11", - "pull-defer": "~0.2.3", - "pull-paramap": "^1.2.2", - "pull-stream": "^3.6.9", - "pull-traverse": "^1.0.3" + "ipfs-unixfs-importer": "~0.38.5" }, "contributors": [ "Alan Shaw ", diff --git a/src/dir-flat.js b/src/dir-flat.js deleted file mode 100644 index 53161e4f..00000000 --- a/src/dir-flat.js +++ /dev/null @@ -1,53 +0,0 @@ -'use strict' - -const pull = require('pull-stream/pull') -const values = require('pull-stream/sources/values') -const filter = require('pull-stream/throughs/filter') -const map = require('pull-stream/throughs/map') -const cat = require('pull-cat') - -// Logic to export a unixfs directory. -module.exports = dirExporter - -function dirExporter (cid, node, name, path, pathRest, resolve, dag, parent, depth, options) { - const accepts = pathRest[0] - - const dir = { - name: name, - depth: depth, - path: path, - cid, - size: 0, - type: 'dir' - } - - // we are at the max depth so no need to descend into children - if (options.maxDepth && options.maxDepth <= depth) { - return values([dir]) - } - - const streams = [ - pull( - values(node.links), - filter((item) => accepts === undefined || item.name === accepts), - map((link) => ({ - depth: depth + 1, - size: 0, - name: link.name, - path: path + '/' + link.name, - cid: link.cid, - linkName: link.name, - pathRest: pathRest.slice(1), - type: 'dir' - })), - resolve - ) - ] - - // place dir before if not specifying subtree - if (!pathRest.length || options.fullPath) { - streams.unshift(values([dir])) - } - - return cat(streams) -} diff --git a/src/dir-hamt-sharded.js b/src/dir-hamt-sharded.js deleted file mode 100644 index b9ade11c..00000000 --- a/src/dir-hamt-sharded.js +++ /dev/null @@ -1,203 +0,0 @@ -'use strict' - -const defer = require('pull-defer') -const pull = require('pull-stream/pull') -const error = require('pull-stream/sources/error') -const values = require('pull-stream/sources/values') -const filter = require('pull-stream/throughs/filter') -const map = require('pull-stream/throughs/map') -const cat = require('pull-cat') -const Bucket = require('hamt-sharding/src/bucket') -const DirSharded = require('ipfs-unixfs-importer/src/importer/dir-sharded') -const waterfall = require('async/waterfall') - -// Logic to export a unixfs directory. -module.exports = shardedDirExporter - -function shardedDirExporter (cid, node, name, path, pathRest, resolve, dag, parent, depth, options) { - let dir - if (!parent || (parent.path !== path)) { - dir = { - name: name, - depth: depth, - path: path, - cid, - size: 0, - type: 'dir' - } - } - - // we are at the max depth so no need to descend into children - if (options.maxDepth && options.maxDepth <= depth) { - return values([dir]) - } - - if (!pathRest.length) { - // return all children - - const streams = [ - pull( - values(node.links), - map((link) => { - // remove the link prefix (2 chars for the bucket index) - const entryName = link.name.substring(2) - const entryPath = entryName ? path + '/' + entryName : path - - return { - depth: entryName ? depth + 1 : depth, - name: entryName, - path: entryPath, - cid: link.cid, - pathRest: entryName ? pathRest.slice(1) : pathRest, - parent: dir || parent - } - }), - resolve - ) - ] - - // place dir before if not specifying subtree - streams.unshift(values([dir])) - - return cat(streams) - } - - const deferred = defer.source() - const targetFile = pathRest[0] - - // recreate our level of the HAMT so we can load only the subshard in pathRest - waterfall([ - (cb) => { - if (!options.rootBucket) { - options.rootBucket = new Bucket({ - hashFn: DirSharded.hashFn - }) - options.hamtDepth = 1 - - return addLinksToHamtBucket(node.links, options.rootBucket, options.rootBucket, cb) - } - - return addLinksToHamtBucket(node.links, options.lastBucket, options.rootBucket, cb) - }, - (cb) => findPosition(targetFile, options.rootBucket, cb), - (position, cb) => { - let prefix = toPrefix(position.pos) - const bucketPath = toBucketPath(position) - - if (bucketPath.length > (options.hamtDepth)) { - options.lastBucket = bucketPath[options.hamtDepth] - - prefix = toPrefix(options.lastBucket._posAtParent) - } - - const streams = [ - pull( - values(node.links), - map((link) => { - const entryPrefix = link.name.substring(0, 2) - const entryName = link.name.substring(2) - const entryPath = entryName ? path + '/' + entryName : path - - if (entryPrefix !== prefix) { - // not the entry or subshard we're looking for - return false - } - - if (entryName && entryName !== targetFile) { - // not the entry we're looking for - return false - } - - if (!entryName) { - // we are doing to descend into a subshard - options.hamtDepth++ - } else { - // we've found the node we are looking for, remove the context - // so we don't affect further hamt traversals - delete options.rootBucket - delete options.lastBucket - delete options.hamtDepth - } - - return { - depth: entryName ? depth + 1 : depth, - name: entryName, - path: entryPath, - cid: link.cid, - pathRest: entryName ? pathRest.slice(1) : pathRest, - parent: dir || parent - } - }), - filter(Boolean), - resolve - ) - ] - - if (options.fullPath) { - streams.unshift(values([dir])) - } - - cb(null, streams) - } - ], (err, streams) => { - if (err) { - return deferred.resolve(error(err)) - } - - deferred.resolve(cat(streams)) - }) - - return deferred -} - -const addLinksToHamtBucket = (links, bucket, rootBucket, callback) => { - Promise.all( - links.map(link => { - if (link.name.length === 2) { - const pos = parseInt(link.name, 16) - - return bucket._putObjectAt(pos, new Bucket({ - hashFn: DirSharded.hashFn - }, bucket, pos)) - } - - return rootBucket.put(link.name.substring(2), true) - }) - ) - .then(() => callback(), callback) -} - -const toPrefix = (position) => { - return position - .toString('16') - .toUpperCase() - .padStart(2, '0') - .substring(0, 2) -} - -const findPosition = (file, bucket, cb) => { - bucket._findNewBucketAndPos(file) - .then(position => { - if (!cb) { - // would have errored in catch block above - return - } - - cb(null, position) - }, cb) -} - -const toBucketPath = (position) => { - let bucket = position.bucket - const path = [] - - while (bucket._parent) { - path.push(bucket) - - bucket = bucket._parent - } - - path.push(bucket) - - return path.reverse() -} diff --git a/src/file.js b/src/file.js deleted file mode 100644 index 0078a9b0..00000000 --- a/src/file.js +++ /dev/null @@ -1,216 +0,0 @@ -'use strict' - -const traverse = require('pull-traverse') -const UnixFS = require('ipfs-unixfs') -const pull = require('pull-stream/pull') -const values = require('pull-stream/sources/values') -const error = require('pull-stream/sources/error') -const once = require('pull-stream/sources/once') -const empty = require('pull-stream/sources/empty') -const filter = require('pull-stream/throughs/filter') -const flatten = require('pull-stream/throughs/flatten') -const map = require('pull-stream/throughs/map') -const paramap = require('pull-paramap') -const extractDataFromBlock = require('./extract-data-from-block') - -// Logic to export a single (possibly chunked) unixfs file. -module.exports = (cid, node, name, path, pathRest, resolve, dag, parent, depth, options) => { - const accepts = pathRest[0] - - if (accepts !== undefined && accepts !== path) { - return empty() - } - - let file - - try { - file = UnixFS.unmarshal(node.data) - } catch (err) { - return error(err) - } - - const fileSize = file.fileSize() - - let offset = options.offset - let length = options.length - - if (offset < 0) { - return error(new Error('Offset must be greater than or equal to 0')) - } - - if (offset > fileSize) { - return error(new Error('Offset must be less than the file size')) - } - - if (length < 0) { - return error(new Error('Length must be greater than or equal to 0')) - } - - if (length === 0) { - return once({ - depth: depth, - content: once(Buffer.alloc(0)), - name: name, - path: path, - cid, - size: fileSize, - type: 'file' - }) - } - - if (!offset) { - offset = 0 - } - - if (!length || (offset + length > fileSize)) { - length = fileSize - offset - } - - const content = streamBytes(dag, node, fileSize, offset, length) - - return values([{ - depth: depth, - content: content, - name: name, - path: path, - cid, - size: fileSize, - type: 'file' - }]) -} - -function streamBytes (dag, node, fileSize, offset, length) { - if (offset === fileSize || length === 0) { - return once(Buffer.alloc(0)) - } - - const end = offset + length - - return pull( - traverse.depthFirst({ - node, - start: 0, - end: fileSize - }, getChildren(dag, offset, end)), - map(extractData(offset, end)), - filter(Boolean) - ) -} - -function getChildren (dag, offset, end) { - // as we step through the children, keep track of where we are in the stream - // so we can filter out nodes we're not interested in - let streamPosition = 0 - - return function visitor ({ node }) { - if (Buffer.isBuffer(node)) { - // this is a leaf node, can't traverse any further - return empty() - } - - let file - - try { - file = UnixFS.unmarshal(node.data) - } catch (err) { - return error(err) - } - - const nodeHasData = Boolean(file.data && file.data.length) - - // handle case where data is present on leaf nodes and internal nodes - if (nodeHasData && node.links.length) { - streamPosition += file.data.length - } - - // work out which child nodes contain the requested data - const filteredLinks = node.links - .map((link, index) => { - const child = { - link: link, - start: streamPosition, - end: streamPosition + file.blockSizes[index], - size: file.blockSizes[index] - } - - streamPosition = child.end - - return child - }) - .filter((child) => { - return (offset >= child.start && offset < child.end) || // child has offset byte - (end > child.start && end <= child.end) || // child has end byte - (offset < child.start && end > child.end) // child is between offset and end bytes - }) - - if (filteredLinks.length) { - // move stream position to the first node we're going to return data from - streamPosition = filteredLinks[0].start - } - - return pull( - once(filteredLinks), - paramap((children, cb) => { - dag.getMany(children.map(child => child.link.cid), (err, results) => { - if (err) { - return cb(err) - } - - cb(null, results.map((result, index) => { - const child = children[index] - - return { - start: child.start, - end: child.end, - node: result, - size: child.size - } - })) - }) - }), - flatten() - ) - } -} - -function extractData (requestedStart, requestedEnd) { - let streamPosition = -1 - - return function getData ({ node, start, end }) { - let block - - if (Buffer.isBuffer(node)) { - block = node - } else { - try { - const file = UnixFS.unmarshal(node.data) - - if (!file.data) { - if (file.blockSizes.length) { - return - } - - return Buffer.alloc(0) - } - - block = file.data - } catch (err) { - throw new Error(`Failed to unmarshal node - ${err.message}`) - } - } - - if (block && block.length) { - if (streamPosition === -1) { - streamPosition = start - } - - const output = extractDataFromBlock(block, streamPosition, requestedStart, requestedEnd) - - streamPosition += block.length - - return output - } - - return Buffer.alloc(0) - } -} diff --git a/src/index.js b/src/index.js index 361b9894..e5f840fa 100644 --- a/src/index.js +++ b/src/index.js @@ -1,118 +1,109 @@ 'use strict' -const pull = require('pull-stream/pull') -const values = require('pull-stream/sources/values') -const error = require('pull-stream/sources/error') -const filter = require('pull-stream/throughs/filter') -const map = require('pull-stream/throughs/map') +const errCode = require('err-code') const CID = require('cids') +const resolve = require('./resolvers') +const last = require('async-iterator-last') -const createResolver = require('./resolve').createResolver - -function pathBaseAndRest (path) { - // Buffer -> raw multihash or CID in buffer - let pathBase = path - let pathRest = '/' +const toPathComponents = (path = '') => { + // split on / unless escaped with \ + return (path + .trim() + .match(/([^\\^/]|\\\/)+/g) || []) + .filter(Boolean) +} +const cidAndRest = (path) => { if (Buffer.isBuffer(path)) { - pathBase = (new CID(path)).toBaseEncodedString() + return { + cid: new CID(path), + toResolve: [] + } + } + + if (CID.isCID(path)) { + return { + cid: path, + toResolve: [] + } } if (typeof path === 'string') { if (path.indexOf('/ipfs/') === 0) { - path = pathBase = path.substring(6) - } - const subtreeStart = path.indexOf('/') - if (subtreeStart > 0) { - pathBase = path.substring(0, subtreeStart) - pathRest = path.substring(subtreeStart) + path = path.substring(6) } - } else if (CID.isCID(pathBase)) { - pathBase = pathBase.toBaseEncodedString() - } - pathBase = (new CID(pathBase)).toBaseEncodedString() + const output = toPathComponents(path) - return { - base: pathBase, - rest: toPathComponents(pathRest) + return { + cid: new CID(output[0]), + toResolve: output.slice(1) + } } -} -const defaultOptions = { - maxDepth: Infinity, - offset: undefined, - length: undefined, - fullPath: false + throw errCode(new Error(`Unknown path type ${path}`), 'ERR_BAD_PATH') } -module.exports = (path, dag, options) => { - options = Object.assign({}, defaultOptions, options) - - let dPath - try { - dPath = pathBaseAndRest(path) - } catch (err) { - return error(err) - } +const walkPath = async function * (path, ipld) { + let { + cid, + toResolve + } = cidAndRest(path) + let name = cid.toBaseEncodedString() + let entryPath = name + const startingDepth = toResolve.length - const pathLengthToCut = join( - [dPath.base].concat(dPath.rest.slice(0, dPath.rest.length - 1))).length - - const cid = new CID(dPath.base) - - return pull( - values([{ - cid, - name: dPath.base, - path: dPath.base, - pathRest: dPath.rest, - depth: 0 - }]), - createResolver(dag, options), - filter(Boolean), - map((node) => { - return { - depth: node.depth, - name: node.name, - path: options.fullPath ? node.path : finalPathFor(node), - size: node.size, - cid: node.cid, - content: node.content, - type: node.type - } - }) - ) + while (true) { + const result = await resolve(cid, name, entryPath, toResolve, startingDepth, ipld) - function finalPathFor (node) { - if (!dPath.rest.length) { - return node.path + if (!result.entry && !result.next) { + throw errCode(new Error(`Could not resolve ${path}`), 'ERR_NOT_FOUND') } - let retPath = node.path.substring(pathLengthToCut) - if (retPath.charAt(0) === '/') { - retPath = retPath.substring(1) + if (result.entry) { + yield result.entry } - if (!retPath) { - retPath = dPath.rest[dPath.rest.length - 1] || dPath.base + + if (!result.next) { + return } - return retPath + + // resolve further parts + toResolve = result.next.toResolve + cid = result.next.cid + name = result.next.name + entryPath = result.next.path } } -function join (paths) { - return paths.reduce((acc, path) => { - if (acc.length) { - acc += '/' - } - return acc + path - }, '') +const exporter = (path, ipld) => { + return last(walkPath(path, ipld)) } -const toPathComponents = (path = '') => { - // split on / unless escaped with \ - return (path - .trim() - .match(/([^\\^/]|\\\/)+/g) || []) - .filter(Boolean) +const recursive = async function * (path, ipld) { + const node = await exporter(path, ipld) + + yield node + + if (node.unixfs && node.unixfs.type.includes('dir')) { + for await (const child of recurse(node)) { + yield child + } + } + + async function * recurse (node) { + for await (const file of node.content()) { + yield file + + if (file.unixfs.type.includes('dir')) { + for await (const subFile of recurse(file)) { + yield subFile + } + } + } + } } + +module.exports = exporter +module.exports.path = walkPath +module.exports.recursive = recursive diff --git a/src/object.js b/src/object.js deleted file mode 100644 index eb11de10..00000000 --- a/src/object.js +++ /dev/null @@ -1,34 +0,0 @@ -'use strict' - -const CID = require('cids') -const pull = require('pull-stream/pull') -const values = require('pull-stream/sources/values') -const error = require('pull-stream/sources/error') - -module.exports = (cid, node, name, path, pathRest, resolve, dag, parent, depth) => { - let newNode - if (pathRest.length) { - const pathElem = pathRest[0] - newNode = node[pathElem] - const newName = path + '/' + pathElem - if (!newNode) { - return error(new Error(`not found`)) - } - - const isCID = CID.isCID(newNode) - - return pull( - values([{ - depth: depth, - name: pathElem, - path: newName, - pathRest: pathRest.slice(1), - cid: isCID && newNode, - object: !isCID && newNode, - parent: parent - }]), - resolve) - } else { - return error(new Error('invalid node type')) - } -} diff --git a/src/raw.js b/src/raw.js deleted file mode 100644 index 94b74624..00000000 --- a/src/raw.js +++ /dev/null @@ -1,62 +0,0 @@ -'use strict' - -const error = require('pull-stream/sources/error') -const once = require('pull-stream/sources/once') -const empty = require('pull-stream/sources/empty') -const extractDataFromBlock = require('./extract-data-from-block') - -// Logic to export a single raw block -module.exports = (cid, node, name, path, pathRest, resolve, dag, parent, depth, options) => { - const accepts = pathRest[0] - - if (accepts !== undefined && accepts !== path) { - return empty() - } - - const size = node.length - - let offset = options.offset - let length = options.length - - if (offset < 0) { - return error(new Error('Offset must be greater than or equal to 0')) - } - - if (offset > size) { - return error(new Error('Offset must be less than the file size')) - } - - if (length < 0) { - return error(new Error('Length must be greater than or equal to 0')) - } - - if (length === 0) { - return once({ - depth, - content: once(Buffer.alloc(0)), - cid, - name, - path, - size, - type: 'raw' - }) - } - - if (!offset) { - offset = 0 - } - - if (!length || (offset + length > size)) { - length = size - offset - } - - return once({ - depth, - content: once(extractDataFromBlock(node, 0, offset, offset + length)), - cid, - name, - path, - size, - type: 'raw' - }) -} diff --git a/src/resolve.js b/src/resolve.js deleted file mode 100644 index e0c10b55..00000000 --- a/src/resolve.js +++ /dev/null @@ -1,101 +0,0 @@ -'use strict' - -const UnixFS = require('ipfs-unixfs') -const pull = require('pull-stream/pull') -const error = require('pull-stream/sources/error') -const filter = require('pull-stream/throughs/filter') -const flatten = require('pull-stream/throughs/flatten') -const map = require('pull-stream/throughs/map') -const paramap = require('pull-paramap') -const waterfall = require('async/waterfall') - -const resolvers = { - directory: require('./dir-flat'), - 'hamt-sharded-directory': require('./dir-hamt-sharded'), - file: require('./file'), - object: require('./object'), - raw: require('./raw') -} - -module.exports = Object.assign({ - createResolver: createResolver, - typeOf: typeOf -}, resolvers) - -function createResolver (dag, options, depth, parent) { - if (!depth) { - depth = 0 - } - - if (depth > options.maxDepth) { - return map(identity) - } - - return pull( - paramap((item, cb) => { - if ((typeof item.depth) !== 'number') { - return error(new Error('no depth')) - } - - if (item.object) { - return cb(null, resolveItem(null, item.object, item, options)) - } - - waterfall([ - (done) => dag.get(item.cid, done), - (node, done) => done(null, resolveItem(item.cid, node.value, item, options)) - ], cb) - }), - flatten(), - filter(Boolean), - filter((node) => node.depth <= options.maxDepth) - ) - - function resolveItem (cid, node, item, options) { - return resolve({ - cid, - node, - name: item.name, - path: item.path, - pathRest: item.pathRest, - dag, - parentNode: item.parent || parent, - depth: item.depth, - options - }) - } - - function resolve ({ cid, node, name, path, pathRest, dag, parentNode, depth, options }) { - let type - - try { - type = typeOf(node) - } catch (err) { - return error(err) - } - - const nodeResolver = resolvers[type] - - if (!nodeResolver) { - return error(new Error('Unkown node type ' + type)) - } - - const resolveDeep = createResolver(dag, options, depth, node) - - return nodeResolver(cid, node, name, path, pathRest, resolveDeep, dag, parentNode, depth, options) - } -} - -function typeOf (node) { - if (Buffer.isBuffer(node)) { - return 'raw' - } else if (Buffer.isBuffer(node.data)) { - return UnixFS.unmarshal(node.data).type - } else { - return 'object' - } -} - -function identity (o) { - return o -} diff --git a/src/resolvers/dag-cbor.js b/src/resolvers/dag-cbor.js new file mode 100644 index 00000000..9f95ace0 --- /dev/null +++ b/src/resolvers/dag-cbor.js @@ -0,0 +1,55 @@ +'use strict' + +const CID = require('cids') +const errCode = require('err-code') + +const resolve = async (cid, name, path, toResolve, resolve, depth, ipld) => { + let node = await ipld.get(cid) + let subObject = node + let subPath = path + + while (toResolve.length) { + const prop = toResolve[0] + + if (prop in subObject) { + // remove the bit of the path we have resolved + toResolve.shift() + subPath = `${subPath}/${prop}` + + if (CID.isCID(subObject[prop])) { + return { + entry: { + name, + path, + cid, + node, + depth + }, + next: { + cid: subObject[prop], + name: prop, + path: subPath, + toResolve + } + } + } + + subObject = subObject[prop] + } else { + // cannot resolve further + throw errCode(new Error(`No property named ${prop} found in cbor node ${cid.toBaseEncodedString()}`), 'ERR_NO_PROP') + } + } + + return { + entry: { + name, + path, + cid, + node, + depth + } + } +} + +module.exports = resolve diff --git a/src/resolvers/index.js b/src/resolvers/index.js new file mode 100644 index 00000000..a3418dc0 --- /dev/null +++ b/src/resolvers/index.js @@ -0,0 +1,21 @@ +'use strict' + +const errCode = require('err-code') + +const resolvers = { + 'dag-pb': require('./unixfs-v1'), + raw: require('./raw'), + 'dag-cbor': require('./dag-cbor') +} + +const resolve = (cid, name, path, toResolve, depth, ipld) => { + const resolver = resolvers[cid.codec] + + if (!resolver) { + throw errCode(new Error(`No resolver for codec ${cid.codec}`), 'ERR_NO_RESOLVER') + } + + return resolver(cid, name, path, toResolve, resolve, depth, ipld) +} + +module.exports = resolve diff --git a/src/resolvers/raw.js b/src/resolvers/raw.js new file mode 100644 index 00000000..6dc7f68a --- /dev/null +++ b/src/resolvers/raw.js @@ -0,0 +1,37 @@ +'use strict' + +const errCode = require('err-code') +const extractDataFromBlock = require('../utils/extract-data-from-block') +const validateOffsetAndLength = require('../utils/validate-offset-and-length') + +const rawContent = (node) => { + return function * (options = {}) { + const { + offset, + length + } = validateOffsetAndLength(node.length, options.offset, options.length) + + yield extractDataFromBlock(node, 0, offset, offset + length) + } +} + +const resolve = async (cid, name, path, toResolve, resolve, depth, ipld) => { + if (toResolve.length) { + throw errCode(new Error(`No link named ${path} found in raw node ${cid.toBaseEncodedString()}`), 'ERR_NOT_FOUND') + } + + const buf = await ipld.get(cid) + + return { + entry: { + name, + path, + cid, + node: buf, + content: rawContent(buf), + depth + } + } +} + +module.exports = resolve diff --git a/src/resolvers/unixfs-v1/content/directory.js b/src/resolvers/unixfs-v1/content/directory.js new file mode 100644 index 00000000..df909e14 --- /dev/null +++ b/src/resolvers/unixfs-v1/content/directory.js @@ -0,0 +1,17 @@ +'use strict' + +const directoryContent = (cid, node, unixfs, path, resolve, depth, ipld) => { + return async function * (options = {}) { + const offset = options.offset || 0 + const length = options.length || node.Links.length + const links = node.Links.slice(offset, length) + + for (const link of links) { + const result = await resolve(link.Hash, link.Name, `${path}/${link.Name}`, [], depth + 1, ipld) + + yield result.entry + } + } +} + +module.exports = directoryContent diff --git a/src/resolvers/unixfs-v1/content/file.js b/src/resolvers/unixfs-v1/content/file.js new file mode 100644 index 00000000..da4cd9da --- /dev/null +++ b/src/resolvers/unixfs-v1/content/file.js @@ -0,0 +1,83 @@ +'use strict' + +const extractDataFromBlock = require('../../../utils/extract-data-from-block') +const validateOffsetAndLength = require('../../../utils/validate-offset-and-length') +const UnixFS = require('ipfs-unixfs') +const errCode = require('err-code') + +async function * emitBytes (ipld, node, start, end, streamPosition = 0) { + // a `raw` node + if (Buffer.isBuffer(node)) { + const buf = extractDataFromBlock(node, streamPosition, start, end) + + if (buf.length) { + yield buf + } + + streamPosition += buf.length + + return streamPosition + } + + let file + + try { + file = UnixFS.unmarshal(node.Data) + } catch (err) { + throw errCode(err, 'ERR_NOT_UNIXFS') + } + + // might be a unixfs `raw` node or have data on intermediate nodes + const nodeHasData = Boolean(file.data && file.data.length) + + if (nodeHasData) { + const buf = extractDataFromBlock(file.data, streamPosition, start, end) + + if (buf.length) { + yield buf + } + + streamPosition += file.data.length + } + + let childStart = streamPosition + + // work out which child nodes contain the requested data + for (let i = 0; i < node.Links.length; i++) { + const childLink = node.Links[i] + const childEnd = streamPosition + file.blockSizes[i] + + if ((start >= childStart && start < childEnd) || // child has offset byte + (end > childStart && end <= childEnd) || // child has end byte + (start < childStart && end > childEnd)) { // child is between offset and end bytes + const child = await ipld.get(childLink.Hash) + + for await (const buf of emitBytes(ipld, child, start, end, streamPosition)) { + streamPosition += buf.length + + yield buf + } + } + + streamPosition = childEnd + childStart = childEnd + 1 + } +} + +const fileContent = (cid, node, unixfs, path, resolve, depth, ipld) => { + return (options = {}) => { + const fileSize = unixfs.fileSize() + + const { + offset, + length + } = validateOffsetAndLength(fileSize, options.offset, options.length) + + const start = offset + const end = offset + length + + return emitBytes(ipld, node, start, end) + } +} + +module.exports = fileContent diff --git a/src/resolvers/unixfs-v1/content/hamt-sharded-directory.js b/src/resolvers/unixfs-v1/content/hamt-sharded-directory.js new file mode 100644 index 00000000..965e1b47 --- /dev/null +++ b/src/resolvers/unixfs-v1/content/hamt-sharded-directory.js @@ -0,0 +1,30 @@ +'use strict' + +const hamtShardedDirectoryContent = (cid, node, unixfs, path, resolve, depth, ipld) => { + return (options = {}) => { + return listDirectory(node, path, resolve, depth, ipld, options) + } +} + +async function * listDirectory (node, path, resolve, depth, ipld, options) { + const links = node.Links + + for (const link of links) { + const name = link.Name.substring(2) + + if (name) { + const result = await resolve(link.Hash, name, `${path}/${name}`, [], depth + 1, ipld) + + yield result.entry + } else { + // descend into subshard + node = await ipld.get(link.Hash) + + for await (const file of listDirectory(node, path, resolve, depth, ipld, options)) { + yield file + } + } + } +} + +module.exports = hamtShardedDirectoryContent diff --git a/src/resolvers/unixfs-v1/content/raw.js b/src/resolvers/unixfs-v1/content/raw.js new file mode 100644 index 00000000..cbef4af6 --- /dev/null +++ b/src/resolvers/unixfs-v1/content/raw.js @@ -0,0 +1,19 @@ +'use strict' + +const extractDataFromBlock = require('../../../utils/extract-data-from-block') +const validateOffsetAndLength = require('../../../utils/validate-offset-and-length') + +const rawContent = (cid, node, unixfs, path, resolve, depth, ipld) => { + return function * (options = {}) { + const size = node.length + + const { + offset, + length + } = validateOffsetAndLength(size, options.offset, options.length) + + yield extractDataFromBlock(unixfs.data, 0, offset, offset + length) + } +} + +module.exports = rawContent diff --git a/src/resolvers/unixfs-v1/index.js b/src/resolvers/unixfs-v1/index.js new file mode 100644 index 00000000..49dc33a9 --- /dev/null +++ b/src/resolvers/unixfs-v1/index.js @@ -0,0 +1,82 @@ +'use strict' + +const errCode = require('err-code') +const UnixFS = require('ipfs-unixfs') +const findShardCid = require('../../utils/find-cid-in-shard') + +const findLinkCid = (node, name) => { + const link = node.Links.find(link => link.Name === name) + + return link && link.Hash +} + +const contentExporters = { + raw: require('./content/file'), + file: require('./content/file'), + directory: require('./content/directory'), + 'hamt-sharded-directory': require('./content/hamt-sharded-directory'), + metadata: (cid, node, unixfs, path, resolve, depth, ipld) => {}, + symlink: (cid, node, unixfs, path, resolve, depth, ipld) => {} +} + +const unixFsResolver = async (cid, name, path, toResolve, resolve, depth, ipld) => { + const node = await ipld.get(cid) + let unixfs + let next + + if (!name) { + name = cid.toBaseEncodedString() + } + + try { + unixfs = UnixFS.unmarshal(node.Data) + } catch (err) { + // non-UnixFS dag-pb node? It could happen. + throw errCode(err, 'ERR_NOT_UNIXFS') + } + + if (!path) { + path = name + } + + if (toResolve.length) { + let linkCid + + if (unixfs && unixfs.type === 'hamt-sharded-directory') { + // special case - unixfs v1 hamt shards + linkCid = await findShardCid(node, toResolve[0], ipld) + } else { + linkCid = findLinkCid(node, toResolve[0]) + } + + if (!linkCid) { + throw errCode(new Error(`file does not exist`), 'ERR_NOT_FOUND') + } + + // remove the path component we have resolved + const nextName = toResolve.shift() + const nextPath = `${path}/${nextName}` + + next = { + cid: linkCid, + toResolve, + name: nextName, + path: nextPath + } + } + + return { + entry: { + name, + path, + cid, + node, + content: contentExporters[unixfs.type](cid, node, unixfs, path, resolve, depth, ipld), + unixfs, + depth + }, + next + } +} + +module.exports = unixFsResolver diff --git a/src/extract-data-from-block.js b/src/utils/extract-data-from-block.js similarity index 100% rename from src/extract-data-from-block.js rename to src/utils/extract-data-from-block.js diff --git a/src/utils/find-cid-in-shard.js b/src/utils/find-cid-in-shard.js new file mode 100644 index 00000000..11d6c095 --- /dev/null +++ b/src/utils/find-cid-in-shard.js @@ -0,0 +1,101 @@ +'use strict' + +const Bucket = require('hamt-sharding/src/bucket') +const DirSharded = require('ipfs-unixfs-importer/src/dir-sharded') + +const addLinksToHamtBucket = (links, bucket, rootBucket) => { + return Promise.all( + links.map(link => { + if (link.Name.length === 2) { + const pos = parseInt(link.Name, 16) + + return bucket._putObjectAt(pos, new Bucket({ + hashFn: DirSharded.hashFn + }, bucket, pos)) + } + + return rootBucket.put(link.Name.substring(2), true) + }) + ) +} + +const toPrefix = (position) => { + return position + .toString('16') + .toUpperCase() + .padStart(2, '0') + .substring(0, 2) +} + +const toBucketPath = (position) => { + let bucket = position.bucket + const path = [] + + while (bucket._parent) { + path.push(bucket) + + bucket = bucket._parent + } + + path.push(bucket) + + return path.reverse() +} + +const findShardCid = async (node, name, ipld, context) => { + if (!context) { + context = { + rootBucket: new Bucket({ + hashFn: DirSharded.hashFn + }), + hamtDepth: 1 + } + + context.lastBucket = context.rootBucket + } + + await addLinksToHamtBucket(node.Links, context.lastBucket, context.rootBucket) + + const position = await context.rootBucket._findNewBucketAndPos(name) + let prefix = toPrefix(position.pos) + const bucketPath = toBucketPath(position) + + if (bucketPath.length > (context.hamtDepth)) { + context.lastBucket = bucketPath[context.hamtDepth] + + prefix = toPrefix(context.lastBucket._posAtParent) + } + + const link = node.Links.find(link => { + const entryPrefix = link.Name.substring(0, 2) + const entryName = link.Name.substring(2) + + if (entryPrefix !== prefix) { + // not the entry or subshard we're looking for + return + } + + if (entryName && entryName !== name) { + // not the entry we're looking for + return + } + + return true + }) + + if (!link) { + return null + } + + if (link.Name.substring(2) === name) { + return link.Hash + } + + context.hamtDepth++ + + node = await ipld.get(link.Hash) + + return findShardCid(node, name, ipld, context) +} + +module.exports = findShardCid diff --git a/src/utils/validate-offset-and-length.js b/src/utils/validate-offset-and-length.js new file mode 100644 index 00000000..d1eb4185 --- /dev/null +++ b/src/utils/validate-offset-and-length.js @@ -0,0 +1,36 @@ +'use strict' + +const errCode = require('err-code') + +const validateOffsetAndLength = (size, offset, length) => { + if (!offset) { + offset = 0 + } + + if (offset < 0) { + throw errCode(new Error('Offset must be greater than or equal to 0'), 'ERR_INVALID_PARAMS') + } + + if (offset > size) { + throw errCode(new Error('Offset must be less than the file size'), 'ERR_INVALID_PARAMS') + } + + if (!length && length !== 0) { + length = size - offset + } + + if (length < 0) { + throw errCode(new Error('Length must be greater than or equal to 0'), 'ERR_INVALID_PARAMS') + } + + if (offset + length > size) { + length = size - offset + } + + return { + offset, + length + } +} + +module.exports = validateOffsetAndLength diff --git a/test/exporter-sharded.spec.js b/test/exporter-sharded.spec.js index 57bb18bf..fcfab689 100644 --- a/test/exporter-sharded.spec.js +++ b/test/exporter-sharded.spec.js @@ -7,13 +7,11 @@ const expect = chai.expect const IPLD = require('ipld') const inMemory = require('ipld-in-memory') const UnixFS = require('ipfs-unixfs') -const pull = require('pull-stream/pull') -const values = require('pull-stream/sources/values') -const collect = require('pull-stream/sinks/collect') -const CID = require('cids') -const waterfall = require('async/waterfall') -const parallel = require('async/parallel') -const randomBytes = require('./helpers/random-bytes') +const mh = require('multihashes') +const mc = require('multicodec') +const all = require('async-iterator-all') +const last = require('async-iterator-last') +const randomBytes = require('async-iterator-buffer-stream') const exporter = require('../src') const importer = require('ipfs-unixfs-importer') const { @@ -28,30 +26,24 @@ describe('exporter sharded', function () { let ipld - const createShard = (numFiles, callback) => { - createShardWithFileNames(numFiles, (index) => `file-${index}`, callback) + const createShard = (numFiles) => { + return createShardWithFileNames(numFiles, (index) => `file-${index}`) } - const createShardWithFileNames = (numFiles, fileName, callback) => { + const createShardWithFileNames = (numFiles, fileName) => { const files = new Array(numFiles).fill(0).map((_, index) => ({ path: fileName(index), content: Buffer.from([0, 1, 2, 3, 4, index]) })) - createShardWithFiles(files, callback) + return createShardWithFiles(files) } - const createShardWithFiles = (files, callback) => { - pull( - values(files), - importer(ipld, { - shardSplitThreshold: SHARD_SPLIT_THRESHOLD, - wrap: true - }), - collect((err, files) => { - callback(err, files ? new CID(files.pop().multihash) : undefined) - }) - ) + const createShardWithFiles = async (files) => { + return (await last(importer(files, ipld, { + shardSplitThreshold: SHARD_SPLIT_THRESHOLD, + wrapWithDirectory: true + }))).cid } before((done) => { @@ -64,350 +56,158 @@ describe('exporter sharded', function () { }) }) - it('exports a sharded directory', (done) => { + it('exports a sharded directory', async () => { const files = {} - let directory for (let i = 0; i < (SHARD_SPLIT_THRESHOLD + 1); i++) { files[`file-${Math.random()}.txt`] = { - content: randomBytes(100) + content: Buffer.concat(await all(randomBytes(100))) } } - waterfall([ - (cb) => pull( - pull.values( - Object.keys(files).map(path => ({ - path, - content: files[path].content - })) - ), - importer(ipld, { - wrap: true, - shardSplitThreshold: SHARD_SPLIT_THRESHOLD - }), - collect(cb) - ), - (imported, cb) => { - directory = new CID(imported.pop().multihash) - - // store the CIDs, we will validate them later - imported.forEach(imported => { - files[imported.path].cid = new CID(imported.multihash) - }) - - ipld.get(directory, cb) - }, - ({ value, cid }, cb) => { - const dir = UnixFS.unmarshal(value.data) - - expect(dir.type).to.equal('hamt-sharded-directory') - - pull( - exporter(directory, ipld), - collect(cb) - ) - }, - (exported, cb) => { - const dir = exported.shift() - - expect(dir.cid.equals(directory)).to.be.true() - expect(exported.length).to.equal(Object.keys(files).length) - - parallel( - exported.map(exported => (cb) => { - pull( - exported.content, - collect((err, bufs) => { - if (err) { - cb(err) - } - - // validate the CID - expect(files[exported.name].cid.equals(exported.cid)).to.be.true() - - // validate the exported file content - expect(files[exported.name].content).to.deep.equal(bufs[0]) - - cb() - }) - ) - }), - cb - ) - } - ], done) - }) + const imported = await all(importer(Object.keys(files).map(path => ({ + path, + content: files[path].content + })), ipld, { + wrapWithDirectory: true, + shardSplitThreshold: SHARD_SPLIT_THRESHOLD + })) - it('exports all the files from a sharded directory with maxDepth', (done) => { - const files = {} - let dirCid + const dirCid = imported.pop().cid - for (let i = 0; i < (SHARD_SPLIT_THRESHOLD + 1); i++) { - files[`file-${Math.random()}.txt`] = { - content: randomBytes(100) - } - } + // store the CIDs, we will validate them later + imported.forEach(imported => { + files[imported.path].cid = imported.cid + }) - waterfall([ - (cb) => pull( - pull.values( - Object.keys(files).map(path => ({ - path, - content: files[path].content - })) - ), - importer(ipld, { - wrap: true, - shardSplitThreshold: SHARD_SPLIT_THRESHOLD - }), - collect(cb) - ), - (imported, cb) => { - dirCid = new CID(imported.pop().multihash) - - pull( - exporter(dirCid, ipld, { - maxDepth: 1 - }), - collect(cb) - ) - }, - (exported, cb) => { - const dir = exported.shift() - - expect(dir.cid.equals(dirCid)).to.be.true() - expect(exported.length).to.equal(Object.keys(files).length) - - cb() - } - ], done) - }) + const dir = await ipld.get(dirCid) + const dirMetadata = UnixFS.unmarshal(dir.Data) - it('exports all files from a sharded directory with subshards', (done) => { - waterfall([ - (cb) => createShard(31, cb), - (dir, cb) => { - pull( - exporter(`/ipfs/${dir.toBaseEncodedString()}`, ipld), - collect(cb) - ) - }, - (exported, cb) => { - expect(exported.length).to.equal(32) + expect(dirMetadata.type).to.equal('hamt-sharded-directory') - const dir = exported.shift() + const exported = await exporter(dirCid, ipld) - expect(dir.type).to.equal('dir') + expect(exported.cid.equals(dirCid)).to.be.true() - exported.forEach(file => expect(file.type).to.equal('file')) + const dirFiles = await all(exported.content()) + expect(dirFiles.length).to.equal(Object.keys(files).length) - cb() - } - ], done) - }) + for (let i = 0; i < dirFiles.length; i++) { + const dirFile = dirFiles[i] + const data = Buffer.concat(await all(dirFile.content())) - it('exports one file from a sharded directory', (done) => { - waterfall([ - (cb) => createShard(31, cb), - (dir, cb) => { - pull( - exporter(`/ipfs/${dir.toBaseEncodedString()}/file-14`, ipld), - collect(cb) - ) - }, - (exported, cb) => { - expect(exported.length).to.equal(1) + // validate the CID + expect(files[dirFile.name].cid.equals(dirFile.cid)).to.be.true() + + // validate the exported file content + expect(files[dirFile.name].content).to.deep.equal(data) + } + }) - const file = exported.shift() + it('exports all files from a sharded directory with subshards', async () => { + const numFiles = 31 + const dirCid = await createShard(numFiles) + const exported = await exporter(dirCid, ipld) + const files = await all(exported.content()) + expect(files.length).to.equal(numFiles) - expect(file.name).to.deep.equal('file-14') + expect(exported.unixfs.type).to.equal('hamt-sharded-directory') - cb() - } - ], done) + files.forEach(file => expect(file.unixfs.type).to.equal('file')) }) - it('exports one file from a sharded directory sub shard', (done) => { - waterfall([ - (cb) => createShard(31, cb), - (dir, cb) => { - pull( - exporter(`/ipfs/${dir.toBaseEncodedString()}/file-30`, ipld), - collect(cb) - ) - }, - (exported, cb) => { - expect(exported.length).to.equal(1) + it('exports one file from a sharded directory', async () => { + const dirCid = await createShard(31) + const exported = await exporter(`/ipfs/${dirCid.toBaseEncodedString()}/file-14`, ipld) - const file = exported.shift() + expect(exported.name).to.equal('file-14') + }) - expect(file.name).to.deep.equal('file-30') + it('exports one file from a sharded directory sub shard', async () => { + const dirCid = await createShard(31) + const exported = await exporter(`/ipfs/${dirCid.toBaseEncodedString()}/file-30`, ipld) - cb() - } - ], done) + expect(exported.name).to.deep.equal('file-30') }) - it('exports one file from a shard inside a shard inside a shard', (done) => { - waterfall([ - (cb) => createShard(2568, cb), - (dir, cb) => { - pull( - exporter(`/ipfs/${dir.toBaseEncodedString()}/file-2567`, ipld), - collect(cb) - ) - }, - (exported, cb) => { - expect(exported.length).to.equal(1) + it('exports one file from a shard inside a shard inside a shard', async () => { + const dirCid = await createShard(2568) + const exported = await exporter(`/ipfs/${dirCid.toBaseEncodedString()}/file-2567`, ipld) - const file = exported.shift() + expect(exported.name).to.deep.equal('file-2567') + }) - expect(file.name).to.deep.equal('file-2567') + it('extracts a deep folder from the sharded directory', async () => { + const dirCid = await createShardWithFileNames(31, (index) => `/foo/bar/baz/file-${index}`) + const exported = await exporter(`/ipfs/${dirCid.toBaseEncodedString()}/foo/bar/baz`, ipld) - cb() - } - ], done) + expect(exported.name).to.deep.equal('baz') }) - it('uses maxDepth to only extract a deep folder from the sharded directory', (done) => { - waterfall([ - (cb) => createShardWithFileNames(31, (index) => `/foo/bar/baz/file-${index}`, cb), - (dir, cb) => { - pull( - exporter(`/ipfs/${dir.toBaseEncodedString()}/foo/bar/baz`, ipld, { - maxDepth: 3 - }), - collect(cb) - ) - }, - (exported, cb) => { - expect(exported.length).to.equal(1) - - const entry = exported.pop() - - expect(entry.name).to.deep.equal('baz') - - cb() - } - ], done) - }) + it('extracts an intermediate folder from the sharded directory', async () => { + const dirCid = await createShardWithFileNames(31, (index) => `/foo/bar/baz/file-${index}`) + const exported = await exporter(`/ipfs/${dirCid.toBaseEncodedString()}/foo/bar`, ipld) - it('uses maxDepth to only extract an intermediate folder from the sharded directory', (done) => { - waterfall([ - (cb) => createShardWithFileNames(31, (index) => `/foo/bar/baz/file-${index}`, cb), - (dir, cb) => { - pull( - exporter(`/ipfs/${dir.toBaseEncodedString()}/foo/bar/baz`, ipld, { - maxDepth: 2 - }), - collect(cb) - ) - }, - (exported, cb) => { - expect(exported.length).to.equal(1) - - const entry = exported.pop() - - expect(entry.name).to.deep.equal('bar') - - cb() - } - ], done) + expect(exported.name).to.deep.equal('bar') }) - it('uses fullPath extract all intermediate entries from the sharded directory', (done) => { - waterfall([ - (cb) => createShardWithFileNames(31, (index) => `/foo/bar/baz/file-${index}`, cb), - (dir, cb) => { - pull( - exporter(`/ipfs/${dir.toBaseEncodedString()}/foo/bar/baz/file-1`, ipld, { - fullPath: true - }), - collect(cb) - ) - }, - (exported, cb) => { - expect(exported.length).to.equal(5) - - expect(exported[1].name).to.equal('foo') - expect(exported[2].name).to.equal('bar') - expect(exported[3].name).to.equal('baz') - expect(exported[4].name).to.equal('file-1') - - cb() - } - ], done) + it('uses .path to extract all intermediate entries from the sharded directory', async () => { + const dirCid = await createShardWithFileNames(31, (index) => `/foo/bar/baz/file-${index}`) + const exported = await all(exporter.path(`/ipfs/${dirCid.toBaseEncodedString()}/foo/bar/baz/file-1`, ipld)) + + expect(exported.length).to.equal(5) + + expect(exported[0].name).to.equal(dirCid.toBaseEncodedString()) + expect(exported[1].name).to.equal('foo') + expect(exported[1].path).to.equal(`${dirCid.toBaseEncodedString()}/foo`) + expect(exported[2].name).to.equal('bar') + expect(exported[2].path).to.equal(`${dirCid.toBaseEncodedString()}/foo/bar`) + expect(exported[3].name).to.equal('baz') + expect(exported[3].path).to.equal(`${dirCid.toBaseEncodedString()}/foo/bar/baz`) + expect(exported[4].name).to.equal('file-1') + expect(exported[4].path).to.equal(`${dirCid.toBaseEncodedString()}/foo/bar/baz/file-1`) }) - it('uses fullPath extract all intermediate entries from the sharded directory as well as the contents', (done) => { - waterfall([ - (cb) => createShardWithFileNames(31, (index) => `/foo/bar/baz/file-${index}`, cb), - (dir, cb) => { - pull( - exporter(`/ipfs/${dir.toBaseEncodedString()}/foo/bar/baz`, ipld, { - fullPath: true - }), - collect(cb) - ) - }, - (exported, cb) => { - expect(exported.length).to.equal(35) - - expect(exported[1].name).to.equal('foo') - expect(exported[2].name).to.equal('bar') - expect(exported[3].name).to.equal('baz') - expect(exported[4].name).to.equal('file-14') - - exported.slice(4).forEach(file => expect(file.type).to.equal('file')) - - cb() - } - ], done) + it('uses .path to extract all intermediate entries from the sharded directory as well as the contents', async () => { + const dirCid = await createShardWithFileNames(31, (index) => `/foo/bar/baz/file-${index}`) + const exported = await all(exporter.path(`/ipfs/${dirCid.toBaseEncodedString()}/foo/bar/baz`, ipld)) + + expect(exported.length).to.equal(4) + + expect(exported[1].name).to.equal('foo') + expect(exported[2].name).to.equal('bar') + expect(exported[3].name).to.equal('baz') + + const files = await all(exported[3].content()) + + expect(files.length).to.equal(31) + + files.forEach(file => { + expect(file.unixfs.type).to.equal('file') + }) }) - it('exports a file from a sharded directory inside a regular directory inside a sharded directory', (done) => { - waterfall([ - (cb) => createShard(15, cb), - (dir, cb) => { - DAGNode.create(new UnixFS('directory').marshal(), [ - new DAGLink('shard', 5, dir) - ], cb) - }, - (node, cb) => { - ipld.put(node, { - version: 0, - format: 'dag-pb', - hashAlg: 'sha2-256' - }, cb) - }, - (cid, cb) => { - DAGNode.create(new UnixFS('hamt-sharded-directory').marshal(), [ - new DAGLink('75normal-dir', 5, cid) - ], cb) - }, - (node, cb) => { - ipld.put(node, { - version: 1, - format: 'dag-pb', - hashAlg: 'sha2-256' - }, cb) - }, - (dir, cb) => { - pull( - exporter(`/ipfs/${dir.toBaseEncodedString()}/normal-dir/shard/file-1`, ipld), - collect(cb) - ) - }, - (exported, cb) => { - expect(exported.length).to.equal(1) - - const entry = exported.pop() - - expect(entry.name).to.deep.equal('file-1') - - cb() - } - ], done) + it('exports a file from a sharded directory inside a regular directory inside a sharded directory', async () => { + const dirCid = await createShard(15) + + const node = await DAGNode.create(new UnixFS('directory').marshal(), [ + new DAGLink('shard', 5, dirCid) + ]) + const nodeCid = await ipld.put(node, mc.DAG_PB, { + cidVersion: 0, + hashAlg: mh.names['sha2-256'] + }) + + const shardNode = await DAGNode.create(new UnixFS('hamt-sharded-directory').marshal(), [ + new DAGLink('75normal-dir', 5, nodeCid) + ]) + const shardNodeCid = await ipld.put(shardNode, mc.DAG_PB, { + cidVersion: 1, + hashAlg: mh.names['sha2-256'] + }) + + const exported = await exporter(`/ipfs/${shardNodeCid.toBaseEncodedString()}/normal-dir/shard/file-1`, ipld) + + expect(exported.name).to.deep.equal('file-1') }) }) diff --git a/test/exporter-subtree.spec.js b/test/exporter-subtree.spec.js index a4577efb..2d1e37b7 100644 --- a/test/exporter-subtree.spec.js +++ b/test/exporter-subtree.spec.js @@ -6,11 +6,11 @@ chai.use(require('dirty-chai')) const expect = chai.expect const IPLD = require('ipld') const inMemory = require('ipld-in-memory') -const CID = require('cids') -const pull = require('pull-stream') -const randomBytes = require('./helpers/random-bytes') -const waterfall = require('async/waterfall') const importer = require('ipfs-unixfs-importer') +const mc = require('multicodec') +const all = require('async-iterator-all') +const last = require('async-iterator-last') +const randomBytes = require('async-iterator-buffer-stream') const ONE_MEG = Math.pow(1024, 2) @@ -29,185 +29,117 @@ describe('exporter subtree', () => { }) }) - it('exports a file 2 levels down', (done) => { - const content = randomBytes(ONE_MEG) - - waterfall([ - (cb) => pull( - pull.values([{ - path: './200Bytes.txt', - content: randomBytes(ONE_MEG) - }, { - path: './level-1/200Bytes.txt', - content - }]), - importer(ipld), - pull.collect(cb) - ), - (files, cb) => cb(null, files.pop().multihash), - (buf, cb) => cb(null, new CID(buf)), - (cid, cb) => pull( - exporter(`${cid.toBaseEncodedString()}/level-1/200Bytes.txt`, ipld), - pull.collect((err, files) => cb(err, { cid, files })) - ), - ({ cid, files }, cb) => { - files.forEach(file => expect(file).to.have.property('cid')) - - expect(files.length).to.equal(1) - expect(files[0].path).to.equal('200Bytes.txt') - fileEql(files[0], content, cb) - } - ], done) + it('exports a file 2 levels down', async () => { + const content = Buffer.concat(await all(randomBytes(ONE_MEG))) + + const imported = await last(importer([{ + path: './200Bytes.txt', + content: randomBytes(ONE_MEG) + }, { + path: './level-1/200Bytes.txt', + content + }], ipld)) + + const exported = await exporter(`${imported.cid.toBaseEncodedString()}/level-1/200Bytes.txt`, ipld) + + expect(exported).to.have.property('cid') + expect(exported.name).to.equal('200Bytes.txt') + expect(exported.path).to.equal(`${imported.cid.toBaseEncodedString()}/level-1/200Bytes.txt`) + + const data = Buffer.concat(await all(exported.content())) + expect(data).to.deep.equal(content) }) - it('exports a directory 1 level down', (done) => { - const content = randomBytes(ONE_MEG) - - waterfall([ - (cb) => pull( - pull.values([{ - path: './200Bytes.txt', - content: randomBytes(ONE_MEG) - }, { - path: './level-1/200Bytes.txt', - content - }, { - path: './level-1/level-2' - }]), - importer(ipld), - pull.collect(cb) - ), - (files, cb) => cb(null, files.pop().multihash), - (buf, cb) => cb(null, new CID(buf)), - (cid, cb) => pull( - exporter(`${cid.toBaseEncodedString()}/level-1`, ipld), - pull.collect((err, files) => cb(err, { cid, files })) - ), - ({ cid, files }, cb) => { - expect(files.length).to.equal(3) - expect(files[0].path).to.equal('level-1') - expect(files[1].path).to.equal('level-1/200Bytes.txt') - expect(files[2].path).to.equal('level-1/level-2') - fileEql(files[1], content, cb) - } - ], done) + it('exports a directory 1 level down', async () => { + const content = Buffer.concat(await all(randomBytes(ONE_MEG))) + const imported = await last(importer([{ + path: './200Bytes.txt', + content: randomBytes(ONE_MEG) + }, { + path: './level-1/200Bytes.txt', + content + }, { + path: './level-1/level-2' + }], ipld)) + + const exported = await exporter(`${imported.cid.toBaseEncodedString()}/level-1`, ipld) + const files = await all(exported.content()) + + expect(files.length).to.equal(2) + expect(files[0].name).to.equal('200Bytes.txt') + expect(files[0].path).to.equal(`${imported.cid.toBaseEncodedString()}/level-1/200Bytes.txt`) + + expect(files[1].name).to.equal('level-2') + expect(files[1].path).to.equal(`${imported.cid.toBaseEncodedString()}/level-1/level-2`) + + const data = Buffer.concat(await all(files[0].content())) + expect(data).to.deep.equal(content) }) - it('export a non existing file from a directory', (done) => { - waterfall([ - (cb) => pull( - pull.values([{ - path: '/derp/200Bytes.txt', - content: randomBytes(ONE_MEG) - }]), - importer(ipld), - pull.collect(cb) - ), - (files, cb) => cb(null, files.pop().multihash), - (buf, cb) => cb(null, new CID(buf)), - (cid, cb) => pull( - exporter(`${cid.toBaseEncodedString()}/doesnotexist`, ipld), - pull.collect((err, files) => cb(err, { cid, files })) - ), - ({ cid, files }, cb) => { - expect(files.length).to.equal(0) - cb() - } - ], done) + it('exports a non existing file from a directory', async () => { + const imported = await last(importer([{ + path: '/derp/200Bytes.txt', + content: randomBytes(ONE_MEG) + }], ipld)) + + try { + await exporter(`${imported.cid.toBaseEncodedString()}/doesnotexist`, ipld) + } catch (err) { + expect(err.code).to.equal('ERR_NOT_FOUND') + } }) - it('exports starting from non-protobuf node', (done) => { - const content = randomBytes(ONE_MEG) - - waterfall([ - (cb) => pull( - pull.values([{ - path: './level-1/200Bytes.txt', - content - }]), - importer(ipld, { - wrapWithDirectory: true - }), - pull.collect(cb) - ), - (files, cb) => cb(null, files.pop().multihash), - (buf, cb) => cb(null, new CID(buf)), - (cid, cb) => ipld.put({ a: { file: cid } }, { format: 'dag-cbor' }, cb), - (cborNodeCid, cb) => pull( - exporter(`${cborNodeCid.toBaseEncodedString()}/a/file/level-1/200Bytes.txt`, ipld), - pull.collect(cb) - ), - (files, cb) => { - expect(files.length).to.equal(1) - expect(files[0].path).to.equal('200Bytes.txt') - fileEql(files[0], content, cb) + it('exports starting from non-protobuf node', async () => { + const content = Buffer.concat(await all(randomBytes(ONE_MEG))) + + const imported = await last(importer([{ + path: './level-1/200Bytes.txt', + content + }], ipld, { + wrapWithDirectory: true + })) + + const cborNodeCid = await ipld.put({ + a: { + file: imported.cid } - ], done) + }, mc.DAG_CBOR) + + const exported = await exporter(`${cborNodeCid.toBaseEncodedString()}/a/file/level-1/200Bytes.txt`, ipld) + + expect(exported.name).to.equal('200Bytes.txt') + expect(exported.path).to.equal(`${cborNodeCid.toBaseEncodedString()}/a/file/level-1/200Bytes.txt`) + + const data = Buffer.concat(await all(exported.content())) + expect(data).to.deep.equal(content) }) - it('exports all components of a path', (done) => { - const content = randomBytes(ONE_MEG) - - waterfall([ - (cb) => pull( - pull.values([{ - path: './200Bytes.txt', - content: randomBytes(ONE_MEG) - }, { - path: './level-1/200Bytes.txt', - content - }, { - path: './level-1/level-2' - }, { - path: './level-1/level-2/200Bytes.txt', - content - }]), - importer(ipld), - pull.collect(cb) - ), - (files, cb) => cb(null, files.pop().multihash), - (buf, cb) => cb(null, new CID(buf)), - (cid, cb) => pull( - exporter(`${cid.toBaseEncodedString()}/level-1/level-2/200Bytes.txt`, ipld, { - fullPath: true - }), - pull.collect((err, files) => cb(err, { cid, files })) - ), - ({ cid, files }, cb) => { - expect(files.length).to.equal(4) - expect(files[0].path).to.equal(cid.toBaseEncodedString()) - expect(files[0].name).to.equal(cid.toBaseEncodedString()) - expect(files[1].path).to.equal(`${cid.toBaseEncodedString()}/level-1`) - expect(files[1].name).to.equal('level-1') - expect(files[2].path).to.equal(`${cid.toBaseEncodedString()}/level-1/level-2`) - expect(files[2].name).to.equal('level-2') - expect(files[3].path).to.equal(`${cid.toBaseEncodedString()}/level-1/level-2/200Bytes.txt`) - expect(files[3].name).to.equal('200Bytes.txt') - - cb() - } - ], done) + it('uses .path to export all components of a path', async () => { + const content = Buffer.concat(await all(randomBytes(ONE_MEG))) + + const imported = await last(importer([{ + path: './200Bytes.txt', + content: randomBytes(ONE_MEG) + }, { + path: './level-1/200Bytes.txt', + content + }, { + path: './level-1/level-2' + }, { + path: './level-1/level-2/200Bytes.txt', + content + }], ipld)) + + const exported = await all(exporter.path(`${imported.cid.toBaseEncodedString()}/level-1/level-2/200Bytes.txt`, ipld)) + + expect(exported.length).to.equal(4) + expect(exported[0].path).to.equal(imported.cid.toBaseEncodedString()) + expect(exported[0].name).to.equal(imported.cid.toBaseEncodedString()) + expect(exported[1].path).to.equal(`${imported.cid.toBaseEncodedString()}/level-1`) + expect(exported[1].name).to.equal('level-1') + expect(exported[2].path).to.equal(`${imported.cid.toBaseEncodedString()}/level-1/level-2`) + expect(exported[2].name).to.equal('level-2') + expect(exported[3].path).to.equal(`${imported.cid.toBaseEncodedString()}/level-1/level-2/200Bytes.txt`) + expect(exported[3].name).to.equal('200Bytes.txt') }) }) - -function fileEql (f1, f2, done) { - pull( - f1.content, - pull.collect((err, data) => { - if (err) { - return done(err) - } - - try { - if (f2) { - expect(Buffer.concat(data)).to.eql(f2) - } else { - expect(data).to.exist() - } - } catch (err) { - return done(err) - } - done() - }) - ) -} diff --git a/test/exporter.spec.js b/test/exporter.spec.js index 124b7098..48459aef 100644 --- a/test/exporter.spec.js +++ b/test/exporter.spec.js @@ -7,147 +7,73 @@ const expect = chai.expect const IPLD = require('ipld') const inMemory = require('ipld-in-memory') const UnixFS = require('ipfs-unixfs') -const pull = require('pull-stream') -const zip = require('pull-zip') const CID = require('cids') -const doUntil = require('async/doUntil') -const waterfall = require('async/waterfall') -const parallel = require('async/parallel') -const series = require('async/series') -const fs = require('fs') -const path = require('path') -const push = require('pull-pushable') -const toPull = require('stream-to-pull-stream') -const toStream = require('pull-stream-to-stream') const { DAGNode, DAGLink } = require('ipld-dag-pb') -const isNode = require('detect-node') -const randomBytes = require('./helpers/random-bytes') - +const mh = require('multihashes') +const mc = require('multicodec') const exporter = require('../src') const importer = require('ipfs-unixfs-importer') +const all = require('async-iterator-all') +const last = require('async-iterator-last') +const first = require('async-iterator-first') +const randomBytes = require('async-iterator-buffer-stream') const ONE_MEG = Math.pow(1024, 2) -const bigFile = randomBytes(ONE_MEG * 1.2) -const smallFile = randomBytes(200) describe('exporter', () => { let ipld + let bigFile + let smallFile - function dagPut (options, cb) { - if (typeof options === 'function') { - cb = options - options = {} - } + before(async () => { + bigFile = Buffer.concat(await all(randomBytes(ONE_MEG * 1.2))) + smallFile = Buffer.concat(await all(randomBytes(200))) + }) + async function dagPut (options = {}) { options.type = options.type || 'file' options.content = options.content || Buffer.from([0x01, 0x02, 0x03]) options.links = options.links || [] const file = new UnixFS(options.type, options.content) - DAGNode.create(file.marshal(), options.links, (err, node) => { - expect(err).to.not.exist() - - ipld.put(node, { - version: 0, - hashAlg: 'sha2-256', - format: 'dag-pb' - }, (err, cid) => { - cb(err, { file: file, node: node, cid: cid }) - }) + const node = await DAGNode.create(file.marshal(), options.links) + const cid = await ipld.put(node, mc.DAG_PB, { + cidVersion: 0, + hashAlg: mh.names['sha2-256'] }) - } - function addTestFile ({ file, strategy = 'balanced', path = '/foo', maxChunkSize, rawLeaves }, cb) { - pull( - pull.values([{ - path, - content: file - }]), - importer(ipld, { - strategy, - rawLeaves, - chunkerOptions: { - maxChunkSize - } - }), - pull.collect((error, nodes) => { - cb(error, nodes && nodes[0] && nodes[0].multihash) - }) - ) + return { file: file, node: node, cid: cid } } - function addAndReadTestFile ({ file, offset, length, strategy = 'balanced', path = '/foo', maxChunkSize, rawLeaves }, cb) { - addTestFile({ file, strategy, path, maxChunkSize, rawLeaves }, (error, multihash) => { - if (error) { - return cb(error) + async function addTestFile ({ file, strategy = 'balanced', path = '/foo', maxChunkSize, rawLeaves }) { + const result = await all(importer([{ + path, + content: file + }], ipld, { + strategy, + rawLeaves, + chunkerOptions: { + maxChunkSize } + })) - pull( - exporter(multihash, ipld, { - offset, length - }), - pull.collect((error, files) => { - if (error) { - return cb(error) - } - - readFile(files[0], cb) - }) - ) - }) + return result[0].cid } - function addTestDirectory ({ directory, strategy = 'balanced', maxChunkSize }, callback) { - const input = push() - const dirName = path.basename(directory) - - pull( - input, - pull.map((file) => { - return { - path: path.join(dirName, path.basename(file)), - content: toPull.source(fs.createReadStream(file)) - } - }), - importer(ipld, { - strategy, - maxChunkSize - }), - pull.collect(callback) - ) - - const listFiles = (directory, depth, stream, cb) => { - waterfall([ - (done) => fs.stat(directory, done), - (stats, done) => { - if (stats.isDirectory()) { - return waterfall([ - (done) => fs.readdir(directory, done), - (children, done) => { - series( - children.map(child => (next) => listFiles(path.join(directory, child), depth + 1, stream, next)), - done - ) - } - ], done) - } - - stream.push(directory) - done() - } - ], cb) - } + async function addAndReadTestFile ({ file, offset, length, strategy = 'balanced', path = '/foo', maxChunkSize, rawLeaves }) { + const cid = await addTestFile({ file, strategy, path, maxChunkSize, rawLeaves }) + const entry = await exporter(cid, ipld) - listFiles(directory, 0, input, () => { - input.end() - }) + return Buffer.concat(await all(entry.content({ + offset, length + }))) } - function checkBytesThatSpanBlocks (strategy, cb) { + async function checkBytesThatSpanBlocks (strategy) { const bytesInABlock = 262144 const bytes = Buffer.alloc(bytesInABlock + 100, 0) @@ -155,20 +81,39 @@ describe('exporter', () => { bytes[bytesInABlock] = 2 bytes[bytesInABlock + 1] = 3 - addAndReadTestFile({ + const data = await addAndReadTestFile({ file: bytes, offset: bytesInABlock - 1, length: 3, strategy - }, (error, data) => { - if (error) { - return cb(error) - } + }) + + expect(data).to.deep.equal(Buffer.from([1, 2, 3])) + } + + async function createAndPersistNode (ipld, type, data, children) { + const file = new UnixFS(type, data ? Buffer.from(data) : undefined) + const links = [] - expect(data).to.deep.equal(Buffer.from([1, 2, 3])) + for (let i = 0; i < children.length; i++) { + const child = children[i] + const leaf = UnixFS.unmarshal(child.node.Data) - cb() + file.addBlockSize(leaf.fileSize()) + + links.push(new DAGLink('', child.node.size, child.cid)) + } + + const node = await DAGNode.create(file.marshal(), links) + const cid = await ipld.put(node, mc.DAG_PB, { + cidVersion: 1, + hashAlg: mh.names['sha2-256'] }) + + return { + node, + cid + } } before((done) => { @@ -181,784 +126,573 @@ describe('exporter', () => { }) }) - it('ensure hash inputs are sanitized', (done) => { - dagPut((err, result) => { - expect(err).to.not.exist() - - ipld.get(result.cid, (err, res) => { - expect(err).to.not.exist() - const unmarsh = UnixFS.unmarshal(result.node.data) - - expect(unmarsh.data).to.deep.equal(result.file.data) + it('ensure hash inputs are sanitized', async () => { + const result = await dagPut() + const node = await ipld.get(result.cid) + const unmarsh = UnixFS.unmarshal(node.Data) - pull( - exporter(result.cid, ipld), - pull.collect(onFiles) - ) + expect(unmarsh.data).to.deep.equal(result.file.data) - function onFiles (err, files) { - expect(err).to.equal(null) - expect(files).to.have.length(1) - expect(files[0]).to.have.property('cid') - expect(files[0]).to.have.property('path', result.cid.toBaseEncodedString()) - fileEql(files[0], unmarsh.data, done) - } - }) - }) - }) + const file = await exporter(result.cid, ipld) - it('exports a file with no links', (done) => { - dagPut((err, result) => { - expect(err).to.not.exist() + expect(file).to.have.property('cid') + expect(file).to.have.property('path', result.cid.toBaseEncodedString()) - pull( - zip( - pull( - ipld.getStream(result.cid), - pull.map((res) => UnixFS.unmarshal(res.value.data)) - ), - exporter(result.cid, ipld) - ), - pull.collect((err, values) => { - expect(err).to.not.exist() - const unmarsh = values[0][0] - const file = values[0][1] - - fileEql(file, unmarsh.data, done) - }) - ) - }) + const data = Buffer.concat(await all(file.content())) + expect(data).to.deep.equal(unmarsh.data) }) - it('small file in a directory with an escaped slash in the title', (done) => { + it('small file in a directory with an escaped slash in the title', async () => { const fileName = `small-\\/file-${Math.random()}.txt` const filePath = `/foo/${fileName}` - pull( - pull.values([{ - path: filePath, - content: pull.values([smallFile]) - }]), - importer(ipld), - pull.collect((err, files) => { - expect(err).to.not.exist() - - const path = `/ipfs/${new CID(files[1].multihash).toBaseEncodedString()}/${fileName}` - - pull( - exporter(path, ipld), - pull.collect((err, files) => { - expect(err).to.not.exist() - expect(files.length).to.equal(1) - expect(files[0].path).to.equal(fileName) - done() - }) - ) - }) - ) + const files = await all(importer([{ + path: filePath, + content: smallFile + }], ipld)) + + const path = `/ipfs/${files[1].cid.toBaseEncodedString()}/${fileName}` + const file = await exporter(path, ipld) + + expect(file.name).to.equal(fileName) + expect(file.path).to.equal(`${files[1].cid.toBaseEncodedString()}/${fileName}`) }) - it('small file in a directory with an square brackets in the title', (done) => { + it('small file in a directory with an square brackets in the title', async () => { const fileName = `small-[bar]-file-${Math.random()}.txt` const filePath = `/foo/${fileName}` - pull( - pull.values([{ - path: filePath, - content: pull.values([smallFile]) - }]), - importer(ipld), - pull.collect((err, files) => { - expect(err).to.not.exist() - - const path = `/ipfs/${new CID(files[1].multihash).toBaseEncodedString()}/${fileName}` - - pull( - exporter(path, ipld), - pull.collect((err, files) => { - expect(err).to.not.exist() - expect(files.length).to.equal(1) - expect(files[0].path).to.equal(fileName) - done() - }) - ) - }) - ) + const files = await all(importer([{ + path: filePath, + content: smallFile + }], ipld)) + + const path = `/ipfs/${files[1].cid.toBaseEncodedString()}/${fileName}` + const file = await exporter(path, ipld) + + expect(file.name).to.equal(fileName) + expect(file.path).to.equal(`${files[1].cid.toBaseEncodedString()}/${fileName}`) }) - it('exports a chunk of a file with no links', (done) => { + it('exports a chunk of a file with no links', async () => { const offset = 0 const length = 5 - dagPut({ - content: randomBytes(100) - }, (err, result) => { - expect(err).to.not.exist() - - pull( - zip( - pull( - ipld.getStream(result.cid), - pull.map((res) => UnixFS.unmarshal(res.value.data)) - ), - exporter(result.cid, ipld, { - offset, - length - }) - ), - pull.collect((err, values) => { - expect(err).to.not.exist() - - const unmarsh = values[0][0] - const file = values[0][1] - - fileEql(file, unmarsh.data.slice(offset, offset + length), done) - }) - ) + const result = await dagPut({ + content: Buffer.concat(await all(randomBytes(100))) }) + + const node = await ipld.get(result.cid) + const unmarsh = UnixFS.unmarshal(node.Data) + + const file = await exporter(result.cid, ipld) + const data = Buffer.concat(await all(file.content({ + offset, + length + }))) + + expect(data).to.deep.equal(unmarsh.data.slice(offset, offset + length)) }) - it('exports a small file with links', function (done) { - waterfall([ - (cb) => dagPut({ content: randomBytes(100) }, cb), - (file, cb) => dagPut({ - content: randomBytes(100), - links: [ - new DAGLink('', file.node.size, file.cid) - ] - }, cb), - (result, cb) => { - pull( - exporter(result.cid, ipld), - pull.collect((err, files) => { - expect(err).to.not.exist() - - fileEql(files[0], result.file.data, cb) - }) - ) - } - ], done) + it('exports a small file with links', async () => { + const content = Buffer.from([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) + const chunk1 = new UnixFS('raw', content.slice(0, 5)) + const chunkNode1 = await DAGNode.create(chunk1.marshal()) + const chunkCid1 = await ipld.put(chunkNode1, mc.DAG_PB, { + cidVersion: 0, + hashAlg: mh.names['sha2-256'] + }) + + const chunk2 = new UnixFS('raw', content.slice(5)) + const chunkNode2 = await DAGNode.create(chunk2.marshal()) + const chunkCid2 = await ipld.put(chunkNode2, mc.DAG_PB, { + cidVersion: 0, + hashAlg: mh.names['sha2-256'] + }) + + const file = new UnixFS('file') + file.addBlockSize(5) + file.addBlockSize(5) + + const fileNode = await DAGNode.create(file.marshal(), [ + new DAGLink('', chunkNode1.size, chunkCid1), + new DAGLink('', chunkNode2.size, chunkCid2) + ]) + const fileCid = await ipld.put(fileNode, mc.DAG_PB, { + cidVersion: 0, + hashAlg: mh.names['sha2-256'] + }) + + const exported = await exporter(fileCid, ipld) + const data = Buffer.concat(await all(exported.content())) + expect(data).to.deep.equal(content) }) - it('exports a chunk of a small file with links', function (done) { + it('exports a chunk of a small file with links', async () => { const offset = 0 const length = 5 - waterfall([ - (cb) => dagPut({ content: randomBytes(100) }, cb), - (file, cb) => dagPut({ - content: randomBytes(100), - links: [ - new DAGLink('', file.node.size, file.cid) - ] - }, cb), - (result, cb) => { - pull( - exporter(result.cid, ipld, { - offset, - length - }), - pull.collect((err, files) => { - expect(err).to.not.exist() - - fileEql(files[0], result.file.data.slice(offset, offset + length), cb) - }) - ) - } - ], done) + const chunk = await dagPut({ content: randomBytes(100) }) + const result = await dagPut({ + content: Buffer.concat(await all(randomBytes(100))), + links: [ + new DAGLink('', chunk.node.size, chunk.cid) + ] + }) + + const file = await exporter(result.cid, ipld) + const data = Buffer.concat(await all(file.content({ + offset, + length + }))) + + expect(data).to.deep.equal(result.file.data.slice(offset, offset + length)) }) - it('exports a large file > 5mb', function (done) { + it('exports a large file > 5mb', async function () { this.timeout(30 * 1000) - waterfall([ - (cb) => addTestFile({ - file: randomBytes(ONE_MEG * 6) - }, cb), - (buf, cb) => cb(null, new CID(buf)), - (cid, cb) => pull( - exporter(cid, ipld), - pull.collect((err, files) => cb(err, { cid, files })) - ), - ({ cid, files: [ file ] }, cb) => { - expect(file).to.have.property('path', cid.toBaseEncodedString()) - expect(file).to.have.property('size', ONE_MEG * 6) - fileEql(file, null, cb) - } - ], done) + const cid = await addTestFile({ + file: randomBytes(ONE_MEG * 6) + }) + + const file = await exporter(cid, ipld) + + expect(file).to.have.property('path', cid.toBaseEncodedString()) + expect(file.unixfs.fileSize()).to.equal(ONE_MEG * 6) }) - it('exports a chunk of a large file > 5mb', function (done) { + it('exports a chunk of a large file > 5mb', async function () { this.timeout(30 * 1000) const offset = 0 const length = 5 - const bytes = randomBytes(ONE_MEG * 6) - - waterfall([ - (cb) => addTestFile({ - file: bytes - }, cb), - (buf, cb) => cb(null, new CID(buf)), - (cid, cb) => pull( - exporter(cid, ipld, { - offset, - length - }), - pull.collect((err, files) => cb(err, { cid, files })) - ), - ({ cid, files: [ file ] }, cb) => { - expect(file).to.have.property('path', cid.toBaseEncodedString()) - - pull( - file.content, - pull.collect(cb) - ) - }, - ([ buf ], cb) => { - expect(buf).to.deep.equal(bytes.slice(offset, offset + length)) - cb() - } - ], done) + const bytes = Buffer.concat(await all(randomBytes(ONE_MEG * 6))) + + const cid = await addTestFile({ + file: bytes + }) + + const file = await exporter(cid, ipld) + expect(file).to.have.property('path', cid.toBaseEncodedString()) + + const data = Buffer.concat(await all(file.content({ + offset, + length + }))) + + expect(data).to.deep.equal(bytes.slice(offset, offset + length)) }) - it('exports the right chunks of files when offsets are specified', function (done) { + it('exports the right chunks of files when offsets are specified', async function () { this.timeout(30 * 1000) const offset = 3 const data = Buffer.alloc(300 * 1024) - addAndReadTestFile({ + const fileWithNoOffset = await addAndReadTestFile({ file: data, offset: 0 - }, (err, fileWithNoOffset) => { - expect(err).to.not.exist() - - addAndReadTestFile({ - file: data, - offset - }, (err, fileWithOffset) => { - expect(err).to.not.exist() - - expect(fileWithNoOffset.length).to.equal(data.length) - expect(fileWithNoOffset.length - fileWithOffset.length).to.equal(offset) - expect(fileWithOffset.length).to.equal(data.length - offset) - expect(fileWithNoOffset.length).to.equal(fileWithOffset.length + offset) + }) - done() - }) + const fileWithOffset = await addAndReadTestFile({ + file: data, + offset }) + + expect(fileWithNoOffset.length).to.equal(data.length) + expect(fileWithNoOffset.length - fileWithOffset.length).to.equal(offset) + expect(fileWithOffset.length).to.equal(data.length - offset) + expect(fileWithNoOffset.length).to.equal(fileWithOffset.length + offset) }) - it('exports a zero length chunk of a large file', function (done) { + it('exports a zero length chunk of a large file', async function () { this.timeout(30 * 1000) - addAndReadTestFile({ + const data = await addAndReadTestFile({ file: bigFile, path: '1.2MiB.txt', rawLeaves: true, length: 0 - }, (err, data) => { - expect(err).to.not.exist() - expect(data).to.eql(Buffer.alloc(0)) - done() }) - }) - it('exports a directory', function (done) { - waterfall([ - (cb) => pull( - pull.values([{ - path: './200Bytes.txt', - content: randomBytes(ONE_MEG) - }, { - path: './dir-another' - }, { - path: './level-1/200Bytes.txt', - content: randomBytes(ONE_MEG) - }, { - path: './level-1/level-2' - }]), - importer(ipld), - pull.collect(cb) - ), - (files, cb) => cb(null, files.pop().multihash), - (buf, cb) => cb(null, new CID(buf)), - (cid, cb) => pull( - exporter(cid, ipld), - pull.collect((err, files) => cb(err, { cid, files })) - ), - ({ cid, files }, cb) => { - files.forEach(file => expect(file).to.have.property('cid')) - - expect( - files.map((file) => file.path) - ).to.be.eql([ - cid.toBaseEncodedString(), - `${cid.toBaseEncodedString()}/200Bytes.txt`, - `${cid.toBaseEncodedString()}/dir-another`, - `${cid.toBaseEncodedString()}/level-1`, - `${cid.toBaseEncodedString()}/level-1/200Bytes.txt`, - `${cid.toBaseEncodedString()}/level-1/level-2` - ]) - - files - .filter(file => file.type === 'dir') - .forEach(dir => { - expect(dir).to.has.property('size', 0) - }) - - pull( - pull.values(files), - pull.map((file) => Boolean(file.content)), - pull.collect(cb) - ) - }, - (contents, cb) => { - expect(contents).to.be.eql([ - false, - true, - false, - false, - true, - false - ]) - cb() - } - ], done) - }) - - it('exports a directory one deep', function (done) { - waterfall([ - (cb) => pull( - pull.values([{ - path: './200Bytes.txt', - content: randomBytes(ONE_MEG) - }, { - path: './dir-another' - }, { - path: './level-1' - }]), - importer(ipld), - pull.collect(cb) - ), - (files, cb) => cb(null, files.pop().multihash), - (buf, cb) => cb(null, new CID(buf)), - (cid, cb) => pull( - exporter(cid, ipld), - pull.collect((err, files) => cb(err, { cid, files })) - ), - ({ cid, files }, cb) => { - files.forEach(file => expect(file).to.have.property('cid')) - - expect( - files.map((file) => file.path) - ).to.be.eql([ - cid.toBaseEncodedString(), - `${cid.toBaseEncodedString()}/200Bytes.txt`, - `${cid.toBaseEncodedString()}/dir-another`, - `${cid.toBaseEncodedString()}/level-1` - ]) - - pull( - pull.values(files), - pull.map((file) => Boolean(file.content)), - pull.collect(cb) - ) - }, - (contents, cb) => { - expect(contents).to.be.eql([ - false, - true, - false, - false - ]) - cb() - } - ], done) - }) + expect(data).to.eql(Buffer.alloc(0)) + }) + + it('exports a directory', async () => { + const importedDir = await last(importer([{ + path: './200Bytes.txt', + content: randomBytes(ONE_MEG) + }, { + path: './dir-another' + }, { + path: './level-1/200Bytes.txt', + content: randomBytes(ONE_MEG) + }, { + path: './level-1/level-2' + }], ipld)) + const dir = await exporter(importedDir.cid, ipld) + const files = await all(dir.content()) + + files.forEach(file => expect(file).to.have.property('cid')) + + expect( + files.map((file) => file.path) + ).to.be.eql([ + `${dir.cid.toBaseEncodedString()}/200Bytes.txt`, + `${dir.cid.toBaseEncodedString()}/dir-another`, + `${dir.cid.toBaseEncodedString()}/level-1` + ]) + + files + .filter(file => file.unixfs.type === 'dir') + .forEach(dir => { + expect(dir).to.has.property('size', 0) + }) - it('exports a small file imported with raw leaves', function (done) { + expect( + files + .map(file => file.unixfs.type === 'file') + ).to.deep.equal([ + true, + false, + false + ]) + }) + + it('exports a directory one deep', async () => { + const importedDir = await last(importer([{ + path: './200Bytes.txt', + content: randomBytes(ONE_MEG) + }, { + path: './dir-another' + }, { + path: './level-1' + }], ipld)) + + const dir = await exporter(importedDir.cid, ipld) + const files = await all(dir.content()) + + files.forEach(file => expect(file).to.have.property('cid')) + + expect( + files.map((file) => file.path) + ).to.be.eql([ + `${importedDir.cid.toBaseEncodedString()}/200Bytes.txt`, + `${importedDir.cid.toBaseEncodedString()}/dir-another`, + `${importedDir.cid.toBaseEncodedString()}/level-1` + ]) + + expect( + files + .map(file => file.unixfs.type === 'file') + ).to.deep.equal([ + true, + false, + false + ]) + }) + + it('exports a small file imported with raw leaves', async function () { this.timeout(30 * 1000) - addAndReadTestFile({ + const data = await addAndReadTestFile({ file: smallFile, path: '200Bytes.txt', rawLeaves: true - }, (err, data) => { - expect(err).to.not.exist() - expect(data).to.eql(smallFile) - done() }) + + expect(data).to.deep.equal(smallFile) }) - it('exports a chunk of a small file imported with raw leaves', function (done) { + it('exports a chunk of a small file imported with raw leaves', async function () { this.timeout(30 * 1000) const length = 100 - addAndReadTestFile({ + const data = await addAndReadTestFile({ file: smallFile, path: '200Bytes.txt', rawLeaves: true, length - }, (err, data) => { - expect(err).to.not.exist() - expect(data).to.eql(smallFile.slice(0, length)) - done() }) + + expect(data).to.eql(smallFile.slice(0, length)) }) - it('exports a chunk of a small file imported with raw leaves with length', function (done) { + it('exports a chunk of a small file imported with raw leaves with length', async function () { this.timeout(30 * 1000) const offset = 100 const length = 200 - addAndReadTestFile({ + const data = await addAndReadTestFile({ file: smallFile, path: '200Bytes.txt', rawLeaves: true, offset, length - }, (err, data) => { - expect(err).to.not.exist() - expect(data).to.eql(smallFile.slice(offset)) - done() }) + + expect(data).to.eql(smallFile.slice(offset)) }) - it('exports a zero length chunk of a small file imported with raw leaves', function (done) { + it('exports a zero length chunk of a small file imported with raw leaves', async function () { this.timeout(30 * 1000) const length = 0 - addAndReadTestFile({ + const data = await addAndReadTestFile({ file: smallFile, path: '200Bytes.txt', rawLeaves: true, length - }, (err, data) => { - expect(err).to.not.exist() - expect(data).to.eql(Buffer.alloc(0)) - done() }) + + expect(data).to.eql(Buffer.alloc(0)) }) - it('errors when exporting a chunk of a small file imported with raw leaves and negative length', function (done) { + it('errors when exporting a chunk of a small file imported with raw leaves and negative length', async function () { this.timeout(30 * 1000) const length = -100 - addAndReadTestFile({ - file: smallFile, - path: '200Bytes.txt', - rawLeaves: true, - length - }, (err, data) => { - expect(err).to.exist() + try { + await addAndReadTestFile({ + file: smallFile, + path: '200Bytes.txt', + rawLeaves: true, + length + }) + throw new Error('Should not have got this far') + } catch (err) { expect(err.message).to.equal('Length must be greater than or equal to 0') - done() - }) + expect(err.code).to.equal('ERR_INVALID_PARAMS') + } }) - it('errors when exporting a chunk of a small file imported with raw leaves and negative offset', function (done) { + it('errors when exporting a chunk of a small file imported with raw leaves and negative offset', async function () { this.timeout(30 * 1000) const offset = -100 - addAndReadTestFile({ - file: smallFile, - path: '200Bytes.txt', - rawLeaves: true, - offset - }, (err, data) => { - expect(err).to.exist() + try { + await addAndReadTestFile({ + file: smallFile, + path: '200Bytes.txt', + rawLeaves: true, + offset + }) + throw new Error('Should not have got this far') + } catch (err) { expect(err.message).to.equal('Offset must be greater than or equal to 0') - done() - }) + expect(err.code).to.equal('ERR_INVALID_PARAMS') + } }) - it('errors when exporting a chunk of a small file imported with raw leaves and offset greater than file size', function (done) { + it('errors when exporting a chunk of a small file imported with raw leaves and offset greater than file size', async function () { this.timeout(30 * 1000) const offset = 201 - addAndReadTestFile({ - file: smallFile, - path: '200Bytes.txt', - rawLeaves: true, - offset - }, (err, data) => { - expect(err).to.exist() + try { + await addAndReadTestFile({ + file: smallFile, + path: '200Bytes.txt', + rawLeaves: true, + offset + }) + throw new Error('Should not have got this far') + } catch (err) { expect(err.message).to.equal('Offset must be less than the file size') - done() - }) + expect(err.code).to.equal('ERR_INVALID_PARAMS') + } }) - it('exports a large file > 1mb imported with raw leaves', function (done) { - waterfall([ - (cb) => pull( - pull.values([{ - path: '1.2MiB.txt', - content: pull.values([bigFile]) - }]), - importer(ipld, { - rawLeaves: true - }), - pull.collect(cb) - ), - (files, cb) => { - expect(files.length).to.equal(1) - - pull( - exporter(files[0].multihash, ipld), - pull.collect(cb) - ) - }, - (files, cb) => { - fileEql(files[0], bigFile, done) - } - ], done) - }) + it('exports a large file > 1mb imported with raw leaves', async () => { + const imported = await first(importer([{ + path: '1.2MiB.txt', + content: bigFile + }], ipld, { + rawLeaves: true + })) - it('returns an empty stream for dir', (done) => { - const hash = 'QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn' + const file = await exporter(imported.cid, ipld) + const data = Buffer.concat(await all(file.content())) - pull( - exporter(hash, ipld), - pull.collect((err, files) => { - expect(err).to.not.exist() - expect(files[0].content).to.not.exist() - done() - }) - ) + expect(data).to.deep.equal(bigFile) + }) + + it('returns an empty stream for dir', async () => { + const imported = await first(importer([{ + path: 'empty' + }], ipld)) + const dir = await exporter(imported.cid, ipld) + const files = await all(dir.content()) + expect(files.length).to.equal(0) }) - it('reads bytes with an offset', (done) => { - addAndReadTestFile({ + it('reads bytes with an offset', async () => { + const data = await addAndReadTestFile({ file: Buffer.from([0, 1, 2, 3]), offset: 1 - }, (error, data) => { - expect(error).to.not.exist() - expect(data).to.deep.equal(Buffer.from([1, 2, 3])) - - done() }) - }) - it('reads bytes with a negative offset', (done) => { - addAndReadTestFile({ - file: Buffer.from([0, 1, 2, 3]), - offset: -1 - }, (error, data) => { - expect(error).to.be.ok() - expect(error.message).to.contain('Offset must be greater than or equal to 0') + expect(data).to.deep.equal(Buffer.from([1, 2, 3])) + }) - done() - }) + it('errors when reading bytes with a negative offset', async () => { + try { + await addAndReadTestFile({ + file: Buffer.from([0, 1, 2, 3]), + offset: -1 + }) + throw new Error('Should not have got this far') + } catch (err) { + expect(err.message).to.contain('Offset must be greater than or equal to 0') + expect(err.code).to.equal('ERR_INVALID_PARAMS') + } }) - it('reads bytes with an offset and a length', (done) => { - addAndReadTestFile({ + it('reads bytes with an offset and a length', async () => { + const data = await addAndReadTestFile({ file: Buffer.from([0, 1, 2, 3]), offset: 0, length: 1 - }, (error, data) => { - expect(error).to.not.exist() - expect(data).to.deep.equal(Buffer.from([0])) - - done() }) + + expect(data).to.deep.equal(Buffer.from([0])) }) - it('reads bytes with a negative length', (done) => { - addAndReadTestFile({ - file: Buffer.from([0, 1, 2, 3, 4]), - offset: 2, - length: -1 - }, (error, data) => { - expect(error).to.be.ok() - expect(error.message).to.contain('Length must be greater than or equal to 0') + it('reads returns an empty buffer when offset is equal to the file size', async () => { + const data = await addAndReadTestFile({ + file: Buffer.from([0, 1, 2, 3]), + offset: 4 + }) - done() + expect(data).to.be.empty() + }) + + it('reads returns an empty buffer when length is zero', async () => { + const data = await addAndReadTestFile({ + file: Buffer.from([0, 1, 2, 3]), + length: 0 }) + + expect(data).to.be.empty() + }) + + it('errors when reading bytes with a negative length', async () => { + try { + await addAndReadTestFile({ + file: Buffer.from([0, 1, 2, 3, 4]), + offset: 2, + length: -1 + }) + } catch (err) { + expect(err.message).to.contain('Length must be greater than or equal to 0') + expect(err.code).to.equal('ERR_INVALID_PARAMS') + } + }) + + it('errors when reading bytes that start after the file ends', async () => { + try { + await addAndReadTestFile({ + file: Buffer.from([0, 1, 2, 3, 4]), + offset: 200 + }) + } catch (err) { + expect(err.message).to.contain('Offset must be less than the file size') + expect(err.code).to.equal('ERR_INVALID_PARAMS') + } }) - it('reads bytes with an offset and a length', (done) => { - addAndReadTestFile({ + it('reads bytes with an offset and a length', async () => { + const data = await addAndReadTestFile({ file: Buffer.from([0, 1, 2, 3, 4]), offset: 1, length: 4 - }, (error, data) => { - expect(error).to.not.exist() - expect(data).to.deep.equal(Buffer.from([1, 2, 3, 4])) - - done() }) + + expect(data).to.deep.equal(Buffer.from([1, 2, 3, 4])) }) - it('reads files that are split across lots of nodes', function (done) { + it('reads files that are split across lots of nodes', async function () { this.timeout(30 * 1000) - addAndReadTestFile({ + const data = await addAndReadTestFile({ file: bigFile, offset: 0, length: bigFile.length, maxChunkSize: 1024 - }, (error, data) => { - expect(error).to.not.exist() - expect(data).to.deep.equal(bigFile) - - done() }) + + expect(data).to.deep.equal(bigFile) }) - it('reads files in multiple steps that are split across lots of nodes in really small chunks', function (done) { + it('reads files in multiple steps that are split across lots of nodes in really small chunks', async function () { this.timeout(600 * 1000) let results = [] let chunkSize = 1024 let offset = 0 - addTestFile({ + const cid = await addTestFile({ file: bigFile, maxChunkSize: 1024 - }, (error, multihash) => { - expect(error).to.not.exist() - - doUntil( - (cb) => { - waterfall([ - (next) => { - pull( - exporter(multihash, ipld, { - offset, - length: chunkSize - }), - pull.collect(next) - ) - }, - (files, next) => readFile(files[0], next) - ], cb) - }, - (result) => { - results.push(result) - - offset += result.length - - return offset >= bigFile.length - }, - (error) => { - expect(error).to.not.exist() - - const buffer = Buffer.concat(results) - - expect(buffer).to.deep.equal(bigFile) - - done() - } - ) }) - }) + const file = await exporter(cid, ipld) - it('reads bytes with an offset and a length that span blocks using balanced layout', (done) => { - checkBytesThatSpanBlocks('balanced', done) - }) + while (offset < bigFile.length) { + const result = Buffer.concat(await all(file.content({ + offset, + length: chunkSize + }))) + results.push(result) - it('reads bytes with an offset and a length that span blocks using flat layout', (done) => { - checkBytesThatSpanBlocks('flat', done) + offset += result.length + } + + const buffer = Buffer.concat(results) + + expect(buffer).to.deep.equal(bigFile) }) - it('reads bytes with an offset and a length that span blocks using trickle layout', (done) => { - checkBytesThatSpanBlocks('trickle', done) + it('reads bytes with an offset and a length that span blocks using balanced layout', async () => { + await checkBytesThatSpanBlocks('balanced') }) - it('exports a directory containing an empty file whose content gets turned into a ReadableStream', function (done) { - if (!isNode) { - return this.skip() - } + it('reads bytes with an offset and a length that span blocks using flat layout', async () => { + await checkBytesThatSpanBlocks('flat') + }) - // replicates the behaviour of ipfs.files.get - waterfall([ - (cb) => addTestDirectory({ - directory: path.join(__dirname, 'fixtures', 'dir-with-empty-files') - }, cb), - (result, cb) => { - const dir = result.pop() - - pull( - exporter(dir.multihash, ipld), - pull.map((file) => { - if (file.content) { - file.content = toStream.source(file.content) - file.content.pause() - } - - return file - }), - pull.collect((error, files) => { - if (error) { - return cb(error) - } - - series( - files - .filter(file => Boolean(file.content)) - .map(file => { - return (done) => { - if (file.content) { - file.content - .pipe(toStream.sink(pull.collect((error, bufs) => { - expect(error).to.not.exist() - expect(bufs.length).to.equal(1) - expect(bufs[0].length).to.equal(0) - - done() - }))) - } - } - }), - cb - ) - }) - ) - } - ], done) + it('reads bytes with an offset and a length that span blocks using trickle layout', async () => { + await checkBytesThatSpanBlocks('trickle') }) - it('fails on non existent hash', (done) => { + it('fails on non existent hash', async () => { // This hash doesn't exist in the repo - const hash = 'QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKj3' + const hash = 'bafybeidu2qqwriogfndznz32swi5r4p2wruf6ztu5k7my53tsezwhncs5y' - pull( - exporter(hash, ipld), - pull.collect((err, files) => { - expect(err).to.exist() - done() - }) - ) + try { + await exporter(hash, ipld) + } catch (err) { + expect(err.code).to.equal('ERR_NOT_FOUND') + } }) - it('exports file with data on internal and leaf nodes', function (done) { - waterfall([ - (cb) => createAndPersistNode(ipld, 'raw', [0x04, 0x05, 0x06, 0x07], [], cb), - (leaf, cb) => createAndPersistNode(ipld, 'file', [0x00, 0x01, 0x02, 0x03], [ - leaf - ], cb), - (file, cb) => { - pull( - exporter(file.cid, ipld), - pull.asyncMap((file, cb) => readFile(file, cb)), - pull.through(buffer => { - expect(buffer).to.deep.equal(Buffer.from([0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07])) - }), - pull.collect(cb) - ) - } - ], done) + it('exports file with data on internal and leaf nodes', async () => { + const leaf = await createAndPersistNode(ipld, 'raw', [0x04, 0x05, 0x06, 0x07], []) + const node = await createAndPersistNode(ipld, 'file', [0x00, 0x01, 0x02, 0x03], [ + leaf + ]) + + const file = await exporter(node.cid, ipld) + const data = Buffer.concat(await all(file.content())) + + expect(data).to.deep.equal(Buffer.from([0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07])) }) - it('exports file with data on some internal and leaf nodes', function (done) { + it('exports file with data on some internal and leaf nodes', async () => { // create a file node with three children: // where: // i = internal node without data @@ -969,197 +703,198 @@ describe('exporter', () => { // l d i // | \ // l l - waterfall([ - (cb) => { - // create leaves - parallel([ - (next) => createAndPersistNode(ipld, 'raw', [0x00, 0x01, 0x02, 0x03], [], next), - (next) => createAndPersistNode(ipld, 'raw', [0x08, 0x09, 0x10, 0x11], [], next), - (next) => createAndPersistNode(ipld, 'raw', [0x12, 0x13, 0x14, 0x15], [], next) - ], cb) - }, - (leaves, cb) => { - parallel([ - (next) => createAndPersistNode(ipld, 'raw', [0x04, 0x05, 0x06, 0x07], [leaves[1]], next), - (next) => createAndPersistNode(ipld, 'raw', null, [leaves[2]], next) - ], (error, internalNodes) => { - if (error) { - return cb(error) - } - - createAndPersistNode(ipld, 'file', null, [ - leaves[0], - internalNodes[0], - internalNodes[1] - ], cb) - }) - }, - (file, cb) => { - pull( - exporter(file.cid, ipld), - pull.asyncMap((file, cb) => readFile(file, cb)), - pull.through(buffer => { - expect(buffer).to.deep.equal( - Buffer.from([ - 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, - 0x08, 0x09, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15 - ]) - ) - }), - pull.collect(cb) - ) - } - ], done) + const leaves = await Promise.all([ + createAndPersistNode(ipld, 'raw', [0x00, 0x01, 0x02, 0x03], []), + createAndPersistNode(ipld, 'raw', [0x08, 0x09, 0x10, 0x11], []), + createAndPersistNode(ipld, 'raw', [0x12, 0x13, 0x14, 0x15], []) + ]) + + const internalNodes = await Promise.all([ + createAndPersistNode(ipld, 'raw', [0x04, 0x05, 0x06, 0x07], [leaves[1]]), + createAndPersistNode(ipld, 'raw', null, [leaves[2]]) + ]) + + const node = await createAndPersistNode(ipld, 'file', null, [ + leaves[0], + internalNodes[0], + internalNodes[1] + ]) + + const file = await exporter(node.cid, ipld) + const data = Buffer.concat(await all(file.content())) + + expect(data).to.deep.equal( + Buffer.from([ + 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, + 0x08, 0x09, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15 + ]) + ) }) - it('exports file with data on internal and leaf nodes with an offset that only fetches data from leaf nodes', function (done) { - waterfall([ - (cb) => createAndPersistNode(ipld, 'raw', [0x04, 0x05, 0x06, 0x07], [], cb), - (leaf, cb) => createAndPersistNode(ipld, 'file', [0x00, 0x01, 0x02, 0x03], [ - leaf - ], cb), - (file, cb) => { - pull( - exporter(file.cid, ipld, { - offset: 4 - }), - pull.asyncMap((file, cb) => readFile(file, cb)), - pull.through(buffer => { - expect(buffer).to.deep.equal(Buffer.from([0x04, 0x05, 0x06, 0x07])) - }), - pull.collect(cb) - ) - } - ], done) + it('exports file with data on internal and leaf nodes with an offset that only fetches data from leaf nodes', async () => { + const leaf = await createAndPersistNode(ipld, 'raw', [0x04, 0x05, 0x06, 0x07], []) + const node = await createAndPersistNode(ipld, 'file', [0x00, 0x01, 0x02, 0x03], [ + leaf + ]) + + const file = await exporter(node.cid, ipld) + const data = Buffer.concat(await all(file.content({ + offset: 4 + }))) + + expect(data).to.deep.equal(Buffer.from([0x04, 0x05, 0x06, 0x07])) }) - it('exports file with data on leaf nodes without emitting empty buffers', function (done) { + it('exports file with data on leaf nodes without emitting empty buffers', async function () { this.timeout(30 * 1000) - pull( - pull.values([{ - path: '200Bytes.txt', - content: pull.values([bigFile]) - }]), - importer(ipld, { - rawLeaves: true - }), - pull.collect(collected) - ) + const imported = await first(importer([{ + path: '200Bytes.txt', + content: bigFile + }], ipld, { + rawLeaves: true + })) - function collected (err, files) { - expect(err).to.not.exist() - expect(files.length).to.equal(1) - - pull( - exporter(files[0].multihash, ipld), - pull.collect((err, files) => { - expect(err).to.not.exist() - expect(files.length).to.equal(1) - - pull( - files[0].content, - pull.collect((error, buffers) => { - expect(error).to.not.exist() - - buffers.forEach(buffer => { - expect(buffer.length).to.not.equal(0) - }) - - done() - }) - ) - }) - ) - } + const file = await exporter(imported.cid, ipld) + const buffers = await all(file.content()) + + buffers.forEach(buffer => { + expect(buffer.length).to.not.equal(0) + }) }) - it('exports a raw leaf', (done) => { - pull( - pull.values([{ - path: '200Bytes.txt', - content: pull.values([smallFile]) - }]), - importer(ipld, { - rawLeaves: true - }), - pull.collect(collected) - ) + it('exports a raw leaf', async () => { + const imported = await first(importer([{ + path: '200Bytes.txt', + content: smallFile + }], ipld, { + rawLeaves: true + })) - function collected (err, files) { - expect(err).to.not.exist() - expect(files.length).to.equal(1) - - pull( - exporter(files[0].multihash, ipld), - pull.collect((err, files) => { - expect(err).to.not.exist() - expect(files.length).to.equal(1) - expect(CID.isCID(files[0].cid)).to.be.true() - fileEql(files[0], smallFile, done) - }) - ) + const file = await exporter(imported.cid, ipld) + expect(CID.isCID(file.cid)).to.be.true() + + const data = Buffer.concat(await all(file.content())) + expect(data).to.deep.equal(smallFile) + }) + + it('errors when exporting a non-existent key from a cbor node', async () => { + const cborNodeCid = await ipld.put({ + foo: 'bar' + }, mc.DAG_CBOR) + + try { + await exporter(`${cborNodeCid.toBaseEncodedString()}/baz`, ipld) + } catch (err) { + expect(err.code).to.equal('ERR_NO_PROP') } }) -}) -function fileEql (actual, expected, done) { - readFile(actual, (error, data) => { - if (error) { - return done(error) + it('exports a cbor node', async () => { + const node = { + foo: 'bar' } + const cborNodeCid = await ipld.put(node, mc.DAG_CBOR) + const exported = await exporter(`${cborNodeCid.toBaseEncodedString()}`, ipld) + + expect(exported.node).to.deep.equal(node) + }) + + it('errors when exporting a node with no resolver', async () => { + const cid = new CID(1, 'git-raw', new CID('zdj7WkRPAX9o9nb9zPbXzwG7JEs78uyhwbUs8JSUayB98DWWY').multihash) + try { - if (expected) { - expect(data).to.eql(expected) - } else { - expect(data).to.exist() - } + await exporter(`${cid.toBaseEncodedString()}`, ipld) } catch (err) { - return done(err) + expect(err.code).to.equal('ERR_NO_RESOLVER') } + }) + + it('errors if we try to export links from inside a raw node', async () => { + const cid = await ipld.put(Buffer.from([0, 1, 2, 3, 4]), mc.RAW) - done() + try { + await exporter(`${cid.toBaseEncodedString()}/lol`, ipld) + } catch (err) { + expect(err.code).to.equal('ERR_NOT_FOUND') + } }) -} - -function readFile (file, done) { - pull( - file.content, - pull.collect((error, data) => { - if (error) { - return done(error) - } - done(null, Buffer.concat(data)) - }) - ) -} + it('errors we export a non-unixfs dag-pb node', async () => { + const cid = await ipld.put(await DAGNode.create(Buffer.from([0, 1, 2, 3, 4])), mc.DAG_PB) + + try { + await exporter(cid, ipld) + } catch (err) { + expect(err.code).to.equal('ERR_NOT_UNIXFS') + } + }) -function createAndPersistNode (ipld, type, data, children, callback) { - const file = new UnixFS(type, data ? Buffer.from(data) : undefined) - const links = [] + it('errors we export a unixfs node that has a non-unixfs/dag-pb child', async () => { + const cborNodeCid = await ipld.put({ + foo: 'bar' + }, mc.DAG_CBOR) - children.forEach(child => { - const leaf = UnixFS.unmarshal(child.node.data) + const file = new UnixFS('file') + file.addBlockSize(100) - file.addBlockSize(leaf.fileSize()) + const cid = await ipld.put(await DAGNode.create(file.marshal(), [ + new DAGLink('', 100, cborNodeCid) + ]), mc.DAG_PB) - links.push(new DAGLink('', child.node.size, child.cid)) - }) + const exported = await exporter(cid, ipld) - DAGNode.create(file.marshal(), links, (error, node) => { - if (error) { - return callback(error) + try { + await all(exported.content()) + } catch (err) { + expect(err.code).to.equal('ERR_NOT_UNIXFS') } + }) - ipld.put(node, { - version: 1, - hashAlg: 'sha2-256', - format: 'dag-pb' - }, (error, cid) => callback(error, { - node, - cid - })) + it('exports a node with depth', async () => { + const imported = await all(importer([{ + path: '/foo/bar/baz.txt', + content: Buffer.from('hello world') + }], ipld)) + + const exported = await exporter(imported[0].cid, ipld) + + expect(exported.depth).to.equal(0) + }) + + it('exports a node recursively with depth', async () => { + const dir = await last(importer([{ + path: '/foo/bar/baz.txt', + content: Buffer.from('hello world') + }, { + path: '/foo/qux.txt', + content: Buffer.from('hello world') + }, { + path: '/foo/bar/quux.txt', + content: Buffer.from('hello world') + }], ipld)) + + const exported = await all(exporter.recursive(dir.cid, ipld)) + const dirCid = dir.cid.toBaseEncodedString() + + expect(exported[0].depth).to.equal(0) + expect(exported[0].name).to.equal(dirCid) + + expect(exported[1].depth).to.equal(1) + expect(exported[1].name).to.equal('bar') + expect(exported[1].path).to.equal(`${dirCid}/bar`) + + expect(exported[2].depth).to.equal(2) + expect(exported[2].name).to.equal('baz.txt') + expect(exported[2].path).to.equal(`${dirCid}/bar/baz.txt`) + + expect(exported[3].depth).to.equal(2) + expect(exported[3].name).to.equal('quux.txt') + expect(exported[3].path).to.equal(`${dirCid}/bar/quux.txt`) + + expect(exported[4].depth).to.equal(1) + expect(exported[4].name).to.equal('qux.txt') + expect(exported[4].path).to.equal(`${dirCid}/qux.txt`) }) -} +}) diff --git a/test/helpers/create-shard.js b/test/helpers/create-shard.js new file mode 100644 index 00000000..8cea7702 --- /dev/null +++ b/test/helpers/create-shard.js @@ -0,0 +1,33 @@ +'use strict' + +const importer = require('ipfs-unixfs-importer') + +const SHARD_SPLIT_THRESHOLD = 10 + +const createShard = (numFiles, ipld) => { + return createShardWithFileNames(numFiles, (index) => `file-${index}`, ipld) +} + +const createShardWithFileNames = (numFiles, fileName, ipld) => { + const files = new Array(numFiles).fill(0).map((_, index) => ({ + path: fileName(index), + content: Buffer.from([0, 1, 2, 3, 4, index]) + })) + + return createShardWithFiles(files, ipld) +} + +const createShardWithFiles = async (files, ipld) => { + let last + + for await (const imported of importer(ipld, files, { + shardSplitThreshold: SHARD_SPLIT_THRESHOLD, + wrap: true + })) { + last = imported + } + + return last.cid +} + +module.exports = createShard diff --git a/test/helpers/random-bytes.js b/test/helpers/random-bytes.js deleted file mode 100644 index 1c10604a..00000000 --- a/test/helpers/random-bytes.js +++ /dev/null @@ -1,22 +0,0 @@ -'use strict' - -const crypto = require('crypto') -const MAX_BYTES = 65536 - -// One day this will be merged: https://github.com/crypto-browserify/randombytes/pull/16 -module.exports = function randomBytes (num) { - num = parseInt(num) - const bytes = Buffer.allocUnsafe(num) - - for (let offset = 0; offset < num; offset += MAX_BYTES) { - let size = MAX_BYTES - - if ((offset + size) > num) { - size = num - offset - } - - crypto.randomFillSync(bytes, offset, size) - } - - return bytes -}