Skip to content

Commit

Permalink
Use go-libipni pcache package to retrieve and cache provider info
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero committed Jul 10, 2023
1 parent 46ff320 commit a2ebba7
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 165 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/filecoin-project/index-provider v0.9.1
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipni/go-libipni v0.2.6
github.com/ipni/go-libipni v0.2.9-0.20230710084357-c0148e9510c3
github.com/libp2p/go-libp2p v0.28.1
github.com/mercari/go-circuitbreaker v0.0.2
github.com/mitchellh/go-homedir v1.1.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ github.com/ipld/go-car/v2 v2.6.0/go.mod h1:qoqfgPnQYcaAYcfphctffdaNWJIWBR2QN4pju
github.com/ipld/go-codec-dagpb v1.5.0 h1:RspDRdsJpLfgCI0ONhTAnbHdySGD4t+LHSPK4X1+R0k=
github.com/ipld/go-ipld-prime v0.20.0 h1:Ud3VwE9ClxpO2LkCYP7vWPc0Fo+dYdYzgxUJZ3uRG4g=
github.com/ipld/go-ipld-prime v0.20.0/go.mod h1:PzqZ/ZR981eKbgdr3y2DJYeD/8bgMawdGVlJDE8kK+M=
github.com/ipni/go-libipni v0.2.6 h1:YKjdNs0dlLvGkNyNAA8gvzZifAQCaclktlG5kSfBims=
github.com/ipni/go-libipni v0.2.6/go.mod h1:dhBH9HwxT6HzQPRZ8ikWv+ccqF8ucMIoGiiTSrHA4tw=
github.com/ipni/go-libipni v0.2.9-0.20230710084357-c0148e9510c3 h1:eWJSiUUMbMlx9HWrnOttTOBfyyOcQ5WM50JV9b6e4nw=
github.com/ipni/go-libipni v0.2.9-0.20230710084357-c0148e9510c3/go.mod h1:dhBH9HwxT6HzQPRZ8ikWv+ccqF8ucMIoGiiTSrHA4tw=
github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0o=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
Expand Down
177 changes: 20 additions & 157 deletions providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ import (
"encoding/json"
"io"
"net/http"
"net/url"
"sync"
"time"
"path"

"github.com/ipni/go-libipni/find/model"
//"github.com/ipni/go-libipni/find/model"
"github.com/libp2p/go-libp2p/core/peer"
)

Expand All @@ -19,92 +17,13 @@ func (s *server) providers(w http.ResponseWriter, r *http.Request) {
return
}

combined := make(chan []model.ProviderInfo)
wg := sync.WaitGroup{}
var err error
_, err = io.ReadAll(r.Body)
_ = r.Body.Close()
if err != nil {
log.Warnw("failed to read original request body", "err", err)
return
}

for _, backend := range s.backends {
// do not send providers requests to not providers backends
if _, ok := backend.(providersBackend); !ok {
continue
}
wg.Add(1)
go func(server *url.URL) {
defer wg.Done()

// Copy the URL from original request and override host/schema to point
// to the server.
endpoint := *r.URL
endpoint.Host = server.Host
endpoint.Scheme = server.Scheme
log := log.With("backend", endpoint.Host)

// If body in original request existed, make a reader for it.
req, err := http.NewRequest(r.Method, endpoint.String(), nil)
if err != nil {
log.Warnw("failed to construct query", "err", err)
return
}
req.Header.Set("X-Forwarded-Host", r.Host)
req.Header.Set("Accept", mediaTypeJson)
resp, err := s.Client.Do(req)
if err != nil {
log.Warnw("failed query", "err", err)
return
}

defer resp.Body.Close()
log = log.With("status", resp.StatusCode)
switch resp.StatusCode {
case http.StatusOK:
dec := json.NewDecoder(resp.Body)
var providers []model.ProviderInfo
if err := dec.Decode(&providers); err != nil {
log.Warnw("failed backend read", "err", err)
return
}
combined <- providers
case http.StatusNotFound, http.StatusNotImplemented:
log.Debug("no providers")
default:
log.Warn("unexpected response while getting providers")
}
}(backend.URL())
}
go func() {
wg.Wait()
close(combined)
}()

resp := make(map[peer.ID]model.ProviderInfo)
for prov := range combined {
for _, p := range prov {
if curr, ok := resp[p.AddrInfo.ID]; ok {
clt, e1 := time.Parse(time.RFC3339, curr.LastAdvertisementTime)
plt, e2 := time.Parse(time.RFC3339, p.LastAdvertisementTime)
if e1 == nil && e2 == nil && clt.Before(plt) {
resp[p.AddrInfo.ID] = p
}
continue
}
resp[p.AddrInfo.ID] = p
}
}
pinfos := s.pcache.List()

// Write out combined.
// Note that /providers never returns 404. Instead, when there are no providers,
// an empty JSON array is returned.
out := make([]model.ProviderInfo, 0, len(resp))
for _, a := range resp {
out = append(out, a)
}
outData, err := json.Marshal(out)
//
// Note that /providers never returns 404. Instead, when there are no
// providers, an empty JSON array is returned.
outData, err := json.Marshal(pinfos)
if err != nil {
log.Warnw("failed marshal response", "err", err)
http.Error(w, "", http.StatusInternalServerError)
Expand All @@ -115,89 +34,33 @@ func (s *server) providers(w http.ResponseWriter, r *http.Request) {

// provider returns most recent state of a single provider.
func (s *server) provider(w http.ResponseWriter, r *http.Request) {
combined := make(chan model.ProviderInfo)
wg := sync.WaitGroup{}
var err error
_, err = io.ReadAll(r.Body)
_, err := io.ReadAll(r.Body)
_ = r.Body.Close()
if err != nil {
log.Warnw("failed to read original request body", "err", err)
return
}

for _, backend := range s.backends {
// do not send providers requests to not providers backends
if _, ok := backend.(providersBackend); !ok {
continue
}
wg.Add(1)
go func(server *url.URL) {
defer wg.Done()

// Copy the URL from original request and override host/schema to point
// to the server.
endpoint := *r.URL
endpoint.Host = server.Host
endpoint.Scheme = server.Scheme
log := log.With("backend", endpoint.Host)

req, err := http.NewRequest(r.Method, endpoint.String(), nil)
if err != nil {
log.Warnw("failed to construct query", "err", err)
return
}
req.Header.Set("X-Forwarded-Host", r.Host)
req.Header.Set("Accept", mediaTypeJson)
resp, err := s.Client.Do(req)
if err != nil {
log.Warnw("failed query", "err", err)
return
}
defer resp.Body.Close()
log = log.With("status", resp.StatusCode)
switch resp.StatusCode {
case http.StatusOK:
dec := json.NewDecoder(resp.Body)
var provider model.ProviderInfo
if err = dec.Decode(&provider); err != nil {
log.Warnw("failed backend read", "err", err)
return
}
combined <- provider
case http.StatusNotFound, http.StatusNotImplemented:
log.Debug("no provider")
default:
log.Warn("unexpected response while getting provider")
}
}(backend.URL())
pid, err := peer.Decode(path.Base(r.URL.Path))
if err != nil {
log.Warnw("bad provider ID", "err", err)
http.Error(w, "", http.StatusBadRequest)
return
}
go func() {
wg.Wait()
close(combined)
}()

resp := model.ProviderInfo{}
var count int
for p := range combined {
count++
if resp.LastAdvertisementTime != "" {
clt, e1 := time.Parse(time.RFC3339, resp.LastAdvertisementTime)
plt, e2 := time.Parse(time.RFC3339, p.LastAdvertisementTime)
if e1 == nil && e2 == nil && clt.Before(plt) {
resp = p
}
continue
}
resp = p
pinfo, err := s.pcache.Get(r.Context(), pid)
if err != nil {
log.Warnw("count not get provider information", "err", err)
http.Error(w, "", http.StatusInternalServerError)
return
}

if count == 0 {
if pinfo == nil {
http.Error(w, "", http.StatusNotFound)
return
}

// Write out combined.
outData, err := json.Marshal(resp)
outData, err := json.Marshal(pinfo)
if err != nil {
log.Warnw("failed marshal response", "err", err)
http.Error(w, "", http.StatusInternalServerError)
Expand Down
32 changes: 27 additions & 5 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/ipni/go-libipni/pcache"
"github.com/ipni/indexstar/metrics"
"github.com/mercari/go-circuitbreaker"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -42,6 +43,7 @@ type server struct {

indexPage []byte
indexPageCompileTime time.Time
pcache *pcache.ProviderCache
}

// caskadeBackend is a marker for caskade backends
Expand Down Expand Up @@ -98,6 +100,28 @@ func NewServer(c *cli.Context) (*server, error) {
return dialer.DialContext(ctx, network, addr)
}

httpClient := http.Client{
Timeout: config.Server.HttpClientTimeout,
Transport: t,
}

var providerSources []pcache.ProviderSource
for _, backend := range backends {
// do not send providers requests to not providers backends
if _, ok := backend.(providersBackend); !ok {
continue
}
httpSrc, err := pcache.NewHTTPSource(backend.URL().String(), &httpClient)
if err != nil {
return nil, fmt.Errorf("cannot create http provider source: %w", err)
}
providerSources = append(providerSources, httpSrc)
}
pc, err := pcache.New(pcache.WithSource(providerSources...))
if err != nil {
return nil, fmt.Errorf("cannot create provider cache: %w", err)
}

indexTemplate, err := template.ParseFS(webUI, "index.html")
if err != nil {
return nil, err
Expand All @@ -113,18 +137,16 @@ func NewServer(c *cli.Context) (*server, error) {
compileTime := time.Now()

return &server{
Context: c.Context,
Client: http.Client{
Timeout: config.Server.HttpClientTimeout,
Transport: t,
},
Context: c.Context,
Client: httpClient,
cfgBase: c.String("config"),
Listener: bound,
metricsListener: mb,
backends: backends,
translateNonStreaming: c.Bool("translateNonStreaming"),
indexPage: indexPageBuf.Bytes(),
indexPageCompileTime: compileTime,
pcache: pc,
}, nil
}

Expand Down

0 comments on commit a2ebba7

Please sign in to comment.