From 35ba0242ded0e9e2fb15d71d75e9ace0e3fa4f9d Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Thu, 9 Nov 2023 12:08:11 +0000 Subject: [PATCH] refactor!: extract fetch to separate module (#2223) BREAKING CHANGE: imports from `libp2p/fetch` should be updated to `@libp2p/fetch` --- .release-please.json | 1 + doc/migrations/v0.46-v1.0.0.md | 33 +- packages/libp2p/.aegir.js | 2 - packages/libp2p/README.md | 14 + packages/libp2p/package.json | 30 -- packages/libp2p/src/fetch/README.md | 41 --- packages/libp2p/src/fetch/index.ts | 315 ------------------ packages/libp2p/test/fetch/fetch.node.ts | 193 ----------- packages/libp2p/test/fetch/index.spec.ts | 146 -------- packages/libp2p/typedoc.json | 3 +- packages/protocol-fetch/LICENSE | 4 + packages/protocol-fetch/LICENSE-APACHE | 5 + packages/protocol-fetch/LICENSE-MIT | 19 ++ packages/protocol-fetch/README.md | 68 ++++ packages/protocol-fetch/package.json | 66 ++++ .../fetch => protocol-fetch/src}/constants.ts | 0 packages/protocol-fetch/src/fetch.ts | 236 +++++++++++++ packages/protocol-fetch/src/index.ts | 98 ++++++ .../src}/pb/proto.proto | 0 .../fetch => protocol-fetch/src}/pb/proto.ts | 0 packages/protocol-fetch/test/index.spec.ts | 274 +++++++++++++++ packages/protocol-fetch/tsconfig.json | 24 ++ packages/protocol-fetch/typedoc.json | 5 + 23 files changed, 847 insertions(+), 730 deletions(-) delete mode 100644 packages/libp2p/src/fetch/README.md delete mode 100644 packages/libp2p/src/fetch/index.ts delete mode 100644 packages/libp2p/test/fetch/fetch.node.ts delete mode 100644 packages/libp2p/test/fetch/index.spec.ts create mode 100644 packages/protocol-fetch/LICENSE create mode 100644 packages/protocol-fetch/LICENSE-APACHE create mode 100644 packages/protocol-fetch/LICENSE-MIT create mode 100644 packages/protocol-fetch/README.md create mode 100644 packages/protocol-fetch/package.json rename packages/{libp2p/src/fetch => protocol-fetch/src}/constants.ts (100%) create mode 100644 packages/protocol-fetch/src/fetch.ts create mode 100644 packages/protocol-fetch/src/index.ts rename packages/{libp2p/src/fetch => protocol-fetch/src}/pb/proto.proto (100%) rename packages/{libp2p/src/fetch => protocol-fetch/src}/pb/proto.ts (100%) create mode 100644 packages/protocol-fetch/test/index.spec.ts create mode 100644 packages/protocol-fetch/tsconfig.json create mode 100644 packages/protocol-fetch/typedoc.json diff --git a/.release-please.json b/.release-please.json index 2c1e29f7b2..6a23611c5e 100644 --- a/.release-please.json +++ b/.release-please.json @@ -23,6 +23,7 @@ "packages/peer-store": {}, "packages/protocol-autonat": {}, "packages/protocol-dcutr": {}, + "packages/protocol-fetch": {}, "packages/protocol-identify": {}, "packages/protocol-perf": {}, "packages/protocol-ping": {}, diff --git a/doc/migrations/v0.46-v1.0.0.md b/doc/migrations/v0.46-v1.0.0.md index 5c726a6296..5a1eb4ac32 100644 --- a/doc/migrations/v0.46-v1.0.0.md +++ b/doc/migrations/v0.46-v1.0.0.md @@ -9,6 +9,7 @@ A migration guide for refactoring your application code from libp2p `v0.46` to ` - [Ping](#ping) - [Identify](#identify) - [DCUtR](#dcutr) +- [Fetch](#fetch) - [KeyChain](#keychain) - [UPnPNat](#upnpnat) - [Plaintext](#plaintext) @@ -117,7 +118,7 @@ import { dcutrService } from 'libp2p/dcutr' const node = await createLibp2p({ services: { - identify: dcutrService() + dcutr: dcutrService() } }) ``` @@ -135,6 +136,36 @@ const node = await createLibp2p({ }) ``` +## Fetch + +The Fetch service is now published in its own package. + +**Before** + +```ts +import { createLibp2p } from 'libp2p' +import { fetchService } from 'libp2p/fetch' + +const node = await createLibp2p({ + services: { + fetch: fetchService() + } +}) +``` + +**After** + +```ts +import { createLibp2p } from 'libp2p' +import { fetch } from '@libp2p/fetch' + +const node = await createLibp2p({ + services: { + fetch: fetch() + } +}) +``` + ## KeyChain The KeyChain object is no longer included on Libp2p and must be instantiated explicitly if desired. diff --git a/packages/libp2p/.aegir.js b/packages/libp2p/.aegir.js index 62d9b2c045..b0f7674cd0 100644 --- a/packages/libp2p/.aegir.js +++ b/packages/libp2p/.aegir.js @@ -18,7 +18,6 @@ export default { const { plaintext } = await import('@libp2p/plaintext') const { circuitRelayServer, circuitRelayTransport } = await import('@libp2p/circuit-relay-v2') const { identify } = await import('@libp2p/identify') - const { fetchService } = await import('./dist/src/fetch/index.js') const peerId = await createEd25519PeerId() const libp2p = await createLibp2p({ @@ -46,7 +45,6 @@ export default { ], services: { identify: identify(), - fetch: fetchService(), relay: circuitRelayServer({ reservations: { maxReservations: Infinity diff --git a/packages/libp2p/README.md b/packages/libp2p/README.md index 8e2c9bb35d..939640c7b2 100644 --- a/packages/libp2p/README.md +++ b/packages/libp2p/README.md @@ -28,6 +28,20 @@ > JavaScript implementation of libp2p, a modular peer to peer network stack +# About + +Use the `createLibp2p` function to create a libp2p node. + +## Example + +```typescript +import { createLibp2p } from 'libp2p' + +const node = await createLibp2p({ + // ...other options +}) +``` + # Project status We've come a long way, but this project is still in Alpha, lots of development is happening, API might change, beware of the Dragons 🐉.. diff --git a/packages/libp2p/package.json b/packages/libp2p/package.json index f533fd2055..ee6a5fa2ad 100644 --- a/packages/libp2p/package.json +++ b/packages/libp2p/package.json @@ -21,22 +21,6 @@ ], "type": "module", "types": "./dist/src/index.d.ts", - "typesVersions": { - "*": { - "*": [ - "*", - "dist/*", - "dist/src/*", - "dist/src/*/index" - ], - "src/*": [ - "*", - "dist/*", - "dist/src/*", - "dist/src/*/index" - ] - } - }, "files": [ "src", "dist", @@ -47,10 +31,6 @@ ".": { "types": "./dist/src/index.d.ts", "import": "./dist/src/index.js" - }, - "./fetch": { - "types": "./dist/src/fetch/index.d.ts", - "import": "./dist/src/fetch/index.js" } }, "eslintConfig": { @@ -71,12 +51,6 @@ "dep-check": "aegir dep-check", "prepublishOnly": "node scripts/update-version.js && npm run build", "build": "aegir build", - "generate": "run-s generate:proto:*", - "generate:proto:circuit-relay": "protons ./src/circuit-relay/pb/index.proto", - "generate:proto:dcutr": "protons ./src/dcutr/pb/message.proto", - "generate:proto:fetch": "protons ./src/fetch/pb/proto.proto", - "generate:proto:identify": "protons ./src/identify/pb/message.proto", - "generate:proto:plaintext": "protons ./src/insecure/pb/proto.proto", "test": "aegir test", "test:node": "aegir test -t node -f \"./dist/test/**/*.{node,spec}.js\" --cov", "test:chrome": "aegir test -t browser -f \"./dist/test/**/*.spec.js\" --cov", @@ -108,7 +82,6 @@ "it-drain": "^3.0.2", "it-filter": "^3.0.1", "it-first": "^3.0.1", - "it-length-prefixed": "^9.0.1", "it-map": "^3.0.3", "it-merge": "^3.0.0", "it-pipe": "^3.0.1", @@ -118,7 +91,6 @@ "p-defer": "^4.0.0", "p-queue": "^7.3.4", "private-ip": "^3.0.0", - "protons-runtime": "^5.0.0", "rate-limiter-flexible": "^3.0.0", "uint8arraylist": "^2.4.3", "uint8arrays": "^4.0.6" @@ -145,11 +117,9 @@ "execa": "^8.0.1", "go-libp2p": "^1.1.1", "it-pushable": "^3.2.0", - "npm-run-all": "^4.1.5", "p-event": "^6.0.0", "p-times": "^4.0.0", "p-wait-for": "^5.0.2", - "protons": "^7.0.2", "sinon": "^17.0.0", "sinon-ts": "^2.0.0" }, diff --git a/packages/libp2p/src/fetch/README.md b/packages/libp2p/src/fetch/README.md deleted file mode 100644 index 8d231d2e83..0000000000 --- a/packages/libp2p/src/fetch/README.md +++ /dev/null @@ -1,41 +0,0 @@ -libp2p-fetch JavaScript Implementation -===================================== - -> Libp2p fetch protocol JavaScript implementation - -## Table of contents - -- [Overview](#overview) -- [Usage](#usage) - -## Overview - -An implementation of the Fetch protocol as described here: https://github.com/libp2p/specs/tree/master/fetch - -The fetch protocol is a simple protocol for requesting a value corresponding to a key from a peer. - -## Usage - -```javascript -import { createLibp2p } from 'libp2p' - -/** - * Given a key (as a string) returns a value (as a Uint8Array), or null if the key isn't found. - * All keys must be prefixed my the same prefix, which will be used to find the appropriate key - * lookup function. - * @param key - a string - * @returns value - a Uint8Array value that corresponds to the given key, or null if the key doesn't - * have a corresponding value. - */ -async function my_subsystem_key_lookup(key) { - // app specific callback to lookup key-value pairs. -} - -// Enable this peer to respond to fetch requests for keys that begin with '/my_subsystem_key_prefix/' -const libp2p = createLibp2p() -libp2p.fetchService.registerLookupFunction('/my_subsystem_key_prefix/', my_subsystem_key_lookup) - -const key = '/my_subsystem_key_prefix/{...}' -const peerDst = PeerId.parse('Qmfoo...') // or Multiaddr instance -const value = await libp2p.fetch(peerDst, key) -``` diff --git a/packages/libp2p/src/fetch/index.ts b/packages/libp2p/src/fetch/index.ts deleted file mode 100644 index b9fad11e92..0000000000 --- a/packages/libp2p/src/fetch/index.ts +++ /dev/null @@ -1,315 +0,0 @@ -import { CodeError, ERR_TIMEOUT } from '@libp2p/interface/errors' -import { setMaxListeners } from '@libp2p/interface/events' -import { logger } from '@libp2p/logger' -import first from 'it-first' -import * as lp from 'it-length-prefixed' -import { pipe } from 'it-pipe' -import { fromString as uint8arrayFromString } from 'uint8arrays/from-string' -import { toString as uint8arrayToString } from 'uint8arrays/to-string' -import { codes } from '../errors.js' -import { PROTOCOL_NAME, PROTOCOL_VERSION } from './constants.js' -import { FetchRequest, FetchResponse } from './pb/proto.js' -import type { AbortOptions } from '@libp2p/interface' -import type { Stream } from '@libp2p/interface/connection' -import type { PeerId } from '@libp2p/interface/peer-id' -import type { Startable } from '@libp2p/interface/startable' -import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager' -import type { IncomingStreamData, Registrar } from '@libp2p/interface-internal/registrar' - -const log = logger('libp2p:fetch') - -const DEFAULT_TIMEOUT = 10000 - -export interface FetchServiceInit { - protocolPrefix?: string - maxInboundStreams?: number - maxOutboundStreams?: number - - /** - * How long we should wait for a remote peer to send any data - */ - timeout?: number -} - -export interface HandleMessageOptions { - stream: Stream - protocol: string -} - -export interface LookupFunction { - (key: string): Promise -} - -export interface FetchServiceComponents { - registrar: Registrar - connectionManager: ConnectionManager -} - -export interface FetchService { - /** - * The protocol name used by this fetch service - */ - protocol: string - - /** - * Sends a request to fetch the value associated with the given key from the given peer - */ - fetch(peer: PeerId, key: string, options?: AbortOptions): Promise - - /** - * Registers a new lookup callback that can map keys to values, for a given set of keys that - * share the same prefix - * - * @example - * - * ```js - * // ... - * libp2p.fetchService.registerLookupFunction('/prefix', (key) => { ... }) - * ``` - */ - registerLookupFunction(prefix: string, lookup: LookupFunction): void - - /** - * Registers a new lookup callback that can map keys to values, for a given set of keys that - * share the same prefix. - * - * @example - * - * ```js - * // ... - * libp2p.fetchService.unregisterLookupFunction('/prefix') - * ``` - */ - unregisterLookupFunction(prefix: string, lookup?: LookupFunction): void -} - -/** - * A simple libp2p protocol for requesting a value corresponding to a key from a peer. - * Developers can register one or more lookup function for retrieving the value corresponding to - * a given key. Each lookup function must act on a distinct part of the overall key space, defined - * by a fixed prefix that all keys that should be routed to that lookup function will start with. - */ -class DefaultFetchService implements Startable, FetchService { - public readonly protocol: string - private readonly components: FetchServiceComponents - private readonly lookupFunctions: Map - private started: boolean - private readonly init: FetchServiceInit - - constructor (components: FetchServiceComponents, init: FetchServiceInit) { - this.started = false - this.components = components - this.protocol = `/${init.protocolPrefix ?? 'libp2p'}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}` - this.lookupFunctions = new Map() // Maps key prefix to value lookup function - this.handleMessage = this.handleMessage.bind(this) - this.init = init - } - - async start (): Promise { - await this.components.registrar.handle(this.protocol, (data) => { - void this.handleMessage(data) - .then(async () => { - await data.stream.close() - }) - .catch(err => { - log.error(err) - }) - }, { - maxInboundStreams: this.init.maxInboundStreams, - maxOutboundStreams: this.init.maxOutboundStreams - }) - this.started = true - } - - async stop (): Promise { - await this.components.registrar.unhandle(this.protocol) - this.started = false - } - - isStarted (): boolean { - return this.started - } - - /** - * Sends a request to fetch the value associated with the given key from the given peer - */ - async fetch (peer: PeerId, key: string, options: AbortOptions = {}): Promise { - log('dialing %s to %p', this.protocol, peer) - - const connection = await this.components.connectionManager.openConnection(peer, options) - let signal = options.signal - let stream: Stream | undefined - let onAbort = (): void => {} - - // create a timeout if no abort signal passed - if (signal == null) { - log('using default timeout of %d ms', this.init.timeout) - signal = AbortSignal.timeout(this.init.timeout ?? DEFAULT_TIMEOUT) - - setMaxListeners(Infinity, signal) - } - - try { - stream = await connection.newStream(this.protocol, { - signal - }) - - onAbort = () => { - stream?.abort(new CodeError('fetch timeout', ERR_TIMEOUT)) - } - - // make stream abortable - signal.addEventListener('abort', onAbort, { once: true }) - - log('fetch %s', key) - - const result = await pipe( - [FetchRequest.encode({ identifier: key })], - (source) => lp.encode(source), - stream, - (source) => lp.decode(source), - async function (source) { - const buf = await first(source) - - if (buf == null) { - throw new CodeError('No data received', codes.ERR_INVALID_MESSAGE) - } - - const response = FetchResponse.decode(buf) - - switch (response.status) { - case (FetchResponse.StatusCode.OK): { - log('received status for %s ok', key) - return response.data - } - case (FetchResponse.StatusCode.NOT_FOUND): { - log('received status for %s not found', key) - return null - } - case (FetchResponse.StatusCode.ERROR): { - log('received status for %s error', key) - const errmsg = uint8arrayToString(response.data) - throw new CodeError('Error in fetch protocol response: ' + errmsg, codes.ERR_INVALID_PARAMETERS) - } - default: { - log('received status for %s unknown', key) - throw new CodeError('Unknown response status', codes.ERR_INVALID_MESSAGE) - } - } - } - ) - - return result ?? null - } finally { - signal.removeEventListener('abort', onAbort) - if (stream != null) { - await stream.close() - } - } - } - - /** - * Invoked when a fetch request is received. Reads the request message off the given stream and - * responds based on looking up the key in the request via the lookup callback that corresponds - * to the key's prefix. - */ - async handleMessage (data: IncomingStreamData): Promise { - const { stream } = data - const self = this - - await pipe( - stream, - (source) => lp.decode(source), - async function * (source) { - const buf = await first(source) - - if (buf == null) { - throw new CodeError('No data received', codes.ERR_INVALID_MESSAGE) - } - - // for await (const buf of source) { - const request = FetchRequest.decode(buf) - - let response: FetchResponse - const lookup = self._getLookupFunction(request.identifier) - if (lookup != null) { - log('look up data with identifier %s', request.identifier) - const data = await lookup(request.identifier) - if (data != null) { - log('sending status for %s ok', request.identifier) - response = { status: FetchResponse.StatusCode.OK, data } - } else { - log('sending status for %s not found', request.identifier) - response = { status: FetchResponse.StatusCode.NOT_FOUND, data: new Uint8Array(0) } - } - } else { - log('sending status for %s error', request.identifier) - const errmsg = uint8arrayFromString(`No lookup function registered for key: ${request.identifier}`) - response = { status: FetchResponse.StatusCode.ERROR, data: errmsg } - } - - yield FetchResponse.encode(response) - }, - (source) => lp.encode(source), - stream - ) - } - - /** - * Given a key, finds the appropriate function for looking up its corresponding value, based on - * the key's prefix. - */ - _getLookupFunction (key: string): LookupFunction | undefined { - for (const prefix of this.lookupFunctions.keys()) { - if (key.startsWith(prefix)) { - return this.lookupFunctions.get(prefix) - } - } - } - - /** - * Registers a new lookup callback that can map keys to values, for a given set of keys that - * share the same prefix - * - * @example - * - * ```js - * // ... - * libp2p.fetchService.registerLookupFunction('/prefix', (key) => { ... }) - * ``` - */ - registerLookupFunction (prefix: string, lookup: LookupFunction): void { - if (this.lookupFunctions.has(prefix)) { - throw new CodeError(`Fetch protocol handler for key prefix '${prefix}' already registered`, codes.ERR_KEY_ALREADY_EXISTS) - } - - this.lookupFunctions.set(prefix, lookup) - } - - /** - * Registers a new lookup callback that can map keys to values, for a given set of keys that - * share the same prefix. - * - * @example - * - * ```js - * // ... - * libp2p.fetchService.unregisterLookupFunction('/prefix') - * ``` - */ - unregisterLookupFunction (prefix: string, lookup?: LookupFunction): void { - if (lookup != null) { - const existingLookup = this.lookupFunctions.get(prefix) - - if (existingLookup !== lookup) { - return - } - } - - this.lookupFunctions.delete(prefix) - } -} - -export function fetchService (init: FetchServiceInit = {}): (components: FetchServiceComponents) => FetchService { - return (components) => new DefaultFetchService(components, init) -} diff --git a/packages/libp2p/test/fetch/fetch.node.ts b/packages/libp2p/test/fetch/fetch.node.ts deleted file mode 100644 index ad23ce1f86..0000000000 --- a/packages/libp2p/test/fetch/fetch.node.ts +++ /dev/null @@ -1,193 +0,0 @@ -/* eslint-env mocha */ - -import { yamux } from '@chainsafe/libp2p-yamux' -import { mplex } from '@libp2p/mplex' -import { plaintext } from '@libp2p/plaintext' -import { tcp } from '@libp2p/tcp' -import { expect } from 'aegir/chai' -import { codes } from '../../src/errors.js' -import { type FetchService, fetchService } from '../../src/fetch/index.js' -import { createLibp2p } from '../../src/index.js' -import { createPeerId } from '../fixtures/creators/peer.js' -import type { Libp2p } from '@libp2p/interface' -import type { PeerId } from '@libp2p/interface/peer-id' - -async function createNode (peerId: PeerId): Promise> { - return createLibp2p({ - peerId, - addresses: { - listen: [ - '/ip4/0.0.0.0/tcp/0' - ] - }, - transports: [ - tcp() - ], - streamMuxers: [ - yamux(), - mplex() - ], - connectionEncryption: [ - plaintext() - ], - services: { - fetch: fetchService() - } - }) -} - -describe('Fetch', () => { - let sender: Libp2p<{ fetch: FetchService }> - let receiver: Libp2p<{ fetch: FetchService }> - const PREFIX_A = '/moduleA/' - const PREFIX_B = '/moduleB/' - const DATA_A = { foobar: 'hello world' } - const DATA_B = { foobar: 'goodnight moon' } - - const generateLookupFunction = function (prefix: string, data: Record) { - return async function (key: string): Promise { - key = key.slice(prefix.length) // strip prefix from key - const val = data[key] - if (val != null) { - return (new TextEncoder()).encode(val) - } - return null - } - } - - beforeEach(async () => { - const peerIdA = await createPeerId() - const peerIdB = await createPeerId() - sender = await createNode(peerIdA) - receiver = await createNode(peerIdB) - - await sender.start() - await receiver.start() - - await receiver.dial(sender.getMultiaddrs()[0]) - }) - - afterEach(async () => { - await sender.stop() - await receiver.stop() - }) - - it('fetch key that exists in receivers datastore', async () => { - receiver.services.fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) - - const rawData = await sender.services.fetch.fetch(receiver.peerId, '/moduleA/foobar') - - if (rawData == null) { - throw new Error('Value was not found') - } - - const value = (new TextDecoder()).decode(rawData) - expect(value).to.equal('hello world') - }) - - it('Different lookups for different prefixes', async () => { - receiver.services.fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) - receiver.services.fetch.registerLookupFunction(PREFIX_B, generateLookupFunction(PREFIX_B, DATA_B)) - - const rawDataA = await sender.services.fetch.fetch(receiver.peerId, '/moduleA/foobar') - - if (rawDataA == null) { - throw new Error('Value was not found') - } - - const valueA = (new TextDecoder()).decode(rawDataA) - expect(valueA).to.equal('hello world') - - // Different lookup functions can be registered on different prefixes, and have different - // values for the same key underneath the different prefix. - const rawDataB = await sender.services.fetch.fetch(receiver.peerId, '/moduleB/foobar') - - if (rawDataB == null) { - throw new Error('Value was not found') - } - - const valueB = (new TextDecoder()).decode(rawDataB) - expect(valueB).to.equal('goodnight moon') - }) - - it('fetch key that does not exist in receivers datastore', async () => { - receiver.services.fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) - const result = await sender.services.fetch.fetch(receiver.peerId, '/moduleA/garbage') - - expect(result).to.equal(null) - }) - - it('fetch key with unknown prefix throws error', async () => { - receiver.services.fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) - - await expect(sender.services.fetch.fetch(receiver.peerId, '/moduleUNKNOWN/foobar')) - .to.eventually.be.rejected.with.property('code', codes.ERR_INVALID_PARAMETERS) - }) - - it('registering multiple handlers for same prefix errors', async () => { - receiver.services.fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_A)) - - expect(() => { receiver.services.fetch.registerLookupFunction(PREFIX_A, generateLookupFunction(PREFIX_A, DATA_B)) }) - .to.throw().with.property('code', codes.ERR_KEY_ALREADY_EXISTS) - }) - - it('can unregister handler', async () => { - const lookupFunction = generateLookupFunction(PREFIX_A, DATA_A) - receiver.services.fetch.registerLookupFunction(PREFIX_A, lookupFunction) - const rawDataA = await sender.services.fetch.fetch(receiver.peerId, '/moduleA/foobar') - - if (rawDataA == null) { - throw new Error('Value was not found') - } - - const valueA = (new TextDecoder()).decode(rawDataA) - expect(valueA).to.equal('hello world') - - receiver.services.fetch.unregisterLookupFunction(PREFIX_A, lookupFunction) - - await expect(sender.services.fetch.fetch(receiver.peerId, '/moduleA/foobar')) - .to.eventually.be.rejectedWith(/No lookup function registered for key/) - }) - - it('can unregister all handlers', async () => { - const lookupFunction = generateLookupFunction(PREFIX_A, DATA_A) - receiver.services.fetch.registerLookupFunction(PREFIX_A, lookupFunction) - const rawDataA = await sender.services.fetch.fetch(receiver.peerId, '/moduleA/foobar') - - if (rawDataA == null) { - throw new Error('Value was not found') - } - - const valueA = (new TextDecoder()).decode(rawDataA) - expect(valueA).to.equal('hello world') - - receiver.services.fetch.unregisterLookupFunction(PREFIX_A) - - await expect(sender.services.fetch.fetch(receiver.peerId, '/moduleA/foobar')) - .to.eventually.be.rejectedWith(/No lookup function registered for key/) - }) - - it('does not unregister wrong handlers', async () => { - const lookupFunction = generateLookupFunction(PREFIX_A, DATA_A) - receiver.services.fetch.registerLookupFunction(PREFIX_A, lookupFunction) - const rawDataA = await sender.services.fetch.fetch(receiver.peerId, '/moduleA/foobar') - - if (rawDataA == null) { - throw new Error('Value was not found') - } - - const valueA = (new TextDecoder()).decode(rawDataA) - expect(valueA).to.equal('hello world') - - receiver.services.fetch.unregisterLookupFunction(PREFIX_A, async () => { return null }) - - const rawDataB = await sender.services.fetch.fetch(receiver.peerId, '/moduleA/foobar') - - if (rawDataB == null) { - throw new Error('Value was not found') - } - - const valueB = (new TextDecoder()).decode(rawDataB) - expect(valueB).to.equal('hello world') - }) -}) diff --git a/packages/libp2p/test/fetch/index.spec.ts b/packages/libp2p/test/fetch/index.spec.ts deleted file mode 100644 index dd7d1d921a..0000000000 --- a/packages/libp2p/test/fetch/index.spec.ts +++ /dev/null @@ -1,146 +0,0 @@ -/* eslint-env mocha */ - -import { ERR_TIMEOUT } from '@libp2p/interface/errors' -import { TypedEventEmitter } from '@libp2p/interface/events' -import { start, stop } from '@libp2p/interface/startable' -import { mockRegistrar, mockUpgrader, connectionPair } from '@libp2p/interface-compliance-tests/mocks' -import { createEd25519PeerId } from '@libp2p/peer-id-factory' -import { PersistentPeerStore } from '@libp2p/peer-store' -import { expect } from 'aegir/chai' -import { MemoryDatastore } from 'datastore-core' -import delay from 'delay' -import { pipe } from 'it-pipe' -import sinon from 'sinon' -import { stubInterface } from 'sinon-ts' -import { defaultComponents, type Components } from '../../src/components.js' -import { DefaultConnectionManager } from '../../src/connection-manager/index.js' -import { fetchService, type FetchServiceInit } from '../../src/fetch/index.js' -import type { ConnectionGater } from '@libp2p/interface/connection-gater' -import type { TransportManager } from '@libp2p/interface-internal/transport-manager' - -const defaultInit: FetchServiceInit = { - protocolPrefix: 'ipfs', - maxInboundStreams: 1, - maxOutboundStreams: 1, - timeout: 1000 -} - -async function createComponents (index: number): Promise { - const peerId = await createEd25519PeerId() - - const events = new TypedEventEmitter() - const components = defaultComponents({ - peerId, - registrar: mockRegistrar(), - upgrader: mockUpgrader({ events }), - datastore: new MemoryDatastore(), - transportManager: stubInterface(), - connectionGater: stubInterface(), - events - }) - components.peerStore = new PersistentPeerStore(components) - components.connectionManager = new DefaultConnectionManager(components, { - minConnections: 50, - maxConnections: 1000, - inboundUpgradeTimeout: 1000 - }) - - return components -} - -describe('fetch', () => { - let localComponents: Components - let remoteComponents: Components - - beforeEach(async () => { - localComponents = await createComponents(0) - remoteComponents = await createComponents(1) - - await Promise.all([ - start(localComponents), - start(remoteComponents) - ]) - }) - - afterEach(async () => { - sinon.restore() - - await Promise.all([ - stop(localComponents), - stop(remoteComponents) - ]) - }) - - it('should be able to fetch from another peer', async () => { - const key = 'key' - const value = Uint8Array.from([0, 1, 2, 3, 4]) - const localFetch = fetchService(defaultInit)(localComponents) - const remoteFetch = fetchService(defaultInit)(remoteComponents) - - remoteFetch.registerLookupFunction(key, async (identifier) => { - expect(identifier).to.equal(key) - - return value - }) - - await start(localFetch) - await start(remoteFetch) - - // simulate connection between nodes - const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents) - localComponents.events.safeDispatchEvent('connection:open', { detail: localToRemote }) - remoteComponents.events.safeDispatchEvent('connection:open', { detail: remoteToLocal }) - - // Run fetch - const result = await localFetch.fetch(remoteComponents.peerId, key) - - expect(result).to.equalBytes(value) - }) - - it('should time out fetching from another peer when waiting for the record', async () => { - const key = 'key' - const localFetch = fetchService(defaultInit)(localComponents) - const remoteFetch = fetchService(defaultInit)(remoteComponents) - - await start(localFetch) - await start(remoteFetch) - - // simulate connection between nodes - const [localToRemote, remoteToLocal] = connectionPair(localComponents, remoteComponents) - localComponents.events.safeDispatchEvent('connection:open', { detail: localToRemote }) - remoteComponents.events.safeDispatchEvent('connection:open', { detail: remoteToLocal }) - - // replace existing handler with a really slow one - await remoteComponents.registrar.unhandle(remoteFetch.protocol) - await remoteComponents.registrar.handle(remoteFetch.protocol, ({ stream }) => { - void pipe( - stream, - async function * (source) { - for await (const chunk of source) { - // longer than the timeout - await delay(1000) - - yield chunk - } - }, - stream - ) - }) - - const newStreamSpy = sinon.spy(localToRemote, 'newStream') - - // 10 ms timeout - const signal = AbortSignal.timeout(10) - - // Run fetch, should time out - await expect(localFetch.fetch(remoteComponents.peerId, key, { - signal - })) - .to.eventually.be.rejected.with.property('code', ERR_TIMEOUT) - - // should have closed stream - expect(newStreamSpy).to.have.property('callCount', 1) - const stream = await newStreamSpy.getCall(0).returnValue - expect(stream).to.have.nested.property('timeline.close') - }) -}) diff --git a/packages/libp2p/typedoc.json b/packages/libp2p/typedoc.json index 9797b17f28..f599dc728d 100644 --- a/packages/libp2p/typedoc.json +++ b/packages/libp2p/typedoc.json @@ -1,6 +1,5 @@ { "entryPoints": [ - "./src/index.ts", - "./src/fetch/index.ts" + "./src/index.ts" ] } diff --git a/packages/protocol-fetch/LICENSE b/packages/protocol-fetch/LICENSE new file mode 100644 index 0000000000..20ce483c86 --- /dev/null +++ b/packages/protocol-fetch/LICENSE @@ -0,0 +1,4 @@ +This project is dual licensed under MIT and Apache-2.0. + +MIT: https://www.opensource.org/licenses/mit +Apache-2.0: https://www.apache.org/licenses/license-2.0 diff --git a/packages/protocol-fetch/LICENSE-APACHE b/packages/protocol-fetch/LICENSE-APACHE new file mode 100644 index 0000000000..14478a3b60 --- /dev/null +++ b/packages/protocol-fetch/LICENSE-APACHE @@ -0,0 +1,5 @@ +Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. diff --git a/packages/protocol-fetch/LICENSE-MIT b/packages/protocol-fetch/LICENSE-MIT new file mode 100644 index 0000000000..72dc60d84b --- /dev/null +++ b/packages/protocol-fetch/LICENSE-MIT @@ -0,0 +1,19 @@ +The MIT License (MIT) + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/packages/protocol-fetch/README.md b/packages/protocol-fetch/README.md new file mode 100644 index 0000000000..f48d42a2e2 --- /dev/null +++ b/packages/protocol-fetch/README.md @@ -0,0 +1,68 @@ +[![libp2p.io](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/) +[![Discuss](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg?style=flat-square)](https://discuss.libp2p.io) +[![codecov](https://img.shields.io/codecov/c/github/libp2p/js-libp2p.svg?style=flat-square)](https://codecov.io/gh/libp2p/js-libp2p) +[![CI](https://img.shields.io/github/actions/workflow/status/libp2p/js-libp2p/main.yml?branch=master\&style=flat-square)](https://github.com/libp2p/js-libp2p/actions/workflows/main.yml?query=branch%3Amaster) + +> Implementation of the Fetch Protocol + +# About + +An implementation of the Fetch protocol as described here: + +The fetch protocol is a simple protocol for requesting a value corresponding to a key from a peer. + +## Example + +```typescript +import { createLibp2p } from 'libp2p' +import { fetch } from '@libp2p/fetch' + +const libp2p = await createLibp2p({ + services: { + fetch: fetch() + } +}) + +// Given a key (as a string) returns a value (as a Uint8Array), or null if the key isn't found. +// All keys must be prefixed my the same prefix, which will be used to find the appropriate key +// lookup function. +async function my_subsystem_key_lookup(key) { + // app specific callback to lookup key-value pairs. +} + +// Enable this peer to respond to fetch requests for keys that begin with '/my_subsystem_key_prefix/' +libp2p.fetch.registerLookupFunction('/my_subsystem_key_prefix/', my_subsystem_key_lookup) + +const key = '/my_subsystem_key_prefix/{...}' +const peerDst = PeerId.parse('Qmfoo...') // or Multiaddr instance +const value = await libp2p.fetch(peerDst, key) +``` + +# Install + +```console +$ npm i @libp2p/fetch +``` + +## Browser ` +``` + +# API Docs + +- + +# License + +Licensed under either of + +- Apache 2.0, ([LICENSE-APACHE](LICENSE-APACHE) / ) +- MIT ([LICENSE-MIT](LICENSE-MIT) / ) + +# Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions. diff --git a/packages/protocol-fetch/package.json b/packages/protocol-fetch/package.json new file mode 100644 index 0000000000..472f91cc32 --- /dev/null +++ b/packages/protocol-fetch/package.json @@ -0,0 +1,66 @@ +{ + "name": "@libp2p/fetch", + "version": "0.0.0", + "description": "Implementation of the Fetch Protocol", + "license": "Apache-2.0 OR MIT", + "homepage": "https://github.com/libp2p/js-libp2p/tree/master/packages/protocol-fetch#readme", + "repository": { + "type": "git", + "url": "git+https://github.com/libp2p/js-libp2p.git" + }, + "bugs": { + "url": "https://github.com/libp2p/js-libp2p/issues" + }, + "type": "module", + "types": "./dist/src/index.d.ts", + "files": [ + "src", + "dist", + "!dist/test", + "!**/*.tsbuildinfo" + ], + "exports": { + ".": { + "types": "./dist/src/index.d.ts", + "import": "./dist/src/index.js" + } + }, + "eslintConfig": { + "extends": "ipfs", + "parserOptions": { + "project": true, + "sourceType": "module" + } + }, + "scripts": { + "start": "node dist/src/main.js", + "build": "aegir build", + "test": "aegir test", + "clean": "aegir clean", + "generate": "protons ./src/pb/index.proto", + "lint": "aegir lint", + "test:chrome": "aegir test -t browser --cov", + "test:chrome-webworker": "aegir test -t webworker", + "test:firefox": "aegir test -t browser -- --browser firefox", + "test:firefox-webworker": "aegir test -t webworker -- --browser firefox", + "test:node": "aegir test -t node --cov", + "dep-check": "aegir dep-check" + }, + "dependencies": { + "@libp2p/interface": "^0.1.2", + "@libp2p/interface-internal": "^0.1.5", + "it-protobuf-stream": "^1.0.2", + "protons-runtime": "^5.0.0", + "uint8arraylist": "^2.4.3", + "uint8arrays": "^4.0.6" + }, + "devDependencies": { + "@libp2p/logger": "^3.1.0", + "@libp2p/peer-id-factory": "^3.0.8", + "aegir": "^41.0.2", + "it-pair": "^2.0.6", + "protons": "^7.3.0", + "sinon": "^17.0.0", + "sinon-ts": "^2.0.0" + } +} diff --git a/packages/libp2p/src/fetch/constants.ts b/packages/protocol-fetch/src/constants.ts similarity index 100% rename from packages/libp2p/src/fetch/constants.ts rename to packages/protocol-fetch/src/constants.ts diff --git a/packages/protocol-fetch/src/fetch.ts b/packages/protocol-fetch/src/fetch.ts new file mode 100644 index 0000000000..3d4aec758d --- /dev/null +++ b/packages/protocol-fetch/src/fetch.ts @@ -0,0 +1,236 @@ +import { CodeError, ERR_INVALID_MESSAGE, ERR_INVALID_PARAMETERS, ERR_TIMEOUT } from '@libp2p/interface/errors' +import { setMaxListeners } from '@libp2p/interface/events' +import { pbStream } from 'it-protobuf-stream' +import { fromString as uint8arrayFromString } from 'uint8arrays/from-string' +import { toString as uint8arrayToString } from 'uint8arrays/to-string' +import { PROTOCOL_NAME, PROTOCOL_VERSION } from './constants.js' +import { FetchRequest, FetchResponse } from './pb/proto.js' +import type { Fetch as FetchInterface, FetchComponents, FetchInit, LookupFunction } from './index.js' +import type { AbortOptions, Logger } from '@libp2p/interface' +import type { Stream } from '@libp2p/interface/connection' +import type { PeerId } from '@libp2p/interface/peer-id' +import type { Startable } from '@libp2p/interface/startable' +import type { IncomingStreamData } from '@libp2p/interface-internal/registrar' + +const DEFAULT_TIMEOUT = 10000 + +/** + * A simple libp2p protocol for requesting a value corresponding to a key from a peer. + * Developers can register one or more lookup function for retrieving the value corresponding to + * a given key. Each lookup function must act on a distinct part of the overall key space, defined + * by a fixed prefix that all keys that should be routed to that lookup function will start with. + */ +export class Fetch implements Startable, FetchInterface { + public readonly protocol: string + private readonly components: FetchComponents + private readonly lookupFunctions: Map + private started: boolean + private readonly init: FetchInit + readonly #log: Logger + + constructor (components: FetchComponents, init: FetchInit = {}) { + this.#log = components.logger.forComponent('libp2p:fetch') + this.started = false + this.components = components + this.protocol = `/${init.protocolPrefix ?? 'libp2p'}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}` + this.lookupFunctions = new Map() // Maps key prefix to value lookup function + this.handleMessage = this.handleMessage.bind(this) + this.init = init + } + + async start (): Promise { + await this.components.registrar.handle(this.protocol, (data) => { + void this.handleMessage(data) + .then(async () => { + await data.stream.close() + }) + .catch(err => { + this.#log.error(err) + }) + }, { + maxInboundStreams: this.init.maxInboundStreams, + maxOutboundStreams: this.init.maxOutboundStreams + }) + this.started = true + } + + async stop (): Promise { + await this.components.registrar.unhandle(this.protocol) + this.started = false + } + + isStarted (): boolean { + return this.started + } + + /** + * Sends a request to fetch the value associated with the given key from the given peer + */ + async fetch (peer: PeerId, key: string, options: AbortOptions = {}): Promise { + this.#log('dialing %s to %p', this.protocol, peer) + + const connection = await this.components.connectionManager.openConnection(peer, options) + let signal = options.signal + let stream: Stream | undefined + let onAbort = (): void => {} + + // create a timeout if no abort signal passed + if (signal == null) { + this.#log('using default timeout of %d ms', this.init.timeout) + signal = AbortSignal.timeout(this.init.timeout ?? DEFAULT_TIMEOUT) + + setMaxListeners(Infinity, signal) + } + + try { + stream = await connection.newStream(this.protocol, { + signal + }) + + onAbort = () => { + stream?.abort(new CodeError('fetch timeout', ERR_TIMEOUT)) + } + + // make stream abortable + signal.addEventListener('abort', onAbort, { once: true }) + + this.#log('fetch %s', key) + + const pb = pbStream(stream) + await pb.write({ + identifier: key + }, FetchRequest, options) + + const response = await pb.read(FetchResponse, options) + await pb.unwrap().close(options) + + switch (response.status) { + case (FetchResponse.StatusCode.OK): { + this.#log('received status for %s ok', key) + return response.data + } + case (FetchResponse.StatusCode.NOT_FOUND): { + this.#log('received status for %s not found', key) + return + } + case (FetchResponse.StatusCode.ERROR): { + this.#log('received status for %s error', key) + const errmsg = uint8arrayToString(response.data) + throw new CodeError('Error in fetch protocol response: ' + errmsg, ERR_INVALID_PARAMETERS) + } + default: { + this.#log('received status for %s unknown', key) + throw new CodeError('Unknown response status', ERR_INVALID_MESSAGE) + } + } + } catch (err: any) { + stream?.abort(err) + throw err + } finally { + signal.removeEventListener('abort', onAbort) + if (stream != null) { + await stream.close() + } + } + } + + /** + * Invoked when a fetch request is received. Reads the request message off the given stream and + * responds based on looking up the key in the request via the lookup callback that corresponds + * to the key's prefix. + */ + async handleMessage (data: IncomingStreamData): Promise { + const { stream } = data + const signal = AbortSignal.timeout(this.init.timeout ?? DEFAULT_TIMEOUT) + + try { + const pb = pbStream(stream) + const request = await pb.read(FetchRequest, { + signal + }) + + let response: FetchResponse + const lookup = this._getLookupFunction(request.identifier) + if (lookup != null) { + this.#log('look up data with identifier %s', request.identifier) + const data = await lookup(request.identifier) + if (data != null) { + this.#log('sending status for %s ok', request.identifier) + response = { status: FetchResponse.StatusCode.OK, data } + } else { + this.#log('sending status for %s not found', request.identifier) + response = { status: FetchResponse.StatusCode.NOT_FOUND, data: new Uint8Array(0) } + } + } else { + this.#log('sending status for %s error', request.identifier) + const errmsg = uint8arrayFromString(`No lookup function registered for key: ${request.identifier}`) + response = { status: FetchResponse.StatusCode.ERROR, data: errmsg } + } + + await pb.write(response, FetchResponse, { + signal + }) + + await pb.unwrap().close({ + signal + }) + } catch (err: any) { + this.#log('error answering fetch request', err) + stream.abort(err) + } + } + + /** + * Given a key, finds the appropriate function for looking up its corresponding value, based on + * the key's prefix. + */ + _getLookupFunction (key: string): LookupFunction | undefined { + for (const prefix of this.lookupFunctions.keys()) { + if (key.startsWith(prefix)) { + return this.lookupFunctions.get(prefix) + } + } + } + + /** + * Registers a new lookup callback that can map keys to values, for a given set of keys that + * share the same prefix + * + * @example + * + * ```js + * // ... + * libp2p.fetchService.registerLookupFunction('/prefix', (key) => { ... }) + * ``` + */ + registerLookupFunction (prefix: string, lookup: LookupFunction): void { + if (this.lookupFunctions.has(prefix)) { + throw new CodeError(`Fetch protocol handler for key prefix '${prefix}' already registered`, 'ERR_KEY_ALREADY_EXISTS') + } + + this.lookupFunctions.set(prefix, lookup) + } + + /** + * Registers a new lookup callback that can map keys to values, for a given set of keys that + * share the same prefix. + * + * @example + * + * ```js + * // ... + * libp2p.fetchService.unregisterLookupFunction('/prefix') + * ``` + */ + unregisterLookupFunction (prefix: string, lookup?: LookupFunction): void { + if (lookup != null) { + const existingLookup = this.lookupFunctions.get(prefix) + + if (existingLookup !== lookup) { + return + } + } + + this.lookupFunctions.delete(prefix) + } +} diff --git a/packages/protocol-fetch/src/index.ts b/packages/protocol-fetch/src/index.ts new file mode 100644 index 0000000000..c270bd770c --- /dev/null +++ b/packages/protocol-fetch/src/index.ts @@ -0,0 +1,98 @@ +/** + * @packageDocumentation + * + * An implementation of the Fetch protocol as described here: https://github.com/libp2p/specs/tree/master/fetch + * + * The fetch protocol is a simple protocol for requesting a value corresponding to a key from a peer. + * + * @example + * + * ```typescript + * import { createLibp2p } from 'libp2p' + * import { fetch } from '@libp2p/fetch' + * + * const libp2p = await createLibp2p({ + * services: { + * fetch: fetch() + * } + * }) + * + * // Given a key (as a string) returns a value (as a Uint8Array), or null if the key isn't found. + * // All keys must be prefixed my the same prefix, which will be used to find the appropriate key + * // lookup function. + * async function my_subsystem_key_lookup(key) { + * // app specific callback to lookup key-value pairs. + * } + * + * // Enable this peer to respond to fetch requests for keys that begin with '/my_subsystem_key_prefix/' + * libp2p.fetch.registerLookupFunction('/my_subsystem_key_prefix/', my_subsystem_key_lookup) + * + * const key = '/my_subsystem_key_prefix/{...}' + * const peerDst = PeerId.parse('Qmfoo...') // or Multiaddr instance + * const value = await libp2p.fetch(peerDst, key) + * ``` + */ + +import { Fetch as FetchClass } from './fetch.js' +import type { AbortOptions, ComponentLogger } from '@libp2p/interface' +import type { PeerId } from '@libp2p/interface/peer-id' +import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager' +import type { Registrar } from '@libp2p/interface-internal/registrar' + +export interface FetchInit { + protocolPrefix?: string + maxInboundStreams?: number + maxOutboundStreams?: number + + /** + * How long we should wait for a remote peer to send any data + */ + timeout?: number +} + +export interface LookupFunction { + (key: string): Promise +} + +export interface FetchComponents { + registrar: Registrar + connectionManager: ConnectionManager + logger: ComponentLogger +} + +export interface Fetch { + /** + * Sends a request to fetch the value associated with the given key from the given peer + */ + fetch(peer: PeerId, key: string, options?: AbortOptions): Promise + + /** + * Registers a new lookup callback that can map keys to values, for a given set of keys that + * share the same prefix + * + * @example + * + * ```js + * // ... + * libp2p.fetchService.registerLookupFunction('/prefix', (key) => { ... }) + * ``` + */ + registerLookupFunction(prefix: string, lookup: LookupFunction): void + + /** + * Registers a new lookup callback that can map keys to values, for a given set of keys that + * share the same prefix. + * + * @example + * + * ```js + * // ... + * libp2p.fetchService.unregisterLookupFunction('/prefix') + * ``` + */ + unregisterLookupFunction(prefix: string, lookup?: LookupFunction): void +} + +export function fetch (init: FetchInit = {}): (components: FetchComponents) => Fetch { + return (components) => new FetchClass(components, init) +} diff --git a/packages/libp2p/src/fetch/pb/proto.proto b/packages/protocol-fetch/src/pb/proto.proto similarity index 100% rename from packages/libp2p/src/fetch/pb/proto.proto rename to packages/protocol-fetch/src/pb/proto.proto diff --git a/packages/libp2p/src/fetch/pb/proto.ts b/packages/protocol-fetch/src/pb/proto.ts similarity index 100% rename from packages/libp2p/src/fetch/pb/proto.ts rename to packages/protocol-fetch/src/pb/proto.ts diff --git a/packages/protocol-fetch/test/index.spec.ts b/packages/protocol-fetch/test/index.spec.ts new file mode 100644 index 0000000000..e97aaa07b3 --- /dev/null +++ b/packages/protocol-fetch/test/index.spec.ts @@ -0,0 +1,274 @@ +/* eslint-env mocha */ + +import { ERR_INVALID_PARAMETERS } from '@libp2p/interface/errors' +import { start, stop } from '@libp2p/interface/startable' +import { defaultLogger } from '@libp2p/logger' +import { createEd25519PeerId } from '@libp2p/peer-id-factory' +import { expect } from 'aegir/chai' +import { duplexPair } from 'it-pair/duplex' +import { pbStream } from 'it-protobuf-stream' +import sinon from 'sinon' +import { stubInterface, type StubbedInstance } from 'sinon-ts' +import { Fetch } from '../src/fetch.js' +import { FetchRequest, FetchResponse } from '../src/pb/proto.js' +import type { ComponentLogger } from '@libp2p/interface' +import type { Connection, Stream } from '@libp2p/interface/connection' +import type { PeerId } from '@libp2p/interface/peer-id' +import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager' +import type { Registrar } from '@libp2p/interface-internal/registrar' + +interface StubbedFetchComponents { + registrar: StubbedInstance + connectionManager: StubbedInstance + logger: ComponentLogger +} + +async function createComponents (): Promise { + return { + registrar: stubInterface(), + connectionManager: stubInterface(), + logger: defaultLogger() + } +} + +function createStreams (components: StubbedFetchComponents, remotePeer?: PeerId): { incomingStream: StubbedInstance, outgoingStream: StubbedInstance, connection: StubbedInstance } { + const duplex = duplexPair() + const outgoingStream = stubInterface() + outgoingStream.source = duplex[0].source + outgoingStream.sink.callsFake(async source => duplex[0].sink(source)) + + const incomingStream = stubInterface() + incomingStream.source = duplex[1].source + incomingStream.sink.callsFake(async source => duplex[1].sink(source)) + + const connection = stubInterface() + + if (remotePeer != null) { + connection.newStream.withArgs('/libp2p/fetch/0.0.1').resolves(outgoingStream) + components.connectionManager.openConnection.withArgs(remotePeer).resolves(connection) + } + + return { + incomingStream, + outgoingStream, + connection + } +} + +describe('fetch', () => { + let components: StubbedFetchComponents + let fetch: Fetch + + beforeEach(async () => { + components = await createComponents() + fetch = new Fetch(components) + }) + + afterEach(async () => { + sinon.restore() + + await stop(fetch) + }) + + it('should register for fetch protocol on startup', async () => { + await start(fetch) + + expect(components.registrar.handle.called).to.be.true('handle was not called') + expect(components.registrar.handle.getCall(0).args[0]).to.equal('/libp2p/fetch/0.0.1') + }) + + describe('outgoing', () => { + it('should be able to fetch from another peer', async () => { + const remotePeer = await createEd25519PeerId() + const key = 'key' + const value = Uint8Array.from([0, 1, 2, 3, 4]) + + const { + incomingStream + } = createStreams(components, remotePeer) + + const result = fetch.fetch(remotePeer, key) + + const pb = pbStream(incomingStream) + const request = await pb.read(FetchRequest) + + expect(request.identifier).to.equal(key) + + await pb.write({ + status: FetchResponse.StatusCode.OK, + data: value + }, FetchResponse) + + await expect(result).to.eventually.deep.equal(value) + }) + + it('should be handle NOT_FOUND from the other peer', async () => { + const remotePeer = await createEd25519PeerId() + const key = 'key' + + const { + incomingStream + } = createStreams(components, remotePeer) + + const result = fetch.fetch(remotePeer, key) + + const pb = pbStream(incomingStream) + const request = await pb.read(FetchRequest) + + expect(request.identifier).to.equal(key) + + await pb.write({ + status: FetchResponse.StatusCode.NOT_FOUND + }, FetchResponse) + + await expect(result).to.eventually.be.undefined() + }) + + it('should be handle ERROR from the other peer', async () => { + const remotePeer = await createEd25519PeerId() + const key = 'key' + + const { + incomingStream + } = createStreams(components, remotePeer) + + const result = fetch.fetch(remotePeer, key) + + const pb = pbStream(incomingStream) + const request = await pb.read(FetchRequest) + + expect(request.identifier).to.equal(key) + + await pb.write({ + status: FetchResponse.StatusCode.ERROR + }, FetchResponse) + + await expect(result).to.eventually.be.rejected + .with.property('code', ERR_INVALID_PARAMETERS) + }) + + it('should time out fetching from another peer when waiting for the record', async () => { + const remotePeer = await createEd25519PeerId() + const key = 'key' + + const { + outgoingStream + } = createStreams(components, remotePeer) + + outgoingStream.abort.callsFake((err) => { + void outgoingStream.source.throw(err) + }) + + await expect(fetch.fetch(remotePeer, key, { + signal: AbortSignal.timeout(10) + })).to.eventually.be.rejected + .with.property('code', 'ABORT_ERR') + + expect(outgoingStream.abort.called).to.be.true() + }) + }) + + describe('incoming', () => { + it('should be able to send to another peer', async () => { + const key = '/test/key' + const value = Uint8Array.from([0, 1, 2, 3, 4]) + + const { + incomingStream, + outgoingStream, + connection + } = createStreams(components) + + fetch.registerLookupFunction('/test', async (k) => { + expect(k).to.equal(key) + return value + }) + + void fetch.handleMessage({ + stream: incomingStream, + connection + }) + + const pb = pbStream(outgoingStream) + + await pb.write({ + identifier: key + }, FetchRequest) + + const response = await pb.read(FetchResponse) + expect(response.status).to.equal(FetchResponse.StatusCode.OK) + expect(response.data).to.equalBytes(value) + }) + + it('should handle not having the requested data', async () => { + const key = '/test/key' + + const { + incomingStream, + outgoingStream, + connection + } = createStreams(components) + + fetch.registerLookupFunction('/test', async (k) => { + return undefined + }) + + void fetch.handleMessage({ + stream: incomingStream, + connection + }) + + const pb = pbStream(outgoingStream) + + await pb.write({ + identifier: key + }, FetchRequest) + + const response = await pb.read(FetchResponse) + expect(response.status).to.equal(FetchResponse.StatusCode.NOT_FOUND) + }) + + it('should handle not having a handler for the key', async () => { + const key = '/test/key' + + const { + incomingStream, + outgoingStream, + connection + } = createStreams(components) + + void fetch.handleMessage({ + stream: incomingStream, + connection + }) + + const pb = pbStream(outgoingStream) + + await pb.write({ + identifier: key + }, FetchRequest) + + const response = await pb.read(FetchResponse) + expect(response.status).to.equal(FetchResponse.StatusCode.ERROR) + }) + + it('should time out sending data to another peer waiting for the request', async () => { + fetch = new Fetch(components, { + timeout: 10 + }) + + const { + incomingStream, + connection + } = createStreams(components) + + await fetch.handleMessage({ + stream: incomingStream, + connection + }) + + expect(incomingStream.abort.called).to.be.true() + expect(incomingStream.abort.getCall(0).args[0]).to.have.property('code', 'ABORT_ERR') + }) + }) +}) diff --git a/packages/protocol-fetch/tsconfig.json b/packages/protocol-fetch/tsconfig.json new file mode 100644 index 0000000000..c144a96186 --- /dev/null +++ b/packages/protocol-fetch/tsconfig.json @@ -0,0 +1,24 @@ +{ + "extends": "aegir/src/config/tsconfig.aegir.json", + "compilerOptions": { + "outDir": "dist" + }, + "include": [ + "src", + "test" + ], + "references": [ + { + "path": "../interface" + }, + { + "path": "../interface-internal" + }, + { + "path": "../logger" + }, + { + "path": "../peer-id-factory" + } + ] +} diff --git a/packages/protocol-fetch/typedoc.json b/packages/protocol-fetch/typedoc.json new file mode 100644 index 0000000000..f599dc728d --- /dev/null +++ b/packages/protocol-fetch/typedoc.json @@ -0,0 +1,5 @@ +{ + "entryPoints": [ + "./src/index.ts" + ] +}