Skip to content
This repository has been archived by the owner on Mar 23, 2023. It is now read-only.

Commit

Permalink
Refactor fs datastore to us async/await
Browse files Browse the repository at this point in the history
  • Loading branch information
Zane Starr committed Mar 11, 2019
1 parent 45ec48b commit 724c3e8
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 161 deletions.
9 changes: 4 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,15 @@
"homepage": "https://github.com/ipfs/js-datastore-fs#readme",
"dependencies": {
"async": "^2.6.1",
"datastore-core": "~0.6.0",
"datastore-core": "git://github.com:zcstarr/js-datastore-core.git",
"fast-write-atomic": "~0.2.0",
"glob": "^7.1.3",
"graceful-fs": "^4.1.11",
"interface-datastore": "~0.6.0",
"mkdirp": "~0.5.1",
"pull-stream": "^3.6.9"
"interface-datastore": "git://github.com/ipfs/interface-datastore.git#refactor/async-iterators",
"mkdirp": "~0.5.1"
},
"devDependencies": {
"aegir": "^15.3.1",
"aegir": "^18.2.0",
"chai": "^4.2.0",
"cids": "~0.5.5",
"dirty-chai": "^2.0.1",
Expand Down
178 changes: 81 additions & 97 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@
/* :: import type {Batch, Query, QueryResult, Callback} from 'interface-datastore' */

const fs = require('graceful-fs')
const pull = require('pull-stream')
const glob = require('glob')
const setImmediate = require('async/setImmediate')
const waterfall = require('async/series')
const each = require('async/each')
const mkdirp = require('mkdirp')
const writeFile = require('fast-write-atomic')
const promisify = require('util').promisify
const writeFile = promisify(require('fast-write-atomic'))
const path = require('path')

const asyncFilter = require('interface-datastore').utils.asyncFilter
const asyncSort = require('interface-datastore').utils.asyncSort
const filter = require('interface-datastore').utils.filter
const take = require('interface-datastore').utils.take
const map = require('interface-datastore').utils.map
const sortAll = require('interface-datastore').utils.sortAll
const IDatastore = require('interface-datastore')

const asyncMkdirp = promisify(require('mkdirp'))
const fsAccess = promisify(fs.access)
const fsReadFile = promisify(fs.readFile)
const fsUnlink = promisify(fs.unlink)

const Key = IDatastore.Key
const Errors = IDatastore.Errors

Expand Down Expand Up @@ -57,9 +62,8 @@ class FsDatastore {
}
}

open (callback /* : Callback<void> */) /* : void */ {
open () /* : void */ {
this._openOrCreate()
setImmediate(callback)
}

/**
Expand Down Expand Up @@ -150,104 +154,97 @@ class FsDatastore {
*
* @param {Key} key
* @param {Buffer} val
* @param {function(Error)} callback
* @returns {void}
* @returns {Promise<void>}
*/
putRaw (key /* : Key */, val /* : Buffer */, callback /* : Callback<void> */) /* : void */ {
async putRaw (key /* : Key */, val /* : Buffer */) /* : void */ {
const parts = this._encode(key)
const file = parts.file.slice(0, -this.opts.extension.length)
waterfall([
(cb) => mkdirp(parts.dir, { fs: fs }, cb),
(cb) => writeFile(file, val, cb)
], (err) => callback(err))
await asyncMkdirp(parts.dir, { fs: fs })
await writeFile(file, val)
}

/**
* Store the given value under the key.
*
* @param {Key} key
* @param {Buffer} val
* @param {function(Error)} callback
* @returns {void}
* @returns {Promise<void>}
*/
put (key /* : Key */, val /* : Buffer */, callback /* : Callback<void> */) /* : void */ {
async put (key /* : Key */, val /* : Buffer */) /* : void */ {
const parts = this._encode(key)
waterfall([
(cb) => mkdirp(parts.dir, { fs: fs }, cb),
(cb) => writeFile(parts.file, val, cb)
], (err) => {
if (err) {
return callback(Errors.dbWriteFailedError(err))
}
callback()
})
try {
await asyncMkdirp(parts.dir, { fs: fs })
await writeFile(parts.file, val)
} catch (err) {
throw Errors.dbWriteFailedError(err)
}
}

/**
* Read from the file system without extension.
*
* @param {Key} key
* @param {function(Error, Buffer)} callback
* @returns {void}
* @returns {Promise<Buffer>}
*/
getRaw (key /* : Key */, callback /* : Callback<Buffer> */) /* : void */ {
async getRaw (key /* : Key */) /* : void */ {
const parts = this._encode(key)
let file = parts.file
file = file.slice(0, -this.opts.extension.length)
fs.readFile(file, (err, data) => {
if (err) {
return callback(Errors.notFoundError(err))
}
callback(null, data)
})
let data
try {
data = await fsReadFile(file)
} catch (err) {
throw Errors.notFoundError(err)
}
return data
}

/**
* Read from the file system.
*
* @param {Key} key
* @param {function(Error, Buffer)} callback
* @returns {void}
* @returns {Promise<Buffer>}
*/
get (key /* : Key */, callback /* : Callback<Buffer> */) /* : void */ {
async get (key /* : Key */) /* : void */ {
const parts = this._encode(key)
fs.readFile(parts.file, (err, data) => {
if (err) {
return callback(Errors.notFoundError(err))
}
callback(null, data)
})
let data
try {
data = await fsReadFile(parts.file)
} catch (err) {
throw Errors.notFoundError(err)
}
return data
}

/**
* Check for the existence of the given key.
*
* @param {Key} key
* @param {function(Error, bool)} callback
* @returns {void}
* @returns {Promise<bool>}
*/
has (key /* : Key */, callback /* : Callback<bool> */) /* : void */ {
async has (key /* : Key */) /* : void */ {
const parts = this._encode(key)
fs.access(parts.file, err => {
callback(null, !err)
})
try {
await fsAccess(parts.file)
} catch (err) {
return false
}
return true
}

/**
* Delete the record under the given key.
*
* @param {Key} key
* @param {function(Error)} callback
* @returns {void}
* @returns {Promise<void>}
*/
delete (key /* : Key */, callback /* : Callback<void> */) /* : void */ {
async delete (key /* : Key */) /* : void */ {
const parts = this._encode(key)
fs.unlink(parts.file, (err) => {
if (err) {
return callback(Errors.dbDeleteFailedError(err))
}
callback()
})
try {
await fsUnlink(parts.file)
} catch (err) {
throw Errors.dbDeleteFailedError(err)
}
}

/**
Expand All @@ -265,15 +262,9 @@ class FsDatastore {
delete (key /* : Key */) /* : void */ {
deletes.push(key)
},
commit: (callback /* : (err: ?Error) => void */) => {
waterfall([
(cb) => each(puts, (p, cb) => {
this.put(p.key, p.value, cb)
}, cb),
(cb) => each(deletes, (k, cb) => {
this.delete(k, cb)
}, cb)
], (err) => callback(err))
commit: async () /* : Promise<void> */ => {
await Promise.all((puts.map((put) => this.put(put.key, put.value))))
await Promise.all((deletes.map((del) => this.delete(del))))
}
}
}
Expand All @@ -282,7 +273,7 @@ class FsDatastore {
* Query the store.
*
* @param {Object} q
* @returns {PullStream}
* @returns {Iterable}
*/
query (q /* : Query<Buffer> */) /* : QueryResult<Buffer> */ {
// glob expects a POSIX path
Expand All @@ -291,53 +282,46 @@ class FsDatastore {
.join(this.path, prefix, '*' + this.opts.extension)
.split(path.sep)
.join('/')
let tasks = [pull.values(glob.sync(pattern))]

let files = glob.sync(pattern)
let it
if (!q.keysOnly) {
tasks.push(pull.asyncMap((f, cb) => {
fs.readFile(f, (err, buf) => {
if (err) {
return cb(err)
}
cb(null, {
key: this._decode(f),
value: buf
})
})
}))
it = map(files, async (f) => {
const buf = await fsReadFile(f)
return {
key: this._decode(f),
value: buf
}
})
} else {
tasks.push(pull.map(f => ({ key: this._decode(f) })))
it = map(files, f => ({ key: this._decode(f) }))
}

if (q.filters != null) {
tasks = tasks.concat(q.filters.map(asyncFilter))
if (Array.isArray(q.filters)) {
it = q.filters.reduce((it, f) => filter(it, f), it)
}

if (q.orders != null) {
tasks = tasks.concat(q.orders.map(asyncSort))
if (Array.isArray(q.orders)) {
it = q.orders.reduce((it, f) => sortAll(it, f), it)
}

if (q.offset != null) {
let i = 0
tasks.push(pull.filter(() => i++ >= q.offset))
it = filter(it, () => i++ >= q.offset)
}

if (q.limit != null) {
tasks.push(pull.take(q.limit))
it = take(it, q.limit)
}

return pull.apply(null, tasks)
return it
}

/**
* Close the store.
*
* @param {function(Error)} callback
* @returns {void}
* @returns {Promise<void>}
*/
close (callback /* : (err: ?Error) => void */) /* : void */ {
setImmediate(callback)
}
async close () /* : Promise<void> */ { }
}

module.exports = FsDatastore
Loading

0 comments on commit 724c3e8

Please sign in to comment.