From 51439486d5fcd719b9af9182b35565e87da96c99 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Thu, 27 Jul 2023 14:27:02 +0100 Subject: [PATCH] feat: add tiered blockstore (#238) To enable use cases where blocks may be stored in more than one location, add a `TieredBlockstore` class to `blockstore-core` similar to the`TieredDatastore` class found in `datastore-core`. --- packages/blockstore-core/README.md | 20 ++- packages/blockstore-core/package.json | 12 +- packages/blockstore-core/src/index.ts | 2 + packages/blockstore-core/src/tiered.ts | 146 +++++++++++++++++++ packages/blockstore-core/test/tiered.spec.ts | 68 +++++++++ 5 files changed, 246 insertions(+), 2 deletions(-) create mode 100644 packages/blockstore-core/src/tiered.ts create mode 100644 packages/blockstore-core/test/tiered.spec.ts diff --git a/packages/blockstore-core/README.md b/packages/blockstore-core/README.md index bde24348..f55d6b17 100644 --- a/packages/blockstore-core/README.md +++ b/packages/blockstore-core/README.md @@ -16,6 +16,7 @@ - [BaseBlockstore](#baseblockstore) - [MemoryBlockstore](#memoryblockstore) - [BlackHoleBlockstore](#blackholeblockstore) + - [TieredBlockstore](#tieredblockstore) - [API Docs](#api-docs) - [License](#license) - [Contribute](#contribute) @@ -38,7 +39,8 @@ Loading this module through a script tag will make it's exports available as `Bl - Base: [`src/base`](src/base.ts) - Memory: [`src/memory`](src/memory.ts) -- BlackHole: ['src/blackhole](src/blackhole.ts) +- BlackHole: ['src/black-hole](src/black-hole.ts) +- Tiered: ['src/tiered](src/tiered.ts) ## Usage @@ -82,6 +84,22 @@ import { BlackHoleBlockstore } from 'blockstore-core/black-hole' const store = new BlackHoleBlockstore() ``` +### TieredBlockstore + +A tiered blockstore wraps one or more blockstores and will query each in parallel to retrieve a block - the operation will succeed if any wrapped store has the block. + +Writes are invoked on all wrapped blockstores. + +```js +import { TieredBlockstore } from 'blockstore-core/tiered' + +const store = new TieredBlockstore([ + store1, + store2, + // ...etc +]) +``` + ## API Docs - diff --git a/packages/blockstore-core/package.json b/packages/blockstore-core/package.json index c13d0ba7..7d818092 100644 --- a/packages/blockstore-core/package.json +++ b/packages/blockstore-core/package.json @@ -66,6 +66,10 @@ "./memory": { "types": "./dist/src/memory.d.ts", "import": "./dist/src/memory.js" + }, + "./tiered": { + "types": "./dist/src/tiered.d.ts", + "import": "./dist/src/tiered.js" } }, "eslintConfig": { @@ -175,10 +179,16 @@ "docs": "aegir docs" }, "dependencies": { + "@libp2p/logger": "^2.0.0", "err-code": "^3.0.1", "interface-blockstore": "^5.0.0", "interface-store": "^5.0.0", - "multiformats": "^11.0.2" + "it-drain": "^3.0.1", + "it-filter": "^3.0.0", + "it-merge": "^3.0.1", + "it-pushable": "^3.0.0", + "multiformats": "^11.0.2", + "uint8arrays": "^4.0.2" }, "devDependencies": { "aegir": "^39.0.9", diff --git a/packages/blockstore-core/src/index.ts b/packages/blockstore-core/src/index.ts index c8961594..69fa16c8 100644 --- a/packages/blockstore-core/src/index.ts +++ b/packages/blockstore-core/src/index.ts @@ -2,6 +2,8 @@ import * as ErrorsImport from './errors.js' export { BaseBlockstore } from './base.js' export { MemoryBlockstore } from './memory.js' +export { BlackHoleBlockstore } from './black-hole.js' +export { TieredBlockstore } from './tiered.js' export const Errors = { ...ErrorsImport diff --git a/packages/blockstore-core/src/tiered.ts b/packages/blockstore-core/src/tiered.ts new file mode 100644 index 00000000..ce5c39f1 --- /dev/null +++ b/packages/blockstore-core/src/tiered.ts @@ -0,0 +1,146 @@ +import { logger } from '@libp2p/logger' +import drain from 'it-drain' +import filter from 'it-filter' +import merge from 'it-merge' +import { pushable } from 'it-pushable' +import { BaseBlockstore } from './base.js' +import * as Errors from './errors.js' +import type { Blockstore, Pair } from 'interface-blockstore' +import type { AbortOptions, AwaitIterable } from 'interface-store' +import type { CID } from 'multiformats/cid' + +const log = logger('blockstore:core:tiered') + +/** + * A blockstore that can combine multiple stores. Puts and deletes + * will write through to all blockstores. Has and get will + * try each store sequentially. getAll will use every store but also + * deduplicate any yielded pairs. + */ +export class TieredBlockstore extends BaseBlockstore { + private readonly stores: Blockstore[] + + constructor (stores: Blockstore[]) { + super() + + this.stores = stores.slice() + } + + async put (key: CID, value: Uint8Array, options?: AbortOptions): Promise { + try { + await Promise.all(this.stores.map(async store => { await store.put(key, value, options) })) + return key + } catch (err: any) { + throw Errors.putFailedError(err) + } + } + + async get (key: CID, options?: AbortOptions): Promise { + for (const store of this.stores) { + try { + const res = await store.get(key, options) + if (res != null) return res + } catch (err) { + log.error(err) + } + } + throw Errors.notFoundError() + } + + async has (key: CID, options?: AbortOptions): Promise { + for (const s of this.stores) { + if (await s.has(key, options)) { + return true + } + } + + return false + } + + async delete (key: CID, options?: AbortOptions): Promise { + try { + await Promise.all(this.stores.map(async store => { await store.delete(key, options) })) + } catch (err: any) { + throw Errors.deleteFailedError(err) + } + } + + async * putMany (source: AwaitIterable, options: AbortOptions = {}): AsyncIterable { + let error: Error | undefined + const pushables = this.stores.map(store => { + const source = pushable({ + objectMode: true + }) + + drain(store.putMany(source, options)) + .catch(err => { + // store threw while putting, make sure we bubble the error up + error = err + }) + + return source + }) + + try { + for await (const pair of source) { + if (error != null) { + throw error + } + + pushables.forEach(p => p.push(pair)) + + yield pair.cid + } + } finally { + pushables.forEach(p => p.end()) + } + } + + async * deleteMany (source: AwaitIterable, options: AbortOptions = {}): AsyncIterable { + let error: Error | undefined + const pushables = this.stores.map(store => { + const source = pushable({ + objectMode: true + }) + + drain(store.deleteMany(source, options)) + .catch(err => { + // store threw while deleting, make sure we bubble the error up + error = err + }) + + return source + }) + + try { + for await (const key of source) { + if (error != null) { + throw error + } + + pushables.forEach(p => p.push(key)) + + yield key + } + } finally { + pushables.forEach(p => p.end()) + } + } + + async * getAll (options?: AbortOptions): AwaitIterable { // eslint-disable-line require-yield + // deduplicate yielded pairs + const seen = new Set() + + yield * filter(merge(...this.stores.map(s => s.getAll(options))), (pair) => { + const cidStr = pair.cid.toString() + + if (seen.has(cidStr)) { + return false + } + + seen.add(cidStr) + + return true + }) + } +} diff --git a/packages/blockstore-core/test/tiered.spec.ts b/packages/blockstore-core/test/tiered.spec.ts new file mode 100644 index 00000000..9f15b6f4 --- /dev/null +++ b/packages/blockstore-core/test/tiered.spec.ts @@ -0,0 +1,68 @@ +/* eslint-env mocha */ + +import { expect } from 'aegir/chai' +import { interfaceBlockstoreTests } from 'interface-blockstore-tests' +import { CID } from 'multiformats/cid' +import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import { MemoryBlockstore } from '../src/memory.js' +import { TieredBlockstore } from '../src/tiered.js' +import type { Blockstore } from 'interface-blockstore' + +describe('Tiered', () => { + describe('all stores', () => { + const ms: Blockstore[] = [] + let store: TieredBlockstore + beforeEach(() => { + ms.push(new MemoryBlockstore()) + ms.push(new MemoryBlockstore()) + store = new TieredBlockstore(ms) + }) + + it('put', async () => { + const k = CID.parse('QmTp9VkYvnHyrqKQuFPiuZkiX9gPcqj6x5LJ1rmWuSySnL') + const v = uint8ArrayFromString('world') + await store.put(k, v) + const res = await Promise.all([ms[0].get(k), ms[1].get(k)]) + res.forEach((val) => { + expect(val).to.be.eql(v) + }) + }) + + it('get and has, where available', async () => { + const k = CID.parse('QmTp9VkYvnHyrqKQuFPiuZkiX9gPcqj6x5LJ1rmWuSySnL') + const v = uint8ArrayFromString('world') + await ms[1].put(k, v) + const val = await store.get(k) + expect(val).to.be.eql(v) + const exists = await store.has(k) + expect(exists).to.be.eql(true) + }) + + it('has - key not found', async () => { + expect(await store.has(CID.parse('QmTp9VkYvnHyrqKQuFPiuZkiX9gPcqj6x5LJ1rmWuSySnA'))).to.be.eql(false) + }) + + it('has and delete', async () => { + const k = CID.parse('QmTp9VkYvnHyrqKQuFPiuZkiX9gPcqj6x5LJ1rmWuSySnL') + const v = uint8ArrayFromString('world') + await store.put(k, v) + let res = await Promise.all([ms[0].has(k), ms[1].has(k)]) + expect(res).to.be.eql([true, true]) + await store.delete(k) + res = await Promise.all([ms[0].has(k), ms[1].has(k)]) + expect(res).to.be.eql([false, false]) + }) + }) + + describe('inteface-datastore-single', () => { + interfaceBlockstoreTests({ + setup () { + return new TieredBlockstore([ + new MemoryBlockstore(), + new MemoryBlockstore() + ]) + }, + teardown () { } + }) + }) +})