Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
Merge pull request #155 from filecoin-saturn/feat/port-Caboose-main
Browse files Browse the repository at this point in the history
Port over caboose main
  • Loading branch information
aarshkshah1992 authored Sep 20, 2023
2 parents 552ea1b + 46b5374 commit ad399fb
Show file tree
Hide file tree
Showing 16 changed files with 345 additions and 134 deletions.
42 changes: 38 additions & 4 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@ package caboose

import (
"context"
"encoding/json"
"io"
"net/http"
"net/url"
"os"
"strings"
"time"

requestcontext "github.com/willscott/go-requestcontext"

ipfsblockstore "github.com/ipfs/boxo/blockstore"
ipath "github.com/ipfs/boxo/coreiface/path"
gateway "github.com/ipfs/boxo/gateway"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"

"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

Expand All @@ -30,7 +34,7 @@ type Config struct {
// OrchestratorClient is the HTTP client to use when communicating with the orchestrator.
OrchestratorClient *http.Client
// OrchestratorOverride replaces calls to the orchestrator with a fixed response.
OrchestratorOverride []string
OrchestratorOverride []state.NodeInfo

// LoggingEndpoint is the URL of the logging endpoint where we submit logs pertaining to retrieval requests.
LoggingEndpoint url.URL
Expand Down Expand Up @@ -77,6 +81,9 @@ type Config struct {

// Harness is an internal test harness that is set during testing.
Harness *state.State

// ComplianceCidPeriod controls how many requests caboose makes on average before requesting a compliance cid
ComplianceCidPeriod int64
}

const DefaultLoggingInterval = 5 * time.Second
Expand All @@ -91,10 +98,12 @@ const defaultMaxRetries = 3
// default percentage of requests to mirror for tracking how nodes perform unless overridden by MirrorFraction
const defaultMirrorFraction = 0.01

const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=200"
const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes?maxNodes=200"
const DefaultPoolRefreshInterval = 5 * time.Minute
const DefaultPoolTargetSize = 30

const DefaultComplianceCidPeriod = int64(100)

// we cool off sending requests for a cid for a certain duration
// if we've seen a certain number of failures for it already in a given duration.
// NOTE: before getting creative here, make sure you dont break end user flow
Expand Down Expand Up @@ -133,7 +142,13 @@ func NewCaboose(config *Config) (*Caboose, error) {
config.MirrorFraction = defaultMirrorFraction
}
if override := os.Getenv(BackendOverrideKey); len(override) > 0 {
config.OrchestratorOverride = strings.Split(override, ",")
var overrideNodes []state.NodeInfo
err := json.Unmarshal([]byte(override), &overrideNodes)
if err != nil {
goLogger.Warnf("Error parsing BackendOverrideKey:", "err", err)
return nil, err
}
config.OrchestratorOverride = overrideNodes
}
if config.PoolTargetSize == 0 {
config.PoolTargetSize = DefaultPoolTargetSize
Expand All @@ -151,6 +166,9 @@ func NewCaboose(config *Config) (*Caboose, error) {
Timeout: DefaultCarRequestTimeout,
}
}

c.config.Client.Transport = otelhttp.NewTransport(c.config.Client.Transport)

if c.config.OrchestratorEndpoint == nil {
var err error
c.config.OrchestratorEndpoint, err = url.Parse(DefaultOrchestratorEndpoint)
Expand All @@ -159,6 +177,10 @@ func NewCaboose(config *Config) (*Caboose, error) {
}
}

if c.config.ComplianceCidPeriod == 0 {
c.config.ComplianceCidPeriod = DefaultComplianceCidPeriod
}

if c.config.PoolRefresh == 0 {
c.config.PoolRefresh = DefaultPoolRefreshInterval
}
Expand Down Expand Up @@ -192,9 +214,21 @@ func (c *Caboose) Close() {

// Fetch allows fetching car archives by a path of the form `/ipfs/<cid>[/path/to/file]`
func (c *Caboose) Fetch(ctx context.Context, path string, cb DataCallback) error {
traceID := requestcontext.IDFromContext(ctx)
tid, err := trace.TraceIDFromHex(traceID)

ctx, span := spanTrace(ctx, "Fetch", trace.WithAttributes(attribute.String("path", path)))
defer span.End()

if err == nil {
sc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: tid,
SpanID: span.SpanContext().SpanID(),
Remote: true,
})
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
}

return c.pool.fetchResourceWith(ctx, path, cb, c.GetAffinity(ctx))
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/caboose/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"time"

"github.com/filecoin-saturn/caboose"
carv2 "github.com/ipfs/boxo/ipld/car/v2"
"github.com/ipfs/boxo/ipld/car/v2/blockstore"
"github.com/ipfs/go-cid"
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/storage/bsadapter"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
Expand Down
8 changes: 7 additions & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,10 @@ func (epr ErrPartialResponse) Error() string {
// ErrInvalidResponse can be returned from a DataCallback to indicate that the data provided for the
// requested resource was explicitly 'incorrect' - that blocks not in the requested dag, or non-car-conforming
// data was returned.
type ErrInvalidResponse error
type ErrInvalidResponse struct {
Message string
}

func (e ErrInvalidResponse) Error() string {
return e.Message
}
52 changes: 36 additions & 16 deletions fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
"hash/crc32"
"io"
"net/http"
"net/http/httptrace"
"os"
"strconv"
"strings"
"time"

"go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

Expand Down Expand Up @@ -52,7 +55,11 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
return ce
}

ctx, span := spanTrace(ctx, "Pool.FetchResource", trace.WithAttributes(attribute.String("from", from.URL), attribute.String("of", resource), attribute.String("mime", mime)))
p.ActiveNodes.lk.RLock()
isCore := p.ActiveNodes.IsCore(from)
p.ActiveNodes.lk.RUnlock()

ctx, span := spanTrace(ctx, "Pool.FetchResource", trace.WithAttributes(attribute.String("from", from.URL), attribute.String("of", resource), attribute.String("mime", mime), attribute.Bool("core", isCore)))
defer span.End()

requestId := uuid.NewString()
Expand All @@ -66,12 +73,20 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
proto := "unknown"
respReq := &http.Request{}
received := 0
reqUrl := fmt.Sprintf("https://%s%s", from.URL, resource)

reqUrl := ""
if strings.Contains(from.URL, "://") {
reqUrl = fmt.Sprintf("%s%s", from.URL, resource)
} else {
reqUrl = fmt.Sprintf("https://%s%s", from.URL, resource)
}

var respHeader http.Header
saturnNodeId := ""
saturnTransferId := ""
isCacheHit := false
networkError := ""
verificationError := ""

isBlockRequest := false
if mime == "application/vnd.ipld.raw" {
Expand Down Expand Up @@ -100,10 +115,6 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
if cacheHit == saturnCacheHit {
isCacheHit = true
}

for k, v := range respHeader {
received = received + len(k) + len(v)
}
}

durationSecs := time.Since(start).Seconds()
Expand Down Expand Up @@ -156,12 +167,13 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
HTTPProtocol: proto,
TTFBMS: int(ttfbMs),
// my address
Range: "",
Referrer: respReq.Referer(),
UserAgent: respReq.UserAgent(),
NodeId: saturnNodeId,
NodeIpAddress: from.URL,
IfNetworkError: networkError,
Range: "",
Referrer: respReq.Referer(),
UserAgent: respReq.UserAgent(),
NodeId: saturnNodeId,
NodeIpAddress: from.URL,
IfNetworkError: networkError,
VerificationError: verificationError,
}
}
}
Expand All @@ -178,7 +190,9 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m

reqCtx, cancel := context.WithTimeout(ctx, requestTimeout)
defer cancel()
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, reqUrl, nil)
clientTrace := otelhttptrace.NewClientTrace(reqCtx)
subReqCtx := httptrace.WithClientTrace(reqCtx, clientTrace)
req, err := http.NewRequestWithContext(subReqCtx, http.MethodGet, reqUrl, nil)
if err != nil {
if isCtxError(reqCtx) {
return reqCtx.Err()
Expand Down Expand Up @@ -282,8 +296,6 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
wrapped := TrackingReader{resp.Body, time.Time{}, 0}
err = cb(resource, &wrapped)
received = wrapped.len
// drain body so it can be re-used.
_, _ = io.Copy(io.Discard, resp.Body)

if err != nil {
if isCtxError(reqCtx) {
Expand All @@ -298,7 +310,15 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
saturnCallsFailureTotalMetric.WithLabelValues(resourceType, fmt.Sprintf("failed-response-read-%s", getCacheStatus(isCacheHit)), fmt.Sprintf("%d", code)).Add(1)
}

networkError = err.Error()
var target = ErrInvalidResponse{}
if errors.As(err, &target) {
verificationError = err.Error()
goLogger.Errorw("failed to read response; verification error", "err", err.Error())
} else {
networkError = err.Error()
goLogger.Errorw("failed to read response; no verification error", "err", err.Error())
}

return err
}

Expand Down
27 changes: 16 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ go 1.19

require (
github.com/google/uuid v1.3.0
github.com/ipfs/boxo v0.10.2
github.com/ipfs/boxo v0.11.0
github.com/ipfs/go-block-format v0.1.2
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipld/go-car v0.6.1
github.com/ipld/go-car/v2 v2.10.1
github.com/ipld/go-car v0.6.2
github.com/ipld/go-car/v2 v2.10.2-0.20230622090957-499d0c909d33
github.com/ipld/go-ipld-prime v0.20.0
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd
github.com/mitchellh/go-server-timing v1.0.1
Expand All @@ -19,10 +19,13 @@ require (
github.com/stretchr/testify v1.8.4
github.com/tcnksm/go-httpstat v0.2.0
github.com/urfave/cli/v2 v2.24.2
github.com/willscott/go-requestcontext v0.0.1
github.com/willscott/hashring v0.0.0-20230731155239-15f93a2dfb44
github.com/zyedidia/generic v1.2.2-0.20230625215236-3404399b19f1
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/trace v1.14.0
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.43.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.43.0
go.opentelemetry.io/otel v1.17.0
go.opentelemetry.io/otel/trace v1.17.0
)

require (
Expand All @@ -34,9 +37,9 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/felixge/httpsnoop v1.0.0 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/gabriel-vasile/mimetype v1.4.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/gddo v0.0.0-20180823221919-9d8ff1c67be5 // indirect
Expand All @@ -45,6 +48,7 @@ require (
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.1 // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-bitfield v1.1.0 // indirect
github.com/ipfs/go-blockservice v0.5.0 // indirect
Expand All @@ -57,7 +61,6 @@ require (
github.com/ipfs/go-ipld-cbor v0.0.6 // indirect
github.com/ipfs/go-ipld-format v0.5.0 // indirect
github.com/ipfs/go-ipld-legacy v0.2.1 // indirect
github.com/ipfs/go-ipns v0.3.0 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-merkledag v0.11.0 // indirect
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
Expand All @@ -71,7 +74,7 @@ require (
github.com/libp2p/go-doh-resolver v0.4.0 // indirect
github.com/libp2p/go-libp2p v0.26.3 // indirect
github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.21.1 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.23.0 // indirect
github.com/libp2p/go-libp2p-kbucket v0.5.0 // indirect
github.com/libp2p/go-libp2p-record v0.2.0 // indirect
github.com/libp2p/go-libp2p-routing-helpers v0.7.0 // indirect
Expand All @@ -86,7 +89,7 @@ require (
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multiaddr v0.8.0 // indirect
github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect
github.com/multiformats/go-multibase v0.1.1 // indirect
github.com/multiformats/go-multibase v0.2.0 // indirect
github.com/multiformats/go-multihash v0.2.3 // indirect
github.com/multiformats/go-multistream v0.4.1 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
Expand All @@ -107,6 +110,7 @@ require (
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel/metric v1.17.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
Expand All @@ -118,7 +122,8 @@ require (
golang.org/x/sys v0.6.0 // indirect
golang.org/x/tools v0.3.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
)
Loading

0 comments on commit ad399fb

Please sign in to comment.