From ab2f2b9ee20fbb876b1ea9ea2b33ee35e92b4254 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Sat, 13 Apr 2019 18:04:54 +0100 Subject: [PATCH] feat: refactor to async iterators (#25) * refactor: async iterators Here's a proposal for the datastore interface that uses async/await and async iterators. See https://github.com/ipfs/js-ipfs/issues/1670 for context. License: MIT Signed-off-by: Alan Shaw * chore: use node.js 10 in CI License: MIT Signed-off-by: Alan Shaw * chore: appease linter License: MIT Signed-off-by: Alan Shaw * fix: remove unnecessary assertion License: MIT Signed-off-by: Alan Shaw --- .gitignore | 1 + .npmignore | 1 + .travis.yml | 6 +- README.md | 115 +++++------- appveyor.yml | 2 +- package.json | 5 +- src/memory.js | 83 +++------ src/tests.js | 413 ++++++++++++++++---------------------------- src/utils.js | 90 ++++------ test/memory.spec.js | 8 +- test/utils.spec.js | 121 +++++++------ 11 files changed, 323 insertions(+), 522 deletions(-) diff --git a/.gitignore b/.gitignore index 1f04af2..f8ca32b 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,7 @@ logs *.log coverage +.nyc_output # Runtime data pids diff --git a/.npmignore b/.npmignore index 00a160d..7eaff2d 100644 --- a/.npmignore +++ b/.npmignore @@ -12,6 +12,7 @@ lib-cov # Coverage directory used by tools like istanbul coverage +.nyc_output # Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files) .grunt diff --git a/.travis.yml b/.travis.yml index 584f308..4f7d3f1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,12 +3,8 @@ language: node_js matrix: include: - - node_js: 6 + - node_js: 10 env: CXX=g++-4.8 - - node_js: 8 - env: CXX=g++-4.8 - # - node_js: stable - # env: CXX=g++-4.8 script: - npm run lint diff --git a/README.md b/README.md index 4360323..cd66d5d 100644 --- a/README.md +++ b/README.md @@ -47,14 +47,14 @@ const ShardingStore = require('datastore-core').ShardingDatatstore const NextToLast = require('datastore-core').shard.NextToLast const fs = new FsStore('path/to/store') -ShardingStore.createOrOpen(fs, new NextToLast(2), (err, flatfs) => { - // flatfs now works like go-flatfs -}) + +// flatfs now works like go-flatfs +const flatfs = await ShardingStore.createOrOpen(fs, new NextToLast(2)) ``` ## Install -``` +```sh $ npm install interface-datastore ``` @@ -67,22 +67,21 @@ const MemoryStore = require('interface-datastore').MemoryDatastore const MountStore = require('datastore-core').MountDatastore const Key = require('interface-datastore').Key -const store = new MountStore({prefix: new Key('/a'), datastore: new MemoryStore()}) +const store = new MountStore({ prefix: new Key('/a'), datastore: new MemoryStore() }) ``` -### Testsuite +### Test suite Available under [`src/tests.js`](src/tests.js) ```js describe('mystore', () => { require('interface-datastore/src/tests')({ - setup (callback) { - callback(null, instanceOfMyStore) + async setup () { + return instanceOfMyStore }, - teardown (callback) { + async teardown () { // cleanup resources - callback() } }) }) @@ -99,12 +98,12 @@ const a = new Key('a') const b = new Key(new Buffer('hello')) ``` -The key scheme is inspired by file systems and Google App Engine key model. Keys are meant to be unique across a system. They are typical hierarchical, incorporating more and more specific namespaces. Thus keys can be deemed 'children' or 'ancestors' of other keys: +The key scheme is inspired by file systems and Google App Engine key model. Keys are meant to be unique across a system. They are typically hierarchical, incorporating more and more specific namespaces. Thus keys can be deemed 'children' or 'ancestors' of other keys: - `new Key('/Comedy')` - `new Key('/Comedy/MontyPython')` -Also, every namespace can be parametrized to embed relevant object information. For example, the Key `name` (most specific namespace) could include the object type: +Also, every namespace can be parameterized to embed relevant object information. For example, the Key `name` (most specific namespace) could include the object type: - `new Key('/Comedy/MontyPython/Actor:JohnCleese')` - `new Key('/Comedy/MontyPython/Sketch:CheeseShop')` @@ -117,90 +116,65 @@ Also, every namespace can be parametrized to embed relevant object information. These methods will be present on every datastore. `Key` always means an instance of the above mentioned Key type. Every datastore is generic over the `Value` type, though currently all backing implementations are implemented only for [`Buffer`](https://nodejs.org/docs/latest/api/buffer.html). -### `has(key, callback)` +### `has(key)` -> `Promise` - `key: Key` -- `callback: function(Error, bool)` Check for the existence of a given key ```js -store.has(new Key('awesome'), (err, exists) => { - if (err) { - throw err - } - console.log('is it there', exists) -}) +const exists = await store.has(new Key('awesome')) +console.log('is it there', exists) ``` -### `put(key, value, callback)` +### `put(key, value)` -> `Promise` - `key: Key` - `value: Value` -- `callback: function(Error)` Store a value with the given key. ```js -store.put(new Key('awesome'), new Buffer('datastores'), (err) => { - if (err) { - throw err - } - console.log('put content') -}) +await store.put(new Key('awesome'), new Buffer('datastores')) +console.log('put content') ``` -### `get(key, callback)` +### `get(key)` -> `Promise` - `key: Key` -- `callback: function(Error, Value)` Retrieve the value stored under the given key. ```js -store.get(new Key('awesome'), (err, value) => { - if (err) { - throw err - } - console.log('got content: %s', value.toString()) - // => got content: datastore -}) +const value = await store.get(new Key('awesome')) +console.log('got content: %s', value.toString()) +// => got content: datastore ``` -### `delete(key, callback)` +### `delete(key)` -> `Promise` - `key: Key` -- `callback: function(Error)` Delete the content stored under the given key. ```js -store.delete(new Key('awesome'), (err) => { - if (err) { - throw err - } - console.log('deleted awesome content :(') -}) +await store.delete(new Key('awesome')) +console.log('deleted awesome content :(') ``` -### `query(query)` +### `query(query)` -> `Iterable` - `query: Query` see below for possible values -- Returns: `pull-stream source` -Search the store for some values. Returns a [pull-stream](https://pull-stream.github.io/) with each item being a `Value`. +Search the store for some values. Returns an [Iterable](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols) with each item being a `Value`. ```js // retrieve __all__ values from the store -pull( - store.query({}), - pull.collect((err, list) => { - if (err) { - console.error(err) - } - console.log('ALL THE VALUES', list) - }) -) +let list = [] +for await (const value of store.query({})) { + list.push(value) +} +console.log('ALL THE VALUES', list) ``` #### `Query` @@ -210,9 +184,9 @@ Object in the form with the following optional properties - `prefix: string` (optional) - only return values where the key starts with this prefix - `filters: Array>` (optional) - filter the results according to the these functions - `orders: Array>` (optional) - order the results according to these functions -- `limit: number` (optional) - only return this many records -- `offset: number` (optional) - skip this many records at the beginning -- `keysOnly: bool` (optional) - Only return keys, no values. +- `limit: Number` (optional) - only return this many records +- `offset: Number` (optional) - skip this many records at the beginning +- `keysOnly: Boolean` (optional) - Only return keys, no values. ### `batch()` @@ -225,13 +199,8 @@ for (let i = 0; i < 100; i++) { b.put(new Key(`hello${i}`), new Buffer(`hello world ${i}`)) } -b.commit((err) => { - if (err) { - throw err - } - console.log('put 100 values') -}) - +await b.commit() +console.log('put 100 values') ``` #### `put(key, value)` @@ -247,21 +216,15 @@ Queue a put operation to the store. Queue a delete operation to the store. -#### `commit(callback)` - -- `callback: function(Error)` +#### `commit()` -> `Promise` Write all queued operations to the underyling store. The batch object should not be used after calling this. -### `open(callback)` - -- `callback: function(Error)` +### `open()` -> `Promise` Opens the datastore, this is only needed if the store was closed before, otherwise this is taken care of by the constructor. -### `close(callback)` - -- `callback: function(Error)` +### `close()` -> `Promise` Close the datastore, this should always be called to ensure resources are cleaned up. diff --git a/appveyor.yml b/appveyor.yml index 767f5b4..9188b91 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -2,7 +2,7 @@ version: "{build}" environment: matrix: - - nodejs_version: "6" + - nodejs_version: "10" matrix: fast_finish: true diff --git a/package.json b/package.json index 0b7f774..b788430 100644 --- a/package.json +++ b/package.json @@ -34,17 +34,14 @@ }, "homepage": "https://github.com/ipfs/interface-datastore#readme", "devDependencies": { - "aegir": "^15.1.0", + "aegir": "^17.1.1", "chai": "^4.1.2", "dirty-chai": "^2.0.1", "flow-bin": "~0.83.0" }, "dependencies": { - "async": "^2.6.1", "class-is": "^1.1.0", "err-code": "^1.1.2", - "pull-defer": "~0.2.3", - "pull-stream": "^3.6.9", "uuid": "^3.2.2" }, "engines": { diff --git a/src/memory.js b/src/memory.js index 9ae1a24..b10cb42 100644 --- a/src/memory.js +++ b/src/memory.js @@ -3,11 +3,7 @@ /* :: import type {Batch, Query, QueryResult, Callback} from './' */ -const pull = require('pull-stream') -const setImmediate = require('async/setImmediate') - -const asyncFilter = require('./utils').asyncFilter -const asyncSort = require('./utils').asyncSort +const { filter, sortAll, take, map } = require('./utils') const Key = require('./key') // Errors @@ -20,42 +16,24 @@ class MemoryDatastore { this.data = {} } - open (callback /* : Callback */) /* : void */ { - setImmediate(callback) - } + async open () /* : Promise */ {} - put (key /* : Key */, val /* : Buffer */, callback /* : Callback */) /* : void */ { + async put (key /* : Key */, val /* : Buffer */) /* : Promise */ { this.data[key.toString()] = val - - setImmediate(callback) } - get (key /* : Key */, callback /* : Callback */) /* : void */ { - this.has(key, (err, exists) => { - if (err) { - return callback(err) - } - - if (!exists) { - return callback(Errors.notFoundError()) - } - - callback(null, this.data[key.toString()]) - }) + async get (key /* : Key */) /* : Promise */ { + const exists = await this.has(key) + if (!exists) throw Errors.notFoundError() + return this.data[key.toString()] } - has (key /* : Key */, callback /* : Callback */) /* : void */ { - setImmediate(() => { - callback(null, this.data[key.toString()] !== undefined) - }) + async has (key /* : Key */) /* : Promise */ { + return this.data[key.toString()] !== undefined } - delete (key /* : Key */, callback /* : Callback */) /* : void */ { + async delete (key /* : Key */) /* : Promise */ { delete this.data[key.toString()] - - setImmediate(() => { - callback() - }) } batch () /* : Batch */ { @@ -69,65 +47,54 @@ class MemoryDatastore { delete (key /* : Key */) /* : void */ { dels.push(key) }, - commit: (callback /* : Callback */) /* : void */ => { + commit: async () /* : Promise */ => { puts.forEach(v => { this.data[v[0].toString()] = v[1] }) - puts = [] + dels.forEach(key => { delete this.data[key.toString()] }) dels = [] - - setImmediate(callback) } } } - query (q /* : Query */) /* : QueryResult */ { - let tasks = [pull.keys(this.data), pull.map(k => ({ - key: new Key(k), - value: this.data[k] - }))] + query (q /* : Query */) /* : Iterator */ { + let it = Object.entries(this.data) - let filters = [] + it = map(it, entry => ({ key: new Key(entry[0]), value: entry[1] })) if (q.prefix != null) { - const prefix = q.prefix - filters.push((e, cb) => cb(null, e.key.toString().startsWith(prefix))) + it = filter(it, e => e.key.toString().startsWith(q.prefix)) } - if (q.filters != null) { - filters = filters.concat(q.filters) + if (Array.isArray(q.filters)) { + it = q.filters.reduce((it, f) => filter(it, f), it) } - tasks = tasks.concat(filters.map(f => asyncFilter(f))) - - if (q.orders != null) { - tasks = tasks.concat(q.orders.map(o => asyncSort(o))) + if (Array.isArray(q.orders)) { + it = q.orders.reduce((it, f) => sortAll(it, f), it) } if (q.offset != null) { let i = 0 - // $FlowFixMe - tasks.push(pull.filter(() => i++ >= q.offset)) + it = filter(it, () => i++ >= q.offset) } if (q.limit != null) { - tasks.push(pull.take(q.limit)) + it = take(it, q.limit) } if (q.keysOnly === true) { - tasks.push(pull.map(e => ({ key: e.key }))) + it = map(it, e => ({ key: e.key })) } - return pull.apply(null, tasks) + return it } - close (callback /* : Callback */) /* : void */ { - setImmediate(callback) - } + async close () /* : Promise */ {} } module.exports = MemoryDatastore diff --git a/src/tests.js b/src/tests.js index 90b0938..5b52009 100644 --- a/src/tests.js +++ b/src/tests.js @@ -6,11 +6,6 @@ const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect -const pull = require('pull-stream') -const series = require('async/series') -const parallel = require('async/parallel') -const map = require('async/map') -const each = require('async/each') const crypto = require('crypto') const Key = require('../src').Key @@ -23,62 +18,38 @@ type Test = { } */ -const check = (s) => { - if (s == null) { - throw new Error('missing store') - } - return s -} - module.exports = (test/* : Test */) => { - const cleanup = (store, done) => { - series([ - (cb) => check(store).close(cb), - (cb) => test.teardown(cb) - ], done) + const cleanup = async store => { + await store.close() + await test.teardown() } describe('put', () => { let store - beforeEach((done) => { - test.setup((err, s) => { - if (err) { - throw err - } - store = s - done() - }) + beforeEach(async () => { + store = await test.setup() + if (!store) throw new Error('missing store') }) - afterEach((done) => { - cleanup(store, done) - }) + afterEach(() => cleanup(store)) - it('simple', (done) => { + it('simple', () => { const k = new Key('/z/one') - check(store).put(k, Buffer.from('one'), done) + return store.put(k, Buffer.from('one')) }) - it('parallel', (done) => { + it('parallel', async () => { const data = [] for (let i = 0; i < 100; i++) { data.push([new Key(`/z/key${i}`), Buffer.from(`data${i}`)]) } - each(data, (d, cb) => { - check(store).put(d[0], d[1], cb) - }, (err) => { - expect(err).to.not.exist() - map(data, (d, cb) => { - check(store).get(d[0], cb) - }, (err, res) => { - expect(err).to.not.exist() - res.forEach((res, i) => { - expect(res).to.be.eql(data[i][1]) - }) - done() - }) + await Promise.all(data.map(d => store.put(d[0], d[1]))) + const res = await Promise.all(data.map(d => store.get(d[0]))) + + res.forEach((res, i) => { + expect(res).to.be.eql(data[i][1]) }) }) }) @@ -86,156 +57,101 @@ module.exports = (test/* : Test */) => { describe('get', () => { let store - beforeEach((done) => { - test.setup((err, s) => { - if (err) { - throw err - } - store = s - done() - }) + beforeEach(async () => { + store = await test.setup() + if (!store) throw new Error('missing store') }) - afterEach((done) => { - cleanup(store, done) - }) + afterEach(() => cleanup(store)) - it('simple', (done) => { + it('simple', async () => { const k = new Key('/z/one') - series([ - (cb) => check(store).put(k, Buffer.from('hello'), cb), - (cb) => check(store).get(k, (err, res) => { - expect(err).to.not.exist() - expect(res).to.be.eql(Buffer.from('hello')) - cb() - }) - ], done) + await store.put(k, Buffer.from('hello')) + const res = await store.get(k) + expect(res).to.be.eql(Buffer.from('hello')) }) - it('should return error with missing key', (done) => { + it('should throw error for missing key', async () => { const k = new Key('/does/not/exist') - check(store).get(k, (err) => { - expect(err).to.exist() + + try { + await store.get(k) + } catch (err) { expect(err).to.have.property('code', 'ERR_NOT_FOUND') - done() - }) + return + } + + throw new Error('expected error to be thrown') }) }) describe('delete', () => { let store - beforeEach((done) => { - test.setup((err, s) => { - if (err) { - throw err - } - store = s - done() - }) + beforeEach(async () => { + store = await test.setup() + if (!store) throw new Error('missing store') }) - afterEach((done) => { - cleanup(store, done) - }) + afterEach(() => cleanup(store)) - it('simple', (done) => { + it('simple', async () => { const k = new Key('/z/one') - series([ - (cb) => check(store).put(k, Buffer.from('hello'), cb), - (cb) => check(store).get(k, (err, res) => { - expect(err).to.not.exist() - expect(res).to.be.eql(Buffer.from('hello')) - cb() - }), - (cb) => check(store).delete(k, cb), - (cb) => check(store).has(k, (err, exists) => { - expect(err).to.not.exist() - expect(exists).to.be.eql(false) - cb() - }) - ], done) + await store.put(k, Buffer.from('hello')) + await store.get(k) + await store.delete(k) + const exists = await store.has(k) + expect(exists).to.be.eql(false) }) - it('parallel', (done) => { + it('parallel', async () => { const data = [] for (let i = 0; i < 100; i++) { data.push([new Key(`/a/key${i}`), Buffer.from(`data${i}`)]) } - series([ - (cb) => each(data, (d, cb) => { - check(store).put(d[0], d[1], cb) - }, cb), - (cb) => map(data, (d, cb) => { - check(store).has(d[0], cb) - }, (err, res) => { - expect(err).to.not.exist() - res.forEach((res, i) => { - expect(res).to.be.eql(true) - }) - cb() - }), - (cb) => each(data, (d, cb) => { - check(store).delete(d[0], cb) - }, cb), - (cb) => map(data, (d, cb) => { - check(store).has(d[0], cb) - }, (err, res) => { - expect(err).to.not.exist() - res.forEach((res, i) => { - expect(res).to.be.eql(false) - }) - cb() - }) - ], done) + await Promise.all(data.map(d => store.put(d[0], d[1]))) + + const res0 = await Promise.all(data.map(d => store.has(d[0]))) + res0.forEach((res, i) => expect(res).to.be.eql(true)) + + await Promise.all(data.map(d => store.delete(d[0]))) + + const res1 = await Promise.all(data.map(d => store.has(d[0]))) + res1.forEach((res, i) => expect(res).to.be.eql(false)) }) }) describe('batch', () => { let store - beforeEach((done) => { - test.setup((err, s) => { - if (err) { - throw err - } - store = s - done() - }) + beforeEach(async () => { + store = await test.setup() + if (!store) throw new Error('missing store') }) - afterEach((done) => { - cleanup(store, done) - }) + afterEach(() => cleanup(store)) - it('simple', (done) => { - const b = check(store).batch() - - series([ - (cb) => check(store).put(new Key('/z/old'), Buffer.from('old'), cb), - (cb) => { - b.put(new Key('/a/one'), Buffer.from('1')) - b.put(new Key('/q/two'), Buffer.from('2')) - b.put(new Key('/q/three'), Buffer.from('3')) - b.delete(new Key('/z/old')) - b.commit(cb) - }, - (cb) => map( - ['/a/one', '/q/two', '/q/three', '/z/old'], - (k, cb) => check(store).has(new Key(k), cb), - (err, res) => { - expect(err).to.not.exist() - expect(res).to.be.eql([true, true, true, false]) - cb() - } - ) - ], done) + it('simple', async () => { + const b = store.batch() + + await store.put(new Key('/z/old'), Buffer.from('old')) + + b.put(new Key('/a/one'), Buffer.from('1')) + b.put(new Key('/q/two'), Buffer.from('2')) + b.put(new Key('/q/three'), Buffer.from('3')) + b.delete(new Key('/z/old')) + await b.commit() + + const keys = ['/a/one', '/q/two', '/q/three', '/z/old'] + const res = await Promise.all(keys.map(k => store.has(new Key(k)))) + + expect(res).to.be.eql([true, true, true, false]) }) - it('many (3 * 400)', function (done) { + it('many (3 * 400)', async function () { this.timeout(20 * 1000) - const b = check(store).batch() + const b = store.batch() const count = 400 for (let i = 0; i < count; i++) { b.put(new Key(`/a/hello${i}`), crypto.randomBytes(32)) @@ -243,47 +159,40 @@ module.exports = (test/* : Test */) => { b.put(new Key(`/z/hello${i}`), crypto.randomBytes(128)) } - series([ - (cb) => b.commit(cb), - (cb) => parallel([ - (cb) => pull(check(store).query({prefix: '/a'}), pull.collect(cb)), - (cb) => pull(check(store).query({prefix: '/z'}), pull.collect(cb)), - (cb) => pull(check(store).query({prefix: '/q'}), pull.collect(cb)) - ], (err, res) => { - expect(err).to.not.exist() - expect(res[0]).to.have.length(count) - expect(res[1]).to.have.length(count) - expect(res[2]).to.have.length(count) - cb() - }) - ], done) + await b.commit() + + const total = async iterable => { + let count = 0 + for await (const _ of iterable) count++ // eslint-disable-line + return count + } + + expect(await total(store.query({ prefix: '/a' }))).to.equal(count) + expect(await total(store.query({ prefix: '/z' }))).to.equal(count) + expect(await total(store.query({ prefix: '/q' }))).to.equal(count) }) }) describe('query', () => { let store - const hello = {key: new Key('/q/1hello'), value: Buffer.from('1')} - const world = {key: new Key('/z/2world'), value: Buffer.from('2')} - const hello2 = {key: new Key('/z/3hello2'), value: Buffer.from('3')} - const filter1 = (entry, cb) => { - cb(null, !entry.key.toString().endsWith('hello')) - } + const hello = { key: new Key('/q/1hello'), value: Buffer.from('1') } + const world = { key: new Key('/z/2world'), value: Buffer.from('2') } + const hello2 = { key: new Key('/z/3hello2'), value: Buffer.from('3') } - const filter2 = (entry, cb) => { - cb(null, entry.key.toString().endsWith('hello2')) - } + const filter1 = async entry => !entry.key.toString().endsWith('hello') + const filter2 = entry => entry.key.toString().endsWith('hello2') - const order1 = (res, cb) => { - cb(null, res.sort((a, b) => { + const order1 = async res => { + return res.sort((a, b) => { if (a.value.toString() < b.value.toString()) { return -1 } return 1 - })) + }) } - const order2 = (res, cb) => { - const out = res.sort((a, b) => { + const order2 = res => { + return res.sort((a, b) => { if (a.value.toString() < b.value.toString()) { return 1 } @@ -292,106 +201,86 @@ module.exports = (test/* : Test */) => { } return 0 }) - - cb(null, out) } const tests = [ ['empty', {}, [hello, world, hello2]], - ['prefix', {prefix: '/z'}, [world, hello2]], - ['1 filter', {filters: [filter1]}, [world, hello2]], - ['2 filters', {filters: [filter1, filter2]}, [hello2]], - ['limit', {limit: 1}, 1], - ['offset', {offset: 1}, 2], - ['keysOnly', {keysOnly: true}, [{key: hello.key}, {key: world.key}, {key: hello2.key}]], - ['1 order (1)', {orders: [order1]}, [hello, world, hello2]], - ['1 order (reverse 1)', {orders: [order2]}, [hello2, world, hello]] + ['prefix', { prefix: '/z' }, [world, hello2]], + ['1 filter', { filters: [filter1] }, [world, hello2]], + ['2 filters', { filters: [filter1, filter2] }, [hello2]], + ['limit', { limit: 1 }, 1], + ['offset', { offset: 1 }, 2], + ['keysOnly', { keysOnly: true }, [{ key: hello.key }, { key: world.key }, { key: hello2.key }]], + ['1 order (1)', { orders: [order1] }, [hello, world, hello2]], + ['1 order (reverse 1)', { orders: [order2] }, [hello2, world, hello]] ] - before((done) => { - test.setup((err, s) => { - if (err) { - throw err - } - store = s + before(async () => { + store = await test.setup() + if (!store) throw new Error('missing store') - const b = check(store).batch() + const b = store.batch() - b.put(hello.key, hello.value) - b.put(world.key, world.value) - b.put(hello2.key, hello2.value) + b.put(hello.key, hello.value) + b.put(world.key, world.value) + b.put(hello2.key, hello2.value) - b.commit(done) - }) + return b.commit() }) - after((done) => { - cleanup(store, done) - }) + after(() => cleanup(store)) + + tests.forEach(t => it(t[0], async () => { + let res = [] + for await (const value of store.query(t[1])) res.push(value) - tests.forEach((t) => it(t[0], (done) => { - pull( - check(store).query(t[1]), - pull.collect((err, res) => { - expect(err).to.not.exist() - const expected = t[2] - if (Array.isArray(expected)) { - if (t[1].orders == null) { - expect(res).to.have.length(expected.length) - const s = (a, b) => { - if (a.key.toString() < b.key.toString()) { - return 1 - } else { - return -1 - } - } - res = res.sort(s) - const exp = expected.sort(s) - - res.forEach((r, i) => { - expect(r.key.toString()).to.be.eql(exp[i].key.toString()) - - if (r.value == null) { - expect(exp[i].value).to.not.exist() - } else { - expect(r.value.equals(exp[i].value)).to.be.eql(true) - } - }) + const expected = t[2] + if (Array.isArray(expected)) { + if (t[1].orders == null) { + expect(res).to.have.length(expected.length) + const s = (a, b) => { + if (a.key.toString() < b.key.toString()) { + return 1 } else { - expect(res).to.be.eql(t[2]) + return -1 } - } else if (typeof expected === 'number') { - expect(res).to.have.length(expected) } - done() - }) - ) + res = res.sort(s) + const exp = expected.sort(s) + + res.forEach((r, i) => { + expect(r.key.toString()).to.be.eql(exp[i].key.toString()) + + if (r.value == null) { + expect(exp[i].value).to.not.exist() + } else { + expect(r.value.equals(exp[i].value)).to.be.eql(true) + } + }) + } else { + expect(res).to.be.eql(t[2]) + } + } else if (typeof expected === 'number') { + expect(res).to.have.length(expected) + } })) }) describe('lifecycle', () => { let store - before((done) => { - test.setup((err, s) => { - if (err) { - throw err - } - store = s - done() - }) - }) - after((done) => { - cleanup(store, done) + before(async () => { + store = await test.setup() + if (!store) throw new Error('missing store') }) - it('close and open', (done) => { - series([ - (cb) => check(store).close(cb), - (cb) => check(store).open(cb), - (cb) => check(store).close(cb), - (cb) => check(store).open(cb) - ], done) + after(() => cleanup(store)) + + it('close and open', async () => { + await store.close() + await store.open() + await store.close() + await store.open() }) }) } diff --git a/src/utils.js b/src/utils.js index 7be47c8..daab6bf 100644 --- a/src/utils.js +++ b/src/utils.js @@ -1,72 +1,48 @@ 'use strict' -const pull = require('pull-stream') -const Source = require('pull-defer/source') const path = require('path') const os = require('os') const uuid = require('uuid/v4') -exports.asyncFilter = function (test) { - let busy = false - let abortCb - let aborted - - return function (read) { - return function next (abort, cb) { - if (aborted) return cb(aborted) - if (abort) { - aborted = abort - if (!busy) { - read(abort, cb) - } else { - read(abort, () => { - // if we are still busy, wait for the test to complete. - if (busy) abortCb = cb; else cb(abort) - }) - } - } else { - read(null, (end, data) => { - if (end) cb(end); else if (aborted) cb(aborted); else { - busy = true - test(data, (err, valid) => { - busy = false - if (aborted) { - cb(aborted) - abortCb(aborted) - } else if (err) { - next(err, cb) - } else if (valid) { - cb(null, data) - } else { - next(null, cb) - } - }) - } - }) - } +exports.filter = (iterable, filterer) => { + return (async function * () { + for await (const value of iterable) { + const keep = await filterer(value) + if (!keep) continue + yield value } - } + })() } -exports.asyncSort = function (sorter) { - const source = Source() +// Not just sort, because the sorter is given all the values and should return +// them all sorted +exports.sortAll = (iterable, sorter) => { + return (async function * () { + let values = [] + for await (const value of iterable) values.push(value) + values = await sorter(values) + for (const value of values) yield value + })() +} - const sink = pull.collect((err, ary) => { - if (err) { - return source.abort(err) +exports.take = (iterable, n) => { + return (async function * () { + if (n <= 0) return + let i = 0 + for await (const value of iterable) { + yield value + i++ + if (i >= n) return } - sorter(ary, (err, res) => { - if (err) { - return source.abort(err) - } - source.resolve(pull.values(ary)) - }) - }) + })() +} - return function (read) { - sink(read) - return source - } +exports.map = (iterable, mapper) => { + return (async function * () { + for await (const value of iterable) { + yield mapper(value) + } + })() } exports.replaceStartWith = function (s, r) { diff --git a/test/memory.spec.js b/test/memory.spec.js index b154362..de16d9b 100644 --- a/test/memory.spec.js +++ b/test/memory.spec.js @@ -7,12 +7,10 @@ const MemoryDatastore = require('../src').MemoryDatastore describe('Memory', () => { describe('interface-datastore', () => { require('../src/tests')({ - setup (callback) { - callback(null, new MemoryDatastore()) + setup () { + return new MemoryDatastore() }, - teardown (callback) { - callback() - } + teardown () {} }) }) }) diff --git a/test/utils.spec.js b/test/utils.spec.js index 807ae79..1a7c617 100644 --- a/test/utils.spec.js +++ b/test/utils.spec.js @@ -2,7 +2,6 @@ /* eslint-env mocha */ 'use strict' -const pull = require('pull-stream') const chai = require('chai') chai.use(require('dirty-chai')) const expect = chai.expect @@ -10,65 +9,79 @@ const expect = chai.expect const utils = require('../src').utils describe('utils', () => { - it('asyncFilter - sync', (done) => { - pull( - pull.values([1, 2, 3, 4]), - utils.asyncFilter((val, cb) => { - cb(null, val % 2 === 0) - }), - pull.collect((err, res) => { - expect(err).to.not.exist() - expect(res).to.be.eql([2, 4]) - done() - }) - ) + it('filter - sync', async () => { + const data = [1, 2, 3, 4] + const filterer = val => val % 2 === 0 + const res = [] + for await (const val of utils.filter(data, filterer)) { + res.push(val) + } + expect(res).to.be.eql([2, 4]) }) - it('asyncFilter - async', (done) => { - pull( - pull.values([1, 2, 3, 4]), - utils.asyncFilter((val, cb) => { - setTimeout(() => { - cb(null, val % 2 === 0) - }, 10) - }), - pull.collect((err, res) => { - expect(err).to.not.exist() - expect(res).to.be.eql([2, 4]) - done() - }) - ) + it('filter - async', async () => { + const data = [1, 2, 3, 4] + const filterer = async val => val % 2 === 0 + const res = [] + for await (const val of utils.filter(data, filterer)) { + res.push(val) + } + expect(res).to.be.eql([2, 4]) }) - it('asyncSort', (done) => { - pull( - pull.values([1, 2, 3, 4]), - utils.asyncSort((res, cb) => { - setTimeout(() => { - cb(null, res.reverse()) - }, 10) - }), - pull.collect((err, res) => { - expect(err).to.not.exist() - expect(res).to.be.eql([4, 3, 2, 1]) - done() - }) - ) + it('sortAll', async () => { + const data = [1, 2, 3, 4] + const sorter = async vals => vals.reverse() + const res = [] + for await (const val of utils.sortAll(data, sorter)) { + res.push(val) + } + expect(res).to.be.eql([4, 3, 2, 1]) }) - it('asyncSort - fail', (done) => { - pull( - pull.values([1, 2, 3, 4]), - utils.asyncSort((res, cb) => { - setTimeout(() => { - cb(new Error('fail')) - }, 10) - }), - pull.collect((err, res) => { - expect(err.message).to.be.eql('fail') - done() - }) - ) + it('sortAll - fail', async () => { + const data = [1, 2, 3, 4] + const sorter = async vals => { throw new Error('fail') } + const res = [] + + try { + for await (const val of utils.sortAll(data, sorter)) { + res.push(val) + } + } catch (err) { + expect(err.message).to.be.eql('fail') + return + } + + throw new Error('expected error to be thrown') + }) + + it('should take n values from iterator', async () => { + const data = [1, 2, 3, 4] + const n = 3 + const res = [] + for await (const val of utils.take(data, n)) { + res.push(val) + } + expect(res).to.be.eql([1, 2, 3]) + }) + + it('should take nothing from iterator', async () => { + const data = [1, 2, 3, 4] + const n = 0 + for await (const _ of utils.take(data, n)) { // eslint-disable-line + throw new Error('took a value') + } + }) + + it('should map iterator values', async () => { + const data = [1, 2, 3, 4] + const mapper = n => n * 2 + const res = [] + for await (const val of utils.map(data, mapper)) { + res.push(val) + } + expect(res).to.be.eql([2, 4, 6, 8]) }) it('replaceStartWith', () => {