From 1b8e29e4ce397fcb40b958a2dcfb156d4fe29045 Mon Sep 17 00:00:00 2001 From: Russell Dempsey <1173416+SgtPooki@users.noreply.github.com> Date: Mon, 14 Aug 2023 12:43:23 -0700 Subject: [PATCH] feat: create @helia/remote-pinner implementation --- src/errors.ts | 5 ++ src/index.ts | 121 +++++++++++++++++++++++++++ test/index.spec.ts | 204 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 330 insertions(+) create mode 100644 src/errors.ts create mode 100644 src/index.ts create mode 100644 test/index.spec.ts diff --git a/src/errors.ts b/src/errors.ts new file mode 100644 index 0000000..0796ba9 --- /dev/null +++ b/src/errors.ts @@ -0,0 +1,5 @@ +/** + * when remote pinning service returns delegates, if we can't connect to any, we won't be able to provide our CID's + * content to the service, and must abort. + */ +export class FailedToConnectToDelegates extends Error {} diff --git a/src/index.ts b/src/index.ts new file mode 100644 index 0000000..3e249d8 --- /dev/null +++ b/src/index.ts @@ -0,0 +1,121 @@ +import { type RemotePinningServiceClient, type Pin, type PinStatus, type PinsRequestidPostRequest, Status } from '@ipfs-shipyard/pinning-service-client' +import { multiaddr } from '@multiformats/multiaddr' +import debug from 'debug' +import pRetry from 'p-retry' +import { FailedToConnectToDelegates } from './errors.js' +import type { Helia } from '@helia/interface' +import type { CID } from 'multiformats/cid' + +const log = debug('helia-remote-pinning') +const logError = log.extend('error') +const logTrace = log.extend('trace') + +export interface HeliaRemotePinningOptions { + /** + * Control whether requests are aborted or not by manually aborting a signal or using AbortSignal.timeout() + */ + signal?: AbortSignal + + /** + * The CID instance to pin. When using Helia, passing around the CID object is preferred over the string. + */ + cid: CID +} + +export interface AddPinArgs extends Omit, HeliaRemotePinningOptions {} + +export interface ReplacePinArgs extends Omit, Omit, HeliaRemotePinningOptions {} + +export class HeliaRemotePinner { + constructor (private readonly heliaInstance: Helia, private readonly remotePinningClient: RemotePinningServiceClient) { + } + + private async getOrigins (otherOrigins: Pin['origins']): Promise> { + const origins = new Set(this.heliaInstance.libp2p.getMultiaddrs().map(multiaddr => multiaddr.toString())) + if (otherOrigins != null) { + for (const origin of otherOrigins) { + origins.add(origin) + } + } + return origins + } + + private async connectToDelegates (delegates: Set, signal?: AbortSignal): Promise { + let successfulDials = 0 + try { + for (const delegate of delegates) { + await this.heliaInstance.libp2p.dial(multiaddr(delegate), { signal }) + successfulDials++ + } + } catch (e) { + logError(e) + } + if (successfulDials === 0) { + throw new FailedToConnectToDelegates('Failed to connect to any delegates') + } + } + + /** + * The code that runs after we get a pinStatus from the remote pinning service. + * This method is the orchestrator for waiting for the pin to complete/fail as well as connecting to the delegates. + */ + private async handlePinStatus (pinStatus: PinStatus, signal?: AbortSignal): Promise { + await this.connectToDelegates(pinStatus.delegates, signal) + let updatedPinStatus = pinStatus + + /** + * We need to ensure that pinStatus is either pinned or failed. + * To do so, we will need to poll the remote pinning service for the status of the pin. + */ + try { + await pRetry(async (attemptNum) => { + logTrace('attempt #%d waiting for pinStatus of "pinned" or "failed"', attemptNum) + updatedPinStatus = await this.remotePinningClient.pinsRequestidGet({ requestid: pinStatus.requestid }) + if ([Status.Pinned, Status.Failed].includes(pinStatus.status)) { + return updatedPinStatus + } + throw new Error(`Pin status is ${pinStatus.status}`) + }, { + retries: 10, + signal + }) + } catch (e) { + logError(e) + } + + return updatedPinStatus + } + + async addPin ({ cid, signal, ...otherArgs }: AddPinArgs): Promise { + if (signal?.aborted === true) { + throw new Error('Signal was aborted prior to pinning') + } + const pinStatus = await this.remotePinningClient.pinsPost({ + pin: { + ...otherArgs, + cid: cid.toString(), + origins: await this.getOrigins(otherArgs.origins) + } + }) + return this.handlePinStatus(pinStatus, signal) + } + + async replacePin ({ cid, requestid, signal, ...otherArgs }: ReplacePinArgs): Promise { + if (signal?.aborted === true) { + throw new Error('Signal was aborted prior to pinning') + } + const pinStatus = await this.remotePinningClient.pinsRequestidPost({ + requestid, + pin: { + ...otherArgs, + cid: cid.toString(), + origins: await this.getOrigins(otherArgs.origins) + } + }) + return this.handlePinStatus(pinStatus, signal) + } +} + +export function createRemotePinner (heliaInstance: Helia, remotePinningClient: RemotePinningServiceClient, options?: HeliaRemotePinningOptions): HeliaRemotePinner { + return new HeliaRemotePinner(heliaInstance, remotePinningClient) +} diff --git a/test/index.spec.ts b/test/index.spec.ts new file mode 100644 index 0000000..469365a --- /dev/null +++ b/test/index.spec.ts @@ -0,0 +1,204 @@ +import { unixfs, type UnixFS } from '@helia/unixfs' +import { Configuration, RemotePinningServiceClient, Status } from '@ipfs-shipyard/pinning-service-client' +import { expect } from 'aegir/chai' +import { createHelia } from 'helia' +import sinon, { type SinonSandbox, type SinonStub } from 'sinon' +import { FailedToConnectToDelegates } from '../src/errors.js' +import { type HeliaRemotePinner, createRemotePinner } from '../src/index.js' +import type { Helia } from '@helia/interface' + +const encoder = new TextEncoder() + +describe('@helia/remote-pinning', function () { + let sinonSandbox: SinonSandbox + let remotePinner: HeliaRemotePinner + let helia: Helia + let remotePinningClient: RemotePinningServiceClient + let heliaFs: UnixFS + let dialStub: SinonStub + + const validatePinResults = async (name: string, count: number): Promise => { + const pinResults = await remotePinningClient.pinsGet({ name }) + expect(pinResults.results).to.have.lengthOf(count) + expect(pinResults.count).to.equal(count) + } + + beforeEach(async function () { + sinonSandbox = sinon.createSandbox() + helia = await createHelia() + heliaFs = unixfs(helia) + dialStub = sinonSandbox.stub(helia.libp2p, 'dial') + const pinServiceConfig = new Configuration({ + endpointUrl: `http://localhost:${process.env.PINNING_SERVER_PORT}`, // the URI for your pinning provider, e.g. `http://localhost:3000` + accessToken: process.env.PINNING_SERVICE_TOKEN // the secret token/key given to you by your pinning provider + }) + + remotePinningClient = new RemotePinningServiceClient(pinServiceConfig) + remotePinner = createRemotePinner(helia, remotePinningClient) + }) + afterEach(async function () { + sinonSandbox.restore() + await helia.stop() + const pins = await remotePinningClient.pinsGet() + await Promise.all([...pins.results].map(pin => remotePinningClient.pinsRequestidDelete({ requestid: pin.requestid }))) + }) + describe('addPin', function () { + it('Returns pinned status when pinning succeeds', async function () { + const cid = await heliaFs.addBytes(encoder.encode('hello world')) + dialStub.returns(Promise.resolve({} as any)) + const addPinResult = await remotePinner.addPin({ + cid, + name: 'pinned-test1' + }) + expect(addPinResult.status).to.equal(Status.Pinned) + }) + + it('Returns failed status when pinning fails', async function () { + const cid = await heliaFs.addBytes(encoder.encode('hello world')) + + dialStub.returns(Promise.resolve({} as any)) + const addPinResult = await remotePinner.addPin({ + cid, + name: 'failed-test1' + }) + expect(addPinResult.status).to.equal(Status.Failed) + }) + + it('will await a queued pin until a signal times out', async function () { + const cid = await heliaFs.addBytes(encoder.encode('hello world')) + const abortController = new AbortController() + const thenSpy = sinonSandbox.spy() + const catchSpy = sinonSandbox.spy() + const finallySpy = sinonSandbox.spy() + dialStub.returns(Promise.resolve({} as any)) + const addPinResult = remotePinner.addPin({ + cid, + name: 'queued-test1', + signal: abortController.signal + }) + addPinResult.then(thenSpy).catch(catchSpy).finally(finallySpy) + expect(thenSpy.called).to.equal(false) + expect(catchSpy.called).to.equal(false) + expect(finallySpy.called).to.equal(false) + + // we need to wait for X time and then confirm the promise hasn't settled + await new Promise(resolve => setTimeout(resolve, 100)) + expect(thenSpy.called).to.equal(false) + expect(catchSpy.called).to.equal(false) + expect(finallySpy.called).to.equal(false) + + // note that mock-pinning-service will indefinitely hang on pins with names that start with "queued-" + abortController.abort() + await expect(addPinResult).to.eventually.have.property('status', Status.Queued) + + expect(thenSpy.called).to.equal(true) + expect(catchSpy.called).to.equal(false) + expect(finallySpy.called).to.equal(true) + expect(abortController.signal.aborted).to.equal(true) + }) + + it('Stops listening when provided signal times out', async function () { + const cid = await heliaFs.addBytes(encoder.encode('hello world')) + dialStub.returns(Promise.resolve({} as any)) + const addPinResult = await remotePinner.addPin({ + cid, + name: 'queued-test2', + signal: AbortSignal.timeout(100) + }) + expect(addPinResult.status).to.equal(Status.Queued) + }) + + it('Will not pin if provided an already aborted signal', async function () { + const cid = await heliaFs.addBytes(encoder.encode('hello world')) + const abortController = new AbortController() + abortController.abort() + dialStub.returns(Promise.resolve({} as any)) + const preAbortedRequest = remotePinner.addPin({ + cid, + name: 'queued-test3', + signal: abortController.signal + }) + await expect(preAbortedRequest).to.eventually.be.rejectedWith('Signal was aborted prior to pinning') + }) + + it('Returns FailedToConnectToDelegates when unable to connect to delegates', async function () { + const cid = await heliaFs.addBytes(encoder.encode('hello world')) + dialStub.throws(new Error('Stubbed dial failure')) + const addPinResult = remotePinner.addPin({ + cid, + name: 'pinned-test2' + }) + // stub heliaInstance.libp2p.dial to throw an error + await expect(addPinResult).to.eventually.be.rejectedWith(FailedToConnectToDelegates) + }) + + it('Does not return FailedToConnectToDelegates when unable to connect to a single delegate', async function () { + const cid = await heliaFs.addBytes(encoder.encode('hello world')) + dialStub.onCall(0).returns(Promise.resolve({} as any)) + dialStub.onCall(1).throws(new Error('Stubbed dial failure')) + const addPinResult = remotePinner.addPin({ + cid, + name: 'pinned-test2' + }) + + await expect(addPinResult).to.eventually.have.property('status', Status.Pinned) + }) + + it('can receive additional remote origins', async function () { + const cid = await heliaFs.addBytes(encoder.encode('hello world')) + dialStub.returns(Promise.resolve({} as any)) + const addPinResult = await remotePinner.addPin({ + cid, + name: 'pinned-test4', + origins: new Set(['http://localhost:4444']) + }) + expect(addPinResult.status).to.equal(Status.Pinned) + }) + }) + + describe('replacePin', function () { + it('will replace a previously added pin', async function () { + const cid = await heliaFs.addBytes(encoder.encode('hello world')) + dialStub.returns(Promise.resolve({} as any)) + const addPinResult = await remotePinner.addPin({ + cid, + name: 'pinned-test3' + }) + expect(addPinResult.status).to.equal(Status.Pinned) + expect(addPinResult.requestid).to.be.a('string') + await validatePinResults('pinned-test3', 1) + await validatePinResults('pinned-test3-replaced', 0) + + const replacePinResult = await remotePinner.replacePin({ + cid, + name: 'pinned-test3-replaced', + requestid: addPinResult.requestid + }) + // console.log('replacePinResult: ', replacePinResult) + expect(replacePinResult.status).to.equal(Status.Pinned) + expect(replacePinResult.requestid).not.to.equal(addPinResult.requestid) + + await validatePinResults('pinned-test3', 0) + await validatePinResults('pinned-test3-replaced', 1) + }) + + it('Will not replace the pin if provided an already aborted signal', async function () { + const cid = await heliaFs.addBytes(encoder.encode('hello world')) + + const addPinResult = await remotePinner.addPin({ + cid, + name: 'pinned-test5' + }) + const abortController = new AbortController() + abortController.abort() + dialStub.returns(Promise.resolve({} as any)) + const preAbortedRequest = remotePinner.replacePin({ + cid, + requestid: addPinResult.requestid, + name: 'queued-test5-replaced', + signal: abortController.signal + }) + await expect(preAbortedRequest).to.eventually.be.rejectedWith('Signal was aborted prior to pinning') + }) + }) +})