diff --git a/cmd/lassie/daemon_test.go b/cmd/lassie/daemon_test.go index 1223ca5b..d4660357 100644 --- a/cmd/lassie/daemon_test.go +++ b/cmd/lassie/daemon_test.go @@ -28,7 +28,7 @@ func TestDaemonCommandFlags(t *testing.T) { args: []string{"daemon"}, assert: func(ctx context.Context, lCfg *l.LassieConfig, hCfg h.HttpServerConfig, erCfg *a.EventRecorderConfig) error { // lassie config - require.Equal(t, nil, lCfg.Finder) + require.Equal(t, nil, lCfg.Source) require.NotNil(t, lCfg.Host, "host should not be nil") require.Equal(t, 20*time.Second, lCfg.ProviderTimeout) require.Equal(t, uint(0), lCfg.ConcurrentSPRetrievals) @@ -156,7 +156,7 @@ func TestDaemonCommandFlags(t *testing.T) { name: "with ipni endpoint", args: []string{"daemon", "--ipni-endpoint", "https://cid.contact"}, assert: func(ctx context.Context, lCfg *l.LassieConfig, hCfg h.HttpServerConfig, erCfg *a.EventRecorderConfig) error { - require.IsType(t, &indexerlookup.IndexerCandidateFinder{}, lCfg.Finder, "finder should be an IndexerCandidateFinder when providing an ipni endpoint") + require.IsType(t, &indexerlookup.IndexerCandidateSource{}, lCfg.Source, "finder should be an IndexerCandidateSource when providing an ipni endpoint") return nil }, }, diff --git a/cmd/lassie/fetch.go b/cmd/lassie/fetch.go index 992f7ddd..7e935f95 100644 --- a/cmd/lassie/fetch.go +++ b/cmd/lassie/fetch.go @@ -252,7 +252,7 @@ func (pp *progressPrinter) subscriber(event types.RetrievalEvent) { case events.CandidatesFoundEvent: pp.candidatesFound = len(ret.Candidates()) case events.CandidatesFilteredEvent: - if len(fetchProviderAddrInfos) == 0 { + if len(fetchProviders) == 0 { fmt.Fprintf(pp.writer, "Found %d storage provider candidate(s) in the indexer:\n", pp.candidatesFound) } else { fmt.Fprintf(pp.writer, "Using the specified storage provider(s):\n") @@ -327,7 +327,7 @@ func defaultFetchRun( if printPath != "" { printPath = "/" + printPath } - if len(fetchProviderAddrInfos) == 0 { + if len(fetchProviders) == 0 { fmt.Fprintf(msgWriter, "Fetching %s", rootCid.String()+printPath) } else { fmt.Fprintf(msgWriter, "Fetching %s from specified provider(s)", rootCid.String()+printPath) diff --git a/cmd/lassie/fetch_test.go b/cmd/lassie/fetch_test.go index c5c1cbcb..b5d2616e 100644 --- a/cmd/lassie/fetch_test.go +++ b/cmd/lassie/fetch_test.go @@ -42,7 +42,7 @@ func TestFetchCommandFlags(t *testing.T) { require.Equal(t, "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4.car", outfile) // lassie config - require.Equal(t, nil, lCfg.Finder) + require.Equal(t, nil, lCfg.Source) require.NotNil(t, lCfg.Host, "host should not be nil") require.Equal(t, 20*time.Second, lCfg.ProviderTimeout) require.Equal(t, uint(0), lCfg.ConcurrentSPRetrievals) @@ -194,7 +194,7 @@ func TestFetchCommandFlags(t *testing.T) { "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4", }, assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path datamodel.Path, dagScope trustlessutils.DagScope, entityBytes *trustlessutils.ByteRange, duplicates bool, tempDir string, progress bool, outfile string) error { - require.IsType(t, &retriever.DirectCandidateFinder{}, lCfg.Finder, "finder should be a DirectCandidateFinder when providers are specified") + require.IsType(t, &retriever.DirectCandidateSource{}, lCfg.Source, "finder should be a DirectCandidateSource when providers are specified") return nil }, }, @@ -207,7 +207,7 @@ func TestFetchCommandFlags(t *testing.T) { "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4", }, assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path datamodel.Path, dagScope trustlessutils.DagScope, entityBytes *trustlessutils.ByteRange, duplicates bool, tempDir string, progress bool, outfile string) error { - require.IsType(t, &indexerlookup.IndexerCandidateFinder{}, lCfg.Finder, "finder should be an IndexerCandidateFinder when providing an ipni endpoint") + require.IsType(t, &indexerlookup.IndexerCandidateSource{}, lCfg.Source, "finder should be an IndexerCandidateSource when providing an ipni endpoint") return nil }, }, diff --git a/cmd/lassie/flags.go b/cmd/lassie/flags.go index fbe8a14f..cca17ab5 100644 --- a/cmd/lassie/flags.go +++ b/cmd/lassie/flags.go @@ -117,7 +117,7 @@ var FlagExcludeProviders = &cli.StringFlag{ }, } -var fetchProviderAddrInfos []peer.AddrInfo +var fetchProviders []types.Provider var FlagAllowProviders = &cli.StringFlag{ Name: "providers", @@ -143,7 +143,7 @@ var FlagAllowProviders = &cli.StringFlag{ if err != nil { return err } - fetchProviderAddrInfos, err = types.ParseProviderStrings(strings.Join(trans, ",")) + fetchProviders, err = types.ParseProviderStrings(strings.Join(trans, ",")) return err }, } @@ -214,7 +214,7 @@ var FlagIPNIEndpoint = &cli.StringFlag{ func ResetGlobalFlags() { // Reset global variables here so that they are not used // in subsequent calls to commands during testing. - fetchProviderAddrInfos = make([]peer.AddrInfo, 0) + fetchProviders = make([]types.Provider, 0) protocols = make([]multicodec.Code, 0) providerBlockList = make(map[peer.ID]bool) } diff --git a/cmd/lassie/main.go b/cmd/lassie/main.go index 1c89d52b..2bda11c2 100644 --- a/cmd/lassie/main.go +++ b/cmd/lassie/main.go @@ -90,8 +90,8 @@ func buildLassieConfigFromCLIContext(cctx *cli.Context, lassieOpts []lassie.Lass } lassieOpts = append(lassieOpts, lassie.WithHost(host)) - if len(fetchProviderAddrInfos) > 0 { - finderOpt := lassie.WithFinder(retriever.NewDirectCandidateFinder(host, fetchProviderAddrInfos)) + if len(fetchProviders) > 0 { + finderOpt := lassie.WithCandidateSource(retriever.NewDirectCandidateSource(fetchProviders, retriever.WithLibp2pCandidateDiscovery(host))) if cctx.IsSet("ipni-endpoint") { logger.Warn("Ignoring ipni-endpoint flag since direct provider is specified") } @@ -103,12 +103,12 @@ func buildLassieConfigFromCLIContext(cctx *cli.Context, lassieOpts []lassie.Lass logger.Errorw("Failed to parse IPNI endpoint as URL", "err", err) return nil, fmt.Errorf("cannot parse given IPNI endpoint %s as valid URL: %w", endpoint, err) } - finder, err := indexerlookup.NewCandidateFinder(indexerlookup.WithHttpEndpoint(endpointUrl)) + finder, err := indexerlookup.NewCandidateSource(indexerlookup.WithHttpEndpoint(endpointUrl)) if err != nil { logger.Errorw("Failed to instantiate IPNI candidate finder", "err", err) return nil, err } - lassieOpts = append(lassieOpts, lassie.WithFinder(finder)) + lassieOpts = append(lassieOpts, lassie.WithCandidateSource(finder)) logger.Debug("Using explicit IPNI endpoint to find candidates", "endpoint", endpoint) } diff --git a/go.mod b/go.mod index f497c27b..9c4bd7b9 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/filecoin-project/go-state-types v0.10.0 github.com/google/uuid v1.3.0 github.com/hannahhoward/go-pubsub v1.0.0 - github.com/ipfs/boxo v0.15.0 + github.com/ipfs/boxo v0.15.1-0.20240125013539-09ff20c5abb6 github.com/ipfs/go-block-format v0.2.0 github.com/ipfs/go-cid v0.4.1 github.com/ipfs/go-datastore v0.6.0 diff --git a/go.sum b/go.sum index 64ff7947..351ce672 100644 --- a/go.sum +++ b/go.sum @@ -234,8 +234,8 @@ github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFck github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs= github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0= -github.com/ipfs/boxo v0.15.0 h1:BriLydj2nlK1nKeJQHxcKSuG5ZXcoutzhBklOtxC5pk= -github.com/ipfs/boxo v0.15.0/go.mod h1:X5ulcbR5Nh7sm3Db8+08AApUo6FsGC5mb23QDKAoB/M= +github.com/ipfs/boxo v0.15.1-0.20240125013539-09ff20c5abb6 h1:1dInSpfeUaxSxDLZoYUcQSAzYOekGGhNpUZIOIe4kvI= +github.com/ipfs/boxo v0.15.1-0.20240125013539-09ff20c5abb6/go.mod h1:X5ulcbR5Nh7sm3Db8+08AApUo6FsGC5mb23QDKAoB/M= github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA= github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU= github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY= diff --git a/pkg/indexerlookup/candidatefinder.go b/pkg/indexerlookup/candidatesource.go similarity index 58% rename from pkg/indexerlookup/candidatefinder.go rename to pkg/indexerlookup/candidatesource.go index 907513d0..b47679f2 100644 --- a/pkg/indexerlookup/candidatefinder.go +++ b/pkg/indexerlookup/candidatesource.go @@ -2,16 +2,13 @@ package indexerlookup import ( "bufio" - "bytes" "context" "encoding/json" "errors" "fmt" "io" - "math/rand" "net/http" - "github.com/filecoin-project/lassie/pkg/retriever" "github.com/filecoin-project/lassie/pkg/types" "github.com/ipfs/go-cid" "github.com/ipfs/go-log/v2" @@ -21,84 +18,25 @@ import ( ) var ( - _ retriever.CandidateFinder = (*IndexerCandidateFinder)(nil) + _ types.CandidateSource = (*IndexerCandidateSource)(nil) logger = log.Logger("lassie/indexerlookup") ) -type IndexerCandidateFinder struct { +type IndexerCandidateSource struct { *options } -func NewCandidateFinder(o ...Option) (*IndexerCandidateFinder, error) { +func NewCandidateSource(o ...Option) (*IndexerCandidateSource, error) { opts, err := newOptions(o...) if err != nil { return nil, err } - return &IndexerCandidateFinder{ + return &IndexerCandidateSource{ options: opts, }, nil } -func (idxf *IndexerCandidateFinder) sendJsonRequest(req *http.Request) (*model.FindResponse, error) { - req.Header.Set("Accept", "application/json") - logger.Debugw("sending outgoing request", "url", req.URL, "accept", req.Header.Get("Accept")) - resp, err := idxf.httpClient.Do(req) - if err != nil { - logger.Debugw("Failed to perform json lookup", "err", err) - return nil, err - } - switch resp.StatusCode { - case http.StatusOK: - defer resp.Body.Close() - b, err := io.ReadAll(resp.Body) - if err != nil { - logger.Debugw("Failed to read response JSON response body", "err", err) - return nil, err - } - return model.UnmarshalFindResponse(b) - case http.StatusNotFound: - return &model.FindResponse{}, nil - default: - return nil, fmt.Errorf("batch find query failed: %s", http.StatusText(resp.StatusCode)) - } -} - -func (idxf *IndexerCandidateFinder) FindCandidates(ctx context.Context, cid cid.Cid) ([]types.RetrievalCandidate, error) { - req, err := idxf.newFindHttpRequest(ctx, cid) - if err != nil { - return nil, err - } - parsedResp, err := idxf.sendJsonRequest(req) - if err != nil { - return nil, err - } - // turn parsedResp into records. - var matches []types.RetrievalCandidate - - indices := rand.Perm(len(parsedResp.MultihashResults)) - for _, i := range indices { - multihashResult := parsedResp.MultihashResults[i] - if !bytes.Equal(cid.Hash(), multihashResult.Multihash) { - continue - } - for _, val := range multihashResult.ProviderResults { - // skip results without decodable metadata - if md, err := decodeMetadata(val); err == nil { - candidate := types.RetrievalCandidate{ - RootCid: cid, - Metadata: md, - } - if val.Provider != nil { - candidate.MinerPeer = *val.Provider - } - matches = append(matches, candidate) - } - } - } - return matches, nil -} - func decodeMetadata(pr model.ProviderResult) (metadata.Metadata, error) { if len(pr.Metadata) == 0 { return metadata.Metadata{}, errors.New("no metadata") @@ -114,7 +52,7 @@ func decodeMetadata(pr model.ProviderResult) (metadata.Metadata, error) { return dtm, nil } -func (idxf *IndexerCandidateFinder) FindCandidatesAsync(ctx context.Context, c cid.Cid, cb func(types.RetrievalCandidate)) error { +func (idxf *IndexerCandidateSource) FindCandidates(ctx context.Context, c cid.Cid, cb func(types.RetrievalCandidate)) error { req, err := idxf.newFindHttpRequest(ctx, c) if err != nil { return err @@ -136,7 +74,7 @@ func (idxf *IndexerCandidateFinder) FindCandidatesAsync(ctx context.Context, c c } } -func (idxf *IndexerCandidateFinder) newFindHttpRequest(ctx context.Context, c cid.Cid) (*http.Request, error) { +func (idxf *IndexerCandidateSource) newFindHttpRequest(ctx context.Context, c cid.Cid) (*http.Request, error) { endpoint := idxf.findByMultihashEndpoint(c.Hash()) req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil) if err != nil { @@ -158,7 +96,7 @@ func (idxf *IndexerCandidateFinder) newFindHttpRequest(ctx context.Context, c ci return req, nil } -func (idxf *IndexerCandidateFinder) decodeProviderResultStream(ctx context.Context, c cid.Cid, from io.ReadCloser, cb func(types.RetrievalCandidate)) error { +func (idxf *IndexerCandidateSource) decodeProviderResultStream(ctx context.Context, c cid.Cid, from io.ReadCloser, cb func(types.RetrievalCandidate)) error { defer from.Close() scanner := bufio.NewScanner(from) for { @@ -195,6 +133,6 @@ func (idxf *IndexerCandidateFinder) decodeProviderResultStream(ctx context.Conte } } -func (idxf *IndexerCandidateFinder) findByMultihashEndpoint(mh multihash.Multihash) string { +func (idxf *IndexerCandidateSource) findByMultihashEndpoint(mh multihash.Multihash) string { return idxf.httpEndpoint.JoinPath("multihash", mh.B58String()).String() } diff --git a/pkg/indexerlookup/candidatefinder_test.go b/pkg/indexerlookup/candidatesource_test.go similarity index 86% rename from pkg/indexerlookup/candidatefinder_test.go rename to pkg/indexerlookup/candidatesource_test.go index bfbe4142..f557136f 100644 --- a/pkg/indexerlookup/candidatefinder_test.go +++ b/pkg/indexerlookup/candidatesource_test.go @@ -16,7 +16,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestCandidateFinder(t *testing.T) { +func TestCandidateSource(t *testing.T) { cids := make([]cid.Cid, 0, 10) candidates := make(map[cid.Cid][]types.RetrievalCandidate, 10) binaryMetadata := make(map[cid.Cid][][]byte, 10) @@ -95,25 +95,13 @@ func TestCandidateFinder(t *testing.T) { }() indexerURL, err := url.Parse("http://" + mockIndexer.Addr()) req.NoError(err) - candidateFinder, err := indexerlookup.NewCandidateFinder(indexerlookup.WithHttpEndpoint(indexerURL)) + candidateSource, err := indexerlookup.NewCandidateSource(indexerlookup.WithHttpEndpoint(indexerURL)) req.NoError(err) for cid, expectedReturns := range testCase.expectedReturns { - syncCandidates, err := candidateFinder.FindCandidates(ctx, cid) - req.NoError(err) - if syncCandidates == nil { - syncCandidates = []types.RetrievalCandidate{} - } - req.Equal(expectedReturns, syncCandidates) - select { - case <-ctx.Done(): - req.FailNow("cancelled") - case recv := <-connectCh: - req.Regexp("^/multihash/"+cid.Hash().B58String(), recv) - } gatheredCandidates := []types.RetrievalCandidate{} asyncCandidatesErr := make(chan error, 1) go func() { - asyncCandidatesErr <- candidateFinder.FindCandidatesAsync(ctx, cid, func(candidate types.RetrievalCandidate) { + asyncCandidatesErr <- candidateSource.FindCandidates(ctx, cid, func(candidate types.RetrievalCandidate) { gatheredCandidates = append(gatheredCandidates, candidate) }) }() diff --git a/pkg/indexerlookup/options.go b/pkg/indexerlookup/options.go index 392e864e..8f72c98e 100644 --- a/pkg/indexerlookup/options.go +++ b/pkg/indexerlookup/options.go @@ -85,7 +85,7 @@ func WithHttpUserAgent(a string) Option { } } -// WithAsyncResultsChanBuffer sets the channel buffer returned by IndexerCandidateFinder.FindCandidatesAsync. +// WithAsyncResultsChanBuffer sets the channel buffer returned by IndexerCandidateSource.FindCandidates. // Defaults to 1 if unspecified. func WithAsyncResultsChanBuffer(i int) Option { return func(o *options) error { diff --git a/pkg/internal/itest/direct_fetch_test.go b/pkg/internal/itest/direct_fetch_test.go index 6e0a22a3..0cd94580 100644 --- a/pkg/internal/itest/direct_fetch_test.go +++ b/pkg/internal/itest/direct_fetch_test.go @@ -32,7 +32,8 @@ import ( const ( bitswapDirect = 0 graphsyncDirect = 1 - transportsDirect = 2 + httpDirect = 2 + transportsDirect = 3 ) func TestDirectFetch(t *testing.T) { @@ -48,6 +49,10 @@ func TestDirectFetch(t *testing.T) { name: "direct graphsync peer", directPeer: graphsyncDirect, }, + { + name: "direct http peer", + directPeer: graphsyncDirect, + }, { name: "peer responding on transports protocol", directPeer: transportsDirect, @@ -67,6 +72,7 @@ func TestDirectFetch(t *testing.T) { mrn := mocknet.NewMockRetrievalNet(ctx, t) mrn.AddGraphsyncPeers(1) mrn.AddBitswapPeers(1) + mrn.AddHttpPeers(1) // generate separate 4MiB random unixfs file DAGs on both peers @@ -78,10 +84,16 @@ func TestDirectFetch(t *testing.T) { // bitswap peer (1) srcData2 := unixfs.GenerateFile(t, mrn.Remotes[1].LinkSystem, bytes.NewReader(srcData1.Content), 4<<20) - req.Equal(srcData2.Root, srcData2.Root) + req.Equal(srcData2.Root, srcData1.Root) bitswapMAs, err := peer.AddrInfoToP2pAddrs(mrn.Remotes[1].AddrInfo()) req.NoError(err) + // http peer (1) + srcData3 := unixfs.GenerateFile(t, mrn.Remotes[2].LinkSystem, bytes.NewReader(srcData1.Content), 4<<20) + req.Equal(srcData3.Root, srcData1.Root) + httpMAs, err := peer.AddrInfoToP2pAddrs(mrn.Remotes[2].AddrInfo()) + req.NoError(err) + transportsAddr, clear := handleTransports(t, mrn.MN, []lp2ptransports.Protocol{ { Name: "bitswap", @@ -91,6 +103,10 @@ func TestDirectFetch(t *testing.T) { Name: "libp2p", Addresses: graphsyncMAs, }, + { + Name: "http", + Addresses: httpMAs, + }, }) req.NoError(mrn.MN.LinkAll()) defer clear() @@ -101,14 +117,16 @@ func TestDirectFetch(t *testing.T) { addr = *mrn.Remotes[0].AddrInfo() case bitswapDirect: addr = *mrn.Remotes[1].AddrInfo() + case httpDirect: + addr = *mrn.Remotes[2].AddrInfo() case transportsDirect: addr = transportsAddr default: req.FailNow("unrecognized direct peer test") } - directFinder := retriever.NewDirectCandidateFinder(mrn.Self, []peer.AddrInfo{addr}) - lassie, err := lassie.NewLassie(ctx, lassie.WithFinder(directFinder), lassie.WithHost(mrn.Self), lassie.WithGlobalTimeout(5*time.Second)) + directFinder := retriever.NewDirectCandidateSource([]types.Provider{{Peer: addr, Protocols: nil}}, retriever.WithLibp2pCandidateDiscovery(mrn.Self)) + lassie, err := lassie.NewLassie(ctx, lassie.WithCandidateSource(directFinder), lassie.WithHost(mrn.Self), lassie.WithGlobalTimeout(5*time.Second)) req.NoError(err) outFile, err := os.CreateTemp(t.TempDir(), "lassie-test-") req.NoError(err) diff --git a/pkg/internal/itest/http_fetch_test.go b/pkg/internal/itest/http_fetch_test.go index ba0f6b83..136a64fb 100644 --- a/pkg/internal/itest/http_fetch_test.go +++ b/pkg/internal/itest/http_fetch_test.go @@ -24,6 +24,7 @@ import ( "github.com/filecoin-project/lassie/pkg/lassie" "github.com/filecoin-project/lassie/pkg/retriever" httpserver "github.com/filecoin-project/lassie/pkg/server/http" + "github.com/filecoin-project/lassie/pkg/types" "github.com/google/uuid" "github.com/ipfs/go-cid" unixfs "github.com/ipfs/go-unixfsnode/testutil" @@ -433,7 +434,7 @@ func TestHttpFetch(t *testing.T) { }, { // A very contrived example - we spread the content generated for this test across 4 peers, - // then we also make sure the root is in all of them, so the CandidateFinder will return them + // then we also make sure the root is in all of them, so the CandidateSource will return them // all. The retriever should then form a swarm of 4 peers and fetch the content from across // the set. name: "bitswap, nested large sharded directory, spread across multiple peers, with path, dag-scope entity", @@ -585,7 +586,7 @@ func TestHttpFetch(t *testing.T) { return []unixfs.DirEntry{fileEntry} }, lassieOpts: func(t *testing.T, mrn *mocknet.MockRetrievalNet) []lassie.LassieOption { - return []lassie.LassieOption{lassie.WithFinder(retriever.NewDirectCandidateFinder(mrn.Self, []peer.AddrInfo{*mrn.Remotes[0].AddrInfo()}))} + return []lassie.LassieOption{lassie.WithCandidateSource(retriever.NewDirectCandidateSource([]types.Provider{{Peer: *mrn.Remotes[0].AddrInfo(), Protocols: nil}}, retriever.WithLibp2pCandidateDiscovery(mrn.Self)))} }, }, { @@ -616,7 +617,7 @@ func TestHttpFetch(t *testing.T) { return []unixfs.DirEntry{fileEntry} }, lassieOpts: func(t *testing.T, mrn *mocknet.MockRetrievalNet) []lassie.LassieOption { - return []lassie.LassieOption{lassie.WithFinder(retriever.NewDirectCandidateFinder(mrn.Self, []peer.AddrInfo{*mrn.Remotes[0].AddrInfo()}))} + return []lassie.LassieOption{lassie.WithCandidateSource(retriever.NewDirectCandidateSource([]types.Provider{{Peer: *mrn.Remotes[0].AddrInfo(), Protocols: nil}}, retriever.WithLibp2pCandidateDiscovery(mrn.Self)))} }, }, { @@ -899,7 +900,7 @@ func TestHttpFetch(t *testing.T) { opts := append([]lassie.LassieOption{ lassie.WithProviderTimeout(20 * time.Second), lassie.WithHost(mrn.Self), - lassie.WithFinder(mrn.Finder), + lassie.WithCandidateSource(mrn.Source), }, customOpts...) if testCase.disableGraphsync { opts = append(opts, lassie.WithProtocols([]multicodec.Code{multicodec.TransportBitswap, multicodec.TransportIpfsGatewayHttp})) diff --git a/pkg/internal/itest/mocknet/mocknet.go b/pkg/internal/itest/mocknet/mocknet.go index fa35a9c6..fb4192a2 100644 --- a/pkg/internal/itest/mocknet/mocknet.go +++ b/pkg/internal/itest/mocknet/mocknet.go @@ -10,7 +10,6 @@ import ( datatransfer "github.com/filecoin-project/go-data-transfer/v2" retrievaltypes "github.com/filecoin-project/go-retrieval-types" "github.com/filecoin-project/lassie/pkg/internal/itest/testpeer" - "github.com/filecoin-project/lassie/pkg/retriever" "github.com/filecoin-project/lassie/pkg/types" bsnet "github.com/ipfs/boxo/bitswap/network" bssrv "github.com/ipfs/boxo/bitswap/server" @@ -38,7 +37,7 @@ type MockRetrievalNet struct { MN lpmock.Mocknet Self host.Host Remotes []testpeer.TestPeer - Finder retriever.CandidateFinder + Source types.CandidateSource } func NewMockRetrievalNet(ctx context.Context, t *testing.T) *MockRetrievalNet { @@ -49,7 +48,7 @@ func NewMockRetrievalNet(ctx context.Context, t *testing.T) *MockRetrievalNet { RemoteEvents: make([][]datatransfer.Event, 0), FinishedChan: make([]chan struct{}, 0), } - mrn.Finder = &mockCandidateFinder{mrn} + mrn.Source = &mockCandidateSource{mrn} mrn.t.Cleanup(func() { require.NoError(mrn.t, mrn.TearDown()) }) @@ -142,11 +141,11 @@ func (mrn *MockRetrievalNet) TearDown() error { return mrn.MN.Close() } -type mockCandidateFinder struct { +type mockCandidateSource struct { mrn *MockRetrievalNet } -func (mcf *mockCandidateFinder) FindCandidates(ctx context.Context, cid cid.Cid) ([]types.RetrievalCandidate, error) { +func (mcf *mockCandidateSource) findCandidates(ctx context.Context, cid cid.Cid) ([]types.RetrievalCandidate, error) { candidates := make([]types.RetrievalCandidate, 0) for _, h := range mcf.mrn.Remotes { if _, has := h.Cids[cid]; has { @@ -165,8 +164,8 @@ func (mcf *mockCandidateFinder) FindCandidates(ctx context.Context, cid cid.Cid) return candidates, nil } -func (mcf *mockCandidateFinder) FindCandidatesAsync(ctx context.Context, cid cid.Cid, cb func(types.RetrievalCandidate)) error { - cand, _ := mcf.FindCandidates(ctx, cid) +func (mcf *mockCandidateSource) FindCandidates(ctx context.Context, cid cid.Cid, cb func(types.RetrievalCandidate)) error { + cand, _ := mcf.findCandidates(ctx, cid) for _, c := range cand { select { case <-ctx.Done(): diff --git a/pkg/internal/itest/testpeer/generator.go b/pkg/internal/itest/testpeer/generator.go index ba77c6dd..d4a0350d 100644 --- a/pkg/internal/itest/testpeer/generator.go +++ b/pkg/internal/itest/testpeer/generator.go @@ -318,7 +318,7 @@ func newTestPeer( } wo := tp.LinkSystem.StorageWriteOpener - // track CIDs put into this store so we can serve via the CandidateFinder + // track CIDs put into this store so we can serve via the CandidateSource tp.LinkSystem.StorageWriteOpener = func(lnkCtx linking.LinkContext) (io.Writer, linking.BlockWriteCommitter, error) { w, c, err := wo(lnkCtx) if err != nil { diff --git a/pkg/internal/itest/trustless_fetch_test.go b/pkg/internal/itest/trustless_fetch_test.go index 03266acd..f8a485e8 100644 --- a/pkg/internal/itest/trustless_fetch_test.go +++ b/pkg/internal/itest/trustless_fetch_test.go @@ -66,7 +66,7 @@ func TestTrustlessUnixfsFetch(t *testing.T) { ctx, lassie.WithProviderTimeout(20*time.Second), lassie.WithHost(mrn.Self), - lassie.WithFinder(mrn.Finder), + lassie.WithCandidateSource(mrn.Source), ) req.NoError(err) cfg := httpserver.HttpServerConfig{Address: "127.0.0.1", Port: 0, TempDir: t.TempDir()} diff --git a/pkg/internal/testutil/gen.go b/pkg/internal/testutil/gen.go index 7ad69b4c..3a26593e 100644 --- a/pkg/internal/testutil/gen.go +++ b/pkg/internal/testutil/gen.go @@ -112,13 +112,13 @@ func GenerateRetrievalCandidatesForCID(t *testing.T, n int, c cid.Cid, protocols protocols = []metadata.Protocol{&metadata.Bitswap{}} } for i := 0; i < n; i++ { - addrs := []multiaddr.Multiaddr{GenerateMultiaddr()} + addrs := []multiaddr.Multiaddr{GenerateHTTPMultiAddr()} candidates = append(candidates, types.NewRetrievalCandidate(peers[i], addrs, c, protocols...)) } return candidates } -func GenerateMultiaddr() multiaddr.Multiaddr { +func GenerateMultiAddr() multiaddr.Multiaddr { // generate a random ipv4 address addr := &net.TCPAddr{IP: net.IPv4(byte(rand.Intn(255)), byte(rand.Intn(255)), byte(rand.Intn(255)), byte(rand.Intn(255))), Port: rand.Intn(65535)} maddr, err := manet.FromIP(addr.IP) @@ -129,7 +129,11 @@ func GenerateMultiaddr() multiaddr.Multiaddr { if err != nil { panic(err) } - maddr = multiaddr.Join(maddr, port) + return multiaddr.Join(maddr, port) +} + +func GenerateHTTPMultiAddr() multiaddr.Multiaddr { + maddr := GenerateMultiAddr() scheme, err := multiaddr.NewComponent("http", "") if err != nil { panic(err) diff --git a/pkg/internal/testutil/mockcandidatefinder.go b/pkg/internal/testutil/mockcandidatefinder.go index 07fdd9fa..82163de5 100644 --- a/pkg/internal/testutil/mockcandidatefinder.go +++ b/pkg/internal/testutil/mockcandidatefinder.go @@ -15,21 +15,21 @@ type DiscoveredCandidate struct { Candidate types.RetrievalCandidate } -type MockCandidateFinder struct { +type MockCandidateSource struct { err error candidates map[cid.Cid][]types.RetrievalCandidate discoveredCandidates chan DiscoveredCandidate } -func NewMockCandidateFinder(err error, candidates map[cid.Cid][]types.RetrievalCandidate) *MockCandidateFinder { - return &MockCandidateFinder{ +func NewMockCandidateSource(err error, candidates map[cid.Cid][]types.RetrievalCandidate) *MockCandidateSource { + return &MockCandidateSource{ err: err, candidates: candidates, discoveredCandidates: make(chan DiscoveredCandidate, 16), } } -func (me *MockCandidateFinder) VerifyCandidatesDiscovered(ctx context.Context, t *testing.T, afterStart time.Duration, expectedCandidatesDiscovered []DiscoveredCandidate) { +func (me *MockCandidateSource) VerifyCandidatesDiscovered(ctx context.Context, t *testing.T, afterStart time.Duration, expectedCandidatesDiscovered []DiscoveredCandidate) { candidatesDiscovered := make([]DiscoveredCandidate, 0, len(expectedCandidatesDiscovered)) for i := 0; i < len(expectedCandidatesDiscovered); i++ { select { @@ -42,8 +42,8 @@ func (me *MockCandidateFinder) VerifyCandidatesDiscovered(ctx context.Context, t require.ElementsMatch(t, expectedCandidatesDiscovered, candidatesDiscovered) } -func (me *MockCandidateFinder) FindCandidatesAsync(ctx context.Context, c cid.Cid, cb func(types.RetrievalCandidate)) error { - rs, err := me.FindCandidates(ctx, c) +func (me *MockCandidateSource) FindCandidates(ctx context.Context, c cid.Cid, cb func(types.RetrievalCandidate)) error { + rs, err := me.findCandidates(ctx, c) if err != nil { return err } @@ -63,7 +63,7 @@ func (me *MockCandidateFinder) FindCandidatesAsync(ctx context.Context, c cid.Ci return nil } -func (me *MockCandidateFinder) FindCandidates(ctx context.Context, cid cid.Cid) ([]types.RetrievalCandidate, error) { +func (me *MockCandidateSource) findCandidates(ctx context.Context, cid cid.Cid) ([]types.RetrievalCandidate, error) { if me.err != nil { return nil, me.err } diff --git a/pkg/internal/testutil/verifier.go b/pkg/internal/testutil/verifier.go index 256bc6fa..e79fef11 100644 --- a/pkg/internal/testutil/verifier.go +++ b/pkg/internal/testutil/verifier.go @@ -53,7 +53,7 @@ func (rv RetrievalVerifier) RunWithVerification( t *testing.T, clock *clock.Mock, client VerifierClient, - mockCandidateFinder *MockCandidateFinder, + mockCandidateSource *MockCandidateSource, mockSession *MockSession, cancelFunc context.CancelFunc, cancelAfter time.Duration, @@ -83,8 +83,8 @@ func (rv RetrievalVerifier) RunWithVerification( if mockSession != nil { mockSession.VerifyMetricsAt(ctx, t, expectedActionsAtTime.AfterStart, expectedActionsAtTime.ExpectedMetrics) } - if mockCandidateFinder != nil { - mockCandidateFinder.VerifyCandidatesDiscovered(ctx, t, expectedActionsAtTime.AfterStart, expectedActionsAtTime.CandidatesDiscovered) + if mockCandidateSource != nil { + mockCandidateSource.VerifyCandidatesDiscovered(ctx, t, expectedActionsAtTime.AfterStart, expectedActionsAtTime.CandidatesDiscovered) } if client != nil { client.VerifyConnectionsReceived(ctx, t, expectedActionsAtTime.AfterStart, expectedActionsAtTime.ReceivedConnections) diff --git a/pkg/lassie/lassie.go b/pkg/lassie/lassie.go index 858c3be4..654fa052 100644 --- a/pkg/lassie/lassie.go +++ b/pkg/lassie/lassie.go @@ -32,7 +32,7 @@ type Lassie struct { // LassieConfig customizes the behavior of a Lassie instance. type LassieConfig struct { - Finder retriever.CandidateFinder + Source types.CandidateSource Host host.Host ProviderTimeout time.Duration ConcurrentSPRetrievals uint @@ -65,9 +65,9 @@ func NewLassieConfig(opts ...LassieOption) *LassieConfig { // NewLassieWithConfig creates a new Lassie instance with a custom // configuration. func NewLassieWithConfig(ctx context.Context, cfg *LassieConfig) (*Lassie, error) { - if cfg.Finder == nil { + if cfg.Source == nil { var err error - cfg.Finder, err = indexerlookup.NewCandidateFinder(indexerlookup.WithHttpClient(&http.Client{})) + cfg.Source, err = indexerlookup.NewCandidateSource(indexerlookup.WithHttpClient(&http.Client{})) if err != nil { return nil, err } @@ -130,7 +130,7 @@ func NewLassieWithConfig(ctx context.Context, cfg *LassieConfig) (*Lassie, error } } - retriever, err := retriever.NewRetriever(ctx, session, cfg.Finder, protocolRetrievers) + retriever, err := retriever.NewRetriever(ctx, session, cfg.Source, protocolRetrievers) if err != nil { return nil, err } @@ -144,10 +144,10 @@ func NewLassieWithConfig(ctx context.Context, cfg *LassieConfig) (*Lassie, error return lassie, nil } -// WithFinder allows you to specify a custom candidate finder. -func WithFinder(finder retriever.CandidateFinder) LassieOption { +// WithCandidateSource allows you to specify a custom candidate finder. +func WithCandidateSource(finder types.CandidateSource) LassieOption { return func(cfg *LassieConfig) { - cfg.Finder = finder + cfg.Source = finder } } diff --git a/pkg/retriever/assignablecandidatefinder.go b/pkg/retriever/assignablecandidatefinder.go index c8272c37..eaa267a6 100644 --- a/pkg/retriever/assignablecandidatefinder.go +++ b/pkg/retriever/assignablecandidatefinder.go @@ -10,9 +10,6 @@ import ( "github.com/filecoin-project/lassie/pkg/events" "github.com/filecoin-project/lassie/pkg/internal/candidatebuffer" "github.com/filecoin-project/lassie/pkg/types" - "github.com/ipfs/go-cid" - "github.com/ipni/go-libipni/metadata" - "github.com/libp2p/go-libp2p/core/peer" ) type FilterIndexerCandidate func(types.RetrievalCandidate) (bool, types.RetrievalCandidate) @@ -20,17 +17,17 @@ type FilterIndexerCandidate func(types.RetrievalCandidate) (bool, types.Retrieva // AssignableCandidateFinder finds and filters candidates for a given retrieval type AssignableCandidateFinder struct { filterIndexerCandidate FilterIndexerCandidate - candidateFinder CandidateFinder + candidateSource types.CandidateSource clock clock.Clock } const BufferWindow = 5 * time.Millisecond -func NewAssignableCandidateFinder(candidateFinder CandidateFinder, filterIndexerCandidate FilterIndexerCandidate) AssignableCandidateFinder { - return NewAssignableCandidateFinderWithClock(candidateFinder, filterIndexerCandidate, clock.New()) +func NewAssignableCandidateFinder(candidateSource types.CandidateSource, filterIndexerCandidate FilterIndexerCandidate) AssignableCandidateFinder { + return NewAssignableCandidateFinderWithClock(candidateSource, filterIndexerCandidate, clock.New()) } -func NewAssignableCandidateFinderWithClock(candidateFinder CandidateFinder, filterIndexerCandidate FilterIndexerCandidate, clock clock.Clock) AssignableCandidateFinder { - return AssignableCandidateFinder{candidateFinder: candidateFinder, filterIndexerCandidate: filterIndexerCandidate, clock: clock} +func NewAssignableCandidateFinderWithClock(candidateSource types.CandidateSource, filterIndexerCandidate FilterIndexerCandidate, clock clock.Clock) AssignableCandidateFinder { + return AssignableCandidateFinder{candidateSource: candidateSource, filterIndexerCandidate: filterIndexerCandidate, clock: clock} } func (acf AssignableCandidateFinder) FindCandidates(ctx context.Context, request types.RetrievalRequest, eventsCallback func(types.RetrievalEvent), onCandidates func([]types.RetrievalCandidate)) error { ctx, cancelCtx := context.WithCancel(ctx) @@ -63,11 +60,12 @@ func (acf AssignableCandidateFinder) FindCandidates(ctx context.Context, request onCandidates(acceptableCandidates) }, acf.clock) + candidateSource := acf.candidateSource + if len(request.Providers) > 0 { + candidateSource = NewDirectCandidateSource(request.Providers) + } err := candidateBuffer.BufferStream(ctx, func(ctx context.Context, onNextCandidate candidatebuffer.OnNextCandidate) error { - if len(request.FixedPeers) > 0 { - return sendFixedPeers(request.Root, request.FixedPeers, onNextCandidate) - } - return acf.candidateFinder.FindCandidatesAsync(ctx, request.Root, onNextCandidate) + return candidateSource.FindCandidates(ctx, request.Root, onNextCandidate) }, BufferWindow) if err != nil { @@ -81,15 +79,3 @@ func (acf AssignableCandidateFinder) FindCandidates(ctx context.Context, request } return nil } - -func sendFixedPeers(requestCid cid.Cid, fixedPeers []peer.AddrInfo, onNextCandidate candidatebuffer.OnNextCandidate) error { - md := metadata.Default.New(&metadata.GraphsyncFilecoinV1{}, &metadata.Bitswap{}, &metadata.IpfsGatewayHttp{}) - for _, fixedPeer := range fixedPeers { - onNextCandidate(types.RetrievalCandidate{ - MinerPeer: fixedPeer, - RootCid: requestCid, - Metadata: md, - }) - } - return nil -} diff --git a/pkg/retriever/assignablecandidatefinder_test.go b/pkg/retriever/assignablecandidatefinder_test.go index 2dfc27ee..ac60bce8 100644 --- a/pkg/retriever/assignablecandidatefinder_test.go +++ b/pkg/retriever/assignablecandidatefinder_test.go @@ -13,6 +13,7 @@ import ( "github.com/ipfs/go-cid" cidlink "github.com/ipld/go-ipld-prime/linking/cid" trustlessutils "github.com/ipld/go-trustless-utils" + "github.com/ipni/go-libipni/metadata" "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/require" ) @@ -147,15 +148,18 @@ func TestAssignableCandidateFinder(t *testing.T) { if testCase.fixedPeers == nil { testCase.fixedPeers = make(map[cid.Cid][]string) } - allFixedPeers := make(map[cid.Cid][]peer.AddrInfo, len(testCase.fixedPeers)) + allProviders := make(map[cid.Cid][]types.Provider, len(testCase.fixedPeers)) for c, stringResults := range testCase.fixedPeers { - fixedPeers := make([]peer.AddrInfo, 0, len(stringResults)) + providers := make([]types.Provider, 0, len(stringResults)) for _, stringResult := range stringResults { - fixedPeers = append(fixedPeers, peer.AddrInfo{ID: peer.ID(stringResult)}) + providers = append(providers, types.Provider{ + Peer: peer.AddrInfo{ID: peer.ID(stringResult)}, + Protocols: []metadata.Protocol{&metadata.GraphsyncFilecoinV1{}, &metadata.Bitswap{}, &metadata.IpfsGatewayHttp{}}, + }) } - allFixedPeers[c] = fixedPeers + allProviders[c] = providers } - candidateFinder := testutil.NewMockCandidateFinder(testCase.candidateError, allCandidateResults) + candidateSource := testutil.NewMockCandidateSource(testCase.candidateError, allCandidateResults) isAcceptableStorageProvider := func(candidate types.RetrievalCandidate) (bool, types.RetrievalCandidate) { for _, filteredPeer := range testCase.filteredPeers { if candidate.MinerPeer.ID == peer.ID(filteredPeer) { @@ -176,7 +180,7 @@ func TestAssignableCandidateFinder(t *testing.T) { retrievalCollector := func(evt types.RetrievalEvent) { receivedEvents[evt.RootCid()] = append(receivedEvents[evt.RootCid()], evt) } - retrievalCandidateFinder := retriever.NewAssignableCandidateFinder(candidateFinder, isAcceptableStorageProvider) + retrievalCandidateFinder := retriever.NewAssignableCandidateFinder(candidateSource, isAcceptableStorageProvider) rid1, err := types.NewRetrievalID() req.NoError(err) receivedErrors := make(map[cid.Cid]error) @@ -189,7 +193,7 @@ func TestAssignableCandidateFinder(t *testing.T) { RetrievalID: rid1, Request: trustlessutils.Request{Root: cid1}, LinkSystem: cidlink.DefaultLinkSystem(), - FixedPeers: allFixedPeers[cid1], + Providers: allProviders[cid1], }, retrievalCollector, candidateCollector) if err != nil { receivedErrors[cid1] = err @@ -203,7 +207,7 @@ func TestAssignableCandidateFinder(t *testing.T) { RetrievalID: rid2, Request: trustlessutils.Request{Root: cid2}, LinkSystem: cidlink.DefaultLinkSystem(), - FixedPeers: allFixedPeers[cid2], + Providers: allProviders[cid2], }, retrievalCollector, candidateCollector) if err != nil { receivedErrors[cid2] = err diff --git a/pkg/retriever/directcandidatefinder.go b/pkg/retriever/directcandidatesource.go similarity index 62% rename from pkg/retriever/directcandidatefinder.go rename to pkg/retriever/directcandidatesource.go index c4c8794c..3be2b904 100644 --- a/pkg/retriever/directcandidatefinder.go +++ b/pkg/retriever/directcandidatesource.go @@ -15,20 +15,34 @@ import ( "github.com/libp2p/go-libp2p/core/peer" ) -var _ CandidateFinder = &DirectCandidateFinder{} +var _ types.CandidateSource = &DirectCandidateSource{} -// DirectCandidateFinder finds candidate protocols from a fixed set of peers -type DirectCandidateFinder struct { +// DirectCandidateSource finds candidate protocols from a fixed set of peers +type DirectCandidateSource struct { h host.Host - providers []peer.AddrInfo + providers []types.Provider } -// NewDirectCandidateFinder returns a new DirectCandidateFinder for the given providers -func NewDirectCandidateFinder(h host.Host, providers []peer.AddrInfo) *DirectCandidateFinder { - return &DirectCandidateFinder{ - h: h, +type Option func(*DirectCandidateSource) + +// WithLibp2pCandidateDiscovery sets a libp2p Host for the DirectCandidateFinder. +// If a Host is set, the providers will be queried to discover available protocols, otherwise +// all protocols will be assumed by default. +func WithLibp2pCandidateDiscovery(h host.Host) Option { + return func(d *DirectCandidateSource) { + d.h = h + } +} + +// NewDirectCandidateSource returns a new DirectCandidateFinder for the given providers +func NewDirectCandidateSource(providers []types.Provider, opts ...Option) *DirectCandidateSource { + d := &DirectCandidateSource{ providers: providers, } + for _, opt := range opts { + opt(d) + } + return d } type candidateSender struct { @@ -61,32 +75,48 @@ func (cs candidateSender) sendError(err error) error { } } -// FindCandidatesAsync finds supported protocols for each peer +// FindCandidates finds supported protocols for each peer // TODO: Cache the results? -func (d *DirectCandidateFinder) FindCandidatesAsync(ctx context.Context, c cid.Cid, cb func(types.RetrievalCandidate)) error { +func (d *DirectCandidateSource) FindCandidates(ctx context.Context, c cid.Cid, cb func(types.RetrievalCandidate)) error { candidateResults := make(chan types.FindCandidatesResult) ctx, cancel := context.WithCancel(ctx) defer cancel() cs := candidateSender{ctx, cancel, c, candidateResults} var wg sync.WaitGroup for _, provider := range d.providers { + + // if protocols are specified, just use those + if len(provider.Protocols) > 0 { + cb(types.RetrievalCandidate{ + MinerPeer: provider.Peer, + RootCid: cs.rootCid, + Metadata: metadata.Default.New(provider.Protocols...), + }) + continue + } + + // if it's http, it'll be in the multiaddr and we can't probe it + if d.trySendHTTPCandidate(provider, c, cb) { + continue + } + + // if we have no libp2p host, just assume all protocols are available + if d.h == nil { + cb(types.RetrievalCandidate{ + MinerPeer: provider.Peer, + RootCid: cs.rootCid, + Metadata: metadata.Default.New(metadata.IpfsGatewayHttp{}, metadata.Bitswap{}, &metadata.GraphsyncFilecoinV1{}), + }) + continue + } + wg.Add(1) provider := provider go func() { defer wg.Done() - // if it's http, it'll be in the multiaddr and we can't probe it - if len(provider.Addrs) == 1 { - for _, proto := range provider.Addrs[0].Protocols() { - if proto.Name == "http" || proto.Name == "https" { - cs.sendCandidate(provider, metadata.IpfsGatewayHttp{}) - return - } - } - } - // probe it - err := d.h.Connect(ctx, provider) + err := d.h.Connect(ctx, provider.Peer) // don't add peers that we can't connect to if err != nil { _ = cs.sendError(err) @@ -94,16 +124,16 @@ func (d *DirectCandidateFinder) FindCandidatesAsync(ctx context.Context, c cid.C } // check for support for Boost libp2p transports protocol transportsClient := lp2ptransports.NewTransportsClient(d.h) - qr, err := transportsClient.SendQuery(ctx, provider.ID) + qr, err := transportsClient.SendQuery(ctx, provider.Peer.ID) if err == nil { - logger.Debugw("retrieving metadata from transports protocol", "peer", provider.ID) + logger.Debugw("retrieving metadata from transports protocol", "peer", provider.Peer.ID) // if present, construct metadata from Boost libp2p transports response - d.retrievalCandidatesFromTransportsProtocol(ctx, qr, provider, cs) + d.retrievalCandidatesFromTransportsProtocol(ctx, qr, provider.Peer, cs) } else { - logger.Debugw("retrieving metadata from libp2p protocol list", "peer", provider.ID) + logger.Debugw("retrieving metadata from libp2p protocol list", "peer", provider.Peer.ID) // if not present, just make guesses based on list of supported libp2p // protocols catalogued via identify protocol - d.retrievalCandidatesFromProtocolProbing(ctx, provider, cs) + d.retrievalCandidatesFromProtocolProbing(ctx, provider.Peer, cs) } }() } @@ -127,7 +157,23 @@ func (d *DirectCandidateFinder) FindCandidatesAsync(ctx context.Context, c cid.C } } -func (d *DirectCandidateFinder) retrievalCandidatesFromProtocolProbing(ctx context.Context, provider peer.AddrInfo, cs candidateSender) { +func (d *DirectCandidateSource) trySendHTTPCandidate(provider types.Provider, rootCid cid.Cid, cb func(types.RetrievalCandidate)) bool { + if len(provider.Peer.Addrs) != 1 { + return false + } + for _, proto := range provider.Peer.Addrs[0].Protocols() { + if proto.Name == "http" || proto.Name == "https" { + cb(types.RetrievalCandidate{ + MinerPeer: provider.Peer, + RootCid: rootCid, + Metadata: metadata.Default.New(metadata.IpfsGatewayHttp{}), + }) + return true + } + } + return false +} +func (d *DirectCandidateSource) retrievalCandidatesFromProtocolProbing(ctx context.Context, provider peer.AddrInfo, cs candidateSender) { var protocols []metadata.Protocol s, err := d.h.NewStream(ctx, provider.ID, bsnet.ProtocolBitswap, @@ -153,7 +199,7 @@ func (d *DirectCandidateFinder) retrievalCandidatesFromProtocolProbing(ctx conte _ = cs.sendCandidate(provider, protocols...) } -func (d *DirectCandidateFinder) retrievalCandidatesFromTransportsProtocol(ctx context.Context, qr *lp2ptransports.QueryResponse, provider peer.AddrInfo, cs candidateSender) { +func (d *DirectCandidateSource) retrievalCandidatesFromTransportsProtocol(ctx context.Context, qr *lp2ptransports.QueryResponse, provider peer.AddrInfo, cs candidateSender) { for _, protocol := range qr.Protocols { // try to parse addr infos directly addrs, err := peer.AddrInfosFromP2pAddrs(protocol.Addresses...) @@ -187,14 +233,3 @@ func (d *DirectCandidateFinder) retrievalCandidatesFromTransportsProtocol(ctx co } } } - -func (d *DirectCandidateFinder) FindCandidates(ctx context.Context, c cid.Cid) ([]types.RetrievalCandidate, error) { - var candidates []types.RetrievalCandidate - err := d.FindCandidatesAsync(ctx, c, func(nextCandidate types.RetrievalCandidate) { - candidates = append(candidates, nextCandidate) - }) - if err != nil { - return nil, err - } - return candidates, nil -} diff --git a/pkg/retriever/directcandidatesource_test.go b/pkg/retriever/directcandidatesource_test.go new file mode 100644 index 00000000..85acf4d3 --- /dev/null +++ b/pkg/retriever/directcandidatesource_test.go @@ -0,0 +1,91 @@ +package retriever_test + +import ( + "context" + "testing" + "time" + + "github.com/filecoin-project/lassie/pkg/internal/testutil" + "github.com/filecoin-project/lassie/pkg/retriever" + "github.com/filecoin-project/lassie/pkg/types" + "github.com/ipni/go-libipni/metadata" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" +) + +func TestDirectCandidateSourceNoLibp2p(t *testing.T) { + rootCid := testutil.GenerateCid() + p := testutil.GeneratePeers(t, 1)[0] + rawMultiaddr := testutil.GenerateMultiAddr() + httpMultiaddr := testutil.GenerateHTTPMultiAddr() + ctx := context.Background() + testCases := []struct { + name string + provider types.Provider + expectedCandidate types.RetrievalCandidate + }{ + { + name: "peer with protocols", + provider: types.Provider{ + Peer: peer.AddrInfo{ + ID: p, + Addrs: []multiaddr.Multiaddr{rawMultiaddr}, + }, + Protocols: []metadata.Protocol{metadata.IpfsGatewayHttp{}, metadata.Bitswap{}}, + }, + expectedCandidate: types.RetrievalCandidate{ + MinerPeer: peer.AddrInfo{ + ID: p, + Addrs: []multiaddr.Multiaddr{rawMultiaddr}, + }, + RootCid: rootCid, + Metadata: metadata.Default.New(metadata.IpfsGatewayHttp{}, metadata.Bitswap{}), + }, + }, + { + name: "peer with no protocols and standard multiaddr", + provider: types.Provider{ + Peer: peer.AddrInfo{ + ID: p, + Addrs: []multiaddr.Multiaddr{rawMultiaddr}, + }, + }, + expectedCandidate: types.RetrievalCandidate{ + MinerPeer: peer.AddrInfo{ + ID: p, + Addrs: []multiaddr.Multiaddr{rawMultiaddr}, + }, + RootCid: rootCid, + Metadata: metadata.Default.New(metadata.IpfsGatewayHttp{}, metadata.Bitswap{}, &metadata.GraphsyncFilecoinV1{}), + }, + }, + { + name: "peer with no protocols and http multiaddr", + provider: types.Provider{ + Peer: peer.AddrInfo{ + ID: p, + Addrs: []multiaddr.Multiaddr{httpMultiaddr}, + }, + }, + expectedCandidate: types.RetrievalCandidate{ + MinerPeer: peer.AddrInfo{ + ID: p, + Addrs: []multiaddr.Multiaddr{httpMultiaddr}, + }, + RootCid: rootCid, + Metadata: metadata.Default.New(metadata.IpfsGatewayHttp{}), + }, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + d := retriever.NewDirectCandidateSource([]types.Provider{testCase.provider}) + d.FindCandidates(ctx, rootCid, func(candidate types.RetrievalCandidate) { + require.Equal(t, testCase.expectedCandidate, candidate) + }) + }) + } +} diff --git a/pkg/retriever/retriever.go b/pkg/retriever/retriever.go index 0ee8c1c6..2514ab33 100644 --- a/pkg/retriever/retriever.go +++ b/pkg/retriever/retriever.go @@ -60,11 +60,6 @@ type Retriever struct { protocols []multicodec.Code } -type CandidateFinder interface { - FindCandidates(context.Context, cid.Cid) ([]types.RetrievalCandidate, error) - FindCandidatesAsync(context.Context, cid.Cid, func(types.RetrievalCandidate)) error -} - type eventStats struct { failedCount int64 } @@ -72,16 +67,16 @@ type eventStats struct { func NewRetriever( ctx context.Context, session Session, - candidateFinder CandidateFinder, + candidateSource types.CandidateSource, protocolRetrievers map[multicodec.Code]types.CandidateRetriever, ) (*Retriever, error) { - return NewRetrieverWithClock(ctx, session, candidateFinder, protocolRetrievers, clock.New()) + return NewRetrieverWithClock(ctx, session, candidateSource, protocolRetrievers, clock.New()) } func NewRetrieverWithClock( ctx context.Context, session Session, - candidateFinder CandidateFinder, + candidateSource types.CandidateSource, protocolRetrievers map[multicodec.Code]types.CandidateRetriever, clock clock.Clock, ) (*Retriever, error) { @@ -95,7 +90,7 @@ func NewRetrieverWithClock( retriever.protocols = append(retriever.protocols, protocol) } retriever.executor = combinators.RetrieverWithCandidateFinder{ - CandidateFinder: NewAssignableCandidateFinderWithClock(candidateFinder, session.FilterIndexerCandidate, clock), + CandidateFinder: NewAssignableCandidateFinderWithClock(candidateSource, session.FilterIndexerCandidate, clock), CandidateRetriever: combinators.SplitRetriever[multicodec.Code]{ AsyncCandidateSplitter: combinators.NewAsyncCandidateSplitter(retriever.protocols, NewProtocolSplitter), CandidateRetrievers: protocolRetrievers, @@ -125,7 +120,7 @@ func (retriever *Retriever) RegisterSubscriber(subscriber types.RetrievalEventSu } // Retrieve attempts to retrieve the given CID using the configured -// CandidateFinder to find storage providers that should have the CID. +// CandidateSource to find storage providers that should have the CID. func (retriever *Retriever) Retrieve( ctx context.Context, request types.RetrievalRequest, diff --git a/pkg/retriever/retriever_test.go b/pkg/retriever/retriever_test.go index bb9adfa8..07e9605f 100644 --- a/pkg/retriever/retriever_test.go +++ b/pkg/retriever/retriever_test.go @@ -28,11 +28,11 @@ import ( ) func TestRetrieverStart(t *testing.T) { - candidateFinder := &testutil.MockCandidateFinder{} + candidateSource := &testutil.MockCandidateSource{} client := &testutil.MockClient{} session := session.NewSession(nil, true) gsretriever := NewGraphsyncRetriever(session, client) - ret, err := NewRetriever(context.Background(), session, candidateFinder, map[multicodec.Code]types.CandidateRetriever{ + ret, err := NewRetriever(context.Background(), session, candidateSource, map[multicodec.Code]types.CandidateRetriever{ multicodec.TransportGraphsyncFilecoinv1: gsretriever, }) require.NoError(t, err) @@ -803,7 +803,7 @@ func TestRetriever(t *testing.T) { clock := clock.NewMock() clock.Set(startTime) // --- setup --- - candidateFinder := testutil.NewMockCandidateFinder(nil, map[cid.Cid][]types.RetrievalCandidate{cid1: tc.candidates}) + candidateSource := testutil.NewMockCandidateSource(nil, map[cid.Cid][]types.RetrievalCandidate{cid1: tc.candidates}) client := testutil.NewMockClient(tc.returns_connected, tc.returns_retrievals, clock) session := testutil.NewMockSession(ctx) if tc.setup != nil { @@ -812,7 +812,7 @@ func TestRetriever(t *testing.T) { gsretriever := NewGraphsyncRetrieverWithConfig(session, client, clock, initialPause, true) // --- create --- - ret, err := NewRetrieverWithClock(context.Background(), session, candidateFinder, map[multicodec.Code]types.CandidateRetriever{ + ret, err := NewRetrieverWithClock(context.Background(), session, candidateSource, map[multicodec.Code]types.CandidateRetriever{ multicodec.TransportGraphsyncFilecoinv1: gsretriever, }, clock) require.NoError(t, err) @@ -829,7 +829,7 @@ func TestRetriever(t *testing.T) { t, clock, client, - candidateFinder, + candidateSource, session, retCancel, tc.cancelAfter, @@ -912,13 +912,13 @@ func TestLinkSystemPerRequest(t *testing.T) { }, Delay: time.Millisecond * 5}, } - candidateFinder := testutil.NewMockCandidateFinder(nil, map[cid.Cid][]types.RetrievalCandidate{cid1: candidates}) + candidateSource := testutil.NewMockCandidateSource(nil, map[cid.Cid][]types.RetrievalCandidate{cid1: candidates}) client := testutil.NewMockClient(returnsConnected, returnsRetrievals, clock) session := session.NewSession(nil, true) gsretriever := NewGraphsyncRetrieverWithConfig(session, client, clock, initialPause, true) // --- create --- - ret, err := NewRetrieverWithClock(context.Background(), session, candidateFinder, map[multicodec.Code]types.CandidateRetriever{ + ret, err := NewRetrieverWithClock(context.Background(), session, candidateSource, map[multicodec.Code]types.CandidateRetriever{ multicodec.TransportGraphsyncFilecoinv1: gsretriever, }, clock) require.NoError(t, err) @@ -981,7 +981,7 @@ func TestLinkSystemPerRequest(t *testing.T) { }, }, }, - }.RunWithVerification(ctx, t, clock, client, candidateFinder, nil, nil, 0, []testutil.RunRetrieval{ + }.RunWithVerification(ctx, t, clock, client, candidateSource, nil, nil, 0, []testutil.RunRetrieval{ func(cb func(types.RetrievalEvent)) (*types.RetrievalStats, error) { return ret.Retrieve(context.Background(), types.RetrievalRequest{ LinkSystem: lsA, @@ -1049,7 +1049,7 @@ func TestLinkSystemPerRequest(t *testing.T) { }, }, }, - }.RunWithVerification(ctx, t, clock, client, candidateFinder, nil, nil, 0, []testutil.RunRetrieval{ + }.RunWithVerification(ctx, t, clock, client, candidateSource, nil, nil, 0, []testutil.RunRetrieval{ func(cb func(types.RetrievalEvent)) (*types.RetrievalStats, error) { return ret.Retrieve(context.Background(), types.RetrievalRequest{ LinkSystem: lsB, diff --git a/pkg/server/http/client_close_test.go b/pkg/server/http/client_close_test.go index 0b9a6912..9d1018c3 100644 --- a/pkg/server/http/client_close_test.go +++ b/pkg/server/http/client_close_test.go @@ -40,7 +40,7 @@ func TestHttpClientClose(t *testing.T) { ctx, lassie.WithProviderTimeout(20*time.Second), lassie.WithHost(mrn.Self), - lassie.WithFinder(mrn.Finder), + lassie.WithCandidateSource(mrn.Source), lassie.WithProtocols([]multicodec.Code{multicodec.TransportBitswap}), ) req.NoError(err) diff --git a/pkg/server/http/ipfs.go b/pkg/server/http/ipfs.go index 497dd069..d67ea47d 100644 --- a/pkg/server/http/ipfs.go +++ b/pkg/server/http/ipfs.go @@ -19,7 +19,6 @@ import ( cidlink "github.com/ipld/go-ipld-prime/linking/cid" trustlessutils "github.com/ipld/go-trustless-utils" trustlesshttp "github.com/ipld/go-trustless-utils/http" - "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multicodec" ) @@ -218,7 +217,7 @@ func decodeRetrievalRequest(cfg HttpServerConfig, res http.ResponseWriter, req * return false, types.RetrievalRequest{} } - fixedPeers, err := parseProviders(req) + providers, err := parseProviders(req) if err != nil { errorResponse(res, statusLogger, http.StatusBadRequest, err) return false, types.RetrievalRequest{} @@ -251,7 +250,7 @@ func decodeRetrievalRequest(cfg HttpServerConfig, res http.ResponseWriter, req * RetrievalID: retrievalId, LinkSystem: linkSystem, Protocols: protocols, - FixedPeers: fixedPeers, + Providers: providers, MaxBlocks: maxBlocks, } } @@ -291,7 +290,7 @@ func parseProtocols(req *http.Request) ([]multicodec.Code, error) { return nil, nil } -func parseProviders(req *http.Request) ([]peer.AddrInfo, error) { +func parseProviders(req *http.Request) ([]types.Provider, error) { if req.URL.Query().Has("providers") { // in case we have been given filecoin actor addresses we can look them up // with heyfil and translate to full multiaddrs, otherwise this is a @@ -300,11 +299,11 @@ func parseProviders(req *http.Request) ([]peer.AddrInfo, error) { if err != nil { return nil, err } - fixedPeers, err := types.ParseProviderStrings(strings.Join(trans, ",")) + providers, err := types.ParseProviderStrings(strings.Join(trans, ",")) if err != nil { return nil, errors.New("invalid providers parameter") } - return fixedPeers, nil + return providers, nil } return nil, nil } diff --git a/pkg/types/request.go b/pkg/types/request.go index 3907185e..faf68042 100644 --- a/pkg/types/request.go +++ b/pkg/types/request.go @@ -16,6 +16,7 @@ import ( ipldstorage "github.com/ipld/go-ipld-prime/storage" trustlessutils "github.com/ipld/go-trustless-utils" "github.com/ipni/go-libipni/maurl" + "github.com/ipni/go-libipni/metadata" "github.com/libp2p/go-libp2p/core/peer" ma "github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multicodec" @@ -80,9 +81,9 @@ type RetrievalRequest struct { // If zero, no limit is applied. MaxBlocks uint64 - // FixedPeers optionally specifies a list of peers to use when fetching + // Providers optionally specifies a list of peers to use when fetching // blocks. If nil, the default peer discovery mechanism will be used. - FixedPeers []peer.AddrInfo + Providers []Provider } // NewRequestForPath creates a new RetrievalRequest for the given root CID as @@ -181,8 +182,8 @@ func (r RetrievalRequest) GetDescriptorString() (string, error) { protocols = sb.String() } var providers string - if len(r.FixedPeers) > 0 { - ps, err := ToProviderString(r.FixedPeers) + if len(r.Providers) > 0 { + ps, err := ToProviderString(r.Providers) if err != nil { return "", err } @@ -236,12 +237,12 @@ func ParseProtocolsString(v string) ([]multicodec.Code, error) { return protocols, nil } -func ParseProviderStrings(v string) ([]peer.AddrInfo, error) { +func ParseProviderStrings(v string) ([]Provider, error) { vs := strings.Split(v, ",") - providerAddrInfos := make([]peer.AddrInfo, 0, len(vs)) + providers := make([]Provider, 0, len(vs)) for _, v := range vs { var maddr ma.Multiaddr - + var protocols []metadata.Protocol // http:// style provider has been specified, parse it as a URL and // transform into a multiaddr with made-up peer ID if strings.HasPrefix(v, "http://") || strings.HasPrefix(v, "https://") { @@ -258,8 +259,33 @@ func ParseProviderStrings(v string) ([]peer.AddrInfo, error) { return nil, err } } else { + parts := strings.Split(v, "+") + addrString := parts[0] + if len(parts) > 1 { + var foundProtocol bool + for _, part := range parts[1:] { + switch part { + case "bitswap": + foundProtocol = true + protocols = append(protocols, metadata.Bitswap{}) + case "graphsync": + foundProtocol = true + protocols = append(protocols, &metadata.GraphsyncFilecoinV1{}) + case "http": + foundProtocol = true + protocols = append(protocols, metadata.IpfsGatewayHttp{}) + default: + // if we haven't encountered a prootocol string yet, assume this + was in the multiaddr for some reason + // if we have, something is malconstructed + if foundProtocol { + return nil, fmt.Errorf("unrecognized protocol: %s", v) + } + addrString += part + } + } + } var err error - maddr, err = ma.NewMultiaddr(v) + maddr, err = ma.NewMultiaddr(addrString) if err != nil { return nil, err } @@ -280,9 +306,9 @@ func ParseProviderStrings(v string) ([]peer.AddrInfo, error) { } } providerAddrInfo := &peer.AddrInfo{ID: id, Addrs: []ma.Multiaddr{transport}} - providerAddrInfos = append(providerAddrInfos, *providerAddrInfo) + providers = append(providers, Provider{*providerAddrInfo, protocols}) } - return providerAddrInfos, nil + return providers, nil } // Make a new random, but valid peer ID for a provider we don't have an ID for @@ -307,17 +333,27 @@ func IsUnknownPeerID(p peer.ID) bool { return p[0:len(unknownPeerID)] == unknownPeerID } -func ToProviderString(ai []peer.AddrInfo) (string, error) { +func ToProviderString(ai []Provider) (string, error) { var sb strings.Builder for i, v := range ai { if i > 0 { sb.WriteString(",") } - ma, err := peer.AddrInfoToP2pAddrs(&v) + ma, err := peer.AddrInfoToP2pAddrs(&v.Peer) if err != nil { return "", err } sb.WriteString(ma[0].String()) + for _, protocol := range v.Protocols { + switch protocol.(type) { + case metadata.Bitswap, *metadata.Bitswap: + sb.WriteString("+bitswap") + case metadata.IpfsGatewayHttp, *metadata.IpfsGatewayHttp: + sb.WriteString("+http") + case *metadata.GraphsyncFilecoinV1: + sb.WriteString("+graphsync") + } + } } return sb.String(), nil } diff --git a/pkg/types/request_test.go b/pkg/types/request_test.go index a1bb5d3d..8794f20e 100644 --- a/pkg/types/request_test.go +++ b/pkg/types/request_test.go @@ -5,6 +5,7 @@ import ( "github.com/ipfs/go-cid" trustlessutils "github.com/ipld/go-trustless-utils" + "github.com/ipni/go-libipni/metadata" "github.com/multiformats/go-multicodec" "github.com/stretchr/testify/require" ) @@ -99,8 +100,8 @@ func TestRequestStringRepresentations(t *testing.T) { { name: "fixed peer", request: RetrievalRequest{ - Request: trustlessutils.Request{Root: testCidV1}, - FixedPeers: must(ParseProviderStrings("/ip4/127.0.0.1/tcp/5000/p2p/12D3KooWBSTEYMLSu5FnQjshEVah9LFGEZoQt26eacCEVYfedWA4")), + Request: trustlessutils.Request{Root: testCidV1}, + Providers: must(ParseProviderStrings("/ip4/127.0.0.1/tcp/5000/p2p/12D3KooWBSTEYMLSu5FnQjshEVah9LFGEZoQt26eacCEVYfedWA4")), }, expectedUrlPath: "?dag-scope=all", expectedDescriptor: "/ipfs/bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi?dag-scope=all&dups=n&providers=/ip4/127.0.0.1/tcp/5000/p2p/12D3KooWBSTEYMLSu5FnQjshEVah9LFGEZoQt26eacCEVYfedWA4", @@ -108,8 +109,8 @@ func TestRequestStringRepresentations(t *testing.T) { { name: "fixed peers", request: RetrievalRequest{ - Request: trustlessutils.Request{Root: testCidV1}, - FixedPeers: must(ParseProviderStrings("/dns/beep.boop.com/tcp/3747/p2p/12D3KooWDXAVxjSTKbHKpNk8mFVQzHdBDvR4kybu582Xd4Zrvagg,/ip4/127.0.0.1/tcp/5000/p2p/12D3KooWBSTEYMLSu5FnQjshEVah9LFGEZoQt26eacCEVYfedWA4")), + Request: trustlessutils.Request{Root: testCidV1}, + Providers: must(ParseProviderStrings("/dns/beep.boop.com/tcp/3747/p2p/12D3KooWDXAVxjSTKbHKpNk8mFVQzHdBDvR4kybu582Xd4Zrvagg,/ip4/127.0.0.1/tcp/5000/p2p/12D3KooWBSTEYMLSu5FnQjshEVah9LFGEZoQt26eacCEVYfedWA4")), }, expectedUrlPath: "?dag-scope=all", expectedDescriptor: "/ipfs/bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi?dag-scope=all&dups=n&providers=/dns/beep.boop.com/tcp/3747/p2p/12D3KooWDXAVxjSTKbHKpNk8mFVQzHdBDvR4kybu582Xd4Zrvagg,/ip4/127.0.0.1/tcp/5000/p2p/12D3KooWBSTEYMLSu5FnQjshEVah9LFGEZoQt26eacCEVYfedWA4", @@ -146,9 +147,9 @@ func TestRequestStringRepresentations(t *testing.T) { Duplicates: true, Bytes: &trustlessutils.ByteRange{From: 100, To: ptr(-200)}, }, - MaxBlocks: 222, - Protocols: []multicodec.Code{multicodec.TransportBitswap, multicodec.TransportIpfsGatewayHttp}, - FixedPeers: must(ParseProviderStrings("/dns/beep.boop.com/tcp/3747/p2p/12D3KooWDXAVxjSTKbHKpNk8mFVQzHdBDvR4kybu582Xd4Zrvagg,/ip4/127.0.0.1/tcp/5000/p2p/12D3KooWBSTEYMLSu5FnQjshEVah9LFGEZoQt26eacCEVYfedWA4")), + MaxBlocks: 222, + Protocols: []multicodec.Code{multicodec.TransportBitswap, multicodec.TransportIpfsGatewayHttp}, + Providers: must(ParseProviderStrings("/dns/beep.boop.com/tcp/3747/p2p/12D3KooWDXAVxjSTKbHKpNk8mFVQzHdBDvR4kybu582Xd4Zrvagg,/ip4/127.0.0.1/tcp/5000/p2p/12D3KooWBSTEYMLSu5FnQjshEVah9LFGEZoQt26eacCEVYfedWA4")), }, expectedUrlPath: "/some/path/to/thing?dag-scope=entity&entity-bytes=100:-200", expectedDescriptor: "/ipfs/QmVXsSVjwxMsCwKRCUxEkGb4f4B98gXVy3ih3v4otvcURK/some/path/to/thing?dag-scope=entity&entity-bytes=100:-200&dups=y&blockLimit=222&protocols=transport-bitswap,transport-ipfs-gateway-http&providers=/dns/beep.boop.com/tcp/3747/p2p/12D3KooWDXAVxjSTKbHKpNk8mFVQzHdBDvR4kybu582Xd4Zrvagg,/ip4/127.0.0.1/tcp/5000/p2p/12D3KooWBSTEYMLSu5FnQjshEVah9LFGEZoQt26eacCEVYfedWA4", @@ -170,8 +171,8 @@ func TestRequestStringRepresentations(t *testing.T) { pps, err := ParseProviderStrings("/ip4/127.0.0.1/tcp/5000/http") require.NoError(t, err) request := RetrievalRequest{ - Request: trustlessutils.Request{Root: testCidV1}, - FixedPeers: pps, + Request: trustlessutils.Request{Root: testCidV1}, + Providers: pps, } ds, err := request.GetDescriptorString() require.NoError(t, err) @@ -185,8 +186,8 @@ func TestRequestStringRepresentations(t *testing.T) { pps, err := ParseProviderStrings("http://127.0.0.1:5000" + p) require.NoError(t, err) request := RetrievalRequest{ - Request: trustlessutils.Request{Root: testCidV1}, - FixedPeers: pps, + Request: trustlessutils.Request{Root: testCidV1}, + Providers: pps, } ds, err := request.GetDescriptorString() require.NoError(t, err) @@ -200,6 +201,21 @@ func TestRequestStringRepresentations(t *testing.T) { _, err := ParseProviderStrings("http://127.0.0.1:5000/nope") require.ErrorContains(t, err, "paths not supported") }) + + t.Run("fixed peer, protocol included", func(t *testing.T) { + pps, err := ParseProviderStrings("/ip4/127.0.0.1/tcp/5000/p2p/12D3KooWBSTEYMLSu5FnQjshEVah9LFGEZoQt26eacCEVYfedWA4+bitswap") + require.NoError(t, err) + require.Equal(t, pps[0].Protocols, []metadata.Protocol{metadata.Bitswap{}}) + request := RetrievalRequest{ + Request: trustlessutils.Request{Root: testCidV1}, + Providers: pps, + } + ds, err := request.GetDescriptorString() + require.NoError(t, err) + expectedStart := "/ipfs/bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi?dag-scope=all&dups=n&providers=/ip4/127.0.0.1/tcp/5000/p2p/12D3KooWBSTEYMLSu5FnQjshEVah9LFGEZoQt26eacCEVYfedWA4+bitswap" + require.Equal(t, expectedStart, ds[0:len(expectedStart)]) + }) + } func TestProviderStrings(t *testing.T) { diff --git a/pkg/types/types.go b/pkg/types/types.go index 13068abd..b08f18ea 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -44,6 +44,11 @@ func NewFetchConfig(opts ...FetchOption) FetchConfig { return cfg } +type Provider struct { + Peer peer.AddrInfo + Protocols []metadata.Protocol +} + // RetrievalCandidate describes a peer and CID combination that can be used to // retrieve data from the peer. The RootCid describes the head of an IPLD graph // that is being retrieved. The MinerPeer is the peer that is (apparently) @@ -97,6 +102,10 @@ type Retriever interface { Retrieve(ctx context.Context, request RetrievalRequest, events func(RetrievalEvent)) (*RetrievalStats, error) } +type CandidateSource interface { + FindCandidates(context.Context, cid.Cid, func(RetrievalCandidate)) error +} + type CandidateFinder interface { FindCandidates(ctx context.Context, request RetrievalRequest, events func(RetrievalEvent), onCandidates func([]RetrievalCandidate)) error }