Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add sharding to s3 blockstore #202

Merged
merged 1 commit into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 12 additions & 37 deletions packages/blockstore-s3/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,43 +13,29 @@ import {
DeleteObjectCommand,
ListObjectsV2Command
} from '@aws-sdk/client-s3'
import { CID } from 'multiformats/cid'
import { base32upper } from 'multiformats/bases/base32'
import type { MultibaseCodec } from 'multiformats/bases/interface'
import type { CID } from 'multiformats/cid'
import { NextToLast, ShardingStrategy } from './sharding.js'

export interface S3DatastoreInit {
/**
* An optional path to use within the bucket for all files - this setting can
* affect S3 performance as it does internal sharding based on 'prefixes' -
* these can be delimited by '/' so it's often better to wrap this datastore in
* a sharding datastore which will generate prefixed datastore keys for you.
*
* See - https://docs.aws.amazon.com/AmazonS3/latest/userguide/optimizing-performance.html
* and https://docs.aws.amazon.com/AmazonS3/latest/userguide/using-prefixes.html
*/
path?: string

/**
* Whether to try to create the bucket if it is missing when `.open` is called
*/
createIfMissing?: boolean

/**
* The multibase codec to use - nb. should be case insensitive.
* default: base32upper
* Control how CIDs map to paths and back
*/
base?: MultibaseCodec<string>
shardingStrategy?: ShardingStrategy
}

/**
* A blockstore backed by AWS S3
*/
export class S3Blockstore extends BaseBlockstore {
public path?: string
public createIfMissing: boolean
private readonly s3: S3
private readonly bucket: string
private readonly base: MultibaseCodec<string>
private readonly shardingStrategy: ShardingStrategy

constructor (s3: S3, bucket: string, init?: S3DatastoreInit) {
super()
Expand All @@ -62,21 +48,10 @@ export class S3Blockstore extends BaseBlockstore {
throw new Error('An bucket must be supplied. See the datastore-s3 README for examples.')
}

this.path = init?.path
this.s3 = s3
this.bucket = bucket
this.createIfMissing = init?.createIfMissing ?? false
this.base = init?.base ?? base32upper
}

/**
* Returns the full key which includes the path to the ipfs store
*/
_getFullKey (cid: CID): string {
// Avoid absolute paths with s3
const str = this.base.encoder.encode(cid.multihash.bytes)

return [this.path, str].filter(Boolean).join('/').replace(/\/\/+/g, '/')
this.shardingStrategy = init?.shardingStrategy ?? new NextToLast()
}

/**
Expand All @@ -88,7 +63,7 @@ export class S3Blockstore extends BaseBlockstore {
await this.s3.send(
new PutObjectCommand({
Bucket: this.bucket,
Key: this._getFullKey(key),
Key: this.shardingStrategy.encode(key),
Body: val
}), {
abortSignal: options?.signal
Expand All @@ -110,7 +85,7 @@ export class S3Blockstore extends BaseBlockstore {
const data = await this.s3.send(
new GetObjectCommand({
Bucket: this.bucket,
Key: this._getFullKey(key)
Key: this.shardingStrategy.encode(key)
}), {
abortSignal: options?.signal
}
Expand Down Expand Up @@ -154,7 +129,7 @@ export class S3Blockstore extends BaseBlockstore {
await this.s3.send(
new HeadObjectCommand({
Bucket: this.bucket,
Key: this._getFullKey(key)
Key: this.shardingStrategy.encode(key)
}), {
abortSignal: options?.signal
}
Expand Down Expand Up @@ -185,7 +160,7 @@ export class S3Blockstore extends BaseBlockstore {
await this.s3.send(
new DeleteObjectCommand({
Bucket: this.bucket,
Key: this._getFullKey(key)
Key: this.shardingStrategy.encode(key)
}), {
abortSignal: options?.signal
}
Expand Down Expand Up @@ -224,7 +199,7 @@ export class S3Blockstore extends BaseBlockstore {
}

// Remove the path from the key
const cid = CID.decode(this.base.decoder.decode(d.Key.slice((this.path ?? '').length)))
const cid = this.shardingStrategy.decode(d.Key)

yield {
cid,
Expand Down Expand Up @@ -257,7 +232,7 @@ export class S3Blockstore extends BaseBlockstore {
await this.s3.send(
new HeadObjectCommand({
Bucket: this.bucket,
Key: this.path ?? ''
Key: ''
}), {
abortSignal: options?.signal
}
Expand Down
118 changes: 118 additions & 0 deletions packages/blockstore-s3/src/sharding.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import { CID } from 'multiformats/cid'
import { base32upper } from 'multiformats/bases/base32'
import type { MultibaseCodec } from 'multiformats/bases/interface'

export interface ShardingStrategy {
extension: string
encode: (cid: CID) => string
decode: (path: string) => CID
}

export interface NextToLastInit {
/**
* The file extension to use. default: '.data'
*/
extension?: string

/**
* How many characters to take from the end of the CID. default: 2
*/
prefixLength?: number

/**
* The multibase codec to use - nb. should be case insensitive.
* default: base32upper
*/
base?: MultibaseCodec<string>
}

/**
* A sharding strategy that takes the last few characters of a multibase encoded
* CID and uses them as the directory to store the block in. This prevents
* storing all blocks in a single directory which would overwhelm most
* filesystems.
*/
export class NextToLast implements ShardingStrategy {
public extension: string
private readonly prefixLength: number
private readonly base: MultibaseCodec<string>

constructor (init: NextToLastInit = {}) {
this.extension = init.extension ?? '.data'
this.prefixLength = init.prefixLength ?? 2
this.base = init.base ?? base32upper
}

encode (cid: CID): string {
const str = this.base.encoder.encode(cid.multihash.bytes)
const prefix = str.substring(str.length - this.prefixLength)

return `${prefix}/${str}${this.extension}`
}

decode (str: string): CID {
let fileName = str.split('/').pop()

if (fileName == null) {
throw new Error('Invalid path')
}

if (fileName.endsWith(this.extension)) {
fileName = fileName.substring(0, fileName.length - this.extension.length)
}

return CID.decode(this.base.decoder.decode(fileName))
}
}

export interface FlatDirectoryInit {
/**
* The file extension to use. default: '.data'
*/
extension?: string

/**
* How many characters to take from the end of the CID. default: 2
*/
prefixLength?: number

/**
* The multibase codec to use - nb. should be case insensitive.
* default: base32padupper
*/
base?: MultibaseCodec<string>
}

/**
* A sharding strategy that does not do any sharding and stores all files
* in one directory. Only for testing, do not use in production.
*/
export class FlatDirectory implements ShardingStrategy {
public extension: string
private readonly base: MultibaseCodec<string>

constructor (init: NextToLastInit = {}) {
this.extension = init.extension ?? '.data'
this.base = init.base ?? base32upper
}

encode (cid: CID): string {
const str = this.base.encoder.encode(cid.multihash.bytes)

return `${str}${this.extension}`
}

decode (str: string): CID {
let fileName = str.split('/').pop()

if (fileName == null) {
throw new Error('Invalid path')
}

if (fileName.endsWith(this.extension)) {
fileName = fileName.substring(0, fileName.length - this.extension.length)
}

return CID.decode(this.base.decoder.decode(fileName))
}
}
47 changes: 1 addition & 46 deletions packages/blockstore-s3/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import { expect } from 'aegir/chai'
import sinon from 'sinon'
import { CreateBucketCommand, PutObjectCommand, HeadObjectCommand, S3, GetObjectCommand } from '@aws-sdk/client-s3'
import { CreateBucketCommand, HeadObjectCommand, S3 } from '@aws-sdk/client-s3'
import defer from 'p-defer'
import { interfaceBlockstoreTests } from 'interface-blockstore-tests'
import { CID } from 'multiformats/cid'
Expand Down Expand Up @@ -43,25 +43,6 @@ describe('S3Blockstore', () => {
})

describe('put', () => {
it('should include the path in the key', async () => {
const s3 = new S3({ region: 'REGION' })
const store = new S3Blockstore(s3, 'test', {
path: '.ipfs/datastore'
})

const deferred = defer<PutObjectCommand>()

sinon.replace(s3, 'send', (command: PutObjectCommand) => {
deferred.resolve(command)
return s3Resolve(null)
})

await store.put(cid, new TextEncoder().encode('test data'))

const command = await deferred.promise
expect(command).to.have.nested.property('input.Key', '.ipfs/datastore/BCIQPGZJ6QLZOFG3OP45NLMSJUWGJCO72QQKHLDTB6FXIB6BDSLRQYLY')
})

it('should return a standard error when the put fails', async () => {
const s3 = new S3({ region: 'REGION' })
const store = new S3Blockstore(s3, 'test')
Expand All @@ -80,32 +61,6 @@ describe('S3Blockstore', () => {
})

describe('get', () => {
it('should include the path in the fetch key', async () => {
const s3 = new S3({ region: 'REGION' })
const store = new S3Blockstore(s3, 'test', {
path: '.ipfs/datastore'
})
const buf = new TextEncoder().encode('test')

const deferred = defer<GetObjectCommand>()

sinon.replace(s3, 'send', (command: any) => {
if (command.constructor.name === 'GetObjectCommand') {
deferred.resolve(command)
return s3Resolve({ Body: buf })
}

return s3Reject(new S3Error('UnknownCommand'))
})

const value = await store.get(cid)

expect(value).to.equalBytes(buf)

const getObjectCommand = await deferred.promise
expect(getObjectCommand).to.have.nested.property('input.Key', '.ipfs/datastore/BCIQPGZJ6QLZOFG3OP45NLMSJUWGJCO72QQKHLDTB6FXIB6BDSLRQYLY')
})

it('should return a standard not found error code if the key isn\'t found', async () => {
const s3 = new S3({ region: 'REGION' })
const store = new S3Blockstore(s3, 'test')
Expand Down