From a2ebba700f4c3f2ddfffb138ccc0d9af3445bf12 Mon Sep 17 00:00:00 2001 From: gammazero Date: Mon, 10 Jul 2023 02:39:24 -0700 Subject: [PATCH] Use go-libipni pcache package to retrieve and cache provider info --- go.mod | 2 +- go.sum | 4 +- providers.go | 177 ++++++--------------------------------------------- server.go | 32 ++++++++-- 4 files changed, 50 insertions(+), 165 deletions(-) diff --git a/go.mod b/go.mod index 1b0ff10..be3e73f 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/filecoin-project/index-provider v0.9.1 github.com/ipfs/go-cid v0.4.1 github.com/ipfs/go-log/v2 v2.5.1 - github.com/ipni/go-libipni v0.2.6 + github.com/ipni/go-libipni v0.2.9-0.20230710084357-c0148e9510c3 github.com/libp2p/go-libp2p v0.28.1 github.com/mercari/go-circuitbreaker v0.0.2 github.com/mitchellh/go-homedir v1.1.0 diff --git a/go.sum b/go.sum index c8b9fd3..f439d70 100644 --- a/go.sum +++ b/go.sum @@ -196,8 +196,8 @@ github.com/ipld/go-car/v2 v2.6.0/go.mod h1:qoqfgPnQYcaAYcfphctffdaNWJIWBR2QN4pju github.com/ipld/go-codec-dagpb v1.5.0 h1:RspDRdsJpLfgCI0ONhTAnbHdySGD4t+LHSPK4X1+R0k= github.com/ipld/go-ipld-prime v0.20.0 h1:Ud3VwE9ClxpO2LkCYP7vWPc0Fo+dYdYzgxUJZ3uRG4g= github.com/ipld/go-ipld-prime v0.20.0/go.mod h1:PzqZ/ZR981eKbgdr3y2DJYeD/8bgMawdGVlJDE8kK+M= -github.com/ipni/go-libipni v0.2.6 h1:YKjdNs0dlLvGkNyNAA8gvzZifAQCaclktlG5kSfBims= -github.com/ipni/go-libipni v0.2.6/go.mod h1:dhBH9HwxT6HzQPRZ8ikWv+ccqF8ucMIoGiiTSrHA4tw= +github.com/ipni/go-libipni v0.2.9-0.20230710084357-c0148e9510c3 h1:eWJSiUUMbMlx9HWrnOttTOBfyyOcQ5WM50JV9b6e4nw= +github.com/ipni/go-libipni v0.2.9-0.20230710084357-c0148e9510c3/go.mod h1:dhBH9HwxT6HzQPRZ8ikWv+ccqF8ucMIoGiiTSrHA4tw= github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o= github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= diff --git a/providers.go b/providers.go index d374d1a..1e96e8b 100644 --- a/providers.go +++ b/providers.go @@ -4,11 +4,9 @@ import ( "encoding/json" "io" "net/http" - "net/url" - "sync" - "time" + "path" - "github.com/ipni/go-libipni/find/model" + //"github.com/ipni/go-libipni/find/model" "github.com/libp2p/go-libp2p/core/peer" ) @@ -19,92 +17,13 @@ func (s *server) providers(w http.ResponseWriter, r *http.Request) { return } - combined := make(chan []model.ProviderInfo) - wg := sync.WaitGroup{} - var err error - _, err = io.ReadAll(r.Body) - _ = r.Body.Close() - if err != nil { - log.Warnw("failed to read original request body", "err", err) - return - } - - for _, backend := range s.backends { - // do not send providers requests to not providers backends - if _, ok := backend.(providersBackend); !ok { - continue - } - wg.Add(1) - go func(server *url.URL) { - defer wg.Done() - - // Copy the URL from original request and override host/schema to point - // to the server. - endpoint := *r.URL - endpoint.Host = server.Host - endpoint.Scheme = server.Scheme - log := log.With("backend", endpoint.Host) - - // If body in original request existed, make a reader for it. - req, err := http.NewRequest(r.Method, endpoint.String(), nil) - if err != nil { - log.Warnw("failed to construct query", "err", err) - return - } - req.Header.Set("X-Forwarded-Host", r.Host) - req.Header.Set("Accept", mediaTypeJson) - resp, err := s.Client.Do(req) - if err != nil { - log.Warnw("failed query", "err", err) - return - } - - defer resp.Body.Close() - log = log.With("status", resp.StatusCode) - switch resp.StatusCode { - case http.StatusOK: - dec := json.NewDecoder(resp.Body) - var providers []model.ProviderInfo - if err := dec.Decode(&providers); err != nil { - log.Warnw("failed backend read", "err", err) - return - } - combined <- providers - case http.StatusNotFound, http.StatusNotImplemented: - log.Debug("no providers") - default: - log.Warn("unexpected response while getting providers") - } - }(backend.URL()) - } - go func() { - wg.Wait() - close(combined) - }() - - resp := make(map[peer.ID]model.ProviderInfo) - for prov := range combined { - for _, p := range prov { - if curr, ok := resp[p.AddrInfo.ID]; ok { - clt, e1 := time.Parse(time.RFC3339, curr.LastAdvertisementTime) - plt, e2 := time.Parse(time.RFC3339, p.LastAdvertisementTime) - if e1 == nil && e2 == nil && clt.Before(plt) { - resp[p.AddrInfo.ID] = p - } - continue - } - resp[p.AddrInfo.ID] = p - } - } + pinfos := s.pcache.List() // Write out combined. - // Note that /providers never returns 404. Instead, when there are no providers, - // an empty JSON array is returned. - out := make([]model.ProviderInfo, 0, len(resp)) - for _, a := range resp { - out = append(out, a) - } - outData, err := json.Marshal(out) + // + // Note that /providers never returns 404. Instead, when there are no + // providers, an empty JSON array is returned. + outData, err := json.Marshal(pinfos) if err != nil { log.Warnw("failed marshal response", "err", err) http.Error(w, "", http.StatusInternalServerError) @@ -115,89 +34,33 @@ func (s *server) providers(w http.ResponseWriter, r *http.Request) { // provider returns most recent state of a single provider. func (s *server) provider(w http.ResponseWriter, r *http.Request) { - combined := make(chan model.ProviderInfo) - wg := sync.WaitGroup{} - var err error - _, err = io.ReadAll(r.Body) + _, err := io.ReadAll(r.Body) _ = r.Body.Close() if err != nil { log.Warnw("failed to read original request body", "err", err) return } - for _, backend := range s.backends { - // do not send providers requests to not providers backends - if _, ok := backend.(providersBackend); !ok { - continue - } - wg.Add(1) - go func(server *url.URL) { - defer wg.Done() - - // Copy the URL from original request and override host/schema to point - // to the server. - endpoint := *r.URL - endpoint.Host = server.Host - endpoint.Scheme = server.Scheme - log := log.With("backend", endpoint.Host) - - req, err := http.NewRequest(r.Method, endpoint.String(), nil) - if err != nil { - log.Warnw("failed to construct query", "err", err) - return - } - req.Header.Set("X-Forwarded-Host", r.Host) - req.Header.Set("Accept", mediaTypeJson) - resp, err := s.Client.Do(req) - if err != nil { - log.Warnw("failed query", "err", err) - return - } - defer resp.Body.Close() - log = log.With("status", resp.StatusCode) - switch resp.StatusCode { - case http.StatusOK: - dec := json.NewDecoder(resp.Body) - var provider model.ProviderInfo - if err = dec.Decode(&provider); err != nil { - log.Warnw("failed backend read", "err", err) - return - } - combined <- provider - case http.StatusNotFound, http.StatusNotImplemented: - log.Debug("no provider") - default: - log.Warn("unexpected response while getting provider") - } - }(backend.URL()) + pid, err := peer.Decode(path.Base(r.URL.Path)) + if err != nil { + log.Warnw("bad provider ID", "err", err) + http.Error(w, "", http.StatusBadRequest) + return } - go func() { - wg.Wait() - close(combined) - }() - resp := model.ProviderInfo{} - var count int - for p := range combined { - count++ - if resp.LastAdvertisementTime != "" { - clt, e1 := time.Parse(time.RFC3339, resp.LastAdvertisementTime) - plt, e2 := time.Parse(time.RFC3339, p.LastAdvertisementTime) - if e1 == nil && e2 == nil && clt.Before(plt) { - resp = p - } - continue - } - resp = p + pinfo, err := s.pcache.Get(r.Context(), pid) + if err != nil { + log.Warnw("count not get provider information", "err", err) + http.Error(w, "", http.StatusInternalServerError) + return } - if count == 0 { + if pinfo == nil { http.Error(w, "", http.StatusNotFound) return } - // Write out combined. - outData, err := json.Marshal(resp) + outData, err := json.Marshal(pinfo) if err != nil { log.Warnw("failed marshal response", "err", err) http.Error(w, "", http.StatusInternalServerError) diff --git a/server.go b/server.go index 55cbe4f..b5daf48 100644 --- a/server.go +++ b/server.go @@ -12,6 +12,7 @@ import ( "time" logging "github.com/ipfs/go-log/v2" + "github.com/ipni/go-libipni/pcache" "github.com/ipni/indexstar/metrics" "github.com/mercari/go-circuitbreaker" "github.com/urfave/cli/v2" @@ -42,6 +43,7 @@ type server struct { indexPage []byte indexPageCompileTime time.Time + pcache *pcache.ProviderCache } // caskadeBackend is a marker for caskade backends @@ -98,6 +100,28 @@ func NewServer(c *cli.Context) (*server, error) { return dialer.DialContext(ctx, network, addr) } + httpClient := http.Client{ + Timeout: config.Server.HttpClientTimeout, + Transport: t, + } + + var providerSources []pcache.ProviderSource + for _, backend := range backends { + // do not send providers requests to not providers backends + if _, ok := backend.(providersBackend); !ok { + continue + } + httpSrc, err := pcache.NewHTTPSource(backend.URL().String(), &httpClient) + if err != nil { + return nil, fmt.Errorf("cannot create http provider source: %w", err) + } + providerSources = append(providerSources, httpSrc) + } + pc, err := pcache.New(pcache.WithSource(providerSources...)) + if err != nil { + return nil, fmt.Errorf("cannot create provider cache: %w", err) + } + indexTemplate, err := template.ParseFS(webUI, "index.html") if err != nil { return nil, err @@ -113,11 +137,8 @@ func NewServer(c *cli.Context) (*server, error) { compileTime := time.Now() return &server{ - Context: c.Context, - Client: http.Client{ - Timeout: config.Server.HttpClientTimeout, - Transport: t, - }, + Context: c.Context, + Client: httpClient, cfgBase: c.String("config"), Listener: bound, metricsListener: mb, @@ -125,6 +146,7 @@ func NewServer(c *cli.Context) (*server, error) { translateNonStreaming: c.Bool("translateNonStreaming"), indexPage: indexPageBuf.Bytes(), indexPageCompileTime: compileTime, + pcache: pc, }, nil }