Skip to content

Commit

Permalink
feat: support byte range for raw block requests (#101)
Browse files Browse the repository at this point in the history
Pulls in latest gateway-lib and dagula to allow byte range requests to
raw blocks (`?format=raw`).

supersedes #100
  • Loading branch information
Alan Shaw authored May 15, 2024
1 parent 1736f61 commit 1ff3bad
Show file tree
Hide file tree
Showing 12 changed files with 468 additions and 131 deletions.
102 changes: 90 additions & 12 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
"@ipld/dag-json": "^10.1.7",
"@ipld/dag-pb": "^4.0.8",
"@web3-storage/content-claims": "^4.0.5",
"@web3-storage/gateway-lib": "^4.1.1",
"@web3-storage/gateway-lib": "^5.0.0",
"cardex": "^3.0.0",
"dagula": "^7.3.0",
"http-range-parse": "^1.0.0",
Expand All @@ -54,6 +54,7 @@
"@cloudflare/workers-types": "^4.20231218.0",
"@ucanto/principal": "^8.1.0",
"ava": "^5.3.1",
"byteranges": "^1.1.0",
"carbites": "^1.0.6",
"carstream": "^2.1.0",
"dotenv": "^16.3.1",
Expand Down
57 changes: 38 additions & 19 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
withCorsHeaders,
withContentDispositionHeader,
withErrorHandler,
withHttpGet,
createWithHttpMethod,
withCdnCache,
withParsedIpfsUrl,
withFixedLengthStream,
Expand All @@ -19,13 +19,15 @@ import {
withContentClaimsDagula,
withHttpRangeUnsupported,
withVersionHeader,
withCarHandler
withCarBlockHandler
} from './middleware.js'

/**
* @typedef {import('./bindings.js').Environment} Environment
* @typedef {import('@web3-storage/gateway-lib').IpfsUrlContext} IpfsUrlContext
* @typedef {import('@web3-storage/gateway-lib').DagulaContext} DagulaContext
* @typedef {import('@web3-storage/gateway-lib').BlockContext} BlockContext
* @typedef {import('@web3-storage/gateway-lib').DagContext} DagContext
* @typedef {import('@web3-storage/gateway-lib').UnixfsContext} UnixfsContext
*/

export default {
Expand All @@ -37,30 +39,47 @@ export default {
withContext,
withCorsHeaders,
withVersionHeader,
withContentDispositionHeader,
withErrorHandler,
withParsedIpfsUrl,
withCarHandler,
withHttpRangeUnsupported,
withHttpGet,
createWithHttpMethod('GET', 'HEAD'),
withCarBlockHandler,
withContentClaimsDagula,
withFormatRawHandler,
withHttpRangeUnsupported,
withFormatCarHandler,
withContentDispositionHeader,
withFixedLengthStream
)
return middleware(handler)(request, env, ctx)
return middleware(handleUnixfs)(request, env, ctx)
}
}

/** @type {import('@web3-storage/gateway-lib').Handler<DagulaContext & IpfsUrlContext, Environment>} */
async function handler (request, env, ctx) {
const { headers } = request
const { searchParams } = ctx
if (!searchParams) throw new Error('missing URL search params')

if (searchParams.get('format') === 'raw' || headers.get('Accept')?.includes('application/vnd.ipld.raw')) {
return await handleBlock(request, env, ctx)
/**
* @type {import('@web3-storage/gateway-lib').Middleware<BlockContext & UnixfsContext & IpfsUrlContext, BlockContext & UnixfsContext & IpfsUrlContext, Environment>}
*/
export function withFormatRawHandler (handler) {
return async (request, env, ctx) => {
const { headers } = request
const { searchParams } = ctx
if (!searchParams) throw new Error('missing URL search params')
if (searchParams.get('format') === 'raw' || headers.get('Accept')?.includes('application/vnd.ipld.raw')) {
return await handleBlock(request, env, ctx)
}
return handler(request, env, ctx) // pass to other handlers
}
if (searchParams.get('format') === 'car' || headers.get('Accept')?.includes('application/vnd.ipld.car')) {
return await handleCar(request, env, ctx)
}

/**
* @type {import('@web3-storage/gateway-lib').Middleware<DagContext & IpfsUrlContext, DagContext & IpfsUrlContext, Environment>}
*/
export function withFormatCarHandler (handler) {
return async (request, env, ctx) => {
const { headers } = request
const { searchParams } = ctx
if (!searchParams) throw new Error('missing URL search params')
if (searchParams.get('format') === 'car' || headers.get('Accept')?.includes('application/vnd.ipld.car')) {
return await handleCar(request, env, ctx)
}
return handler(request, env, ctx) // pass to other handlers
}
return await handleUnixfs(request, env, ctx)
}
45 changes: 45 additions & 0 deletions src/lib/blockstore.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import { readBlockHead, asyncIterableReader } from '@ipld/car/decoder'
import { base58btc } from 'multiformats/bases/base58'
import defer from 'p-defer'
import { OrderedCarBlockBatcher } from './block-batch.js'
import * as IndexEntry from './dag-index/entry.js'

/**
* @typedef {import('multiformats').UnknownLink} UnknownLink
* @typedef {import('dagula').Blockstore} Blockstore
* @typedef {import('dagula').Block} Block
* @typedef {import('@cloudflare/workers-types').R2Bucket} R2Bucket
*/
Expand All @@ -16,6 +18,8 @@ const MAX_ENCODED_BLOCK_LENGTH = (1024 * 1024 * 2) + 39 + 61
* A blockstore that is backed by an R2 bucket which contains CARv2
* MultihashIndexSorted indexes alongside CAR files. It can read DAGs split
* across multiple CARs.
*
* @implements {Blockstore}
*/
export class R2Blockstore {
/**
Expand All @@ -32,6 +36,15 @@ export class R2Blockstore {
// console.log(`get ${cid}`)
const entry = await this._idx.get(cid)
if (!entry) return

if (IndexEntry.isLocated(entry)) {
const keyAndOptions = IndexEntry.toBucketGet(entry)
if (!keyAndOptions) return

const res = await this._dataBucket.get(keyAndOptions[0], keyAndOptions[1])
return res ? { cid, bytes: new Uint8Array(await res.arrayBuffer()) } : undefined
}

const carPath = `${entry.origin}/${entry.origin}.car`
const range = { offset: entry.offset }
const res = await this._dataBucket.get(carPath, { range })
Expand All @@ -51,6 +64,32 @@ export class R2Blockstore {
reader.cancel()
return { cid, bytes }
}

/** @param {UnknownLink} cid */
async stat (cid) {
const entry = await this._idx.get(cid)
if (!entry) return

// stat API exists only for blobs (i.e. location claimed)
if (IndexEntry.isLocated(entry)) {
return { size: entry.site.range.length }
}
}

/**
* @param {UnknownLink} cid
* @param {import('dagula').AbortOptions & import('dagula').RangeOptions} [options]
*/
async stream (cid, options) {
const entry = await this._idx.get(cid)
if (!entry) return

const keyAndOptions = IndexEntry.toBucketGet(entry, options)
if (!keyAndOptions) return

const res = await this._dataBucket.get(keyAndOptions[0], keyAndOptions[1])
return /** @type {ReadableStream<Uint8Array>|undefined} */ (res?.body)
}
}

export class BatchingR2Blockstore extends R2Blockstore {
Expand Down Expand Up @@ -199,6 +238,12 @@ export class BatchingR2Blockstore extends R2Blockstore {
const entry = await this._idx.get(cid)
if (!entry) return

// TODO: batch with multipart gyte range request when we switch to reading
// from any URL.
if (IndexEntry.isLocated(entry)) {
return super.get(cid)
}

this.#batcher.add({ carCid: entry.origin, blockCid: cid, offset: entry.offset })

if (!entry.multihash) throw new Error('missing entry multihash')
Expand Down
Loading

0 comments on commit 1ff3bad

Please sign in to comment.