Skip to content

Commit

Permalink
feat: no optional extension + simplify some of blockstore code
Browse files Browse the repository at this point in the history
  • Loading branch information
daviddias committed Sep 30, 2016
1 parent ca51a92 commit 311551a
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 131 deletions.
155 changes: 84 additions & 71 deletions src/stores/blockstore.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ const pull = require('pull-stream')
const Lock = require('lock')
const base32 = require('base32.js')
const path = require('path')
const write = require('pull-write')
const pullWrite = require('pull-write')
const parallel = require('run-parallel')
const defer = require('pull-defer/source')
const pullDefer = require('pull-defer/source')

const PREFIX_LENGTH = 5

exports = module.exports

function multihashToPath (multihash, extension) {
extension = extension || 'data'
function multihashToPath (multihash) {
const extension = 'data'
const encoder = new base32.Encoder()
const hash = encoder.write(multihash).finalize()
const filename = `${hash}.${extension}`
Expand All @@ -27,82 +27,105 @@ exports.setUp = (basePath, BlobStore, locks) => {
const store = new BlobStore(basePath + '/blocks')
const lock = new Lock()

function writeBlock (block, cb) {
function writeBlock (block, callback) {
if (!block || !block.data) {
return cb(new Error('Invalid block'))
return callback(new Error('Invalid block'))
}

const key = multihashToPath(block.key, block.extension)

lock(key, (release) => pull(
pull.values([block.data]),
store.write(key, release((err) => {
if (err) {
return cb(err)
}
cb(null, {key})
}))
))
const key = multihashToPath(block.key())

lock(key, (release) => {
pull(
pull.values([
block.data
]),
store.write(key, release(released))
)
})

// called once the lock is released
function released (err) {
if (err) {
return callback(err)
}
callback(null, { key: key })
}
}

return {
getStream (key, extension) {
// returns a pull-stream of one block being read
getStream (key) {
if (!key) {
return pull.error(new Error('Invalid key'))
}

const p = multihashToPath(key, extension)
const deferred = defer()
const blockPath = multihashToPath(key)
const deferred = pullDefer()

lock(p, (release) => {
const ext = extension === 'data' ? 'protobuf' : extension
lock(blockPath, (release) => {
pull(
store.read(p),
pull.collect(release((err, data) => {
if (err) {
return deferred.abort(err)
}

deferred.resolve(pull.values([
new Block(Buffer.concat(data), ext)
]))
}))
store.read(blockPath),
pull.collect(release(released))
)
})

function released (err, data) {
if (err) {
return deferred.abort(err)
}

deferred.resolve(
pull.values([
new Block(Buffer.concat(data))
])
)
}

return deferred
},

// returns a pull-stream to write blocks into
// TODO use a more explicit name, given that getStream is just for
// one block, multiple blocks should have different naming
putStream () {
let ended = false
let written = []
let push = null

const sink = write((blocks, cb) => {
parallel(blocks.map((block) => (cb) => {
writeBlock(block, (err, meta) => {
if (err) {
return cb(err)
}

if (push) {
const read = push
push = null
read(null, meta)
return cb()
}

written.push(meta)
cb()
})
}), cb)
const sink = pullWrite((blocks, cb) => {
const tasks = blocks.map((block) => {
return (cb) => {
writeBlock(block, (err, meta) => {
if (err) {
return cb(err)
}

if (push) {
const read = push
push = null
read(null, meta)
return cb()
}

written.push(meta)
cb()
})
}
})

parallel(tasks, cb)
}, null, 100, (err) => {
ended = err || true
if (push) push(ended)
if (push) {
push(ended)
}
})

// TODO ??Why does a putStream need to be a source as well??
const source = (end, cb) => {
if (end) ended = end
if (end) {
ended = end
}
if (ended) {
return cb(ended)
}
Expand All @@ -114,35 +137,25 @@ exports.setUp = (basePath, BlobStore, locks) => {
push = cb
}

return {source, sink}
return { source: source, sink: sink }
},

has (key, extension, cb) {
if (typeof extension === 'function') {
cb = extension
extension = undefined
}

has (key, callback) {
if (!key) {
return cb(new Error('Invalid key'))
return callback(new Error('Invalid key'))
}

const p = multihashToPath(key, extension)
store.exists(p, cb)
const blockPath = multihashToPath(key)
store.exists(blockPath, callback)
},

delete (key, extension, cb) {
if (typeof extension === 'function') {
cb = extension
extension = undefined
}

delete (key, callback) {
if (!key) {
return cb(new Error('Invalid key'))
return callback(new Error('Invalid key'))
}

const p = multihashToPath(key, extension)
store.remove(p, cb)
const blockPath = multihashToPath(key)
store.remove(blockPath, callback)
}
}
}
34 changes: 22 additions & 12 deletions src/stores/locks.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@ exports.setUp = (basePath, BlobStore) => {
lock (callback) {
function createLock () {
pull(
pull.values([new Buffer('LOCK')]),
pull.values([
new Buffer('LOCK')
]),
store.write(lockFile, callback)
)
}

function doesExist (err, exists) {
if (err) return callback(err)
if (err) {
return callback(err)
}

if (exists) {
// default 100ms
Expand All @@ -37,16 +41,22 @@ exports.setUp = (basePath, BlobStore) => {

unlock (callback) {
series([
(cb) => store.remove(lockFile, cb),
(cb) => store.exists(lockFile, (err, exists) => {
if (err) return cb(err)

if (exists) {
return cb(new Error('failed to remove lock'))
}

cb()
})
(cb) => {
store.remove(lockFile, cb)
},
(cb) => {
store.exists(lockFile, (err, exists) => {
if (err) {
return cb(err)
}

if (exists) {
return cb(new Error('failed to remove lock'))
}

cb()
})
}
], callback)
}
}
Expand Down
61 changes: 13 additions & 48 deletions test/blockstore-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ const _ = require('lodash')
module.exports = (repo) => {
describe('blockstore', () => {
const helloKey = 'CIQLS/CIQLSTJHXGJU2PQIUUXFFV62PWV7VREE57RXUU4A52IIR55M4LX432I.data'
const helloIpldKey = 'CIQO2/CIQO2EUTF47PSTAHSL54KUTDS2AAN2DH4URM7H5KRATUGQFCM4OUIQI.ipld'

const helloIpldKey = 'CIQO2/CIQO2EUTF47PSTAHSL54KUTDS2AAN2DH4URM7H5KRATUGQFCM4OUIQI.data'

const blockCollection = _.range(100).map((i) => new Block(new Buffer(`hello-${i}-${Math.random()}`)))

describe('.putStream', () => {
Expand Down Expand Up @@ -68,7 +70,7 @@ module.exports = (repo) => {
})

it('custom extension', function (done) {
const b = new Block('hello world 2', 'ipld')
const b = new Block('hello world 2')
pull(
pull.values([b]),
repo.blockstore.putStream(),
Expand Down Expand Up @@ -97,10 +99,10 @@ module.exports = (repo) => {
const b = new Block('hello world')

pull(
repo.blockstore.getStream(b.key),
repo.blockstore.getStream(b.key()),
pull.collect((err, data) => {
expect(err).to.not.exist
expect(data[0]).to.be.eql(b)
expect(data[0].key()).to.be.eql(b.key())

done()
})
Expand All @@ -111,30 +113,17 @@ module.exports = (repo) => {
parallel(_.range(20 * 100).map((i) => (cb) => {
const j = i % blockCollection.length
pull(
repo.blockstore.getStream(blockCollection[j].key),
repo.blockstore.getStream(blockCollection[j].key()),
pull.collect((err, meta) => {
expect(err).to.not.exist
expect(meta).to.be.eql([blockCollection[j]])
expect(meta[0].key())
.to.be.eql(blockCollection[j].key())
cb()
})
)
}), done)
})

it('custom extension', (done) => {
const b = new Block('hello world 2', 'ipld')

pull(
repo.blockstore.getStream(b.key, b.extension),
pull.collect((err, data) => {
expect(err).to.not.exist
expect(data[0]).to.be.eql(b)

done()
})
)
})

it('returns an error on invalid block', (done) => {
pull(
repo.blockstore.getStream(),
Expand All @@ -150,17 +139,7 @@ module.exports = (repo) => {
it('existing block', (done) => {
const b = new Block('hello world')

repo.blockstore.has(b.key, (err, exists) => {
expect(err).to.not.exist
expect(exists).to.equal(true)
done()
})
})

it('with extension', (done) => {
const b = new Block('hello world')

repo.blockstore.has(b.key, 'data', (err, exists) => {
repo.blockstore.has(b.key(), (err, exists) => {
expect(err).to.not.exist
expect(exists).to.equal(true)
done()
Expand All @@ -170,7 +149,7 @@ module.exports = (repo) => {
it('non existent block', (done) => {
const b = new Block('wooot')

repo.blockstore.has(b.key, (err, exists) => {
repo.blockstore.has(b.key(), (err, exists) => {
expect(err).to.not.exist
expect(exists).to.equal(false)
done()
Expand All @@ -182,24 +161,10 @@ module.exports = (repo) => {
it('simple', (done) => {
const b = new Block('hello world')

repo.blockstore.delete(b.key, (err) => {
expect(err).to.not.exist

repo.blockstore.has(b.key, (err, exists) => {
expect(err).to.not.exist
expect(exists).to.equal(false)
done()
})
})
})

it('custom extension', (done) => {
const b = new Block('hello world', 'ipld')

repo.blockstore.delete(b.key, b.extension, (err) => {
repo.blockstore.delete(b.key(), (err) => {
expect(err).to.not.exist

repo.blockstore.has(b.key, b.extension, (err, exists) => {
repo.blockstore.has(b.key(), (err, exists) => {
expect(err).to.not.exist
expect(exists).to.equal(false)
done()
Expand Down

0 comments on commit 311551a

Please sign in to comment.