Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add config option to control fanout size #356

Merged
merged 2 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ async function * listDirectory (node: PBNode, path: string, resolve: Resolve, de
throw errCode(err, 'ERR_NOT_UNIXFS')
}

if (!dir.fanout) {
if (dir.fanout == null) {
throw errCode(new Error('missing fanout'), 'ERR_NOT_UNIXFS')
}

Expand Down
38 changes: 37 additions & 1 deletion packages/ipfs-unixfs-exporter/test/exporter-sharded.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import * as dagPb from '@ipld/dag-pb'
import { expect } from 'aegir/chai'
import { MemoryBlockstore } from 'blockstore-core'
import { UnixFS } from 'ipfs-unixfs'
import { importer } from 'ipfs-unixfs-importer'
import { importer, type ImportCandidate } from 'ipfs-unixfs-importer'
import all from 'it-all'
import randomBytes from 'it-buffer-stream'
import last from 'it-last'
Expand Down Expand Up @@ -255,4 +255,40 @@ describe('exporter sharded', function () {

expect(exported.name).to.deep.equal('file-1')
})

it('exports a shard with a different fanout size', async () => {
const files: ImportCandidate[] = [{
path: '/baz.txt',
content: Uint8Array.from([0, 1, 2, 3, 4])
}, {
path: '/foo.txt',
content: Uint8Array.from([0, 1, 2, 3, 4])
}, {
path: '/bar.txt',
content: Uint8Array.from([0, 1, 2, 3, 4])
}]

const result = await last(importer(files, block, {
shardSplitThresholdBytes: 0,
shardFanoutBits: 4, // 2**4 = 16 children max
wrapWithDirectory: true
}))

if (result == null) {
throw new Error('Import failed')
}

const { cid } = result
const dir = await exporter(cid, block)

expect(dir).to.have.nested.property('unixfs.fanout', 16n)

const contents = await all(dir.content())

expect(contents.map(entry => ({
path: `/${entry.name}`,
content: entry.node
})))
.to.deep.equal(files)
})
})
15 changes: 11 additions & 4 deletions packages/ipfs-unixfs-importer/src/dir-sharded.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,21 @@ async function hamtHashFn (buf: Uint8Array): Promise<Uint8Array> {
}

const HAMT_HASH_CODE = BigInt(0x22)
const DEFAULT_FANOUT_BITS = 8

export interface DirShardedOptions extends PersistOptions {
shardFanoutBits: number
}

class DirSharded extends Dir {
private readonly _bucket: Bucket<InProgressImportResult | Dir>

constructor (props: DirProps, options: PersistOptions) {
constructor (props: DirProps, options: DirShardedOptions) {
super(props, options)

this._bucket = createHAMT({
hashFn: hamtHashFn,
bits: 8
bits: options.shardFanoutBits ?? DEFAULT_FANOUT_BITS
})
}

Expand Down Expand Up @@ -88,6 +93,7 @@ export default DirSharded

async function * flush (bucket: Bucket<Dir | InProgressImportResult>, blockstore: Blockstore, shardRoot: DirSharded | null, options: PersistOptions): AsyncIterable<ImportResult> {
const children = bucket._children
const padLength = (bucket.tableSize() - 1).toString(16).length
const links: PBLink[] = []
let childrenSize = 0n

Expand All @@ -98,7 +104,7 @@ async function * flush (bucket: Bucket<Dir | InProgressImportResult>, blockstore
continue
}

const labelPrefix = i.toString(16).toUpperCase().padStart(2, '0')
const labelPrefix = i.toString(16).toUpperCase().padStart(padLength, '0')

if (child instanceof Bucket) {
let shard
Expand Down Expand Up @@ -191,6 +197,7 @@ function isDir (obj: any): obj is Dir {

function calculateSize (bucket: Bucket<any>, shardRoot: DirSharded | null, options: PersistOptions): number {
const children = bucket._children
const padLength = (bucket.tableSize() - 1).toString(16).length
const links: PBLink[] = []

for (let i = 0; i < children.length; i++) {
Expand All @@ -200,7 +207,7 @@ function calculateSize (bucket: Bucket<any>, shardRoot: DirSharded | null, optio
continue
}

const labelPrefix = i.toString(16).toUpperCase().padStart(2, '0')
const labelPrefix = i.toString(16).toUpperCase().padStart(padLength, '0')

if (child instanceof Bucket) {
const size = calculateSize(child, null, options)
Expand Down
7 changes: 3 additions & 4 deletions packages/ipfs-unixfs-importer/src/flat-to-shard.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { DirFlat } from './dir-flat.js'
import DirSharded from './dir-sharded.js'
import DirSharded, { type DirShardedOptions } from './dir-sharded.js'
import type { Dir } from './dir.js'
import type { PersistOptions } from './utils/persist.js'

export async function flatToShard (child: Dir | null, dir: Dir, threshold: number, options: PersistOptions): Promise<DirSharded> {
export async function flatToShard (child: Dir | null, dir: Dir, threshold: number, options: DirShardedOptions): Promise<DirSharded> {
let newDir = dir as DirSharded

if (dir instanceof DirFlat && dir.estimateNodeSize() > threshold) {
Expand Down Expand Up @@ -31,7 +30,7 @@ export async function flatToShard (child: Dir | null, dir: Dir, threshold: numbe
return newDir
}

async function convertToShard (oldDir: DirFlat, options: PersistOptions): Promise<DirSharded> {
async function convertToShard (oldDir: DirFlat, options: DirShardedOptions): Promise<DirSharded> {
const newDir = new DirSharded({
root: oldDir.root,
dir: true,
Expand Down
9 changes: 9 additions & 0 deletions packages/ipfs-unixfs-importer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ export interface ImporterOptions extends ProgressOptions<ImporterProgressEvents>
*/
shardSplitThresholdBytes?: number

/**
* The number of bits of a hash digest used at each level of sharding to
* the child index. 2**shardFanoutBits will dictate the maximum number of
* children for any shard in the HAMT. Default: 8
*/
shardFanoutBits?: number

/**
* How many files to import concurrently. For large numbers of small files this
* should be high (e.g. 50). Default: 10
Expand Down Expand Up @@ -241,6 +248,7 @@ export async function * importer (source: ImportCandidateStream, blockstore: Wri

const wrapWithDirectory = options.wrapWithDirectory ?? false
const shardSplitThresholdBytes = options.shardSplitThresholdBytes ?? 262144
const shardFanoutBits = options.shardFanoutBits ?? 8
const cidVersion = options.cidVersion ?? 1
const rawLeaves = options.rawLeaves ?? true
const leafType = options.leafType ?? 'file'
Expand Down Expand Up @@ -269,6 +277,7 @@ export async function * importer (source: ImportCandidateStream, blockstore: Wri
const buildTree: TreeBuilder = options.treeBuilder ?? defaultTreeBuilder({
wrapWithDirectory,
shardSplitThresholdBytes,
shardFanoutBits,
cidVersion,
onProgress: options.onProgress
})
Expand Down
1 change: 1 addition & 0 deletions packages/ipfs-unixfs-importer/src/tree-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type { PersistOptions } from './utils/persist.js'

export interface AddToTreeOptions extends PersistOptions {
shardSplitThresholdBytes: number
shardFanoutBits: number
}

async function addToTree (elem: InProgressImportResult, tree: Dir, options: AddToTreeOptions): Promise<Dir> {
Expand Down