From f7e4047938e74ffb6341dba215473c13f40adcab Mon Sep 17 00:00:00 2001 From: David Dias Date: Sat, 1 Oct 2016 07:52:30 +0100 Subject: [PATCH] feat: blockstore gets blockBlobs instead of blocks (the difference is that now it receives the key in which it should store it --- package.json | 4 +- src/stores/blockstore.js | 95 ++++++++++++++++++++++++++-------------- test/blockstore-test.js | 30 +++++-------- 3 files changed, 75 insertions(+), 54 deletions(-) diff --git a/package.json b/package.json index f96c0b12..99abddb7 100644 --- a/package.json +++ b/package.json @@ -44,7 +44,7 @@ "dependencies": { "babel-runtime": "^6.11.6", "base32.js": "^0.1.0", - "ipfs-block": "^0.3.0", + "ipfs-block": "^0.4.0", "lock": "^0.1.3", "multihashes": "^0.2.2", "pull-defer": "^0.2.2", @@ -67,4 +67,4 @@ "nginnever ", "npmcdn-to-unpkg-bot " ] -} \ No newline at end of file +} diff --git a/src/stores/blockstore.js b/src/stores/blockstore.js index 54ce0df9..4f1765dd 100644 --- a/src/stores/blockstore.js +++ b/src/stores/blockstore.js @@ -1,23 +1,23 @@ 'use strict' const Block = require('ipfs-block') -const pull = require('pull-stream') const Lock = require('lock') const base32 = require('base32.js') const path = require('path') -const pullWrite = require('pull-write') const parallel = require('run-parallel') +const pull = require('pull-stream') +const pullWrite = require('pull-write') const pullDefer = require('pull-defer/source') const PREFIX_LENGTH = 5 +const EXTENSION = 'data' exports = module.exports function multihashToPath (multihash) { - const extension = 'data' const encoder = new base32.Encoder() const hash = encoder.write(multihash).finalize() - const filename = `${hash}.${extension}` + const filename = `${hash}.${EXTENSION}` const folder = filename.slice(0, PREFIX_LENGTH) return path.join(folder, filename) @@ -27,17 +27,19 @@ exports.setUp = (basePath, BlobStore, locks) => { const store = new BlobStore(basePath + '/blocks') const lock = new Lock() - function writeBlock (block, callback) { - if (!block || !block.data) { + // blockBlob is an object with: + // { data: <>, key: <> } + function writeBlock (blockBlob, callback) { + if (!blockBlob || !blockBlob.data) { return callback(new Error('Invalid block')) } - const key = multihashToPath(block.key()) + const key = multihashToPath(blockBlob.key) lock(key, (release) => { pull( pull.values([ - block.data + blockBlob.data ]), store.write(key, release(released)) ) @@ -84,35 +86,32 @@ exports.setUp = (basePath, BlobStore, locks) => { return deferred }, - // returns a pull-stream to write blocks into - // TODO use a more explicit name, given that getStream is just for - // one block, multiple blocks should have different naming + /* + * putStream - write multiple blocks + * + * returns a pull-stream that expects blockBlobs + * + * NOTE: blockBlob is a { data: <>, key: <> } and not a + * ipfs-block instance. This is because Block instances support + * several types of hashing and it is up to the BlockService + * to understand the right one to use (given the CID) + */ + // TODO + // consider using a more explicit name, this can cause some confusion + // since the natural association is + // getStream - createReadStream - read one + // putStream - createWriteStream - write one + // where in fact it is: + // getStream - createReadStream - read one (the same) + // putStream - createFilesWriteStream = write several + // putStream () { let ended = false let written = [] let push = null - const sink = pullWrite((blocks, cb) => { - const tasks = blocks.map((block) => { - return (cb) => { - writeBlock(block, (err, meta) => { - if (err) { - return cb(err) - } - - if (push) { - const read = push - push = null - read(null, meta) - return cb() - } - - written.push(meta) - cb() - }) - } - }) - + const sink = pullWrite((blockBlobs, cb) => { + const tasks = writeTasks(blockBlobs) parallel(tasks, cb) }, null, 100, (err) => { ended = err || true @@ -121,7 +120,6 @@ exports.setUp = (basePath, BlobStore, locks) => { } }) - // TODO ??Why does a putStream need to be a source as well?? const source = (end, cb) => { if (end) { ended = end @@ -137,7 +135,36 @@ exports.setUp = (basePath, BlobStore, locks) => { push = cb } - return { source: source, sink: sink } + /* + * Creates individual tasks to write each block blob that can be + * exectured in parallel + */ + function writeTasks (blockBlobs) { + return blockBlobs.map((blockBlob) => { + return (cb) => { + writeBlock(blockBlob, (err, meta) => { + if (err) { + return cb(err) + } + + if (push) { + const read = push + push = null + read(null, meta) + return cb() + } + + written.push(meta) + cb() + }) + } + }) + } + + return { + source: source, + sink: sink + } }, has (key, callback) { diff --git a/test/blockstore-test.js b/test/blockstore-test.js index 7252e7b8..68726e79 100644 --- a/test/blockstore-test.js +++ b/test/blockstore-test.js @@ -12,15 +12,15 @@ module.exports = (repo) => { describe('blockstore', () => { const helloKey = 'CIQLS/CIQLSTJHXGJU2PQIUUXFFV62PWV7VREE57RXUU4A52IIR55M4LX432I.data' - const helloIpldKey = 'CIQO2/CIQO2EUTF47PSTAHSL54KUTDS2AAN2DH4URM7H5KRATUGQFCM4OUIQI.data' - const blockCollection = _.range(100).map((i) => new Block(new Buffer(`hello-${i}-${Math.random()}`))) describe('.putStream', () => { it('simple', (done) => { const b = new Block('hello world') pull( - pull.values([b]), + pull.values([ + { data: b.data, key: b.key() } + ]), repo.blockstore.putStream(), pull.collect((err, meta) => { expect(err).to.not.exist @@ -43,13 +43,17 @@ module.exports = (repo) => { } pull( - pull.values([b]), + pull.values([ + { data: b.data, key: b.key() } + ]), repo.blockstore.putStream(), pull.collect(finish) ) pull( - pull.values([b]), + pull.values([ + { data: b.data, key: b.key() } + ]), repo.blockstore.putStream(), pull.collect(finish) ) @@ -59,6 +63,9 @@ module.exports = (repo) => { parallel(_.range(50).map(() => (cb) => { pull( pull.values(blockCollection), + pull.map((b) => { + return { data: b.data, key: b.key() } + }), repo.blockstore.putStream(), pull.collect((err, meta) => { expect(err).to.not.exist @@ -69,19 +76,6 @@ module.exports = (repo) => { }), done) }) - it('custom extension', function (done) { - const b = new Block('hello world 2') - pull( - pull.values([b]), - repo.blockstore.putStream(), - pull.collect((err, meta) => { - expect(err).to.not.exist - expect(meta[0].key).to.be.eql(helloIpldKey) - done() - }) - ) - }) - it('returns an error on invalid block', (done) => { pull( pull.values(['hello']),