generated from ipfs/ipfs-repository-template
-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: create @helia/remote-pinner implementation
- Loading branch information
Showing
3 changed files
with
330 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
/** | ||
* when remote pinning service returns delegates, if we can't connect to any, we won't be able to provide our CID's | ||
* content to the service, and must abort. | ||
*/ | ||
export class FailedToConnectToDelegates extends Error {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
import { type RemotePinningServiceClient, type Pin, type PinStatus, type PinsRequestidPostRequest, Status } from '@ipfs-shipyard/pinning-service-client' | ||
import { multiaddr } from '@multiformats/multiaddr' | ||
import debug from 'debug' | ||
import pRetry from 'p-retry' | ||
import { FailedToConnectToDelegates } from './errors.js' | ||
import type { Helia } from '@helia/interface' | ||
import type { CID } from 'multiformats/cid' | ||
|
||
const log = debug('helia-remote-pinning') | ||
const logError = log.extend('error') | ||
const logTrace = log.extend('trace') | ||
|
||
export interface HeliaRemotePinningOptions { | ||
/** | ||
* Control whether requests are aborted or not by manually aborting a signal or using AbortSignal.timeout() | ||
*/ | ||
signal?: AbortSignal | ||
|
||
/** | ||
* The CID instance to pin. When using Helia, passing around the CID object is preferred over the string. | ||
*/ | ||
cid: CID | ||
} | ||
|
||
export interface AddPinArgs extends Omit<Pin, 'cid'>, HeliaRemotePinningOptions {} | ||
|
||
export interface ReplacePinArgs extends Omit<PinsRequestidPostRequest, 'pin'>, Omit<Pin, 'cid'>, HeliaRemotePinningOptions {} | ||
|
||
export class HeliaRemotePinner { | ||
constructor (private readonly heliaInstance: Helia, private readonly remotePinningClient: RemotePinningServiceClient) { | ||
} | ||
|
||
private async getOrigins (otherOrigins: Pin['origins']): Promise<Set<string>> { | ||
const origins = new Set(this.heliaInstance.libp2p.getMultiaddrs().map(multiaddr => multiaddr.toString())) | ||
if (otherOrigins != null) { | ||
for (const origin of otherOrigins) { | ||
origins.add(origin) | ||
} | ||
} | ||
return origins | ||
} | ||
|
||
private async connectToDelegates (delegates: Set<string>, signal?: AbortSignal): Promise<void> { | ||
let successfulDials = 0 | ||
try { | ||
for (const delegate of delegates) { | ||
await this.heliaInstance.libp2p.dial(multiaddr(delegate), { signal }) | ||
successfulDials++ | ||
} | ||
} catch (e) { | ||
logError(e) | ||
} | ||
if (successfulDials === 0) { | ||
throw new FailedToConnectToDelegates('Failed to connect to any delegates') | ||
} | ||
} | ||
|
||
/** | ||
* The code that runs after we get a pinStatus from the remote pinning service. | ||
* This method is the orchestrator for waiting for the pin to complete/fail as well as connecting to the delegates. | ||
*/ | ||
private async handlePinStatus (pinStatus: PinStatus, signal?: AbortSignal): Promise<PinStatus> { | ||
await this.connectToDelegates(pinStatus.delegates, signal) | ||
let updatedPinStatus = pinStatus | ||
|
||
/** | ||
* We need to ensure that pinStatus is either pinned or failed. | ||
* To do so, we will need to poll the remote pinning service for the status of the pin. | ||
*/ | ||
try { | ||
await pRetry(async (attemptNum) => { | ||
logTrace('attempt #%d waiting for pinStatus of "pinned" or "failed"', attemptNum) | ||
updatedPinStatus = await this.remotePinningClient.pinsRequestidGet({ requestid: pinStatus.requestid }) | ||
if ([Status.Pinned, Status.Failed].includes(pinStatus.status)) { | ||
return updatedPinStatus | ||
} | ||
throw new Error(`Pin status is ${pinStatus.status}`) | ||
}, { | ||
retries: 10, | ||
signal | ||
}) | ||
} catch (e) { | ||
logError(e) | ||
} | ||
|
||
return updatedPinStatus | ||
} | ||
|
||
async addPin ({ cid, signal, ...otherArgs }: AddPinArgs): Promise<PinStatus> { | ||
if (signal?.aborted === true) { | ||
throw new Error('Signal was aborted prior to pinning') | ||
} | ||
const pinStatus = await this.remotePinningClient.pinsPost({ | ||
pin: { | ||
...otherArgs, | ||
cid: cid.toString(), | ||
origins: await this.getOrigins(otherArgs.origins) | ||
} | ||
}) | ||
return this.handlePinStatus(pinStatus, signal) | ||
} | ||
|
||
async replacePin ({ cid, requestid, signal, ...otherArgs }: ReplacePinArgs): Promise<PinStatus> { | ||
if (signal?.aborted === true) { | ||
throw new Error('Signal was aborted prior to pinning') | ||
} | ||
const pinStatus = await this.remotePinningClient.pinsRequestidPost({ | ||
requestid, | ||
pin: { | ||
...otherArgs, | ||
cid: cid.toString(), | ||
origins: await this.getOrigins(otherArgs.origins) | ||
} | ||
}) | ||
return this.handlePinStatus(pinStatus, signal) | ||
} | ||
} | ||
|
||
export function createRemotePinner (heliaInstance: Helia, remotePinningClient: RemotePinningServiceClient, options?: HeliaRemotePinningOptions): HeliaRemotePinner { | ||
return new HeliaRemotePinner(heliaInstance, remotePinningClient) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,204 @@ | ||
import { unixfs, type UnixFS } from '@helia/unixfs' | ||
import { Configuration, RemotePinningServiceClient, Status } from '@ipfs-shipyard/pinning-service-client' | ||
import { expect } from 'aegir/chai' | ||
import { createHelia } from 'helia' | ||
import sinon, { type SinonSandbox, type SinonStub } from 'sinon' | ||
import { FailedToConnectToDelegates } from '../src/errors.js' | ||
import { type HeliaRemotePinner, createRemotePinner } from '../src/index.js' | ||
import type { Helia } from '@helia/interface' | ||
|
||
const encoder = new TextEncoder() | ||
|
||
describe('@helia/remote-pinning', function () { | ||
let sinonSandbox: SinonSandbox | ||
let remotePinner: HeliaRemotePinner | ||
let helia: Helia | ||
let remotePinningClient: RemotePinningServiceClient | ||
let heliaFs: UnixFS | ||
let dialStub: SinonStub | ||
|
||
const validatePinResults = async (name: string, count: number): Promise<void> => { | ||
const pinResults = await remotePinningClient.pinsGet({ name }) | ||
expect(pinResults.results).to.have.lengthOf(count) | ||
expect(pinResults.count).to.equal(count) | ||
} | ||
|
||
beforeEach(async function () { | ||
sinonSandbox = sinon.createSandbox() | ||
helia = await createHelia() | ||
heliaFs = unixfs(helia) | ||
dialStub = sinonSandbox.stub(helia.libp2p, 'dial') | ||
const pinServiceConfig = new Configuration({ | ||
endpointUrl: `http://localhost:${process.env.PINNING_SERVER_PORT}`, // the URI for your pinning provider, e.g. `http://localhost:3000` | ||
accessToken: process.env.PINNING_SERVICE_TOKEN // the secret token/key given to you by your pinning provider | ||
}) | ||
|
||
remotePinningClient = new RemotePinningServiceClient(pinServiceConfig) | ||
remotePinner = createRemotePinner(helia, remotePinningClient) | ||
}) | ||
afterEach(async function () { | ||
sinonSandbox.restore() | ||
await helia.stop() | ||
const pins = await remotePinningClient.pinsGet() | ||
await Promise.all([...pins.results].map(pin => remotePinningClient.pinsRequestidDelete({ requestid: pin.requestid }))) | ||
}) | ||
describe('addPin', function () { | ||
it('Returns pinned status when pinning succeeds', async function () { | ||
const cid = await heliaFs.addBytes(encoder.encode('hello world')) | ||
dialStub.returns(Promise.resolve({} as any)) | ||
const addPinResult = await remotePinner.addPin({ | ||
cid, | ||
name: 'pinned-test1' | ||
}) | ||
expect(addPinResult.status).to.equal(Status.Pinned) | ||
}) | ||
|
||
it('Returns failed status when pinning fails', async function () { | ||
const cid = await heliaFs.addBytes(encoder.encode('hello world')) | ||
|
||
dialStub.returns(Promise.resolve({} as any)) | ||
const addPinResult = await remotePinner.addPin({ | ||
cid, | ||
name: 'failed-test1' | ||
}) | ||
expect(addPinResult.status).to.equal(Status.Failed) | ||
}) | ||
|
||
it('will await a queued pin until a signal times out', async function () { | ||
const cid = await heliaFs.addBytes(encoder.encode('hello world')) | ||
const abortController = new AbortController() | ||
const thenSpy = sinonSandbox.spy() | ||
const catchSpy = sinonSandbox.spy() | ||
const finallySpy = sinonSandbox.spy() | ||
dialStub.returns(Promise.resolve({} as any)) | ||
const addPinResult = remotePinner.addPin({ | ||
cid, | ||
name: 'queued-test1', | ||
signal: abortController.signal | ||
}) | ||
addPinResult.then(thenSpy).catch(catchSpy).finally(finallySpy) | ||
expect(thenSpy.called).to.equal(false) | ||
expect(catchSpy.called).to.equal(false) | ||
expect(finallySpy.called).to.equal(false) | ||
|
||
// we need to wait for X time and then confirm the promise hasn't settled | ||
await new Promise(resolve => setTimeout(resolve, 100)) | ||
expect(thenSpy.called).to.equal(false) | ||
expect(catchSpy.called).to.equal(false) | ||
expect(finallySpy.called).to.equal(false) | ||
|
||
// note that mock-pinning-service will indefinitely hang on pins with names that start with "queued-" | ||
abortController.abort() | ||
await expect(addPinResult).to.eventually.have.property('status', Status.Queued) | ||
|
||
expect(thenSpy.called).to.equal(true) | ||
expect(catchSpy.called).to.equal(false) | ||
expect(finallySpy.called).to.equal(true) | ||
expect(abortController.signal.aborted).to.equal(true) | ||
}) | ||
|
||
it('Stops listening when provided signal times out', async function () { | ||
const cid = await heliaFs.addBytes(encoder.encode('hello world')) | ||
dialStub.returns(Promise.resolve({} as any)) | ||
const addPinResult = await remotePinner.addPin({ | ||
cid, | ||
name: 'queued-test2', | ||
signal: AbortSignal.timeout(100) | ||
}) | ||
expect(addPinResult.status).to.equal(Status.Queued) | ||
}) | ||
|
||
it('Will not pin if provided an already aborted signal', async function () { | ||
const cid = await heliaFs.addBytes(encoder.encode('hello world')) | ||
const abortController = new AbortController() | ||
abortController.abort() | ||
dialStub.returns(Promise.resolve({} as any)) | ||
const preAbortedRequest = remotePinner.addPin({ | ||
cid, | ||
name: 'queued-test3', | ||
signal: abortController.signal | ||
}) | ||
await expect(preAbortedRequest).to.eventually.be.rejectedWith('Signal was aborted prior to pinning') | ||
}) | ||
|
||
it('Returns FailedToConnectToDelegates when unable to connect to delegates', async function () { | ||
const cid = await heliaFs.addBytes(encoder.encode('hello world')) | ||
dialStub.throws(new Error('Stubbed dial failure')) | ||
const addPinResult = remotePinner.addPin({ | ||
cid, | ||
name: 'pinned-test2' | ||
}) | ||
// stub heliaInstance.libp2p.dial to throw an error | ||
await expect(addPinResult).to.eventually.be.rejectedWith(FailedToConnectToDelegates) | ||
}) | ||
|
||
it('Does not return FailedToConnectToDelegates when unable to connect to a single delegate', async function () { | ||
const cid = await heliaFs.addBytes(encoder.encode('hello world')) | ||
dialStub.onCall(0).returns(Promise.resolve({} as any)) | ||
dialStub.onCall(1).throws(new Error('Stubbed dial failure')) | ||
const addPinResult = remotePinner.addPin({ | ||
cid, | ||
name: 'pinned-test2' | ||
}) | ||
|
||
await expect(addPinResult).to.eventually.have.property('status', Status.Pinned) | ||
}) | ||
|
||
it('can receive additional remote origins', async function () { | ||
const cid = await heliaFs.addBytes(encoder.encode('hello world')) | ||
dialStub.returns(Promise.resolve({} as any)) | ||
const addPinResult = await remotePinner.addPin({ | ||
cid, | ||
name: 'pinned-test4', | ||
origins: new Set(['http://localhost:4444']) | ||
}) | ||
expect(addPinResult.status).to.equal(Status.Pinned) | ||
}) | ||
}) | ||
|
||
describe('replacePin', function () { | ||
it('will replace a previously added pin', async function () { | ||
const cid = await heliaFs.addBytes(encoder.encode('hello world')) | ||
dialStub.returns(Promise.resolve({} as any)) | ||
const addPinResult = await remotePinner.addPin({ | ||
cid, | ||
name: 'pinned-test3' | ||
}) | ||
expect(addPinResult.status).to.equal(Status.Pinned) | ||
expect(addPinResult.requestid).to.be.a('string') | ||
await validatePinResults('pinned-test3', 1) | ||
await validatePinResults('pinned-test3-replaced', 0) | ||
|
||
const replacePinResult = await remotePinner.replacePin({ | ||
cid, | ||
name: 'pinned-test3-replaced', | ||
requestid: addPinResult.requestid | ||
}) | ||
// console.log('replacePinResult: ', replacePinResult) | ||
expect(replacePinResult.status).to.equal(Status.Pinned) | ||
expect(replacePinResult.requestid).not.to.equal(addPinResult.requestid) | ||
|
||
await validatePinResults('pinned-test3', 0) | ||
await validatePinResults('pinned-test3-replaced', 1) | ||
}) | ||
|
||
it('Will not replace the pin if provided an already aborted signal', async function () { | ||
const cid = await heliaFs.addBytes(encoder.encode('hello world')) | ||
|
||
const addPinResult = await remotePinner.addPin({ | ||
cid, | ||
name: 'pinned-test5' | ||
}) | ||
const abortController = new AbortController() | ||
abortController.abort() | ||
dialStub.returns(Promise.resolve({} as any)) | ||
const preAbortedRequest = remotePinner.replacePin({ | ||
cid, | ||
requestid: addPinResult.requestid, | ||
name: 'queued-test5-replaced', | ||
signal: abortController.signal | ||
}) | ||
await expect(preAbortedRequest).to.eventually.be.rejectedWith('Signal was aborted prior to pinning') | ||
}) | ||
}) | ||
}) |