Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bitswap: peer prom tacker #413

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
*client.Client
*server.Server

tracer tracer.Tracer
net network.BitSwapNetwork
tracers []tracer.Tracer
net network.BitSwapNetwork
}

func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Bitswap {
Expand All @@ -77,8 +77,8 @@
}
}

if bs.tracer != nil {
var tracer tracer.Tracer = nopReceiveTracer{bs.tracer}
for _, t := range bs.tracers {
var tracer tracer.Tracer = nopReceiveTracer{t}

Check warning on line 81 in bitswap/bitswap.go

View check run for this annotation

Codecov / codecov/patch

bitswap/bitswap.go#L81

Added line #L81 was not covered by tests
clientOptions = append(clientOptions, client.WithTracer(tracer))
serverOptions = append(serverOptions, server.WithTracer(tracer))
}
Expand Down Expand Up @@ -172,8 +172,8 @@
}

func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming message.BitSwapMessage) {
if bs.tracer != nil {
bs.tracer.MessageReceived(p, incoming)
for _, t := range bs.tracers {
t.MessageReceived(p, incoming)

Check warning on line 176 in bitswap/bitswap.go

View check run for this annotation

Codecov / codecov/patch

bitswap/bitswap.go#L176

Added line #L176 was not covered by tests
}

bs.Client.ReceiveMessage(ctx, p, incoming)
Expand Down
11 changes: 6 additions & 5 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,13 @@
}
}

// Configures the Client to use given tracer.
// WithTracer configures the Client to use given tracer.
// This provides methods to access all messages sent and received by the Client.
// This interface can be used to implement various statistics (this is original intent).
// This can be passed multiple times to register multiple tracers.
func WithTracer(tap tracer.Tracer) Option {
return func(bs *Client) {
bs.tracer = tap
bs.tracers = append(bs.tracers, tap)

Check warning on line 75 in bitswap/client/client.go

View check run for this annotation

Codecov / codecov/patch

bitswap/client/client.go#L75

Added line #L75 was not covered by tests
}
}

Expand Down Expand Up @@ -208,7 +209,7 @@
allMetric metrics.Histogram

// External statistics interface
tracer tracer.Tracer
tracers []tracer.Tracer

// the SessionManager routes requests to interested sessions
sm *bssm.SessionManager
Expand Down Expand Up @@ -342,8 +343,8 @@
bs.counters.messagesRecvd++
bs.counterLk.Unlock()

if bs.tracer != nil {
bs.tracer.MessageReceived(p, incoming)
for _, t := range bs.tracers {
t.MessageReceived(p, incoming)

Check warning on line 347 in bitswap/client/client.go

View check run for this annotation

Codecov / codecov/patch

bitswap/client/client.go#L347

Added line #L347 was not covered by tests
}

iblocks := incoming.Blocks()
Expand Down
3 changes: 2 additions & 1 deletion bitswap/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,12 @@
return Option{client.SetSimulateDontHavesOnTimeout(send)}
}

// WithTracer can be passed multiple times to register multiple tracers.
func WithTracer(tap tracer.Tracer) Option {
// Only trace the server, both receive the same messages anyway
return Option{
option(func(bs *Bitswap) {
bs.tracer = tap
bs.tracers = append(bs.tracers, tap)

Check warning on line 87 in bitswap/options.go

View check run for this annotation

Codecov / codecov/patch

bitswap/options.go#L87

Added line #L87 was not covered by tests
}),
}
}
146 changes: 146 additions & 0 deletions bitswap/peer-prom-tracker/peer-prom-tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package peerpromtracker

import (
"errors"
"fmt"

bsmsg "github.com/ipfs/boxo/bitswap/message"
"github.com/ipfs/boxo/bitswap/tracer"
"github.com/ipfs/go-log/v2"
peer "github.com/libp2p/go-libp2p/core/peer"
prom "github.com/prometheus/client_golang/prometheus"
)

var logger = log.Logger("bitswap/peer-prom-tracker")

var _ tracer.Tracer = (*PeerTracer)(nil)

type PeerTracer struct {
bytesSent *prom.CounterVec
messagesSent *prom.CounterVec
bytesReceived *prom.CounterVec
messagesReceived *prom.CounterVec

reg *prom.Registry
}

func NewPeerTracer(reg *prom.Registry) (*PeerTracer, error) {
pt := &PeerTracer{
reg: reg,
}

var good bool
defer func() {
if !good {
pt.Close() // will unregister the sucessfully registered metrics
}

Check warning on line 36 in bitswap/peer-prom-tracker/peer-prom-tracker.go

View check run for this annotation

Codecov / codecov/patch

bitswap/peer-prom-tracker/peer-prom-tracker.go#L27-L36

Added lines #L27 - L36 were not covered by tests
}()

peerIdLabel := []string{"peer-id"}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ This will create a separate time series per PeerID, which is ok for debugging, but should NOT be enabled by default.

High cardinality labels in prometheus are is considered antipattern. If there was 20K of peers, this will create 20K time series, and that may cause problems (performance, billing) when Grafana tries to visualize it.

To understand why high cardinality is a problem, see:

IMO this PR can't land in boxo in this form as it creates footgun for users of this library.

There needs to be either a hard-limit on the number of peers tracked, or an explicit opt-in via constructor option or ENV variable.

Copy link
Member

@lidel lidel Jul 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💭 I think if we wanted to have metrics similar to this, we could measure P95 globally without running into the cardinality problem.

To do so, one would define Objectives in SummaryOpts to be P50, P75, P95 etc, and calculate messages-received, messages-sent, bytes-received etc across all peers, not specific per peer. This way we get useful P95 metric with known error margin, without exploding the time series.


bytesSent := prom.NewCounterVec(prom.CounterOpts{
Namespace: "ipfs",
Subsystem: "bitswap/messages",
Name: "bytes-sent",
Help: "This records the number of bitswap messages bytes sent per peer.",
}, peerIdLabel)
if err := reg.Register(bytesSent); err != nil {
return nil, fmt.Errorf("registering bytes-sent: %w", err)
}
pt.bytesSent = bytesSent

messagesSent := prom.NewCounterVec(prom.CounterOpts{
Namespace: "ipfs",
Subsystem: "bitswap/messages",
Name: "messages-sent",
Help: "This records the number of bitswap messages sent per peer.",
}, peerIdLabel)
if err := reg.Register(messagesSent); err != nil {
return nil, fmt.Errorf("registering messages-sent: %w", err)
}
pt.messagesSent = messagesSent

bytesReceived := prom.NewCounterVec(prom.CounterOpts{
Namespace: "ipfs",
Subsystem: "bitswap/messages",
Name: "bytes-received",
Help: "This records the number of bitswap messages bytes received from each peer.",
}, peerIdLabel)
if err := reg.Register(bytesReceived); err != nil {
return nil, fmt.Errorf("registering bytes-received: %w", err)
}
pt.bytesReceived = bytesReceived

messagesReceived := prom.NewCounterVec(prom.CounterOpts{
Namespace: "ipfs",
Subsystem: "bitswap/messages",
Name: "messages-received",
Help: "This records the number of bitswap messages received from each peer.",
}, peerIdLabel)
if err := reg.Register(messagesReceived); err != nil {
return nil, fmt.Errorf("registering messages-received: %w", err)
}
pt.messagesReceived = messagesReceived

good = true
return pt, nil

Check warning on line 86 in bitswap/peer-prom-tracker/peer-prom-tracker.go

View check run for this annotation

Codecov / codecov/patch

bitswap/peer-prom-tracker/peer-prom-tracker.go#L39-L86

Added lines #L39 - L86 were not covered by tests
}

func (t *PeerTracer) MessageReceived(p peer.ID, msg bsmsg.BitSwapMessage) {
strPeerid := p.Pretty()

counter, err := t.messagesReceived.GetMetricWithLabelValues(strPeerid)
if err == nil {
logger.Debugf("failed to grab messages received label %s", err)
} else {
counter.Inc()
}

Check warning on line 97 in bitswap/peer-prom-tracker/peer-prom-tracker.go

View check run for this annotation

Codecov / codecov/patch

bitswap/peer-prom-tracker/peer-prom-tracker.go#L89-L97

Added lines #L89 - L97 were not covered by tests

counter, err = t.bytesReceived.GetMetricWithLabelValues(strPeerid)
if err == nil {
logger.Debugf("failed to grab messages received label %s", err)
} else {
counter.Add(float64(msg.Size()))
}

Check warning on line 104 in bitswap/peer-prom-tracker/peer-prom-tracker.go

View check run for this annotation

Codecov / codecov/patch

bitswap/peer-prom-tracker/peer-prom-tracker.go#L99-L104

Added lines #L99 - L104 were not covered by tests
}

func (t *PeerTracer) MessageSent(p peer.ID, msg bsmsg.BitSwapMessage) {
strPeerid := p.Pretty()

counter, err := t.messagesSent.GetMetricWithLabelValues(strPeerid)
if err == nil {
logger.Debugf("failed to grab messages received label %s", err)
} else {
counter.Inc()
}

Check warning on line 115 in bitswap/peer-prom-tracker/peer-prom-tracker.go

View check run for this annotation

Codecov / codecov/patch

bitswap/peer-prom-tracker/peer-prom-tracker.go#L107-L115

Added lines #L107 - L115 were not covered by tests

counter, err = t.bytesSent.GetMetricWithLabelValues(strPeerid)
if err == nil {
logger.Debugf("failed to grab messages received label %s", err)
} else {
counter.Add(float64(msg.Size()))
}

Check warning on line 122 in bitswap/peer-prom-tracker/peer-prom-tracker.go

View check run for this annotation

Codecov / codecov/patch

bitswap/peer-prom-tracker/peer-prom-tracker.go#L117-L122

Added lines #L117 - L122 were not covered by tests
}

func (t *PeerTracer) Close() error {
if t.reg == nil {
return errors.New("already closed")
}

Check warning on line 128 in bitswap/peer-prom-tracker/peer-prom-tracker.go

View check run for this annotation

Codecov / codecov/patch

bitswap/peer-prom-tracker/peer-prom-tracker.go#L125-L128

Added lines #L125 - L128 were not covered by tests

// we have to check because this can be called by [NewPeerTracer] if an errors occurs.
if t.bytesSent != nil {
t.reg.Unregister(t.bytesSent)
}
if t.messagesSent != nil {
t.reg.Unregister(t.messagesSent)
}
if t.bytesReceived != nil {
t.reg.Unregister(t.bytesReceived)
}
if t.messagesReceived != nil {
t.reg.Unregister(t.messagesReceived)
}
*t = PeerTracer{}

return nil

Check warning on line 145 in bitswap/peer-prom-tracker/peer-prom-tracker.go

View check run for this annotation

Codecov / codecov/patch

bitswap/peer-prom-tracker/peer-prom-tracker.go#L131-L145

Added lines #L131 - L145 were not covered by tests
}
13 changes: 7 additions & 6 deletions bitswap/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
network bsnet.BitSwapNetwork

// External statistics interface
tracer tracer.Tracer
tracers []tracer.Tracer

// Counters for various statistics
counterLk sync.Mutex
Expand Down Expand Up @@ -123,9 +123,10 @@
}
}

// WithTracer can be passed multiple times to register multiple tracers.
func WithTracer(tap tracer.Tracer) Option {
return func(bs *Server) {
bs.tracer = tap
bs.tracers = append(bs.tracers, tap)

Check warning on line 129 in bitswap/server/server.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/server.go#L129

Added line #L129 was not covered by tests
}
}

Expand Down Expand Up @@ -294,8 +295,8 @@
// Ideally, yes. But we'd need some way to trigger a retry and/or drop
// the peer.
bs.engine.MessageSent(envelope.Peer, envelope.Message)
if bs.tracer != nil {
bs.tracer.MessageSent(envelope.Peer, envelope.Message)
for _, t := range bs.tracers {
t.MessageSent(envelope.Peer, envelope.Message)

Check warning on line 299 in bitswap/server/server.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/server.go#L299

Added line #L299 was not covered by tests
}
bs.sendBlocks(ctx, envelope)

Expand Down Expand Up @@ -529,8 +530,8 @@
// TODO: this is bad, and could be easily abused.
// Should only track *useful* messages in ledger

if bs.tracer != nil {
bs.tracer.MessageReceived(p, incoming)
for _, t := range bs.tracers {
t.MessageReceived(p, incoming)

Check warning on line 534 in bitswap/server/server.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/server.go#L534

Added line #L534 was not covered by tests
}
}

Expand Down
Loading