Skip to content
This repository has been archived by the owner on Mar 23, 2023. It is now read-only.

Commit

Permalink
feat: parallel block writes (#97)
Browse files Browse the repository at this point in the history
* feat: parallel block writes

When using the `putMany` datastore method, write blocks in parallel
up to a configurable limit.

In local testing this almost doubles the throughput of this module
for multi-block writes.

* chore: use parallel for gets and deletes too
  • Loading branch information
achingbrain authored Jul 23, 2021
1 parent 5a1373a commit ceb5cfd
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 2 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"interface-datastore": "^5.1.1",
"it-glob": "0.0.13",
"it-map": "^1.0.5",
"it-parallel-batch": "^1.0.9",
"mkdirp": "^1.0.4"
},
"devDependencies": {
Expand Down
63 changes: 61 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const {
Adapter, Key, Errors
} = require('interface-datastore')
const map = require('it-map')
const parallel = require('it-parallel-batch')

const noop = () => {}
const fsAccess = promisify(fs.access || noop)
Expand All @@ -25,6 +26,11 @@ const fsUnlink = promisify(fs.unlink || noop)
* @typedef {import('interface-datastore').KeyQuery} KeyQuery
*/

/**
* @template TEntry
* @typedef {import('interface-store').AwaitIterable<TEntry>} AwaitIterable
*/

/**
* Write a file atomically
*
Expand Down Expand Up @@ -62,7 +68,7 @@ async function writeFile (path, contents) {
class FsDatastore extends Adapter {
/**
* @param {string} location
* @param {{ createIfMissing?: boolean; errorIfExists?: boolean; extension?: string; } | undefined} [opts]
* @param {{ createIfMissing?: boolean, errorIfExists?: boolean, extension?: string, putManyConcurrency?: number } | undefined} [opts]
*/
constructor (location, opts) {
super()
Expand All @@ -71,7 +77,10 @@ class FsDatastore extends Adapter {
this.opts = Object.assign({}, {
createIfMissing: true,
errorIfExists: false,
extension: '.data'
extension: '.data',
deleteManyConcurrency: 50,
getManyConcurrency: 50,
putManyConcurrency: 50
}, opts)
}

Expand Down Expand Up @@ -175,6 +184,23 @@ class FsDatastore extends Adapter {
}
}

/**
* @param {AwaitIterable<Pair>} source
* @returns {AsyncIterable<Pair>}
*/
async * putMany (source) {
yield * parallel(
map(source, ({ key, value }) => {
return async () => {
await this.put(key, value)

return { key, value }
}
}),
this.opts.putManyConcurrency
)
}

/**
* Read from the file system without extension.
*
Expand Down Expand Up @@ -215,6 +241,38 @@ class FsDatastore extends Adapter {
return data
}

/**
* @param {AwaitIterable<Key>} source
* @returns {AsyncIterable<Uint8Array>}
*/
async * getMany (source) {
yield * parallel(
map(source, key => {
return async () => {
return this.get(key)
}
}),
this.opts.getManyConcurrency
)
}

/**
* @param {AwaitIterable<Key>} source
* @returns {AsyncIterable<Key>}
*/
async * deleteMany (source) {
yield * parallel(
map(source, key => {
return async () => {
await this.delete(key)

return key
}
}),
this.opts.deleteManyConcurrency
)
}

/**
* Check for the existence of the given key.
*
Expand All @@ -223,6 +281,7 @@ class FsDatastore extends Adapter {
*/
async has (key) {
const parts = this._encode(key)

try {
await fsAccess(parts.file)
} catch (err) {
Expand Down

0 comments on commit ceb5cfd

Please sign in to comment.