diff --git a/package.json b/package.json index dcd9edfceb..3306250b31 100644 --- a/package.json +++ b/package.json @@ -93,15 +93,17 @@ "async": "^2.5.0", "bl": "^1.2.1", "boom": "^5.2.0", - "debug": "^3.0.1", "cids": "^0.5.1", + "debug": "^3.0.1", "file-type": "^6.1.0", "filesize": "^3.5.10", + "fnv1a": "^1.0.1", "fsm-event": "^2.1.0", "glob": "^7.1.2", "hapi": "^16.5.2", "hapi-set-header": "^1.0.2", "hoek": "^4.2.0", + "interface-datastore": "^0.3.0", "ipfs-api": "^14.3.5", "ipfs-bitswap": "~0.17.2", "ipfs-block": "~0.6.0", @@ -126,6 +128,7 @@ "libp2p-websockets": "~0.10.1", "lodash.flatmap": "^4.5.0", "lodash.get": "^4.4.2", + "lodash.set": "^4.3.0", "lodash.sortby": "^4.7.0", "lodash.values": "^4.3.0", "mafmt": "^3.0.1", @@ -139,6 +142,7 @@ "peer-id": "~0.10.1", "peer-info": "~0.11.0", "promisify-es6": "^1.0.3", + "protocol-buffers": "^3.2.1", "pull-file": "^1.0.0", "pull-paramap": "^1.2.2", "pull-pushable": "^2.1.1", @@ -216,4 +220,4 @@ "Łukasz Magiera ", "ᴠɪᴄᴛᴏʀ ʙᴊᴇʟᴋʜᴏʟᴍ " ] -} \ No newline at end of file +} diff --git a/src/cli/commands/pin.js b/src/cli/commands/pin.js new file mode 100644 index 0000000000..d7a68fb023 --- /dev/null +++ b/src/cli/commands/pin.js @@ -0,0 +1,15 @@ +'use strict' + +module.exports = { + command: 'pin', + + description: 'Pin and unpin objects to local storage.', + + builder (yargs) { + return yargs + .commandDir('pin') + }, + + handler (argv) { + } +} diff --git a/src/cli/commands/pin/add.js b/src/cli/commands/pin/add.js new file mode 100644 index 0000000000..ac7b90ae92 --- /dev/null +++ b/src/cli/commands/pin/add.js @@ -0,0 +1,28 @@ +'use strict' + +module.exports = { + command: 'add ', + + describe: 'Pins objects to local storage.', + + builder: { + recursive: { + type: 'boolean', + alias: 'r', + default: true, + describe: 'Recursively pin the object linked to by the specified object(s).' + } + }, + + handler (argv) { + const paths = argv['ipfs-path'].split(' ') + const recursive = argv.recursive + const type = recursive ? 'recursive' : 'direct' + argv.ipfs.pin.add(paths[0], { recursive }, (err, results) => { + if (err) { throw err } + results.forEach((res) => { + console.log(`pinned ${res.hash} ${type}ly`) + }) + }) + } +} diff --git a/src/cli/commands/pin/ls.js b/src/cli/commands/pin/ls.js new file mode 100644 index 0000000000..9a855d56b8 --- /dev/null +++ b/src/cli/commands/pin/ls.js @@ -0,0 +1,43 @@ +'use strict' + +module.exports = { + command: 'ls', + + describe: 'List objects pinned to local storage.', + + builder: { + path: { + type: 'string', + describe: 'List pinned state of specific .' + }, + type: { + type: 'string', + alias: 't', + default: 'all', + describe: ('The type of pinned keys to list. ' + + 'Can be "direct", "indirect", "recursive", or "all".') + }, + quiet: { + type: 'boolean', + alias: 'q', + default: false, + describe: 'Write just hashes of objects.' + } + }, + + handler: (argv) => { + const paths = argv.path && argv.path.split(' ') + const type = argv.type + const quiet = argv.quiet + argv.ipfs.pin.ls(paths, { type }, (err, results) => { + if (err) { throw err } + results.forEach((res) => { + let line = res.hash + if (!quiet) { + line += ` ${res.type}` + } + console.log(line) + }) + }) + } +} diff --git a/src/cli/commands/pin/rm.js b/src/cli/commands/pin/rm.js new file mode 100644 index 0000000000..e0a0dab148 --- /dev/null +++ b/src/cli/commands/pin/rm.js @@ -0,0 +1,35 @@ +'use strict' + +const utils = require('../../utils') +const debug = require('debug') +const log = debug('cli:pin') +log.error = debug('cli:pin:error') + +module.exports = { + command: 'rm ', + + describe: 'Removes the pinned object from local storage.', + + builder: { + recursive: { + type: 'boolean', + alias: 'r', + default: true, + describe: 'Recursively unpin the objects linked to by the specified object(s).' + } + }, + + handler: (argv) => { + const paths = argv['ipfs-path'].split(' ') + const recursive = argv.recursive + utils.getIPFS((err, ipfs) => { + if (err) { throw err } + ipfs.pin.rm(paths, { recursive }, (err, results) => { + if (err) { throw err } + results.forEach((res) => { + console.log(`unpinned ${res.hash}`) + }) + }) + }) + } +} diff --git a/src/core/boot.js b/src/core/boot.js index 5ba2e3b5b6..960127afaa 100644 --- a/src/core/boot.js +++ b/src/core/boot.js @@ -28,6 +28,7 @@ module.exports = (self) => { series([ (cb) => self._repo.open(cb), + (cb) => self.pin.load(cb), (cb) => self.preStart(cb), (cb) => { self.state.initialized() diff --git a/src/core/components/files.js b/src/core/components/files.js index 6a9f5fbced..8cab30b759 100644 --- a/src/core/components/files.js +++ b/src/core/components/files.js @@ -74,6 +74,11 @@ module.exports = function files (self) { if (a.path > b.path) return -1 return 0 }), + pull.asyncMap((file, cb) => { + self.pin.add(file.hash, (err) => { + cb(err, file) + }) + }), pull.collect(callback) ) }), diff --git a/src/core/components/index.js b/src/core/components/index.js index e4fb45464f..fb86d1f6d8 100644 --- a/src/core/components/index.js +++ b/src/core/components/index.js @@ -16,6 +16,7 @@ exports.dag = require('./dag') exports.libp2p = require('./libp2p') exports.swarm = require('./swarm') exports.ping = require('./ping') +exports.pin = require('./pin') exports.files = require('./files') exports.bitswap = require('./bitswap') exports.pubsub = require('./pubsub') diff --git a/src/core/components/key-set.js b/src/core/components/key-set.js new file mode 100644 index 0000000000..7c322b609e --- /dev/null +++ b/src/core/components/key-set.js @@ -0,0 +1,31 @@ +'use strict' + +const multihashes = require('multihashes') + +module.exports = function KeySet (keys) { + // Buffers with identical data are still different objects, so + // they need to be cast to strings to prevent duplicates in Sets + this.keys = {} + this.add = (key) => { + this.keys[multihashes.toB58String(key)] = key + } + this.delete = (key) => { + delete this.keys[multihashes.toB58String(key)] + } + this.clear = () => { + this.keys = {} + } + this.has = (key) => { + return (multihashes.toB58String(key) in this.keys) + } + this.toArray = () => { + return Object.keys(this.keys).map((hash) => { + return this.keys[hash] + }) + } + this.toStringArray = () => { + return Object.keys(this.keys) + } + keys = keys || [] + keys.forEach(this.add) +} diff --git a/src/core/components/pin-set.js b/src/core/components/pin-set.js new file mode 100644 index 0000000000..db91428833 --- /dev/null +++ b/src/core/components/pin-set.js @@ -0,0 +1,277 @@ +'use strict' + +const multihashes = require('multihashes') +const CID = require('cids') +const protobuf = require('protocol-buffers') +const crypto = require('crypto') +const fnv1a = require('fnv1a') +const dagPB = require('ipld-dag-pb') +const DAGNode = dagPB.DAGNode +const DAGLink = dagPB.DAGLink +const varint = require('varint') +const once = require('once') + +const emptyKeyHash = 'QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n' +const emptyKey = multihashes.fromB58String(emptyKeyHash) +const defaultFanout = 256 +const maxItems = 8192 + +// Protobuf interface +const pbSchema = ( + // from go-ipfs/pin/internal/pb/header.proto + 'message Set { ' + + // 1 for now + 'optional uint32 version = 1; ' + + // how many of the links are subtrees + 'optional uint32 fanout = 2; ' + + // hash seed for subtree selection, a random number + 'optional fixed32 seed = 3; ' + + '}' +) +const pb = protobuf(pbSchema) +function readHeader (rootNode) { + // rootNode.data should be a buffer of the format: + // < varint(headerLength) | header | itemData... > + const rootData = rootNode.data + const hdrLength = varint.decode(rootData) + const vBytes = varint.decode.bytes + if (vBytes <= 0) { + return { err: 'Invalid Set header length' } + } + if (vBytes + hdrLength > rootData.length) { + return { err: 'Impossibly large set header length' } + } + const hdrSlice = rootData.slice(vBytes, hdrLength + vBytes) + const header = pb.Set.decode(hdrSlice) + if (header.version !== 1) { + return { err: 'Unsupported Set version: ' + header.version } + } + if (header.fanout > rootNode.links.length) { + return { err: 'Impossibly large fanout' } + } + return { + header: header, + data: rootData.slice(hdrLength + vBytes) + } +} + +exports = module.exports = function (dag) { + const pinSet = { + // should this be part of `object` API? + hasChild: (root, childhash, callback, _links, _checked, _seen) => { + // callback (err, has) + callback = once(callback) + if (callback.called) { return } + if (typeof childhash === 'object') { + childhash = multihashes.toB58String(childhash) + } + _links = _links || root.links.length + _checked = _checked || 0 + _seen = _seen || {} + + if (!root.links.length && _links === _checked) { + // all nodes have been checked + return callback(null, false) + } + root.links.forEach((link) => { + const bs58link = multihashes.toB58String(link.multihash) + if (bs58link === childhash) { + return callback(null, true) + } + dag.get(new CID(link.multihash), (err, res) => { + if (err) { + return callback(err) + } + // don't check the same links twice + if (bs58link in _seen) { return } + _seen[bs58link] = true + + _checked++ + _links += res.value.links.length + pinSet.hasChild(res.value, childhash, callback, _links, _checked, _seen) + }) + }) + }, + + storeSet: (keys, logInternalKey, callback) => { + // callback (err, rootNode) + callback = once(callback) + const items = keys.map((key) => { + return { + key: key, + data: null + } + }) + pinSet.storeItems(items, logInternalKey, (err, rootNode) => { + if (err) { return callback(err) } + const opts = { cid: new CID(rootNode.multihash) } + dag.put(rootNode, opts, (err, cid) => { + if (err) { return callback(err) } + logInternalKey(rootNode.multihash) + callback(null, rootNode) + }) + }) + }, + + storeItems: (items, logInternalKey, callback, _subcalls, _done) => { + // callback (err, rootNode) + callback = once(callback) + const seed = crypto.randomBytes(4).readUInt32LE(0, true) + const pbHeader = pb.Set.encode({ + version: 1, + fanout: defaultFanout, + seed: seed + }) + let rootData = Buffer.concat([ + new Buffer(varint.encode(pbHeader.length)), pbHeader + ]) + let rootLinks = [] + for (let i = 0; i < defaultFanout; i++) { + rootLinks.push(new DAGLink('', 1, emptyKey)) + } + logInternalKey(emptyKey) + + if (items.length <= maxItems) { + // the items will fit in a single root node + const itemLinks = [] + const itemData = [] + const indices = [] + for (let i = 0; i < items.length; i++) { + itemLinks.push(new DAGLink('', 1, items[i].key)) + itemData.push(items[i].data || new Buffer(0)) + indices.push(i) + } + indices.sort((a, b) => { + const x = Buffer.compare(itemLinks[a].multihash, itemLinks[b].multihash) + if (x) { return x } + return (a < b ? -1 : 1) + }) + const sortedLinks = indices.map((i) => { return itemLinks[i] }) + const sortedData = indices.map((i) => { return itemData[i] }) + rootLinks = rootLinks.concat(sortedLinks) + rootData = Buffer.concat([rootData].concat(sortedData)) + DAGNode.create(rootData, rootLinks, (err, rootNode) => { + if (err) { return callback(err) } + return callback(null, rootNode) + }) + } else { + // need to split up the items into multiple root nodes + // (using go-ipfs "wasteful but simple" approach for consistency) + _subcalls = _subcalls || 0 + _done = _done || 0 + const hashed = {} + const hashFn = (seed, key) => { + const buf = new Buffer(4) + buf.writeUInt32LE(seed, 0) + const data = Buffer.concat([ + buf, new Buffer(multihashes.toB58String(key)) + ]) + return fnv1a(data.toString('binary')) + } + // items will be distributed among `defaultFanout` bins + for (let i = 0; i < items.length; i++) { + let h = hashFn(seed, items[i].key) % defaultFanout + hashed[h] = hashed[h] || [] + hashed[h].push(items[i]) + } + const storeItemsCb = (err, child) => { + if (callback.called) { return } + if (err) { + return callback(err) + } + dag.put(child, (err) => { + if (callback.called) { return } + if (err) { + return callback(err) + } + logInternalKey(child.multihash) + rootLinks[this.h] = new DAGLink( + '', child.size, child.multihash + ) + _done++ + if (_done === _subcalls) { + // all finished + DAGNode.create(rootData, rootLinks, (err, rootNode) => { + if (err) { return callback(err) } + return callback(null, rootNode) + }) + } + }) + } + _subcalls += Object.keys(hashed).length + for (let h in hashed) { + if (hashed.hasOwnProperty(h)) { + pinSet.storeItems( + hashed[h], + logInternalKey, + storeItemsCb.bind({h: h}), + _subcalls, + _done + ) + } + } + } + }, + + loadSet: (rootNode, name, logInternalKey, callback) => { + // callback (err, keys) + callback = once(callback) + const link = rootNode.links.filter(l => l.name === name).pop() + if (!link) { return callback('No link found with name ' + name) } + logInternalKey(link.multihash) + dag.get(new CID(link.multihash), (err, res) => { + if (err) { return callback(err) } + const keys = [] + const walkerFn = (link) => { + keys.push(link.multihash) + } + pinSet.walkItems(res.value, walkerFn, logInternalKey, (err) => { + if (err) { return callback(err) } + return callback(null, keys) + }) + }) + }, + + walkItems: (node, walkerFn, logInternalKey, callback) => { + // callback (err) + callback = once(callback) + const h = readHeader(node) + if (h.err) { return callback(h.err) } + const fanout = h.header.fanout + let subwalkCount = 0 + let finishedCount = 0 + + const walkCb = (err) => { + if (err) { return callback(err) } + finishedCount++ + if (subwalkCount === finishedCount) { + return callback() + } + } + + for (let i = 0; i < node.links.length; i++) { + const link = node.links[i] + if (i >= fanout) { + // item link + walkerFn(link, i, h.data) + } else { + // fanout link + logInternalKey(link.multihash) + if (!emptyKey.equals(link.multihash)) { + subwalkCount++ + dag.get(new CID(link.multihash), (err, res) => { + if (err) { return callback(err) } + pinSet.walkItems( + res.value, walkerFn, logInternalKey, walkCb + ) + }) + } + } + } + if (!subwalkCount) { + return callback() + } + } + } + return pinSet +} diff --git a/src/core/components/pin.js b/src/core/components/pin.js new file mode 100644 index 0000000000..adeda6e61f --- /dev/null +++ b/src/core/components/pin.js @@ -0,0 +1,529 @@ +'use strict' + +const dagPB = require('ipld-dag-pb') +const DAGNode = dagPB.DAGNode +const DAGLink = dagPB.DAGLink +const CID = require('cids') +const pinSet = require('./pin-set') +const KeySet = require('./key-set') +const promisify = require('promisify-es6') +const multihashes = require('multihashes') +const Key = require('interface-datastore').Key +const _ = require('lodash') +const each = require('async/each') +const waterfall = require('async/waterfall') +const until = require('async/until') +const once = require('once') + +const keyString = multihashes.toB58String + +module.exports = function pin (self) { + let directPins = new KeySet() + let recursivePins = new KeySet() + let internalPins = new KeySet() + + // temp - should use /local/pins to be consistent with go repo, but + // the pin set in datastore in test/go-ipfs-repo doesn't deserialize + const pinDataStoreKey = new Key('/local/pins/js') + + const repo = self._repo + const dag = self.dag + + function normalizeHashes (hashes, callback) { + // try to accept a variety of hash options including + // multihash Buffers, base58 strings, and ipfs path + // strings, either individually or as an array + if (Buffer.isBuffer(hashes) || typeof hashes.forEach !== 'function') { + hashes = [hashes] + } + const normalized = { + hashes: [], + update: (multihash, cb) => { + try { + multihashes.validate(multihash) + } catch (err) { return cb(err) } + + normalized.hashes.push(multihash) + cb() + } + } + each(hashes, (hash, cb) => { + if (typeof hash === 'string') { + // example: '/ipfs/QmRootHash/links/by/name' + const matched = hash.match(/^(?:\/ipfs\/)?([^/]+(?:\/[^/]+)*)\/?$/) + if (!matched) { + return cb(new Error('invalid ipfs ref path')) + } + const split = matched[1].split('/') + const rootHash = multihashes.fromB58String(split[0]) + const links = split.slice(1, split.length) + if (!links.length) { + normalized.update(rootHash, cb) + } else { + // recursively follow named links to the target + const pathFn = (err, obj) => { + if (err) { return cb(err) } + if (links.length) { + const linkName = links.shift() + const nextLink = obj.links.filter((link) => { + return (link.name === linkName) + }) + if (!nextLink.length) { + return cb(new Error( + `no link named ${linkName} under ${obj.toJSON().Hash}` + )) + } + const nextHash = nextLink[0].multihash + self.object.get(nextHash, pathFn) + } else { + normalized.update(obj.multihash, cb) + } + } + self.object.get(rootHash, pathFn) + } + } else { + normalized.update(hash, cb) + } + }, (err) => { + if (err) { return callback(err) } + return callback(null, normalized.hashes) + }) + } + + function getRecursive (multihash, callback) { + // gets flat array of all DAGNodes in tree given by multihash + // (should this be part of dag.js API? it was in ipfs-merkle-dag) + callback = once(callback) + dag.get(new CID(multihash), (err, res) => { + if (err) { return callback(err) } + const links = res.value.links + const nodes = [res.value] + // leaf case + if (!links.length) { + return callback(null, nodes) + } + // branch case + links.forEach(link => { + getRecursive(link.multihash, (err, subNodes) => { + if (err) { return callback(err) } + nodes.push(subNodes) + if (nodes.length === links.length + 1) { + return callback(null, _.flattenDeep(nodes)) + } + }) + }) + }) + } + + const pin = { + types: { + direct: 'direct', + recursive: 'recursive', + indirect: 'indirect', + internal: 'internal', + all: 'all' + }, + + clear: () => { + directPins.clear() + recursivePins.clear() + internalPins.clear() + }, + + set: pinSet(dag), + + add: promisify((hashes, options, callback) => { + // callback (err, pinset) + if (typeof options === 'function') { + callback = options + options = null + } + callback = once(callback) + const recursive = !options || options.recursive !== false + normalizeHashes(hashes, (err, mhs) => { + if (err) { return callback(err) } + const result = { + // async result queue + payload: [], + update: (hash) => { + result.payload.push({hash}) + if (result.payload.length === mhs.length) { + pin.flush((err, root) => { + if (err) { return callback(err) } + return callback(null, result.payload) + }) + } + } + } + mhs.forEach((multihash) => { + if (recursive) { + if (recursivePins.has(multihash)) { + // it's already pinned recursively + result.update(keyString(multihash)) + return + } + + // recursive pin should replace direct pin + directPins.delete(multihash) + + // entire graph of nested links should be + // pinned, so make sure we have all the objects + getRecursive(multihash, (err) => { + if (err) { return callback(err) } + // found all objects, we can add the pin + recursivePins.add(multihash) + result.update(keyString(multihash)) + }) + } else { + if (recursivePins.has(multihash)) { + // recursive supersedes direct, can't have both + return callback( + `${keyString(multihash)} already pinned recursively` + ) + } + if (directPins.has(multihash)) { + // already directly pinned + result.update(keyString(multihash)) + return + } + // make sure we have the object + dag.get(new CID(multihash), (err, res) => { + if (err) { return callback(err) } + // found the object, we can add the pin + directPins.add(multihash) + result.update(keyString(multihash)) + }) + } + }) + }) + }), + + rm: promisify((hashes, options, callback) => { + // callback (err) + let recursive = true + if (typeof options === 'function') { + callback = options + } else if (options && options.recursive === false) { + recursive = false + } + callback = once(callback) + normalizeHashes(hashes, (err, mhs) => { + if (err) { return callback(err) } + const result = { + // async result queue + payload: [], + update: (hash) => { + result.payload.push({hash}) + if (result.payload.length === mhs.length) { + pin.flush((err, root) => { + if (err) { return callback(err) } + return callback(null, result.payload) + }) + } + } + } + mhs.forEach((multihash) => { + pin.isPinnedWithType(multihash, pin.types.all, (err, pinned, reason) => { + if (err) { return callback(err) } + if (!pinned) { return callback(new Error('not pinned')) } + switch (reason) { + case (pin.types.recursive): + if (recursive) { + recursivePins.delete(multihash) + return result.update(keyString(multihash)) + } + return callback(new Error( + `${keyString(multihash)} is pinned recursively` + )) + case (pin.types.direct): + directPins.delete(multihash) + return result.update(keyString(multihash)) + default: + return callback(new Error( + `${keyString(multihash)} is pinned indirectly under ${reason}` + )) + } + }) + }) + }) + }), + + ls: promisify((hashes, options, callback) => { + // callback (err, pinset) + let type = pin.types.all + if (typeof hashes === 'function') { + callback = hashes + options = null + hashes = null + } + if (typeof options === 'function') { + callback = options + } + if (hashes && hashes.type) { + options = hashes + hashes = null + } + if (options && options.type) { + type = options.type.toLowerCase() + } + callback = once(callback) + if (Object.keys(pin.types).indexOf(type) < 0) { + return callback(new Error( + `Invalid type '${type}', must be one of {direct, indirect, recursive, all}` + )) + } + if (hashes) { + normalizeHashes(hashes, (err, mhs) => { + if (err) { return callback(err) } + const result = { + // async result queue + payload: [], + update: (item) => { + result.payload.push(item) + if (result.payload.length === mhs.length) { + return callback(null, result.payload) + } + } + } + mhs.forEach((multihash) => { + pin.isPinnedWithType(multihash, type, (err, pinned, reason) => { + if (err) { return callback(err) } + if (!pinned) { + return callback(new Error( + `Path ${keyString(multihash)} is not pinned` + )) + } + switch (reason) { + case pin.types.direct: + case pin.types.recursive: + result.update({ + hash: keyString(multihash), + type: reason + }) + break + default: + result.update({ + hash: keyString(multihash), + type: `${pin.types.indirect} through ${reason}` + }) + } + }) + }) + }) + } else { + const result = [] + if (type === pin.types.direct || type === pin.types.all) { + pin.directKeyStrings().forEach((hash) => { + result.push({ + type: pin.types.direct, + hash: hash + }) + }) + } + if (type === pin.types.recursive || type === pin.types.all) { + pin.recursiveKeyStrings().forEach((hash) => { + result.push({ + type: pin.types.recursive, + hash: hash + }) + }) + } + if (type === pin.types.indirect || type === pin.types.all) { + pin.getIndirectKeys((err, hashes) => { + if (err) { return callback(err) } + hashes.forEach((hash) => { + result.push({ + type: pin.types.indirect, + hash: hash + }) + }) + return callback(null, result) + }) + } else { + return callback(null, result) + } + } + }), + + isPinned: (multihash, callback) => { + // callback (err, pinned, reason) + pin.isPinnedWithType(multihash, pin.types.all, callback) + }, + + isPinnedWithType: (multihash, pinType, callback) => { + // callback (err, pinned, reason) + + // recursive + if ((pinType === pin.types.recursive || pinType === pin.types.all) && + recursivePins.has(multihash)) { + return callback(null, true, pin.types.recursive) + } + if ((pinType === pin.types.recursive)) { + return callback(null, false) + } + // direct + if ((pinType === pin.types.direct || pinType === pin.types.all) && + directPins.has(multihash)) { + return callback(null, true, pin.types.direct) + } + if ((pinType === pin.types.direct)) { + return callback(null, false) + } + if ((pinType === pin.types.internal || pinType === pin.types.all) && + internalPins.has(multihash)) { + return callback(null, true, pin.types.internal) + } + if ((pinType === pin.types.internal)) { + return callback(null, false) + } + + // indirect (default) + // check each recursive key to see if multihash is under it + const rKeys = pin.recursiveKeys() + let found = false + until( + // search until multihash was found or no more keys to check + () => (found || !rKeys.length), + (cb) => { + const key = rKeys.pop() + dag.get(new CID(key), (err, res) => { + if (err) { return cb(err) } + pin.set.hasChild(res.value, multihash, (err, has) => { + if (err) { return cb(err) } + found = has + // if found, return the hash of the parent recursive pin + cb(null, found ? keyString(res.value.multihash) : null) + }) + }) + }, + (err, result) => { + if (err) { return callback(err) } + return callback(null, found, result) + } + ) + }, + + directKeys: () => { + return directPins.toArray() + }, + + directKeyStrings: () => { + return directPins.toStringArray() + }, + + recursiveKeys: () => { + return recursivePins.toArray() + }, + + recursiveKeyStrings: () => { + return recursivePins.toStringArray() + }, + + getIndirectKeys: (callback) => { + // callback (err, keys) + const indirectKeys = new KeySet() + const rKeys = pin.recursiveKeys() + if (!rKeys.length) { + return callback(null, []) + } + each(rKeys, (multihash, cb) => { + getRecursive(multihash, (err, nodes) => { + if (err) { return cb(err) } + nodes.forEach((node) => { + const mh = node.multihash + if (!directPins.has(mh) && !recursivePins.has(mh)) { + // not already pinned recursively or directly + indirectKeys.add(mh) + } + }) + cb() + }) + }, (err) => { + if (err) { return callback(err) } + callback(null, indirectKeys.toStringArray()) + }) + }, + + internalKeys: () => { + return internalPins.toArray() + }, + + internalKeyStrings: () => { + return internalPins.toStringArray() + }, + + // encodes and writes pin key sets to the datastore + // each key set will be stored as a DAG node, and a root node will link to both + flush: promisify((callback) => { + // callback (err, root) + const newInternalPins = new KeySet() + const logInternalKey = (mh) => newInternalPins.add(mh) + const handle = { + put: (k, v, cb) => { + handle[k] = v + cb() + } + } + waterfall([ + // create link to direct keys node + (cb) => pin.set.storeSet(pin.directKeys(), logInternalKey, cb), + (dRoot, cb) => DAGLink.create(pin.types.direct, dRoot.size, dRoot.multihash, cb), + (dLink, cb) => handle.put('dLink', dLink, cb), + // create link to recursive keys node + (cb) => pin.set.storeSet(pin.recursiveKeys(), logInternalKey, cb), + (rRoot, cb) => DAGLink.create(pin.types.recursive, rRoot.size, rRoot.multihash, cb), + (rLink, cb) => handle.put('rLink', rLink, cb), + // the pin-set nodes link to an empty node, so make sure it's added to dag + (cb) => DAGNode.create(new Buffer(0), cb), + (empty, cb) => dag.put(empty, {cid: new CID(empty.multihash)}, cb), + // create root node with links to direct and recursive nodes + (cid, cb) => DAGNode.create(new Buffer(0), [handle.dLink, handle.rLink], cb), + (root, cb) => handle.put('root', root, cb), + // add the root node to dag + (cb) => dag.put(handle.root, {cid: new CID(handle.root.multihash)}, cb), + // update the internal pin set + (cid, cb) => cb(null, logInternalKey(handle.root.multihash)), + // save serialized root to datastore under a consistent key + (_, cb) => repo.closed ? repo.datastore.open(cb) : cb(null, null), // temp + (_, cb) => repo.datastore.put(pinDataStoreKey, handle.root.serialized, cb) + ], (err, result) => { + if (err) { return callback(err) } + internalPins = newInternalPins + return callback(null, handle.root) + }) + }), + + load: promisify((callback) => { + // callback (err) + const newInternalPins = new KeySet() + const logInternalKey = (mh) => newInternalPins.add(mh) + const handle = { + put: (k, v, cb) => { + handle[k] = v + cb() + } + } + waterfall([ + (cb) => repo.closed ? repo.datastore.open(cb) : cb(null, null), // temp + (_, cb) => repo.datastore.has(pinDataStoreKey, cb), + (has, cb) => has ? cb() : cb('break'), + (cb) => repo.datastore.get(pinDataStoreKey, cb), + (serialized, cb) => dagPB.util.deserialize(serialized, cb), + (root, cb) => handle.put('root', root, cb), + (cb) => pin.set.loadSet(handle.root, pin.types.recursive, logInternalKey, cb), + (rKeys, cb) => handle.put('rKeys', rKeys, cb), + (cb) => pin.set.loadSet(handle.root, pin.types.direct, logInternalKey, cb) + ], (err, dKeys) => { + if (err && err !== 'break') { return callback(err) } + if (dKeys) { + directPins = new KeySet(dKeys) + recursivePins = new KeySet(handle.rKeys) + logInternalKey(handle.root.multihash) + internalPins = newInternalPins + } + return callback() + }) + }) + } + return pin +} diff --git a/src/core/index.js b/src/core/index.js index 6f01d7f2ef..ba368cfbb4 100644 --- a/src/core/index.js +++ b/src/core/index.js @@ -93,6 +93,7 @@ class IPFS extends EventEmitter { this.swarm = components.swarm(this) this.files = components.files(this) this.bitswap = components.bitswap(this) + this.pin = components.pin(this) this.ping = components.ping(this) this.pubsub = components.pubsub(this) this.dht = components.dht(this) diff --git a/src/http/api/resources/index.js b/src/http/api/resources/index.js index 671a349952..bb01daef00 100644 --- a/src/http/api/resources/index.js +++ b/src/http/api/resources/index.js @@ -5,6 +5,7 @@ exports.id = require('./id') exports.bootstrap = require('./bootstrap') exports.repo = require('./repo') exports.object = require('./object') +exports.pin = require('./pin') exports.config = require('./config') exports.block = require('./block') exports.swarm = require('./swarm') diff --git a/src/http/api/resources/pin.js b/src/http/api/resources/pin.js new file mode 100644 index 0000000000..c942cc1e11 --- /dev/null +++ b/src/http/api/resources/pin.js @@ -0,0 +1,83 @@ +'use strict' + +const _ = require('lodash') +const debug = require('debug') +const log = debug('jsipfs:http-api:pin') +log.error = debug('jsipfs:http-api:pin:error') + +exports = module.exports + +exports.ls = (request, reply) => { + const ipfs = request.server.app.ipfs + const types = ipfs.pin.types + const path = request.query.arg + const type = request.query.type || types.all + ipfs.pin.ls(path, { type }, (err, result) => { + if (err) { + log.error(err) + return reply({ + Message: `Failed to list pins: ${err.message}`, + Code: 0 + }).code(500) + } + + return reply({ + Keys: _.mapValues( + _.keyBy(result, obj => obj.hash), + obj => ({Type: obj.type}) + ) + }) + }) +} + +exports.add = { + // main route handler which is called after `parseArgs`, + // but only if the args were valid + handler: (request, reply) => { + const ipfs = request.server.app.ipfs + const path = request.query.arg + const recursive = request.query.recursive !== 'false' + const onError = (err, code) => { + log.error(err) + return reply({ + Message: `Failed to add pin: ${err.message}`, + Code: 0 + }).code(code) + } + if (!path) { + return onError(new Error("Argument 'ipfs-path' is required"), 400) + } + ipfs.pin.add(path, { recursive }, (err, result) => { + if (err) { return onError(err, 500) } + return reply({ + Pins: result.map(obj => obj.hash) + }) + }) + } +} + +exports.rm = { + // main route handler which is called after `parseArgs`, + // but only if the args were valid + handler: (request, reply) => { + const ipfs = request.server.app.ipfs + const path = request.query.arg + const recursive = request.query.recursive !== 'false' + const onError = (err, code) => { + log.error(err) + return reply({ + Message: `Failed to remove pin: ${err.message}`, + Code: 0 + }).code(code) + } + if (!path) { + return onError(new Error("Argument 'ipfs-path' is required"), 400) + } + ipfs.pin.rm(path, { recursive }, (err, result) => { + if (err) { return onError(err, 500) } + return reply({ + Pins: result.map(obj => obj.hash) + }) + }) + } +} diff --git a/src/http/api/routes/index.js b/src/http/api/routes/index.js index c317db6de7..acaacbdae6 100644 --- a/src/http/api/routes/index.js +++ b/src/http/api/routes/index.js @@ -6,6 +6,7 @@ module.exports = (server) => { require('./bootstrap')(server) require('./block')(server) require('./object')(server) + require('./pin')(server) // require('./repo')(server) require('./config')(server) require('./swarm')(server) diff --git a/src/http/api/routes/pin.js b/src/http/api/routes/pin.js new file mode 100644 index 0000000000..2e3cf185b6 --- /dev/null +++ b/src/http/api/routes/pin.js @@ -0,0 +1,31 @@ +'use strict' + +const resources = require('./../resources') + +module.exports = (server) => { + const api = server.select('API') + + api.route({ + method: '*', + path: '/api/v0/pin/add', + config: { + handler: resources.pin.add.handler + } + }) + + api.route({ + method: '*', + path: '/api/v0/pin/rm', + config: { + handler: resources.pin.rm.handler + } + }) + + api.route({ + method: '*', + path: '/api/v0/pin/ls', + config: { + handler: resources.pin.ls + } + }) +} diff --git a/test/cli/pin.js b/test/cli/pin.js new file mode 100644 index 0000000000..1484575f44 --- /dev/null +++ b/test/cli/pin.js @@ -0,0 +1,100 @@ +/* eslint-env mocha */ +'use strict' + +const expect = require('chai').expect +const runOnAndOff = require('../utils/on-and-off') + +// use a tree of ipfs objects for recursive tests: +// root +// |`leaf +// `branch +// `subLeaf + +const keys = { + root: 'QmWQwS2Xh1SFGMPzUVYQ52b7RC7fTfiaPHm3ZyTRZuHmer', + leaf: 'QmaZoTQ6wFe7EtvaePBUeXavfeRqCAq3RUMomFxBpZLrLA', + branch: 'QmNxjjP7dtx6pzxWGBRCrgmjX3JqKL7uF2Kjx7ExiZDbSB', + subLeaf: 'QmUzzznkyQL7FjjBztG3D1tTjBuxeArLceDZnuSowUggXL' +} + +describe('pin', () => runOnAndOff((thing) => { + const filesDir = 'test/test-data/tree/' + + let ipfs + + before(() => { + ipfs = thing.ipfs + + return ipfs(`object put ${filesDir + 'root.json'}`) + .then(() => ipfs(`object put ${filesDir + 'root.json'}`)) + .then(() => ipfs(`object put ${filesDir + 'leaf.json'}`)) + .then(() => ipfs(`object put ${filesDir + 'branch.json'}`)) + .then(() => ipfs(`object put ${filesDir + 'subLeaf.json'}`)) + }) + + it('add (recursively by default)', () => { + return ipfs(`pin add ${keys.root}`).then((out) => { + expect(out).to.eql(`pinned ${keys.root} recursively\n`) + }) + }) + + it('add (direct)', () => { + return ipfs(`pin add ${keys.leaf} --recursive false`).then((out) => { + expect(out).to.eql(`pinned ${keys.leaf} directly\n`) + }) + }) + + it('ls (recursive)', () => { + return ipfs(`pin ls --path ${keys.root}`).then((out) => { + expect(out).to.eql(`${keys.root} recursive\n`) + }) + }) + + it('ls (direct)', () => { + return ipfs(`pin ls --path ${keys.leaf}`).then((out) => { + expect(out).to.eql(`${keys.leaf} direct\n`) + }) + }) + + it('ls (indirect)', () => { + return ipfs(`pin ls --path ${keys.subLeaf}`).then((out) => { + expect(out).to.eql(`${keys.subLeaf} indirect through ${keys.root}\n`) + }) + }) + + it('ls (all)', () => { + return ipfs('pin ls').then((out) => { + expect(out).to.include(`${keys.leaf} direct\n`) + expect(out).to.include(`${keys.root} recursive\n`) + expect(out).to.include(`${keys.branch} indirect\n`) + expect(out).to.include(`${keys.subLeaf} indirect\n`) + }) + }) + +// it('ls (quiet)', () => { +// return ipfs('pin ls --quiet').then((out) => { +// expect(out).to.include(`${keys.leaf}\n`) +// expect(out).to.include(`${keys.root}\n`) +// expect(out).to.include(`${keys.branch}\n`) +// expect(out).to.include(`${keys.subLeaf}\n`) +// }) +// }) + + it('rm (recursively by default)', () => { + return ipfs(`pin rm ${keys.root}`).then((out) => { + expect(out).to.equal(`unpinned ${keys.root}\n`) + }) + }) + + it('rm (direct)', () => { + return ipfs(`pin rm --recursive false ${keys.leaf}`).then((out) => { + expect(out).to.equal(`unpinned ${keys.leaf}\n`) + }) + }) + + it('confirm removal', () => { + return ipfs('pin ls').then((out) => { + expect(out).to.equal('') + }) + }) +})) diff --git a/test/core/interface/interface.spec.js b/test/core/interface/interface.spec.js index 23e7dab09f..037d3701ce 100644 --- a/test/core/interface/interface.spec.js +++ b/test/core/interface/interface.spec.js @@ -6,6 +6,7 @@ const isNode = require('detect-node') describe('interface-ipfs-core tests', () => { require('./block') + require('./pin') require('./config') require('./files') require('./generic') diff --git a/test/core/interface/pin.js b/test/core/interface/pin.js new file mode 100644 index 0000000000..90bfc8f2e9 --- /dev/null +++ b/test/core/interface/pin.js @@ -0,0 +1,19 @@ +/* eslint-env mocha */ +'use strict' + +const test = require('interface-ipfs-core') +const IPFSFactory = require('../../utils/ipfs-factory-instance') + +let factory + +const common = { + setup: function (cb) { + factory = new IPFSFactory() + cb(null, factory) + }, + teardown: function (cb) { + factory.dismantle(cb) + } +} + +test.pin(common) diff --git a/test/http-api/spec/pin.js b/test/http-api/spec/pin.js new file mode 100644 index 0000000000..7016ce1110 --- /dev/null +++ b/test/http-api/spec/pin.js @@ -0,0 +1,147 @@ +/* eslint-env mocha */ +'use strict' + +const expect = require('chai').expect +const fs = require('fs') +const FormData = require('form-data') +const streamToPromise = require('stream-to-promise') +const each = require('async/each') + +// use a tree of ipfs objects for recursive tests: +// root +// |`leaf +// `branch +// `subLeaf + +const keys = { + root: 'QmWQwS2Xh1SFGMPzUVYQ52b7RC7fTfiaPHm3ZyTRZuHmer', + leaf: 'QmaZoTQ6wFe7EtvaePBUeXavfeRqCAq3RUMomFxBpZLrLA', + branch: 'QmNxjjP7dtx6pzxWGBRCrgmjX3JqKL7uF2Kjx7ExiZDbSB', + subLeaf: 'QmUzzznkyQL7FjjBztG3D1tTjBuxeArLceDZnuSowUggXL' +} + +module.exports = (http) => { + describe('pin', () => { + let api + + before((done) => { + // add test tree to repo + api = http.api.server.select('API') + const putFile = (filename, cb) => { + const filePath = `test/test-data/tree/${filename}.json` + const form = new FormData() + form.append('file', fs.createReadStream(filePath)) + const headers = form.getHeaders() + streamToPromise(form).then((payload) => { + api.inject({ + method: 'POST', + url: '/api/v0/object/put', + headers: headers, + payload: payload + }, (res) => { + expect(res.statusCode).to.equal(200) + cb() + }) + }) + } + each(Object.keys(keys), putFile, (err) => { + expect(err).to.not.exist() + done() + }) + }) + + describe('/pin/add', () => { + it('pins object recursively by default', (done) => { + api.inject({ + method: 'POST', + url: `/api/v0/pin/add?arg=${keys.root}` + }, (res) => { + expect(res.statusCode).to.equal(200) + expect(res.result).to.deep.equal({Pins: [keys.root]}) + done() + }) + }) + }) + + describe('/pin/add (direct)', () => { + it('pins object directly if specified', (done) => { + api.inject({ + method: 'POST', + url: `/api/v0/pin/add?arg=${keys.leaf}&recursive=false` + }, (res) => { + expect(res.statusCode).to.equal(200) + expect(res.result).to.deep.equal({Pins: [keys.leaf]}) + done() + }) + }) + }) + + describe('/pin/ls (with path)', () => { + it('finds specified pinned object', (done) => { + api.inject({ + method: 'GET', + url: `/api/v0/pin/ls?arg=/ipfs/${keys.root}/branch/subLeaf` + }, (res) => { + expect(res.statusCode).to.equal(200) + expect(res.result.Keys[keys.subLeaf].Type) + .to.equal(`indirect through ${keys.root}`) + done() + }) + }) + }) + + describe('/pin/ls (without path or type)', () => { + it('finds all pinned objects', (done) => { + api.inject({ + method: 'GET', + url: ('/api/v0/pin/ls') + }, (res) => { + expect(res.statusCode).to.equal(200) + expect(res.result.Keys[keys.root].Type).to.equal('recursive') + expect(res.result.Keys[keys.leaf].Type).to.equal('direct') + expect(res.result.Keys[keys.branch].Type).to.equal('indirect') + expect(res.result.Keys[keys.subLeaf].Type).to.equal('indirect') + done() + }) + }) + }) + + describe('/pin/rm (direct)', () => { + it('unpins only directly pinned objects if specified', (done) => { + api.inject({ + method: 'POST', + url: `/api/v0/pin/rm?arg=${keys.leaf}&recursive=false` + }, (res) => { + expect(res.statusCode).to.equal(200) + expect(res.result).to.deep.equal({Pins: [keys.leaf]}) + + api.inject({ + method: 'POST', + url: `/api/v0/pin/rm?arg=${keys.root}&recursive=false` + }, (res) => { + expect(res.statusCode).to.equal(500) + expect(res.result.Message).to.equal( + 'Failed to remove pin: ' + + 'QmWQwS2Xh1SFGMPzUVYQ52b7RC7fTfiaPHm3ZyTRZuHmer ' + + 'is pinned recursively' + ) + done() + }) + }) + }) + }) + + describe('/pin/rm', () => { + it('unpins recursively by default', (done) => { + api.inject({ + method: 'POST', + url: `/api/v0/pin/rm?arg=${keys.root}` + }, (res) => { + expect(res.statusCode).to.equal(200) + expect(res.result).to.deep.equal({Pins: [keys.root]}) + done() + }) + }) + }) + }) +} diff --git a/test/test-data/tree/branch.json b/test/test-data/tree/branch.json new file mode 100644 index 0000000000..459498c85c --- /dev/null +++ b/test/test-data/tree/branch.json @@ -0,0 +1 @@ +{"Links":[{"Name":"subLeaf","Hash":"QmUzzznkyQL7FjjBztG3D1tTjBuxeArLceDZnuSowUggXL","Size":15}],"Data":"\u0008\u0001"} diff --git a/test/test-data/tree/leaf.json b/test/test-data/tree/leaf.json new file mode 100644 index 0000000000..547be2cd24 --- /dev/null +++ b/test/test-data/tree/leaf.json @@ -0,0 +1 @@ +{"Links":[],"Data":"\u0008\u0002\u0012\u0004leaf\u0018\u0004"} diff --git a/test/test-data/tree/root.json b/test/test-data/tree/root.json new file mode 100644 index 0000000000..22f1229788 --- /dev/null +++ b/test/test-data/tree/root.json @@ -0,0 +1 @@ +{"Links":[{"Name":"leaf","Hash":"QmaZoTQ6wFe7EtvaePBUeXavfeRqCAq3RUMomFxBpZLrLA","Size":12},{"Name":"branch","Hash":"QmNxjjP7dtx6pzxWGBRCrgmjX3JqKL7uF2Kjx7ExiZDbSB","Size":68}],"Data":"\u0008\u0001"} diff --git a/test/test-data/tree/subLeaf.json b/test/test-data/tree/subLeaf.json new file mode 100644 index 0000000000..d77789e6f0 --- /dev/null +++ b/test/test-data/tree/subLeaf.json @@ -0,0 +1 @@ +{"Links":[],"Data":"\u0008\u0002\u0012\u0007subLeaf\u0018\u0007"}