diff --git a/package.json b/package.json index 3a2ff65..6ae6bad 100644 --- a/package.json +++ b/package.json @@ -43,6 +43,7 @@ "interface-datastore": "^5.1.1", "it-glob": "0.0.13", "it-map": "^1.0.5", + "it-parallel-batch": "^1.0.9", "mkdirp": "^1.0.4" }, "devDependencies": { diff --git a/src/index.js b/src/index.js index 7cb2572..e130e98 100644 --- a/src/index.js +++ b/src/index.js @@ -12,6 +12,7 @@ const { Adapter, Key, Errors } = require('interface-datastore') const map = require('it-map') +const parallel = require('it-parallel-batch') const noop = () => {} const fsAccess = promisify(fs.access || noop) @@ -25,6 +26,11 @@ const fsUnlink = promisify(fs.unlink || noop) * @typedef {import('interface-datastore').KeyQuery} KeyQuery */ +/** + * @template TEntry + * @typedef {import('interface-store').AwaitIterable} AwaitIterable + */ + /** * Write a file atomically * @@ -62,7 +68,7 @@ async function writeFile (path, contents) { class FsDatastore extends Adapter { /** * @param {string} location - * @param {{ createIfMissing?: boolean; errorIfExists?: boolean; extension?: string; } | undefined} [opts] + * @param {{ createIfMissing?: boolean, errorIfExists?: boolean, extension?: string, putManyConcurrency?: number } | undefined} [opts] */ constructor (location, opts) { super() @@ -71,7 +77,10 @@ class FsDatastore extends Adapter { this.opts = Object.assign({}, { createIfMissing: true, errorIfExists: false, - extension: '.data' + extension: '.data', + deleteManyConcurrency: 50, + getManyConcurrency: 50, + putManyConcurrency: 50 }, opts) } @@ -175,6 +184,23 @@ class FsDatastore extends Adapter { } } + /** + * @param {AwaitIterable} source + * @returns {AsyncIterable} + */ + async * putMany (source) { + yield * parallel( + map(source, ({ key, value }) => { + return async () => { + await this.put(key, value) + + return { key, value } + } + }), + this.opts.putManyConcurrency + ) + } + /** * Read from the file system without extension. * @@ -215,6 +241,38 @@ class FsDatastore extends Adapter { return data } + /** + * @param {AwaitIterable} source + * @returns {AsyncIterable} + */ + async * getMany (source) { + yield * parallel( + map(source, key => { + return async () => { + return this.get(key) + } + }), + this.opts.getManyConcurrency + ) + } + + /** + * @param {AwaitIterable} source + * @returns {AsyncIterable} + */ + async * deleteMany (source) { + yield * parallel( + map(source, key => { + return async () => { + await this.delete(key) + + return key + } + }), + this.opts.deleteManyConcurrency + ) + } + /** * Check for the existence of the given key. * @@ -223,6 +281,7 @@ class FsDatastore extends Adapter { */ async has (key) { const parts = this._encode(key) + try { await fsAccess(parts.file) } catch (err) {