Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protocols in providers #465

Merged
merged 9 commits into from
Jan 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/lassie/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestDaemonCommandFlags(t *testing.T) {
args: []string{"daemon"},
assert: func(ctx context.Context, lCfg *l.LassieConfig, hCfg h.HttpServerConfig, erCfg *a.EventRecorderConfig) error {
// lassie config
require.Equal(t, nil, lCfg.Finder)
require.Equal(t, nil, lCfg.Source)
require.NotNil(t, lCfg.Host, "host should not be nil")
require.Equal(t, 20*time.Second, lCfg.ProviderTimeout)
require.Equal(t, uint(0), lCfg.ConcurrentSPRetrievals)
Expand Down Expand Up @@ -156,7 +156,7 @@ func TestDaemonCommandFlags(t *testing.T) {
name: "with ipni endpoint",
args: []string{"daemon", "--ipni-endpoint", "https://cid.contact"},
assert: func(ctx context.Context, lCfg *l.LassieConfig, hCfg h.HttpServerConfig, erCfg *a.EventRecorderConfig) error {
require.IsType(t, &indexerlookup.IndexerCandidateFinder{}, lCfg.Finder, "finder should be an IndexerCandidateFinder when providing an ipni endpoint")
require.IsType(t, &indexerlookup.IndexerCandidateSource{}, lCfg.Source, "finder should be an IndexerCandidateSource when providing an ipni endpoint")
return nil
},
},
Expand Down
4 changes: 2 additions & 2 deletions cmd/lassie/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (pp *progressPrinter) subscriber(event types.RetrievalEvent) {
case events.CandidatesFoundEvent:
pp.candidatesFound = len(ret.Candidates())
case events.CandidatesFilteredEvent:
if len(fetchProviderAddrInfos) == 0 {
if len(fetchProviders) == 0 {
fmt.Fprintf(pp.writer, "Found %d storage provider candidate(s) in the indexer:\n", pp.candidatesFound)
} else {
fmt.Fprintf(pp.writer, "Using the specified storage provider(s):\n")
Expand Down Expand Up @@ -327,7 +327,7 @@ func defaultFetchRun(
if printPath != "" {
printPath = "/" + printPath
}
if len(fetchProviderAddrInfos) == 0 {
if len(fetchProviders) == 0 {
fmt.Fprintf(msgWriter, "Fetching %s", rootCid.String()+printPath)
} else {
fmt.Fprintf(msgWriter, "Fetching %s from specified provider(s)", rootCid.String()+printPath)
Expand Down
6 changes: 3 additions & 3 deletions cmd/lassie/fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestFetchCommandFlags(t *testing.T) {
require.Equal(t, "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4.car", outfile)

// lassie config
require.Equal(t, nil, lCfg.Finder)
require.Equal(t, nil, lCfg.Source)
require.NotNil(t, lCfg.Host, "host should not be nil")
require.Equal(t, 20*time.Second, lCfg.ProviderTimeout)
require.Equal(t, uint(0), lCfg.ConcurrentSPRetrievals)
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestFetchCommandFlags(t *testing.T) {
"bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4",
},
assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path datamodel.Path, dagScope trustlessutils.DagScope, entityBytes *trustlessutils.ByteRange, duplicates bool, tempDir string, progress bool, outfile string) error {
require.IsType(t, &retriever.DirectCandidateFinder{}, lCfg.Finder, "finder should be a DirectCandidateFinder when providers are specified")
require.IsType(t, &retriever.DirectCandidateSource{}, lCfg.Source, "finder should be a DirectCandidateSource when providers are specified")
return nil
},
},
Expand All @@ -207,7 +207,7 @@ func TestFetchCommandFlags(t *testing.T) {
"bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4",
},
assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path datamodel.Path, dagScope trustlessutils.DagScope, entityBytes *trustlessutils.ByteRange, duplicates bool, tempDir string, progress bool, outfile string) error {
require.IsType(t, &indexerlookup.IndexerCandidateFinder{}, lCfg.Finder, "finder should be an IndexerCandidateFinder when providing an ipni endpoint")
require.IsType(t, &indexerlookup.IndexerCandidateSource{}, lCfg.Source, "finder should be an IndexerCandidateSource when providing an ipni endpoint")
return nil
},
},
Expand Down
6 changes: 3 additions & 3 deletions cmd/lassie/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ var FlagExcludeProviders = &cli.StringFlag{
},
}

var fetchProviderAddrInfos []peer.AddrInfo
var fetchProviders []types.Provider

var FlagAllowProviders = &cli.StringFlag{
Name: "providers",
Expand All @@ -143,7 +143,7 @@ var FlagAllowProviders = &cli.StringFlag{
if err != nil {
return err
}
fetchProviderAddrInfos, err = types.ParseProviderStrings(strings.Join(trans, ","))
fetchProviders, err = types.ParseProviderStrings(strings.Join(trans, ","))
return err
},
}
Expand Down Expand Up @@ -214,7 +214,7 @@ var FlagIPNIEndpoint = &cli.StringFlag{
func ResetGlobalFlags() {
// Reset global variables here so that they are not used
// in subsequent calls to commands during testing.
fetchProviderAddrInfos = make([]peer.AddrInfo, 0)
fetchProviders = make([]types.Provider, 0)
protocols = make([]multicodec.Code, 0)
providerBlockList = make(map[peer.ID]bool)
}
8 changes: 4 additions & 4 deletions cmd/lassie/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func buildLassieConfigFromCLIContext(cctx *cli.Context, lassieOpts []lassie.Lass
}
lassieOpts = append(lassieOpts, lassie.WithHost(host))

if len(fetchProviderAddrInfos) > 0 {
finderOpt := lassie.WithFinder(retriever.NewDirectCandidateFinder(host, fetchProviderAddrInfos))
if len(fetchProviders) > 0 {
finderOpt := lassie.WithCandidateSource(retriever.NewDirectCandidateSource(fetchProviders, retriever.WithLibp2pCandidateDiscovery(host)))
if cctx.IsSet("ipni-endpoint") {
logger.Warn("Ignoring ipni-endpoint flag since direct provider is specified")
}
Expand All @@ -103,12 +103,12 @@ func buildLassieConfigFromCLIContext(cctx *cli.Context, lassieOpts []lassie.Lass
logger.Errorw("Failed to parse IPNI endpoint as URL", "err", err)
return nil, fmt.Errorf("cannot parse given IPNI endpoint %s as valid URL: %w", endpoint, err)
}
finder, err := indexerlookup.NewCandidateFinder(indexerlookup.WithHttpEndpoint(endpointUrl))
finder, err := indexerlookup.NewCandidateSource(indexerlookup.WithHttpEndpoint(endpointUrl))
if err != nil {
logger.Errorw("Failed to instantiate IPNI candidate finder", "err", err)
return nil, err
}
lassieOpts = append(lassieOpts, lassie.WithFinder(finder))
lassieOpts = append(lassieOpts, lassie.WithCandidateSource(finder))
logger.Debug("Using explicit IPNI endpoint to find candidates", "endpoint", endpoint)
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/filecoin-project/go-state-types v0.10.0
github.com/google/uuid v1.3.0
github.com/hannahhoward/go-pubsub v1.0.0
github.com/ipfs/boxo v0.15.0
github.com/ipfs/boxo v0.15.1-0.20240125013539-09ff20c5abb6
github.com/ipfs/go-block-format v0.2.0
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-datastore v0.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFck
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
github.com/ipfs/boxo v0.15.0 h1:BriLydj2nlK1nKeJQHxcKSuG5ZXcoutzhBklOtxC5pk=
github.com/ipfs/boxo v0.15.0/go.mod h1:X5ulcbR5Nh7sm3Db8+08AApUo6FsGC5mb23QDKAoB/M=
github.com/ipfs/boxo v0.15.1-0.20240125013539-09ff20c5abb6 h1:1dInSpfeUaxSxDLZoYUcQSAzYOekGGhNpUZIOIe4kvI=
github.com/ipfs/boxo v0.15.1-0.20240125013539-09ff20c5abb6/go.mod h1:X5ulcbR5Nh7sm3Db8+08AApUo6FsGC5mb23QDKAoB/M=
github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA=
github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU=
github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ package indexerlookup

import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"math/rand"
"net/http"

"github.com/filecoin-project/lassie/pkg/retriever"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-log/v2"
Expand All @@ -21,84 +18,25 @@ import (
)

var (
_ retriever.CandidateFinder = (*IndexerCandidateFinder)(nil)
_ types.CandidateSource = (*IndexerCandidateSource)(nil)

logger = log.Logger("lassie/indexerlookup")
)

type IndexerCandidateFinder struct {
type IndexerCandidateSource struct {
*options
}

func NewCandidateFinder(o ...Option) (*IndexerCandidateFinder, error) {
func NewCandidateSource(o ...Option) (*IndexerCandidateSource, error) {
opts, err := newOptions(o...)
if err != nil {
return nil, err
}
return &IndexerCandidateFinder{
return &IndexerCandidateSource{
options: opts,
}, nil
}

func (idxf *IndexerCandidateFinder) sendJsonRequest(req *http.Request) (*model.FindResponse, error) {
req.Header.Set("Accept", "application/json")
logger.Debugw("sending outgoing request", "url", req.URL, "accept", req.Header.Get("Accept"))
resp, err := idxf.httpClient.Do(req)
if err != nil {
logger.Debugw("Failed to perform json lookup", "err", err)
return nil, err
}
switch resp.StatusCode {
case http.StatusOK:
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
if err != nil {
logger.Debugw("Failed to read response JSON response body", "err", err)
return nil, err
}
return model.UnmarshalFindResponse(b)
case http.StatusNotFound:
return &model.FindResponse{}, nil
default:
return nil, fmt.Errorf("batch find query failed: %s", http.StatusText(resp.StatusCode))
}
}

func (idxf *IndexerCandidateFinder) FindCandidates(ctx context.Context, cid cid.Cid) ([]types.RetrievalCandidate, error) {
req, err := idxf.newFindHttpRequest(ctx, cid)
if err != nil {
return nil, err
}
parsedResp, err := idxf.sendJsonRequest(req)
if err != nil {
return nil, err
}
// turn parsedResp into records.
var matches []types.RetrievalCandidate

indices := rand.Perm(len(parsedResp.MultihashResults))
for _, i := range indices {
multihashResult := parsedResp.MultihashResults[i]
if !bytes.Equal(cid.Hash(), multihashResult.Multihash) {
continue
}
for _, val := range multihashResult.ProviderResults {
// skip results without decodable metadata
if md, err := decodeMetadata(val); err == nil {
candidate := types.RetrievalCandidate{
RootCid: cid,
Metadata: md,
}
if val.Provider != nil {
candidate.MinerPeer = *val.Provider
}
matches = append(matches, candidate)
}
}
}
return matches, nil
}

func decodeMetadata(pr model.ProviderResult) (metadata.Metadata, error) {
if len(pr.Metadata) == 0 {
return metadata.Metadata{}, errors.New("no metadata")
Expand All @@ -114,7 +52,7 @@ func decodeMetadata(pr model.ProviderResult) (metadata.Metadata, error) {
return dtm, nil
}

func (idxf *IndexerCandidateFinder) FindCandidatesAsync(ctx context.Context, c cid.Cid, cb func(types.RetrievalCandidate)) error {
func (idxf *IndexerCandidateSource) FindCandidates(ctx context.Context, c cid.Cid, cb func(types.RetrievalCandidate)) error {
req, err := idxf.newFindHttpRequest(ctx, c)
if err != nil {
return err
Expand All @@ -136,7 +74,7 @@ func (idxf *IndexerCandidateFinder) FindCandidatesAsync(ctx context.Context, c c
}
}

func (idxf *IndexerCandidateFinder) newFindHttpRequest(ctx context.Context, c cid.Cid) (*http.Request, error) {
func (idxf *IndexerCandidateSource) newFindHttpRequest(ctx context.Context, c cid.Cid) (*http.Request, error) {
endpoint := idxf.findByMultihashEndpoint(c.Hash())
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
if err != nil {
Expand All @@ -158,7 +96,7 @@ func (idxf *IndexerCandidateFinder) newFindHttpRequest(ctx context.Context, c ci
return req, nil
}

func (idxf *IndexerCandidateFinder) decodeProviderResultStream(ctx context.Context, c cid.Cid, from io.ReadCloser, cb func(types.RetrievalCandidate)) error {
func (idxf *IndexerCandidateSource) decodeProviderResultStream(ctx context.Context, c cid.Cid, from io.ReadCloser, cb func(types.RetrievalCandidate)) error {
defer from.Close()
scanner := bufio.NewScanner(from)
for {
Expand Down Expand Up @@ -195,6 +133,6 @@ func (idxf *IndexerCandidateFinder) decodeProviderResultStream(ctx context.Conte
}
}

func (idxf *IndexerCandidateFinder) findByMultihashEndpoint(mh multihash.Multihash) string {
func (idxf *IndexerCandidateSource) findByMultihashEndpoint(mh multihash.Multihash) string {
return idxf.httpEndpoint.JoinPath("multihash", mh.B58String()).String()
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/stretchr/testify/require"
)

func TestCandidateFinder(t *testing.T) {
func TestCandidateSource(t *testing.T) {
cids := make([]cid.Cid, 0, 10)
candidates := make(map[cid.Cid][]types.RetrievalCandidate, 10)
binaryMetadata := make(map[cid.Cid][][]byte, 10)
Expand Down Expand Up @@ -95,25 +95,13 @@ func TestCandidateFinder(t *testing.T) {
}()
indexerURL, err := url.Parse("http://" + mockIndexer.Addr())
req.NoError(err)
candidateFinder, err := indexerlookup.NewCandidateFinder(indexerlookup.WithHttpEndpoint(indexerURL))
candidateSource, err := indexerlookup.NewCandidateSource(indexerlookup.WithHttpEndpoint(indexerURL))
req.NoError(err)
for cid, expectedReturns := range testCase.expectedReturns {
syncCandidates, err := candidateFinder.FindCandidates(ctx, cid)
req.NoError(err)
if syncCandidates == nil {
syncCandidates = []types.RetrievalCandidate{}
}
req.Equal(expectedReturns, syncCandidates)
select {
case <-ctx.Done():
req.FailNow("cancelled")
case recv := <-connectCh:
req.Regexp("^/multihash/"+cid.Hash().B58String(), recv)
}
gatheredCandidates := []types.RetrievalCandidate{}
asyncCandidatesErr := make(chan error, 1)
go func() {
asyncCandidatesErr <- candidateFinder.FindCandidatesAsync(ctx, cid, func(candidate types.RetrievalCandidate) {
asyncCandidatesErr <- candidateSource.FindCandidates(ctx, cid, func(candidate types.RetrievalCandidate) {
gatheredCandidates = append(gatheredCandidates, candidate)
})
}()
Expand Down
2 changes: 1 addition & 1 deletion pkg/indexerlookup/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func WithHttpUserAgent(a string) Option {
}
}

// WithAsyncResultsChanBuffer sets the channel buffer returned by IndexerCandidateFinder.FindCandidatesAsync.
// WithAsyncResultsChanBuffer sets the channel buffer returned by IndexerCandidateSource.FindCandidates.
// Defaults to 1 if unspecified.
func WithAsyncResultsChanBuffer(i int) Option {
return func(o *options) error {
Expand Down
26 changes: 22 additions & 4 deletions pkg/internal/itest/direct_fetch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import (
const (
bitswapDirect = 0
graphsyncDirect = 1
transportsDirect = 2
httpDirect = 2
transportsDirect = 3
)

func TestDirectFetch(t *testing.T) {
Expand All @@ -48,6 +49,10 @@ func TestDirectFetch(t *testing.T) {
name: "direct graphsync peer",
directPeer: graphsyncDirect,
},
{
name: "direct http peer",
directPeer: graphsyncDirect,
},
{
name: "peer responding on transports protocol",
directPeer: transportsDirect,
Expand All @@ -67,6 +72,7 @@ func TestDirectFetch(t *testing.T) {
mrn := mocknet.NewMockRetrievalNet(ctx, t)
mrn.AddGraphsyncPeers(1)
mrn.AddBitswapPeers(1)
mrn.AddHttpPeers(1)

// generate separate 4MiB random unixfs file DAGs on both peers

Expand All @@ -78,10 +84,16 @@ func TestDirectFetch(t *testing.T) {

// bitswap peer (1)
srcData2 := unixfs.GenerateFile(t, mrn.Remotes[1].LinkSystem, bytes.NewReader(srcData1.Content), 4<<20)
req.Equal(srcData2.Root, srcData2.Root)
req.Equal(srcData2.Root, srcData1.Root)
bitswapMAs, err := peer.AddrInfoToP2pAddrs(mrn.Remotes[1].AddrInfo())
req.NoError(err)

// http peer (1)
srcData3 := unixfs.GenerateFile(t, mrn.Remotes[2].LinkSystem, bytes.NewReader(srcData1.Content), 4<<20)
req.Equal(srcData3.Root, srcData1.Root)
httpMAs, err := peer.AddrInfoToP2pAddrs(mrn.Remotes[2].AddrInfo())
req.NoError(err)

transportsAddr, clear := handleTransports(t, mrn.MN, []lp2ptransports.Protocol{
{
Name: "bitswap",
Expand All @@ -91,6 +103,10 @@ func TestDirectFetch(t *testing.T) {
Name: "libp2p",
Addresses: graphsyncMAs,
},
{
Name: "http",
Addresses: httpMAs,
},
})
req.NoError(mrn.MN.LinkAll())
defer clear()
Expand All @@ -101,14 +117,16 @@ func TestDirectFetch(t *testing.T) {
addr = *mrn.Remotes[0].AddrInfo()
case bitswapDirect:
addr = *mrn.Remotes[1].AddrInfo()
case httpDirect:
addr = *mrn.Remotes[2].AddrInfo()
case transportsDirect:
addr = transportsAddr
default:
req.FailNow("unrecognized direct peer test")
}

directFinder := retriever.NewDirectCandidateFinder(mrn.Self, []peer.AddrInfo{addr})
lassie, err := lassie.NewLassie(ctx, lassie.WithFinder(directFinder), lassie.WithHost(mrn.Self), lassie.WithGlobalTimeout(5*time.Second))
directFinder := retriever.NewDirectCandidateSource([]types.Provider{{Peer: addr, Protocols: nil}}, retriever.WithLibp2pCandidateDiscovery(mrn.Self))
lassie, err := lassie.NewLassie(ctx, lassie.WithCandidateSource(directFinder), lassie.WithHost(mrn.Self), lassie.WithGlobalTimeout(5*time.Second))
req.NoError(err)
outFile, err := os.CreateTemp(t.TempDir(), "lassie-test-")
req.NoError(err)
Expand Down
Loading
Loading