Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add customer origin url fallback #37

Merged
merged 14 commits into from
Nov 3, 2023
167 changes: 87 additions & 80 deletions src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,28 @@ import { isErrorUnavoidable } from './utils/errors.js'
const MAX_NODE_WEIGHT = 100
/**
* @typedef {import('./types.js').Node} Node
* @typedef {import('./types.js').FetchOptions} FetchOptions
*/

export class Saturn {
static nodesListKey = 'saturn-nodes'
static defaultRaceCount = 3
/**
*
* @param {object} [opts={}]
* @param {string} [opts.clientKey]
* @param {string} [opts.clientId=randomUUID()]
* @param {string} [opts.cdnURL=saturn.ms]
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
* @param {string} [opts.orchURL]
* @param {number} [opts.fallbackLimit]
* @param {boolean} [opts.experimental]
* @param {import('./storage/index.js').Storage} [opts.storage]
* @param {object} [config={}]
* @param {string} [config.clientKey]
* @param {string} [config.clientId=randomUUID()]
* @param {string} [config.cdnURL=saturn.ms]
* @param {number} [config.connectTimeout=5000]
* @param {number} [config.downloadTimeout=0]
* @param {string} [config.orchURL]
* @param {string} [config.customerFallbackURL]
* @param {number} [config.fallbackLimit]
* @param {boolean} [config.experimental]
* @param {import('./storage/index.js').Storage} [config.storage]
*/
constructor (opts = {}) {
this.opts = Object.assign({}, {
constructor (config = {}) {
this.config = Object.assign({}, {
clientId: randomUUID(),
cdnURL: 'l1s.saturn.ms',
logURL: 'https://twb3qukm2i654i3tnvx36char40aymqq.lambda-url.us-west-2.on.aws/',
Expand All @@ -42,9 +44,9 @@ export class Saturn {
fallbackLimit: 5,
connectTimeout: 5_000,
downloadTimeout: 0
}, opts)
}, config)

if (!this.opts.clientKey) {
if (!this.config.clientKey) {
throw new Error('clientKey is required')
}

Expand All @@ -55,28 +57,24 @@ export class Saturn {
if (this.reportingLogs && this.hasPerformanceAPI) {
this._monitorPerformanceBuffer()
}
this.storage = this.opts.storage || memoryStorage()
this.loadNodesPromise = this.opts.experimental ? this._loadNodes(this.opts) : null
this.storage = this.config.storage || memoryStorage()
this.loadNodesPromise = this.config.experimental ? this._loadNodes(this.config) : null
}

/**
*
* @param {string} cidPath
* @param {object} [opts={}]
* @param {Node[]} [opts.nodes]
* @param {Node} [opts.node]
* @param {('car'|'raw')} [opts.format]
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
* @param {FetchOptions} [opts={}]
* @returns {Promise<object>}
*/
async fetchCIDWithRace (cidPath, opts = {}) {
const [cid] = (cidPath ?? '').split('/')
CID.parse(cid)

const jwt = await getJWT(this.opts, this.storage)

const options = Object.assign({}, this.opts, { format: 'car', jwt }, opts)
const options = Object.assign({}, this.config, { format: 'car' }, opts)
if (!opts.originFallback) {
const [cid] = (cidPath ?? '').split('/')
CID.parse(cid)
const jwt = await getJWT(options, this.storage)
options.jwt = jwt
}

if (!isBrowserContext) {
options.headers = {
Expand All @@ -87,7 +85,7 @@ export class Saturn {

let nodes = options.nodes
if (!nodes || nodes.length === 0) {
const replacementNode = options.node ?? { url: this.opts.cdnURL }
const replacementNode = { url: options.cdnURL }
nodes = [replacementNode]
}
const controllers = []
Expand Down Expand Up @@ -157,22 +155,20 @@ export class Saturn {
/**
*
* @param {string} cidPath
* @param {object} [opts={}]
* @param {('car'|'raw')} [opts.format]
* @param {Node} [opts.node]
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
* @param {FetchOptions} [opts={}]
* @returns {Promise<object>}
*/
async fetchCID (cidPath, opts = {}) {
const [cid] = (cidPath ?? '').split('/')
CID.parse(cid)

const jwt = await getJWT(this.opts, this.storage)
const options = Object.assign({}, this.config, { format: 'car' }, opts)
if (!opts.originFallback) {
const [cid] = (cidPath ?? '').split('/')
CID.parse(cid)
const jwt = await getJWT(this.config, this.storage)
options.jwt = jwt
}

const options = Object.assign({}, this.opts, { format: 'car', jwt }, opts)
const node = options.node
const origin = node?.url ?? this.opts.cdnURL
const node = options.nodes && options.nodes[0]
const origin = node?.url ?? this.config.cdnURL
const url = this.createRequestURL(cidPath, { ...options, url: origin })

let log = {
Expand Down Expand Up @@ -242,20 +238,15 @@ export class Saturn {
/**
*
* @param {string} cidPath
* @param {object} [opts={}]
* @param {('car'|'raw')} [opts.format]
* @param {boolean} [opts.raceNodes]
* @param {string} [opts.url]
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
* @param {AbortController} [opts.controller]
* @param {FetchOptions} [opts={}]
* @returns {Promise<AsyncIterable<Uint8Array>>}
*/
async * fetchContentWithFallback (cidPath, opts = {}) {
const upstreamController = opts.controller;
delete opts.controller;
const upstreamController = opts.controller
delete opts.controller

let lastError = null
let skipNodes = false
// we use this to checkpoint at which chunk a request failed.
// this is temporary until range requests are supported.
let byteCountCheckpoint = 0
Expand All @@ -264,16 +255,17 @@ export class Saturn {
throw new Error(`All attempts to fetch content have failed. Last error: ${lastError.message}`)
}

const fetchContent = async function * () {
const controller = new AbortController();
opts.controller = controller;
const fetchContent = async function * (options) {
const controller = new AbortController()
opts.controller = controller
if (upstreamController) {
upstreamController.signal.addEventListener('abort', () => {
controller.abort();
});
controller.abort()
})
}
let byteCount = 0
const byteChunks = await this.fetchContent(cidPath, opts)
const fetchOptions = Object.assign(opts, { format: 'car' }, options)
const byteChunks = await this.fetchContent(cidPath, fetchOptions)
for await (const chunk of byteChunks) {
// avoid sending duplicate chunks
if (byteCount < byteCountCheckpoint) {
Expand All @@ -291,33 +283,34 @@ export class Saturn {
}
}.bind(this)

// Use CDN origin if node list is not loaded
if (this.nodes.length === 0) {
// fetch from origin in the case that no nodes are loaded
opts.url = this.opts.cdnURL
opts.nodes = Array({ url: this.config.cdnURL })
try {
yield * fetchContent()
return
} catch (err) {
lastError = err
if (err.res?.status === 410 || isErrorUnavoidable(err)) {
throwError()
skipNodes = true
} else {
await this.loadNodesPromise
}
await this.loadNodesPromise
}
}

let fallbackCount = 0
const nodes = this.nodes
for (let i = 0; i < nodes.length; i++) {
if (fallbackCount > this.opts.fallbackLimit || upstreamController?.signal.aborted) {
return
if (fallbackCount > this.config.fallbackLimit || skipNodes || upstreamController?.signal.aborted) {
break
}
if (opts.raceNodes) {
opts.nodes = nodes.slice(i, i + Saturn.defaultRaceCount)
} else {
opts.node = nodes[i]
opts.nodes = Array(nodes[i])
}

try {
yield * fetchContent()
return
Expand All @@ -331,18 +324,25 @@ export class Saturn {
}

if (lastError) {
const originUrl = opts.customerFallbackURL ?? this.config.customerFallbackURL
// Use customer origin if cid is not retrievable by lassie.
if (originUrl) {
opts.nodes = Array({ url: originUrl })
try {
yield * fetchContent({ format: null, originFallback: true })
return
} catch (err) {
lastError = err
}
}
throwError()
}
}

/**
*
* @param {string} cidPath
* @param {object} [opts={}]
* @param {('car'|'raw')} [opts.format]
* @param {boolean} [opts.raceNodes]
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
* @param {FetchOptions} [opts={}]
* @returns {Promise<AsyncIterable<Uint8Array>>}
*/
async * fetchContent (cidPath, opts = {}) {
Expand All @@ -365,7 +365,11 @@ export class Saturn {

try {
const itr = metricsIterable(asAsyncIterable(res.body))
yield * extractVerifiedContent(cidPath, itr)
if (opts.format === 'car') {
yield * extractVerifiedContent(cidPath, itr)
} else {
yield * itr
}
} catch (err) {
log.error = err.message
controller.abort()
Expand All @@ -379,11 +383,7 @@ export class Saturn {
/**
*
* @param {string} cidPath
* @param {object} [opts={}]
* @param {('car'|'raw')} [opts.format]
* @param {boolean} [opts.raceNodes]
* @param {number} [opts.connectTimeout=5000]
* @param {number} [opts.downloadTimeout=0]
* @param {FetchOptions} [opts={}]
* @returns {Promise<Uint8Array>}
*/
async fetchContentBuffer (cidPath, opts = {}) {
Expand All @@ -395,14 +395,21 @@ export class Saturn {
* @param {string} cidPath
* @param {object} [opts={}]
* @param {string} [opts.url]
* @param {string} [opts.format]
* @param {string} [opts.originFallback]
* @param {object} [opts.jwt]
* @returns {URL}
*/
createRequestURL (cidPath, opts) {
let origin = opts.url ?? this.opts.cdnURL
createRequestURL (cidPath, opts = {}) {
let origin = opts.url ?? this.config.cdnURL
origin = addHttpPrefix(origin)
if (opts.originFallback) {
return new URL(origin)
}
const url = new URL(`${origin}/ipfs/${cidPath}`)

url.searchParams.set('format', opts.format)
if (opts.format) url.searchParams.set('format', opts.format)

if (opts.format === 'car') {
url.searchParams.set('dag-scope', 'entity')
}
Expand Down Expand Up @@ -444,10 +451,10 @@ export class Saturn {
: this.logs

await fetch(
this.opts.logURL,
this.config.logURL,
{
method: 'POST',
body: JSON.stringify({ bandwidthLogs, logSender: this.opts.logSender })
body: JSON.stringify({ bandwidthLogs, logSender: this.config.logSender })
}
)

Expand Down Expand Up @@ -569,7 +576,7 @@ export class Saturn {

const url = new URL(origin)
const controller = new AbortController()
const options = Object.assign({}, { method: 'GET' }, this.opts)
const options = Object.assign({}, { method: 'GET' }, this.config)

const connectTimeout = setTimeout(() => {
controller.abort()
Expand Down
14 changes: 14 additions & 0 deletions src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,18 @@
* @property {string} url
*/

/**
* Common options for fetch functions.
*
* @typedef {object} FetchOptions
* @property {Node[]} [nodes] - An array of nodes.
* @property {('car'|'raw')} [format] - The format of the fetched content.
* @property {boolean} [originFallback] - Is this a fallback to the customer origin
* @property {boolean} [raceNodes] - Does the fetch race multiple nodes on requests.
* @property {string} [customerFallbackURL] - Customer Origin that is a fallback.
* @property {number} [connectTimeout=5000] - Connection timeout in milliseconds.
* @property {number} [downloadTimeout=0] - Download timeout in milliseconds.
* @property {AbortController} [controller]
*/

export {}
3 changes: 2 additions & 1 deletion src/utils/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ export function isErrorUnavoidable (error) {
/file does not exist/,
/Cannot read properties of undefined \(reading '([^']+)'\)/,
/([a-zA-Z_.]+) is undefined/,
/undefined is not an object \(evaluating '([^']+)'\)/
/undefined is not an object \(evaluating '([^']+)'\)/,
/all retrievals failed/
]

for (const pattern of errorPatterns) {
Expand Down
Loading
Loading