Skip to content

Commit

Permalink
test: add separate-thread concurrency test (#305)
Browse files Browse the repository at this point in the history
* tmp: testing concurrent writes with worker threads

* test: add separate-thread concurrency test

* chore: re-throw errors from writer-worker

* fix: replace fast-atomic-write with steno (#285)

fixes #284

fast-write-atomic hasn't been updated in 5 years, is CJS, and is slower than steno (updated 2 months ago).

Benchmarks for various content-types & libraries (though we only use Uint8Arrays) can be found at https://github.com/SgtPooki/fast-write-atomic#benchmarks

However, there may be further room for improvement by moving to [fs.createWriteStream](https://nodejs.org/api/fs.html#fscreatewritestreampath-options)


```
╰─ ✔ ❯ hyperfine --parameter-list branch 284-chore-replace-fast-write-atomic-with-steno,main --setup "git switch {branch} && npm run reset && npm i && npm run build" --runs 20  -w 1 "npm run test:node"
Benchmark 1: npm run test:node (branch = 284-chore-replace-fast-write-atomic-with-steno)
  Time (mean ± σ):     27.212 s ±  0.832 s    [User: 34.810 s, System: 6.051 s]
  Range (min … max):   25.927 s … 29.324 s    20 runs

Benchmark 2: npm run test:node (branch = main)
  Time (mean ± σ):     42.971 s ±  0.637 s    [User: 35.297 s, System: 7.534 s]
  Range (min … max):   42.178 s … 44.796 s    20 runs

Summary
  npm run test:node (branch = 284-chore-replace-fast-write-atomic-with-steno) ran
    1.58 ± 0.05 times faster than npm run test:node (branch = main)
```

---

### Updated benchmarks of `npm run test` as of 2024-04-19

```
╭─    ~/code/work/protocol.ai/ipfs/js-stores    main ?1 
╰─ ✔ ❯ hyperfine --parameter-list branch main,test/not-same-event-loop-concurrency,284-chore-replace-fast-write-atomic-with-steno --setup "git switch {branch} && npm run reset && npm i && npm run build && cd packages/datastore-fs" "npm run test"
Benchmark 1: npm run test (branch = main)
  Time (mean ± σ):     99.415 s ±  2.918 s    [User: 69.659 s, System: 23.361 s]
  Range (min … max):   96.134 s … 105.200 s    10 runs

Benchmark 2: npm run test (branch = test/not-same-event-loop-concurrency)
  Time (mean ± σ):     103.456 s ±  3.186 s    [User: 74.442 s, System: 25.261 s]
  Range (min … max):   98.813 s … 108.429 s    10 runs

Benchmark 3: npm run test (branch = 284-chore-replace-fast-write-atomic-with-steno)
  Time (mean ± σ):     80.308 s ±  2.107 s    [User: 74.331 s, System: 22.228 s]
  Range (min … max):   78.219 s … 84.277 s    10 runs

Summary
  npm run test (branch = 284-chore-replace-fast-write-atomic-with-steno) ran
    1.24 ± 0.05 times faster than npm run test (branch = main)
    1.29 ± 0.05 times faster than npm run test (branch = test/not-same-event-loop-concurrency)                                                                                        [49m1.944s]

╭─    ~/code/work/protocol.ai/ipfs/js-stores    284-chore-re…c-with-steno ?1 
╰─ ✔ ❯ hyperfine --parameter-list branch main,test/not-same-event-loop-concurrency,284-chore-replace-fast-write-atomic-with-steno --setup "git switch {branch} && npm run reset && npm i && npm run build && cd packages/blockstore-fs" "npm run test"
Benchmark 1: npm run test (branch = main)
  Time (mean ± σ):     98.840 s ±  2.612 s    [User: 68.486 s, System: 22.585 s]
  Range (min … max):   97.005 s … 104.396 s    10 runs

Benchmark 2: npm run test (branch = test/not-same-event-loop-concurrency)
  Time (mean ± σ):     105.307 s ±  2.335 s    [User: 72.442 s, System: 24.766 s]
  Range (min … max):   101.167 s … 109.007 s    10 runs

Benchmark 3: npm run test (branch = 284-chore-replace-fast-write-atomic-with-steno)
  Time (mean ± σ):     77.012 s ±  1.829 s    [User: 74.442 s, System: 21.938 s]
  Range (min … max):   75.258 s … 80.825 s    10 runs

Summary
  npm run test (branch = 284-chore-replace-fast-write-atomic-with-steno) ran
    1.28 ± 0.05 times faster than npm run test (branch = main)
    1.37 ± 0.04 times faster than npm run test (branch = test/not-same-event-loop-concurrency)
```

* chore: fix lint error

---------

Co-authored-by: Alex Potsides <alex@achingbrain.net>
  • Loading branch information
SgtPooki and achingbrain authored Sep 17, 2024
1 parent dd2f303 commit 5e3114e
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 27 deletions.
7 changes: 4 additions & 3 deletions packages/blockstore-fs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,17 @@
"release": "aegir release"
},
"dependencies": {
"fast-write-atomic": "^0.2.1",
"interface-blockstore": "^5.0.0",
"interface-store": "^6.0.0",
"it-glob": "^3.0.1",
"it-map": "^3.1.1",
"it-parallel-batch": "^3.0.6",
"multiformats": "^13.2.3"
"multiformats": "^13.2.3",
"steno": "^4.0.2"
},
"devDependencies": {
"aegir": "^44.1.1",
"interface-blockstore-tests": "^7.0.0"
"interface-blockstore-tests": "^7.0.0",
"threads": "^1.7.0"
}
}
17 changes: 6 additions & 11 deletions packages/blockstore-fs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,34 @@

import fs from 'node:fs/promises'
import path from 'node:path'
import { promisify } from 'node:util'
// @ts-expect-error no types
import fwa from 'fast-write-atomic'
import { OpenFailedError, type AwaitIterable, PutFailedError, NotFoundError, DeleteFailedError } from 'interface-store'
import glob from 'it-glob'
import map from 'it-map'
import parallelBatch from 'it-parallel-batch'
import { Writer } from 'steno'
import { NextToLast } from './sharding.js'
import type { ShardingStrategy } from './sharding.js'
import type { Blockstore, Pair } from 'interface-blockstore'
import type { CID } from 'multiformats/cid'

const writeAtomic = promisify(fwa)

/**
* Write a file atomically
*/
async function writeFile (file: string, contents: Uint8Array): Promise<void> {
try {
await writeAtomic(file, contents)
const writer = new Writer(file)
await writer.write(contents)
} catch (err: any) {
if (err.code === 'EPERM' && err.syscall === 'rename') {
// fast-write-atomic writes a file to a temp location before renaming it.
// On Windows, if the final file already exists this error is thrown.
// No such error is thrown on Linux/Mac
if (err.syscall === 'rename' && ['ENOENT', 'EPERM'].includes(err.code)) {
// steno writes a file to a temp location before renaming it.
// If the final file already exists this error is thrown.
// Make sure we can read & write to this file
await fs.access(file, fs.constants.F_OK | fs.constants.W_OK)

// The file was created by another context - this means there were
// attempts to write the same block by two different function calls
return
}

throw err
}
}
Expand Down
28 changes: 28 additions & 0 deletions packages/blockstore-fs/test/fixtures/writer-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { CID } from 'multiformats/cid'
import { expose } from 'threads/worker'
import { FsBlockstore } from '../../src/index.js'

let fs: FsBlockstore
expose({
async isReady (path) {
fs = new FsBlockstore(path)
try {
await fs.open()
return true
} catch (err) {
// eslint-disable-next-line no-console
console.error('Error opening blockstore', err)
throw err
}
},
async put (cidString, value) {
const key = CID.parse(cidString)
try {
return await fs.put(key, value)
} catch (err) {
// eslint-disable-next-line no-console
console.error('Error putting block', err)
throw err
}
}
})
32 changes: 32 additions & 0 deletions packages/blockstore-fs/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { expect } from 'aegir/chai'
import { interfaceBlockstoreTests } from 'interface-blockstore-tests'
import { base32 } from 'multiformats/bases/base32'
import { CID } from 'multiformats/cid'
import { spawn, Thread, Worker } from 'threads'
import { FsBlockstore } from '../src/index.js'
import { FlatDirectory, NextToLast } from '../src/sharding.js'

Expand Down Expand Up @@ -157,4 +158,35 @@ describe('FsBlockstore', () => {

expect(res).to.deep.equal(value)
})

/**
* This test spawns 10 workers that concurrently write to the same blockstore.
* it's different from the previous test because it uses workers to write to the blockstore
* which means that the writes are happening in parallel in different threads.
*/
it('can survive concurrent worker writes', async () => {
const dir = path.join(os.tmpdir(), `test-${Math.random()}`)
const key = CID.parse('QmeimKZyjcBnuXmAD9zMnSjM9JodTbgGT3gutofkTqz9rE')
const workers = await Promise.all(new Array(10).fill(0).map(async () => {
const worker = await spawn(new Worker('./fixtures/writer-worker.js'))
await worker.isReady(dir)
return worker
}))

try {
const value = utf8Encoder.encode('Hello world')
// 100 iterations of looping over all workers and putting the same key value pair
await Promise.all(new Array(100).fill(0).map(async () => {
return Promise.all(workers.map(async (worker) => worker.put(key.toString(), value)))
}))

const fs = new FsBlockstore(dir)
await fs.open()
const res = await fs.get(key)

expect(res).to.deep.equal(value)
} finally {
await Promise.all(workers.map(async (worker) => Thread.terminate(worker)))
}
})
})
7 changes: 4 additions & 3 deletions packages/datastore-fs/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,17 @@
},
"dependencies": {
"datastore-core": "^10.0.0",
"fast-write-atomic": "^0.2.1",
"interface-datastore": "^8.0.0",
"interface-store": "^6.0.0",
"it-glob": "^3.0.1",
"it-map": "^3.1.1",
"it-parallel-batch": "^3.0.6"
"it-parallel-batch": "^3.0.6",
"steno": "^4.0.2"
},
"devDependencies": {
"aegir": "^44.1.1",
"interface-datastore-tests": "^6.0.0",
"ipfs-utils": "^9.0.14"
"ipfs-utils": "^9.0.14",
"threads": "^1.7.0"
}
}
16 changes: 6 additions & 10 deletions packages/datastore-fs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,27 @@

import fs from 'node:fs/promises'
import path from 'node:path'
import { promisify } from 'util'
import { BaseDatastore } from 'datastore-core'
// @ts-expect-error no types
import fwa from 'fast-write-atomic'
import { Key } from 'interface-datastore'
import { OpenFailedError, NotFoundError, PutFailedError, DeleteFailedError } from 'interface-store'
import glob from 'it-glob'
import map from 'it-map'
import parallel from 'it-parallel-batch'
import { Writer } from 'steno'
import type { KeyQuery, Pair, Query } from 'interface-datastore'
import type { AwaitIterable } from 'interface-store'

const writeAtomic = promisify(fwa)

/**
* Write a file atomically
*/
async function writeFile (path: string, contents: Uint8Array): Promise<void> {
try {
await writeAtomic(path, contents)
const writer = new Writer(path)
await writer.write(contents)
} catch (err: any) {
if (err.code === 'EPERM' && err.syscall === 'rename') {
// fast-write-atomic writes a file to a temp location before renaming it.
// On Windows, if the final file already exists this error is thrown.
// No such error is thrown on Linux/Mac
if (err.syscall === 'rename' && ['ENOENT', 'EPERM'].includes(err.code)) {
// steno writes a file to a temp location before renaming it.
// If the final file already exists this error is thrown.
// Make sure we can read & write to this file
await fs.access(path, fs.constants.F_OK | fs.constants.W_OK)

Expand Down
28 changes: 28 additions & 0 deletions packages/datastore-fs/test/fixtures/writer-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { Key } from 'interface-datastore'
import { expose } from 'threads/worker'
import { FsDatastore } from '../../src/index.js'

let fs: FsDatastore
expose({
async isReady (path) {
fs = new FsDatastore(path)
try {
await fs.open()
return true
} catch (err) {
// eslint-disable-next-line no-console
console.error('Error opening blockstore', err)
throw err
}
},
async put (keyString, value) {
const key = new Key(keyString)
try {
return await fs.put(key, value)
} catch (err) {
// eslint-disable-next-line no-console
console.error('Error putting block', err)
throw err
}
}
})
32 changes: 32 additions & 0 deletions packages/datastore-fs/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { ShardingDatastore, shard } from 'datastore-core'
import { Key } from 'interface-datastore'
import { interfaceDatastoreTests } from 'interface-datastore-tests'
import tempdir from 'ipfs-utils/src/temp-dir.js'
import { spawn, Thread, Worker } from 'threads'
import { FsDatastore } from '../src/index.js'

const utf8Encoder = new TextEncoder()
Expand Down Expand Up @@ -170,4 +171,35 @@ describe('FsDatastore', () => {

expect(res).to.deep.equal(value)
})

/**
* This test spawns 10 workers that concurrently write to the same blockstore.
* it's different from the previous test because it uses workers to write to the blockstore
* which means that the writes are happening in parallel in different threads.
*/
it('can survive concurrent worker writes', async () => {
const dir = tempdir()
const key = new Key('CIQGFTQ7FSI2COUXWWLOQ45VUM2GUZCGAXLWCTOKKPGTUWPXHBNIVOY')
const workers = await Promise.all(new Array(10).fill(0).map(async () => {
const worker = await spawn(new Worker('./fixtures/writer-worker.js'))
await worker.isReady(dir)
return worker
}))

try {
const value = utf8Encoder.encode('Hello world')
// 100 iterations of looping over all workers and putting the same key value pair
await Promise.all(new Array(100).fill(0).map(async () => {
return Promise.all(workers.map(async (worker) => worker.put(key.toString(), value)))
}))

const fs = new FsDatastore(dir)
await fs.open()
const res = await fs.get(key)

expect(res).to.deep.equal(value)
} finally {
await Promise.all(workers.map(async (worker) => Thread.terminate(worker)))
}
})
})

0 comments on commit 5e3114e

Please sign in to comment.