Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Port over caboose main #155

Merged
merged 20 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 38 additions & 4 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@ package caboose

import (
"context"
"encoding/json"
"io"
"net/http"
"net/url"
"os"
"strings"
"time"

requestcontext "github.com/willscott/go-requestcontext"

ipfsblockstore "github.com/ipfs/boxo/blockstore"
ipath "github.com/ipfs/boxo/coreiface/path"
gateway "github.com/ipfs/boxo/gateway"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"

"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

Expand All @@ -30,7 +34,7 @@ type Config struct {
// OrchestratorClient is the HTTP client to use when communicating with the orchestrator.
OrchestratorClient *http.Client
// OrchestratorOverride replaces calls to the orchestrator with a fixed response.
OrchestratorOverride []string
OrchestratorOverride []state.NodeInfo

// LoggingEndpoint is the URL of the logging endpoint where we submit logs pertaining to retrieval requests.
LoggingEndpoint url.URL
Expand Down Expand Up @@ -77,6 +81,9 @@ type Config struct {

// Harness is an internal test harness that is set during testing.
Harness *state.State

// ComplianceCidPeriod controls how many requests caboose makes on average before requesting a compliance cid
ComplianceCidPeriod int64
}

const DefaultLoggingInterval = 5 * time.Second
Expand All @@ -91,10 +98,12 @@ const defaultMaxRetries = 3
// default percentage of requests to mirror for tracking how nodes perform unless overridden by MirrorFraction
const defaultMirrorFraction = 0.01

const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=200"
const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes?maxNodes=200"
const DefaultPoolRefreshInterval = 5 * time.Minute
const DefaultPoolTargetSize = 30

const DefaultComplianceCidPeriod = int64(100)

// we cool off sending requests for a cid for a certain duration
// if we've seen a certain number of failures for it already in a given duration.
// NOTE: before getting creative here, make sure you dont break end user flow
Expand Down Expand Up @@ -133,7 +142,13 @@ func NewCaboose(config *Config) (*Caboose, error) {
config.MirrorFraction = defaultMirrorFraction
}
if override := os.Getenv(BackendOverrideKey); len(override) > 0 {
config.OrchestratorOverride = strings.Split(override, ",")
var overrideNodes []state.NodeInfo
err := json.Unmarshal([]byte(override), &overrideNodes)
if err != nil {
goLogger.Warnf("Error parsing BackendOverrideKey:", "err", err)
return nil, err
}
config.OrchestratorOverride = overrideNodes
}
if config.PoolTargetSize == 0 {
config.PoolTargetSize = DefaultPoolTargetSize
Expand All @@ -151,6 +166,9 @@ func NewCaboose(config *Config) (*Caboose, error) {
Timeout: DefaultCarRequestTimeout,
}
}

c.config.Client.Transport = otelhttp.NewTransport(c.config.Client.Transport)

if c.config.OrchestratorEndpoint == nil {
var err error
c.config.OrchestratorEndpoint, err = url.Parse(DefaultOrchestratorEndpoint)
Expand All @@ -159,6 +177,10 @@ func NewCaboose(config *Config) (*Caboose, error) {
}
}

if c.config.ComplianceCidPeriod == 0 {
c.config.ComplianceCidPeriod = DefaultComplianceCidPeriod
}

if c.config.PoolRefresh == 0 {
c.config.PoolRefresh = DefaultPoolRefreshInterval
}
Expand Down Expand Up @@ -192,9 +214,21 @@ func (c *Caboose) Close() {

// Fetch allows fetching car archives by a path of the form `/ipfs/<cid>[/path/to/file]`
func (c *Caboose) Fetch(ctx context.Context, path string, cb DataCallback) error {
traceID := requestcontext.IDFromContext(ctx)
tid, err := trace.TraceIDFromHex(traceID)

ctx, span := spanTrace(ctx, "Fetch", trace.WithAttributes(attribute.String("path", path)))
defer span.End()

if err == nil {
sc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: tid,
SpanID: span.SpanContext().SpanID(),
Remote: true,
})
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
}

return c.pool.fetchResourceWith(ctx, path, cb, c.GetAffinity(ctx))
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/caboose/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"time"

"github.com/filecoin-saturn/caboose"
carv2 "github.com/ipfs/boxo/ipld/car/v2"
"github.com/ipfs/boxo/ipld/car/v2/blockstore"
"github.com/ipfs/go-cid"
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/storage/bsadapter"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
Expand Down
8 changes: 7 additions & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,10 @@ func (epr ErrPartialResponse) Error() string {
// ErrInvalidResponse can be returned from a DataCallback to indicate that the data provided for the
// requested resource was explicitly 'incorrect' - that blocks not in the requested dag, or non-car-conforming
// data was returned.
type ErrInvalidResponse error
type ErrInvalidResponse struct {
Message string
}

func (e ErrInvalidResponse) Error() string {
return e.Message
}
52 changes: 36 additions & 16 deletions fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
"hash/crc32"
"io"
"net/http"
"net/http/httptrace"
"os"
"strconv"
"strings"
"time"

"go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

Expand Down Expand Up @@ -52,7 +55,11 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
return ce
}

ctx, span := spanTrace(ctx, "Pool.FetchResource", trace.WithAttributes(attribute.String("from", from.URL), attribute.String("of", resource), attribute.String("mime", mime)))
p.ActiveNodes.lk.RLock()
isCore := p.ActiveNodes.IsCore(from)
p.ActiveNodes.lk.RUnlock()

ctx, span := spanTrace(ctx, "Pool.FetchResource", trace.WithAttributes(attribute.String("from", from.URL), attribute.String("of", resource), attribute.String("mime", mime), attribute.Bool("core", isCore)))
defer span.End()

requestId := uuid.NewString()
Expand All @@ -66,12 +73,20 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
proto := "unknown"
respReq := &http.Request{}
received := 0
reqUrl := fmt.Sprintf("https://%s%s", from.URL, resource)

reqUrl := ""
if strings.Contains(from.URL, "://") {
reqUrl = fmt.Sprintf("%s%s", from.URL, resource)
} else {
reqUrl = fmt.Sprintf("https://%s%s", from.URL, resource)
}

var respHeader http.Header
saturnNodeId := ""
saturnTransferId := ""
isCacheHit := false
networkError := ""
verificationError := ""

isBlockRequest := false
if mime == "application/vnd.ipld.raw" {
Expand Down Expand Up @@ -100,10 +115,6 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
if cacheHit == saturnCacheHit {
isCacheHit = true
}

for k, v := range respHeader {
received = received + len(k) + len(v)
}
}

durationSecs := time.Since(start).Seconds()
Expand Down Expand Up @@ -156,12 +167,13 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
HTTPProtocol: proto,
TTFBMS: int(ttfbMs),
// my address
Range: "",
Referrer: respReq.Referer(),
UserAgent: respReq.UserAgent(),
NodeId: saturnNodeId,
NodeIpAddress: from.URL,
IfNetworkError: networkError,
Range: "",
Referrer: respReq.Referer(),
UserAgent: respReq.UserAgent(),
NodeId: saturnNodeId,
NodeIpAddress: from.URL,
IfNetworkError: networkError,
VerificationError: verificationError,
}
}
}
Expand All @@ -178,7 +190,9 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m

reqCtx, cancel := context.WithTimeout(ctx, requestTimeout)
defer cancel()
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, reqUrl, nil)
clientTrace := otelhttptrace.NewClientTrace(reqCtx)
subReqCtx := httptrace.WithClientTrace(reqCtx, clientTrace)
req, err := http.NewRequestWithContext(subReqCtx, http.MethodGet, reqUrl, nil)
if err != nil {
if isCtxError(reqCtx) {
return reqCtx.Err()
Expand Down Expand Up @@ -282,8 +296,6 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
wrapped := TrackingReader{resp.Body, time.Time{}, 0}
err = cb(resource, &wrapped)
received = wrapped.len
// drain body so it can be re-used.
_, _ = io.Copy(io.Discard, resp.Body)

if err != nil {
if isCtxError(reqCtx) {
Expand All @@ -298,7 +310,15 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m
saturnCallsFailureTotalMetric.WithLabelValues(resourceType, fmt.Sprintf("failed-response-read-%s", getCacheStatus(isCacheHit)), fmt.Sprintf("%d", code)).Add(1)
}

networkError = err.Error()
var target = ErrInvalidResponse{}
if errors.As(err, &target) {
verificationError = err.Error()
goLogger.Errorw("failed to read response; verification error", "err", err.Error())
} else {
networkError = err.Error()
goLogger.Errorw("failed to read response; no verification error", "err", err.Error())
}

return err
}

Expand Down
27 changes: 16 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ go 1.19

require (
github.com/google/uuid v1.3.0
github.com/ipfs/boxo v0.10.2
github.com/ipfs/boxo v0.11.0
github.com/ipfs/go-block-format v0.1.2
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipld/go-car v0.6.1
github.com/ipld/go-car/v2 v2.10.1
github.com/ipld/go-car v0.6.2
github.com/ipld/go-car/v2 v2.10.2-0.20230622090957-499d0c909d33
github.com/ipld/go-ipld-prime v0.20.0
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd
github.com/mitchellh/go-server-timing v1.0.1
Expand All @@ -19,10 +19,13 @@ require (
github.com/stretchr/testify v1.8.4
github.com/tcnksm/go-httpstat v0.2.0
github.com/urfave/cli/v2 v2.24.2
github.com/willscott/go-requestcontext v0.0.1
github.com/willscott/hashring v0.0.0-20230731155239-15f93a2dfb44
github.com/zyedidia/generic v1.2.2-0.20230625215236-3404399b19f1
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/trace v1.14.0
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.43.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.43.0
go.opentelemetry.io/otel v1.17.0
go.opentelemetry.io/otel/trace v1.17.0
)

require (
Expand All @@ -34,9 +37,9 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/felixge/httpsnoop v1.0.0 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/gabriel-vasile/mimetype v1.4.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/gddo v0.0.0-20180823221919-9d8ff1c67be5 // indirect
Expand All @@ -45,6 +48,7 @@ require (
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.1 // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-bitfield v1.1.0 // indirect
github.com/ipfs/go-blockservice v0.5.0 // indirect
Expand All @@ -57,7 +61,6 @@ require (
github.com/ipfs/go-ipld-cbor v0.0.6 // indirect
github.com/ipfs/go-ipld-format v0.5.0 // indirect
github.com/ipfs/go-ipld-legacy v0.2.1 // indirect
github.com/ipfs/go-ipns v0.3.0 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-merkledag v0.11.0 // indirect
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
Expand All @@ -71,7 +74,7 @@ require (
github.com/libp2p/go-doh-resolver v0.4.0 // indirect
github.com/libp2p/go-libp2p v0.26.3 // indirect
github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.21.1 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.23.0 // indirect
github.com/libp2p/go-libp2p-kbucket v0.5.0 // indirect
github.com/libp2p/go-libp2p-record v0.2.0 // indirect
github.com/libp2p/go-libp2p-routing-helpers v0.7.0 // indirect
Expand All @@ -86,7 +89,7 @@ require (
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multiaddr v0.8.0 // indirect
github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect
github.com/multiformats/go-multibase v0.1.1 // indirect
github.com/multiformats/go-multibase v0.2.0 // indirect
github.com/multiformats/go-multihash v0.2.3 // indirect
github.com/multiformats/go-multistream v0.4.1 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
Expand All @@ -107,6 +110,7 @@ require (
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel/metric v1.17.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
Expand All @@ -118,7 +122,8 @@ require (
golang.org/x/sys v0.6.0 // indirect
golang.org/x/tools v0.3.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
)
Loading
Loading