From d9abb6380348ce7cf8c3701c733bc653fa9baa75 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Thu, 20 Jul 2023 16:50:25 +0200 Subject: [PATCH 1/2] bitswap: add peer-prom-tracker For #209 Need examples. --- .../peer-prom-tracker/peer-prom-tracker.go | 146 ++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 bitswap/peer-prom-tracker/peer-prom-tracker.go diff --git a/bitswap/peer-prom-tracker/peer-prom-tracker.go b/bitswap/peer-prom-tracker/peer-prom-tracker.go new file mode 100644 index 000000000..41af1a6e1 --- /dev/null +++ b/bitswap/peer-prom-tracker/peer-prom-tracker.go @@ -0,0 +1,146 @@ +package peerpromtracker + +import ( + "errors" + "fmt" + + bsmsg "github.com/ipfs/boxo/bitswap/message" + "github.com/ipfs/boxo/bitswap/tracer" + "github.com/ipfs/go-log/v2" + peer "github.com/libp2p/go-libp2p/core/peer" + prom "github.com/prometheus/client_golang/prometheus" +) + +var logger = log.Logger("bitswap/peer-prom-tracker") + +var _ tracer.Tracer = (*PeerTracer)(nil) + +type PeerTracer struct { + bytesSent *prom.CounterVec + messagesSent *prom.CounterVec + bytesReceived *prom.CounterVec + messagesReceived *prom.CounterVec + + reg *prom.Registry +} + +func NewPeerTracer(reg *prom.Registry) (*PeerTracer, error) { + pt := &PeerTracer{ + reg: reg, + } + + var good bool + defer func() { + if !good { + pt.Close() // will unregister the sucessfully registered metrics + } + }() + + peerIdLabel := []string{"peer-id"} + + bytesSent := prom.NewCounterVec(prom.CounterOpts{ + Namespace: "ipfs", + Subsystem: "bitswap/messages", + Name: "bytes-sent", + Help: "This records the number of bitswap messages bytes sent per peer.", + }, peerIdLabel) + if err := reg.Register(bytesSent); err != nil { + return nil, fmt.Errorf("registering bytes-sent: %w", err) + } + pt.bytesSent = bytesSent + + messagesSent := prom.NewCounterVec(prom.CounterOpts{ + Namespace: "ipfs", + Subsystem: "bitswap/messages", + Name: "messages-sent", + Help: "This records the number of bitswap messages sent per peer.", + }, peerIdLabel) + if err := reg.Register(messagesSent); err != nil { + return nil, fmt.Errorf("registering messages-sent: %w", err) + } + pt.messagesSent = messagesSent + + bytesReceived := prom.NewCounterVec(prom.CounterOpts{ + Namespace: "ipfs", + Subsystem: "bitswap/messages", + Name: "bytes-received", + Help: "This records the number of bitswap messages bytes received from each peer.", + }, peerIdLabel) + if err := reg.Register(bytesReceived); err != nil { + return nil, fmt.Errorf("registering bytes-received: %w", err) + } + pt.bytesReceived = bytesReceived + + messagesReceived := prom.NewCounterVec(prom.CounterOpts{ + Namespace: "ipfs", + Subsystem: "bitswap/messages", + Name: "messages-received", + Help: "This records the number of bitswap messages received from each peer.", + }, peerIdLabel) + if err := reg.Register(messagesReceived); err != nil { + return nil, fmt.Errorf("registering messages-received: %w", err) + } + pt.messagesReceived = messagesReceived + + good = true + return pt, nil +} + +func (t *PeerTracer) MessageReceived(p peer.ID, msg bsmsg.BitSwapMessage) { + strPeerid := p.Pretty() + + counter, err := t.messagesReceived.GetMetricWithLabelValues(strPeerid) + if err == nil { + logger.Debugf("failed to grab messages received label %s", err) + } else { + counter.Inc() + } + + counter, err = t.bytesReceived.GetMetricWithLabelValues(strPeerid) + if err == nil { + logger.Debugf("failed to grab messages received label %s", err) + } else { + counter.Add(float64(msg.Size())) + } +} + +func (t *PeerTracer) MessageSent(p peer.ID, msg bsmsg.BitSwapMessage) { + strPeerid := p.Pretty() + + counter, err := t.messagesSent.GetMetricWithLabelValues(strPeerid) + if err == nil { + logger.Debugf("failed to grab messages received label %s", err) + } else { + counter.Inc() + } + + counter, err = t.bytesSent.GetMetricWithLabelValues(strPeerid) + if err == nil { + logger.Debugf("failed to grab messages received label %s", err) + } else { + counter.Add(float64(msg.Size())) + } +} + +func (t *PeerTracer) Close() error { + if t.reg == nil { + return errors.New("already closed") + } + + // we have to check because this can be called by [NewPeerTracer] if an errors occurs. + if t.bytesSent != nil { + t.reg.Unregister(t.bytesSent) + } + if t.messagesSent != nil { + t.reg.Unregister(t.messagesSent) + } + if t.bytesReceived != nil { + t.reg.Unregister(t.bytesReceived) + } + if t.messagesReceived != nil { + t.reg.Unregister(t.messagesReceived) + } + *t = PeerTracer{} + + return nil +} From b2cef5dedc6d70e33f389436b03696e8ea8287ae Mon Sep 17 00:00:00 2001 From: Jorropo Date: Thu, 20 Jul 2023 16:50:50 +0200 Subject: [PATCH 2/2] bitswap: Allow to register multiple tracers Need tests (I think the client's tracer is bugged and it does not record outbound messages). --- bitswap/bitswap.go | 12 ++++++------ bitswap/client/client.go | 11 ++++++----- bitswap/options.go | 3 ++- bitswap/server/server.go | 13 +++++++------ 4 files changed, 21 insertions(+), 18 deletions(-) diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index 3863538ee..02e4a6336 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -52,8 +52,8 @@ type Bitswap struct { *client.Client *server.Server - tracer tracer.Tracer - net network.BitSwapNetwork + tracers []tracer.Tracer + net network.BitSwapNetwork } func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Bitswap { @@ -77,8 +77,8 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc } } - if bs.tracer != nil { - var tracer tracer.Tracer = nopReceiveTracer{bs.tracer} + for _, t := range bs.tracers { + var tracer tracer.Tracer = nopReceiveTracer{t} clientOptions = append(clientOptions, client.WithTracer(tracer)) serverOptions = append(serverOptions, server.WithTracer(tracer)) } @@ -172,8 +172,8 @@ func (bs *Bitswap) ReceiveError(err error) { } func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming message.BitSwapMessage) { - if bs.tracer != nil { - bs.tracer.MessageReceived(p, incoming) + for _, t := range bs.tracers { + t.MessageReceived(p, incoming) } bs.Client.ReceiveMessage(ctx, p, incoming) diff --git a/bitswap/client/client.go b/bitswap/client/client.go index 4e5c57b82..9a6684966 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -66,12 +66,13 @@ func SetSimulateDontHavesOnTimeout(send bool) Option { } } -// Configures the Client to use given tracer. +// WithTracer configures the Client to use given tracer. // This provides methods to access all messages sent and received by the Client. // This interface can be used to implement various statistics (this is original intent). +// This can be passed multiple times to register multiple tracers. func WithTracer(tap tracer.Tracer) Option { return func(bs *Client) { - bs.tracer = tap + bs.tracers = append(bs.tracers, tap) } } @@ -208,7 +209,7 @@ type Client struct { allMetric metrics.Histogram // External statistics interface - tracer tracer.Tracer + tracers []tracer.Tracer // the SessionManager routes requests to interested sessions sm *bssm.SessionManager @@ -342,8 +343,8 @@ func (bs *Client) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg. bs.counters.messagesRecvd++ bs.counterLk.Unlock() - if bs.tracer != nil { - bs.tracer.MessageReceived(p, incoming) + for _, t := range bs.tracers { + t.MessageReceived(p, incoming) } iblocks := incoming.Blocks() diff --git a/bitswap/options.go b/bitswap/options.go index da759dfe2..32e69cad5 100644 --- a/bitswap/options.go +++ b/bitswap/options.go @@ -79,11 +79,12 @@ func SetSimulateDontHavesOnTimeout(send bool) Option { return Option{client.SetSimulateDontHavesOnTimeout(send)} } +// WithTracer can be passed multiple times to register multiple tracers. func WithTracer(tap tracer.Tracer) Option { // Only trace the server, both receive the same messages anyway return Option{ option(func(bs *Bitswap) { - bs.tracer = tap + bs.tracers = append(bs.tracers, tap) }), } } diff --git a/bitswap/server/server.go b/bitswap/server/server.go index 9c8e4cdb3..b8d79f3a9 100644 --- a/bitswap/server/server.go +++ b/bitswap/server/server.go @@ -46,7 +46,7 @@ type Server struct { network bsnet.BitSwapNetwork // External statistics interface - tracer tracer.Tracer + tracers []tracer.Tracer // Counters for various statistics counterLk sync.Mutex @@ -123,9 +123,10 @@ func TaskWorkerCount(count int) Option { } } +// WithTracer can be passed multiple times to register multiple tracers. func WithTracer(tap tracer.Tracer) Option { return func(bs *Server) { - bs.tracer = tap + bs.tracers = append(bs.tracers, tap) } } @@ -294,8 +295,8 @@ func (bs *Server) taskWorker(ctx context.Context, id int) { // Ideally, yes. But we'd need some way to trigger a retry and/or drop // the peer. bs.engine.MessageSent(envelope.Peer, envelope.Message) - if bs.tracer != nil { - bs.tracer.MessageSent(envelope.Peer, envelope.Message) + for _, t := range bs.tracers { + t.MessageSent(envelope.Peer, envelope.Message) } bs.sendBlocks(ctx, envelope) @@ -529,8 +530,8 @@ func (bs *Server) ReceiveMessage(ctx context.Context, p peer.ID, incoming messag // TODO: this is bad, and could be easily abused. // Should only track *useful* messages in ledger - if bs.tracer != nil { - bs.tracer.MessageReceived(p, incoming) + for _, t := range bs.tracers { + t.MessageReceived(p, incoming) } }