Skip to content

Commit

Permalink
feat: blockstore gets blockBlobs instead of blocks (the difference is…
Browse files Browse the repository at this point in the history
… that now it receives the key in which it should store it
  • Loading branch information
daviddias committed Oct 26, 2016
1 parent 311551a commit f7e4047
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 54 deletions.
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
"dependencies": {
"babel-runtime": "^6.11.6",
"base32.js": "^0.1.0",
"ipfs-block": "^0.3.0",
"ipfs-block": "^0.4.0",
"lock": "^0.1.3",
"multihashes": "^0.2.2",
"pull-defer": "^0.2.2",
Expand All @@ -67,4 +67,4 @@
"nginnever <ginneversource@gmail.com>",
"npmcdn-to-unpkg-bot <npmcdn-to-unpkg-bot@users.noreply.github.com>"
]
}
}
95 changes: 61 additions & 34 deletions src/stores/blockstore.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
'use strict'

const Block = require('ipfs-block')
const pull = require('pull-stream')
const Lock = require('lock')
const base32 = require('base32.js')
const path = require('path')
const pullWrite = require('pull-write')
const parallel = require('run-parallel')
const pull = require('pull-stream')
const pullWrite = require('pull-write')
const pullDefer = require('pull-defer/source')

const PREFIX_LENGTH = 5
const EXTENSION = 'data'

exports = module.exports

function multihashToPath (multihash) {
const extension = 'data'
const encoder = new base32.Encoder()
const hash = encoder.write(multihash).finalize()
const filename = `${hash}.${extension}`
const filename = `${hash}.${EXTENSION}`
const folder = filename.slice(0, PREFIX_LENGTH)

return path.join(folder, filename)
Expand All @@ -27,17 +27,19 @@ exports.setUp = (basePath, BlobStore, locks) => {
const store = new BlobStore(basePath + '/blocks')
const lock = new Lock()

function writeBlock (block, callback) {
if (!block || !block.data) {
// blockBlob is an object with:
// { data: <>, key: <> }
function writeBlock (blockBlob, callback) {
if (!blockBlob || !blockBlob.data) {
return callback(new Error('Invalid block'))
}

const key = multihashToPath(block.key())
const key = multihashToPath(blockBlob.key)

lock(key, (release) => {
pull(
pull.values([
block.data
blockBlob.data
]),
store.write(key, release(released))
)
Expand Down Expand Up @@ -84,35 +86,32 @@ exports.setUp = (basePath, BlobStore, locks) => {
return deferred
},

// returns a pull-stream to write blocks into
// TODO use a more explicit name, given that getStream is just for
// one block, multiple blocks should have different naming
/*
* putStream - write multiple blocks
*
* returns a pull-stream that expects blockBlobs
*
* NOTE: blockBlob is a { data: <>, key: <> } and not a
* ipfs-block instance. This is because Block instances support
* several types of hashing and it is up to the BlockService
* to understand the right one to use (given the CID)
*/
// TODO
// consider using a more explicit name, this can cause some confusion
// since the natural association is
// getStream - createReadStream - read one
// putStream - createWriteStream - write one
// where in fact it is:
// getStream - createReadStream - read one (the same)
// putStream - createFilesWriteStream = write several
//
putStream () {
let ended = false
let written = []
let push = null

const sink = pullWrite((blocks, cb) => {
const tasks = blocks.map((block) => {
return (cb) => {
writeBlock(block, (err, meta) => {
if (err) {
return cb(err)
}

if (push) {
const read = push
push = null
read(null, meta)
return cb()
}

written.push(meta)
cb()
})
}
})

const sink = pullWrite((blockBlobs, cb) => {
const tasks = writeTasks(blockBlobs)
parallel(tasks, cb)
}, null, 100, (err) => {
ended = err || true
Expand All @@ -121,7 +120,6 @@ exports.setUp = (basePath, BlobStore, locks) => {
}
})

// TODO ??Why does a putStream need to be a source as well??
const source = (end, cb) => {
if (end) {
ended = end
Expand All @@ -137,7 +135,36 @@ exports.setUp = (basePath, BlobStore, locks) => {
push = cb
}

return { source: source, sink: sink }
/*
* Creates individual tasks to write each block blob that can be
* exectured in parallel
*/
function writeTasks (blockBlobs) {
return blockBlobs.map((blockBlob) => {
return (cb) => {
writeBlock(blockBlob, (err, meta) => {
if (err) {
return cb(err)
}

if (push) {
const read = push
push = null
read(null, meta)
return cb()
}

written.push(meta)
cb()
})
}
})
}

return {
source: source,
sink: sink
}
},

has (key, callback) {
Expand Down
30 changes: 12 additions & 18 deletions test/blockstore-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ module.exports = (repo) => {
describe('blockstore', () => {
const helloKey = 'CIQLS/CIQLSTJHXGJU2PQIUUXFFV62PWV7VREE57RXUU4A52IIR55M4LX432I.data'

const helloIpldKey = 'CIQO2/CIQO2EUTF47PSTAHSL54KUTDS2AAN2DH4URM7H5KRATUGQFCM4OUIQI.data'

const blockCollection = _.range(100).map((i) => new Block(new Buffer(`hello-${i}-${Math.random()}`)))

describe('.putStream', () => {
it('simple', (done) => {
const b = new Block('hello world')
pull(
pull.values([b]),
pull.values([
{ data: b.data, key: b.key() }
]),
repo.blockstore.putStream(),
pull.collect((err, meta) => {
expect(err).to.not.exist
Expand All @@ -43,13 +43,17 @@ module.exports = (repo) => {
}

pull(
pull.values([b]),
pull.values([
{ data: b.data, key: b.key() }
]),
repo.blockstore.putStream(),
pull.collect(finish)
)

pull(
pull.values([b]),
pull.values([
{ data: b.data, key: b.key() }
]),
repo.blockstore.putStream(),
pull.collect(finish)
)
Expand All @@ -59,6 +63,9 @@ module.exports = (repo) => {
parallel(_.range(50).map(() => (cb) => {
pull(
pull.values(blockCollection),
pull.map((b) => {
return { data: b.data, key: b.key() }
}),
repo.blockstore.putStream(),
pull.collect((err, meta) => {
expect(err).to.not.exist
Expand All @@ -69,19 +76,6 @@ module.exports = (repo) => {
}), done)
})

it('custom extension', function (done) {
const b = new Block('hello world 2')
pull(
pull.values([b]),
repo.blockstore.putStream(),
pull.collect((err, meta) => {
expect(err).to.not.exist
expect(meta[0].key).to.be.eql(helloIpldKey)
done()
})
)
})

it('returns an error on invalid block', (done) => {
pull(
pull.values(['hello']),
Expand Down

0 comments on commit f7e4047

Please sign in to comment.