Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
feat: IPFSBackend and GraphGateway metrics
Browse files Browse the repository at this point in the history
- IPFSBackend metrics from ipfs/boxo#245
- GraphGateway metrics from #61
- Version for tracking rollouts
  • Loading branch information
lidel committed Mar 31, 2023
1 parent fdc0d5a commit 69b5a49
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 20 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ go 1.19

require (
github.com/cskr/pubsub v1.0.2
github.com/filecoin-saturn/caboose v0.0.0-20230329185035-5b37545e2a41
github.com/filecoin-saturn/caboose v0.0.0-20230329203940-7c08345c244d
github.com/gogo/protobuf v1.3.2
github.com/hashicorp/golang-lru/v2 v2.0.1
github.com/ipfs/boxo v0.8.0-rc2.0.20230329082438-360b031ed895
github.com/ipfs/boxo v0.8.0-rc3.0.20230331021433-e49cc43d95ad
github.com/ipfs/go-block-format v0.1.2
github.com/ipfs/go-cid v0.4.0
github.com/ipfs/go-ipld-format v0.4.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/filecoin-saturn/caboose v0.0.0-20230329185035-5b37545e2a41 h1:CRERrZrpsrSwWYyBzlQJZUeh9gy8LaOukw7E6dMgd3E=
github.com/filecoin-saturn/caboose v0.0.0-20230329185035-5b37545e2a41/go.mod h1:u8END6RLUG9ZWudtVHAOOg7ptGiAgKjf/QnNOUOC160=
github.com/filecoin-saturn/caboose v0.0.0-20230329203940-7c08345c244d h1:ok/4Ffp2ETy6l5kBRUumvxzGz/Hxqq2iTfmGLslZbBc=
github.com/filecoin-saturn/caboose v0.0.0-20230329203940-7c08345c244d/go.mod h1:u8END6RLUG9ZWudtVHAOOg7ptGiAgKjf/QnNOUOC160=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ=
github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag=
Expand Down Expand Up @@ -186,8 +186,8 @@ github.com/inconshreveable/mousetrap v1.0.1 h1:U3uMjPSQEBMNp1lFxmllqCPM6P5u/Xq7P
github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/ipfs/bbloom v0.0.4 h1:Gi+8EGJ2y5qiD5FbsbpX/TMNcJw8gSqr7eyjHa4Fhvs=
github.com/ipfs/bbloom v0.0.4/go.mod h1:cS9YprKXpoZ9lT0n/Mw/a6/aFV6DTjTLYHeA+gyqMG0=
github.com/ipfs/boxo v0.8.0-rc2.0.20230329082438-360b031ed895 h1:bh+8xMBQSOnieUSg7qToTFhPpf4Oc8QkvPSRmQSrnpc=
github.com/ipfs/boxo v0.8.0-rc2.0.20230329082438-360b031ed895/go.mod h1:RIsi4CnTyQ7AUsNn5gXljJYZlQrHBMnJp94p73liFiA=
github.com/ipfs/boxo v0.8.0-rc3.0.20230331021433-e49cc43d95ad h1:olq3866OLC+aCW3hImmUtze0Nh8lbFfciTSeq09xfQU=
github.com/ipfs/boxo v0.8.0-rc3.0.20230331021433-e49cc43d95ad/go.mod h1:RIsi4CnTyQ7AUsNn5gXljJYZlQrHBMnJp94p73liFiA=
github.com/ipfs/go-bitfield v1.1.0 h1:fh7FIo8bSwaJEh6DdTWbCeZ1eqOaOkKFI74SCnsWbGA=
github.com/ipfs/go-bitfield v1.1.0/go.mod h1:paqf1wjq/D2BBmzfTVFlJQ9IlFOZpg422HL0HqsGWHU=
github.com/ipfs/go-block-format v0.0.2/go.mod h1:AWR46JfpcObNfg3ok2JHDUfdiHRgWhJgCQF+KIgOPJY=
Expand Down
2 changes: 1 addition & 1 deletion lib/blockstore_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (

const DefaultCacheBlockStoreSize = 1024

var cacheLog = golog.Logger("bifrost-gateway:cache-blockstore")
var cacheLog = golog.Logger("cache-blockstore")

func NewCacheBlockStore(size int) (blockstore.Blockstore, error) {
c, err := lru.New2Q[string, []byte](size)
Expand Down
141 changes: 128 additions & 13 deletions lib/graph_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"io"
"net/http"
"runtime"
"strconv"
"strings"
"sync"

"github.com/filecoin-saturn/caboose"
Expand All @@ -30,10 +32,11 @@ import (
"github.com/libp2p/go-libp2p/core/routing"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/multierr"
)

var graphLog = golog.Logger("bifrost-gateway:graph-backend")
var graphLog = golog.Logger("graph-gateway")

// type DataCallback = func(resource string, reader io.Reader) error
// TODO: Don't use a caboose type, perhaps ask them to use a type alias instead of a type
Expand Down Expand Up @@ -89,6 +92,17 @@ type GraphGateway struct {

lk sync.RWMutex
notifiers map[Notifier]struct{}
metrics *GraphGatewayMetrics
}

type GraphGatewayMetrics struct {
carFetchAttemptMetric prometheus.Counter
carBlocksFetchedMetric prometheus.Counter
blockRecoveryAttemptMetric prometheus.Counter
carParamsMetric *prometheus.CounterVec

// TODO: what to measure here?
//fetchParamsMetric *prometheus.CounterVec
}

func NewGraphGatewayBackend(f CarFetcher, blockFetcher exchange.Fetcher, opts ...GraphGatewayOption) (*GraphGateway, error) {
Expand Down Expand Up @@ -138,9 +152,66 @@ func NewGraphGatewayBackend(f CarFetcher, blockFetcher exchange.Fetcher, opts ..
namesys: ns,
bstore: bs,
notifiers: make(map[Notifier]struct{}),
metrics: registerGraphGatewayMetrics(),
}, nil
}

func registerGraphGatewayMetrics() *GraphGatewayMetrics {

// How many CAR Fetch attempts we had? Need this to calculate % of various graph request types.
// We only count attempts here, because success/failure with/without retries are provided by caboose:
// - ipfs_caboose_fetch_duration_car_success_count
// - ipfs_caboose_fetch_duration_car_failure_count
// - ipfs_caboose_fetch_duration_car_peer_success_count
// - ipfs_caboose_fetch_duration_car_peer_failure_count
carFetchAttemptMetric := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "ipfs",
Subsystem: "gw_graph_backend",
Name: "car_fetch_attempts",
Help: "The number of times a CAR fetch was attempted by IPFSBackend.",
})
prometheus.MustRegister(carFetchAttemptMetric)

// How many blocks were read via CARs?
// Need this as a baseline to reason about error ratio vs raw_block_recovery_attempts.
carBlocksFetchedMetric := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "ipfs",
Subsystem: "gw_graph_backend",
Name: "car_blocks_fetched",
Help: "The number of blocks successfully read via CAR fetch.",
})
prometheus.MustRegister(carBlocksFetchedMetric)

// How many times CAR response was not enough or just failed, and we had to read a block via ?format=raw
// We only count attempts here, because success/failure with/without retries are provided by caboose:
// - ipfs_caboose_fetch_duration_block_success_count
// - ipfs_caboose_fetch_duration_block_failure_count
// - ipfs_caboose_fetch_duration_block_peer_success_count
// - ipfs_caboose_fetch_duration_block_peer_failure_count
blockRecoveryAttemptMetric := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "ipfs",
Subsystem: "gw_graph_backend",
Name: "raw_block_recovery_attempts",
Help: "The number of ?format=raw block fetch attempts due to GraphGateway failure (CAR fetch error, missing block in CAR response, or a block evicted from cache too soon).",
})
prometheus.MustRegister(blockRecoveryAttemptMetric)

carParamsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "ipfs",
Subsystem: "gw_graph_backend",
Name: "car_fetch_params",
Help: "How many times specific CAR parameter was used during CAR data fetch.",
}, []string{"depth", "bytes"})
prometheus.MustRegister(carParamsMetric)

return &GraphGatewayMetrics{
carFetchAttemptMetric,
carBlocksFetchedMetric,
blockRecoveryAttemptMetric,
carParamsMetric,
}
}

/*
Implementation iteration plan:
Expand All @@ -159,18 +230,21 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con
startingExchange: carFetchingExch,
followupExchange: &blockFetcherExchWrapper{api.blockFetcher},
handoffCh: doneWithFetcher,
metrics: api.metrics,
}

api.lk.Lock()
api.notifiers[exch] = struct{}{}
api.lk.Unlock()

go func() {
go func(metrics *GraphGatewayMetrics) {
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered fetcher error", r)
// TODO: move to Debugw?
graphLog.Errorw("Recovered fetcher error", "error", r)
}
}()
metrics.carFetchAttemptMetric.Inc()
err := api.fetcher.Fetch(ctx, path, func(resource string, reader io.Reader) error {
cr, err := car.NewCarReader(reader)
if err != nil {
Expand All @@ -187,18 +261,21 @@ func (api *GraphGateway) loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx con
if err := bstore.Put(ctx, blk); err != nil {
return err
}
metrics.carBlocksFetchedMetric.Inc()
api.notifyAllOngoingRequests(ctx, blk)
}
})
if err != nil {
graphLog.Error(err)
// TODO: move to debug?
graphLog.Errorw("car Fetch failed", "error", err)
}
if err := carFetchingExch.Close(); err != nil {
graphLog.Error(err)
// TODO: move to debug?
graphLog.Errorw("carFetchingExch.Close()", "error", err)
}
doneWithFetcher <- struct{}{}
close(doneWithFetcher)
}()
}(api.metrics)

bserv := blockservice.New(bstore, exch)
blkgw, err := gateway.NewBlocksGateway(bserv)
Expand All @@ -218,7 +295,7 @@ func (api *GraphGateway) notifyAllOngoingRequests(ctx context.Context, blks ...b
for n := range api.notifiers {
err := n.NotifyNewBlocks(ctx, blks...)
if err != nil {
graphLog.Error(fmt.Errorf("notifyAllOngoingRequests failed: %w", err))
graphLog.Errorw("notifyAllOngoingRequests failed", "error", err)
}
}
api.lk.RUnlock()
Expand Down Expand Up @@ -264,6 +341,7 @@ func wrapNodeWithClose[T files.Node](node T, closeFn func()) (T, error) {
}

func (api *GraphGateway) Get(ctx context.Context, path gateway.ImmutablePath) (gateway.ContentPathMetadata, *gateway.GetResponse, error) {
api.metrics.carParamsMetric.With(prometheus.Labels{"depth": "1", "bytes": ""}).Inc()
blkgw, closeFn, err := api.loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx, path.String()+"?format=car&depth=1")
if err != nil {
return gateway.ContentPathMetadata{}, nil, err
Expand All @@ -277,9 +355,34 @@ func (api *GraphGateway) Get(ctx context.Context, path gateway.ImmutablePath) (g
return md, gr, err
}

func (api *GraphGateway) GetRange(ctx context.Context, path gateway.ImmutablePath, getRange ...gateway.GetRange) (gateway.ContentPathMetadata, files.File, error) {
// TODO: actually implement ranges
blkgw, closeFn, err := api.loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx, path.String()+"?format=car&depth=1")
func (api *GraphGateway) GetRange(ctx context.Context, path gateway.ImmutablePath, httpRanges ...gateway.GetRange) (gateway.ContentPathMetadata, files.File, error) {
rangeCount := len(httpRanges)
api.metrics.carParamsMetric.With(prometheus.Labels{
"depth": "1",
"bytes": fmt.Sprintf("(request ranges: %d)", rangeCount), // we dont pass specific ranges, we want deterministic number of CounterVec labels
}).Inc()

carParams := "?format=car&depth=1"

// TODO: refactor this if we ever merge Get and GetRange
if rangeCount > 0 {

bytesBuilder := strings.Builder{}
bytesBuilder.WriteString("&bytes=")
for i, r := range httpRanges {
bytesBuilder.WriteString(strconv.FormatUint(r.From, 10))
bytesBuilder.WriteString("-")
if r.To != nil {
bytesBuilder.WriteString(strconv.FormatInt(*r.To, 10))
}
if i != rangeCount-1 {
bytesBuilder.WriteString(",")
}
}
carParams += bytesBuilder.String()
}

blkgw, closeFn, err := api.loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx, path.String()+carParams)
if err != nil {
return gateway.ContentPathMetadata{}, nil, err
}
Expand All @@ -295,6 +398,7 @@ func (api *GraphGateway) GetRange(ctx context.Context, path gateway.ImmutablePat
}

func (api *GraphGateway) GetAll(ctx context.Context, path gateway.ImmutablePath) (gateway.ContentPathMetadata, files.Node, error) {
api.metrics.carParamsMetric.With(prometheus.Labels{"depth": "all", "bytes": ""}).Inc()
blkgw, closeFn, err := api.loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx, path.String()+"?format=car&depth=all")
if err != nil {
return gateway.ContentPathMetadata{}, nil, err
Expand All @@ -311,6 +415,8 @@ func (api *GraphGateway) GetAll(ctx context.Context, path gateway.ImmutablePath)
}

func (api *GraphGateway) GetBlock(ctx context.Context, path gateway.ImmutablePath) (gateway.ContentPathMetadata, files.File, error) {
api.metrics.carParamsMetric.With(prometheus.Labels{"depth": "0", "bytes": ""}).Inc()
// TODO: if path is `/ipfs/cid`, we should use ?format=raw
blkgw, closeFn, err := api.loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx, path.String()+"?format=car&depth=0")
if err != nil {
return gateway.ContentPathMetadata{}, nil, err
Expand All @@ -327,6 +433,10 @@ func (api *GraphGateway) GetBlock(ctx context.Context, path gateway.ImmutablePat
}

func (api *GraphGateway) Head(ctx context.Context, path gateway.ImmutablePath) (gateway.ContentPathMetadata, files.Node, error) {
api.metrics.carParamsMetric.With(prometheus.Labels{
"depth": "",
"bytes": "0:1023 (Head)",
}).Inc()
blkgw, closeFn, err := api.loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx, path.String()+"?format=car&bytes=0:1023")
if err != nil {
return gateway.ContentPathMetadata{}, nil, err
Expand All @@ -343,6 +453,7 @@ func (api *GraphGateway) Head(ctx context.Context, path gateway.ImmutablePath) (
}

func (api *GraphGateway) ResolvePath(ctx context.Context, path gateway.ImmutablePath) (gateway.ContentPathMetadata, error) {
api.metrics.carParamsMetric.With(prometheus.Labels{"depth": "0", "bytes": ""}).Inc()
blkgw, closeFn, err := api.loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx, path.String()+"?format=car&depth=0")
if err != nil {
return gateway.ContentPathMetadata{}, err
Expand All @@ -352,6 +463,7 @@ func (api *GraphGateway) ResolvePath(ctx context.Context, path gateway.Immutable
}

func (api *GraphGateway) GetCAR(ctx context.Context, path gateway.ImmutablePath) (gateway.ContentPathMetadata, io.ReadCloser, <-chan error, error) {
api.metrics.carParamsMetric.With(prometheus.Labels{"depth": "", "bytes": ""}).Inc()
blkgw, closeFn, err := api.loadRequestIntoSharedBlockstoreAndBlocksGateway(ctx, path.String()+"?format=car")
if err != nil {
return gateway.ContentPathMetadata{}, nil, nil, err
Expand Down Expand Up @@ -479,6 +591,7 @@ var _ exchange.Interface = (*inboundBlockExchange)(nil)
type handoffExchange struct {
startingExchange, followupExchange exchange.Interface
handoffCh <-chan struct{}
metrics *GraphGatewayMetrics
}

func (f *handoffExchange) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
Expand All @@ -493,7 +606,8 @@ func (f *handoffExchange) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block

select {
case <-f.handoffCh:
graphLog.Infof("needed to use use a backup fetcher for cid %s", c)
graphLog.Debugw("switching to backup block fetcher", "cid", c)
f.metrics.blockRecoveryAttemptMetric.Inc()
return f.followupExchange.GetBlock(ctx, c)
case <-ctx.Done():
return nil, ctx.Err()
Expand Down Expand Up @@ -533,10 +647,11 @@ func (f *handoffExchange) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan
newCidArr = append(newCidArr, c)
}
}
graphLog.Infof("needed to use use a backup fetcher for cids %v", newCidArr)
graphLog.Debugw("needed to use use a backup fetcher for cids", "cids", newCidArr)
f.metrics.blockRecoveryAttemptMetric.Add(float64(len(newCidArr)))
fch, err := f.followupExchange.GetBlocks(ctx, newCidArr)
if err != nil {
graphLog.Error(fmt.Errorf("error getting blocks from followup exchange %w", err))
graphLog.Errorw("error getting blocks from followupExchange", "error", err)
return
}
for cs.Len() < len(cids) {
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ See documentation at: https://github.com/ipfs/bifrost-gateway/#readme`,
}

log.Printf("Starting %s %s", name, version)
registerVersionMetric(version)

cdns := newCachedDNS(dnsCacheRefreshInterval)
defer cdns.Close()
Expand Down
14 changes: 14 additions & 0 deletions version.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package main
import (
"runtime/debug"
"time"

"github.com/prometheus/client_golang/prometheus"
)

var name = "bifrost-gateway"
Expand Down Expand Up @@ -37,3 +39,15 @@ func buildVersion() string {
}
return "dev-build"
}

func registerVersionMetric(version string) {
m := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "ipfs",
Subsystem: "bifrost_gateway",
Name: "info",
Help: "Information about bifrost-gateway instance.",
ConstLabels: prometheus.Labels{"version": version},
})
prometheus.MustRegister(m)
m.Set(1)
}

0 comments on commit 69b5a49

Please sign in to comment.