Skip to content

Commit

Permalink
Feature/metrics (#35)
Browse files Browse the repository at this point in the history
* Wetware metrics provider

* set sample tick

* metrics for opened RPCs

* renamed metrics reporter
  • Loading branch information
aratz-lasa authored Jun 14, 2022
1 parent 94eeeab commit bf6bbfc
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 16 deletions.
8 changes: 4 additions & 4 deletions internal/runtime/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,14 @@ type vatConfig struct {
CLI *cli.Context
DHT *dual.DHT
Lifecycle fx.Lifecycle
Metrics *statsdutil.MetricsReporter
}

func vatnet(config vatConfig) vat.Network {
return vat.Network{
NS: config.Namespace(),
Host: routedhost.Wrap(config.Host(), config.DHT),
NS: config.Namespace(),
Host: routedhost.Wrap(config.Host(), config.DHT),
Metrics: config.Metrics.NewStore(),
}
}

Expand Down Expand Up @@ -156,7 +158,6 @@ func peercache(config pexConfig) (*pex.PeerExchange, error) {
if err == nil {
config.SetCloseHook(px)
}

return px, err
}

Expand Down Expand Up @@ -205,7 +206,6 @@ func bootstrap(config bootConfig) (bootstrapper, error) {
config.SetCloseHook(c)
}
}

return bootstrapper{
Log: config.Logger(),
Discovery: b,
Expand Down
15 changes: 9 additions & 6 deletions internal/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/wetware/casm/pkg/cluster"
"github.com/wetware/casm/pkg/pex"
serviceutil "github.com/wetware/ww/internal/util/service"
statsdutil "github.com/wetware/ww/internal/util/statsd"
"github.com/wetware/ww/pkg/server"
"github.com/wetware/ww/pkg/vat"
"go.uber.org/fx"
Expand Down Expand Up @@ -149,11 +150,12 @@ func supervisor(c *cli.Context) *suture.Supervisor {
type serverConfig struct {
fx.In

Log log.Logger
Vat vat.Network
PubSub *pubsub.PubSub
PeX *pex.PeerExchange
DHT *dual.DHT
Log log.Logger
Vat vat.Network
PubSub *pubsub.PubSub
PeX *pex.PeerExchange
DHT *dual.DHT
Metrics *statsdutil.MetricsReporter

Lifecycle fx.Lifecycle
}
Expand All @@ -174,7 +176,8 @@ func (config serverConfig) SetCloser(c io.Closer) {
func node(c *cli.Context, config serverConfig) (*server.Node, error) {
n, err := server.New(c.Context, config.Vat, config.PubSub,
server.WithLogger(config.Logger()),
server.WithClusterConfig(config.ClusterOpts()...))
server.WithClusterConfig(config.ClusterOpts()...),
server.WithMetrics(config.Metrics))

if err == nil {
config.SetCloser(n)
Expand Down
10 changes: 9 additions & 1 deletion internal/runtime/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/lthibault/log"
"github.com/urfave/cli/v2"
"go.uber.org/fx"
"gopkg.in/alexcesaro/statsd.v2"

ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
Expand Down Expand Up @@ -36,14 +37,21 @@ var observability = fx.Provide(
logging,
statsdutil.New,
statsdutil.NewBandwidthCounter,
statsdutil.NewPubSubTracer)
statsdutil.NewPubSubTracer,
NewWwMetricsReporter)

func logging(c *cli.Context) log.Logger {
return logutil.New(c).With(log.F{
"ns": c.String("ns"),
})
}

func NewWwMetricsReporter(c *cli.Context, client *statsd.Client) *statsdutil.MetricsReporter {
metrics := statsdutil.NewMetricsReporter(client)
go metrics.Run(c.Context)
return metrics
}

// hook populates heartbeat messages with system information from the
// operating system.
type hook struct{}
Expand Down
82 changes: 81 additions & 1 deletion internal/util/statsd/reporter.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
package statsdutil

import (
"context"
"fmt"
"sync"
"time"

"github.com/libp2p/go-libp2p-core/metrics"
"gopkg.in/alexcesaro/statsd.v2"
)

const (
sampleTick = time.Minute
)

func NewBandwidthCounter(s *statsd.Client) (b *metrics.BandwidthCounter, stop func()) {

b = metrics.NewBandwidthCounter()
s.Clone(
statsd.SampleRate(.1), // send 10% of metrics
statsd.Prefix("libp2p.host.bandwidth."))

ticker := time.NewTicker(time.Minute) // 1440 samples/day base-rate
ticker := time.NewTicker(sampleTick) // 1440 samples/day base-rate
go func() {
stat := b.GetBandwidthTotals()
s.Gauge("rate.in", stat.RateIn)
Expand All @@ -31,3 +37,77 @@ func NewBandwidthCounter(s *statsd.Client) (b *metrics.BandwidthCounter, stop fu

return b, ticker.Stop
}

type MetricsProvider interface {
Metrics() map[string]interface{}
}

type MetricsReporter struct {
providers []MetricsProvider
stats *statsd.Client
newProvider chan MetricsProvider
}

func NewMetricsReporter(stats *statsd.Client) *MetricsReporter {
return &MetricsReporter{providers: make([]MetricsProvider, 0), stats: stats, newProvider: make(chan MetricsProvider)}
}

func (m *MetricsReporter) Run(ctx context.Context) error {
ticker := time.NewTicker(sampleTick)
for {
select {
case <-ticker.C:
m.report()
case p := <-m.newProvider:
m.providers = append(m.providers, p)
case <-ctx.Done():
return ctx.Err()
}
}
}

func (m *MetricsReporter) Add(p MetricsProvider) {
m.newProvider <- p
}

func (m *MetricsReporter) NewStore() *MetricStore {
store := MetricStore{store: make(map[string]interface{})}
m.newProvider <- &store
return &store
}

func (m *MetricsReporter) report() {
for _, provider := range m.providers {
for name, value := range provider.Metrics() {
m.stats.Gauge(name, value)
}
}
}

type MetricStore struct {
mu sync.Mutex
store map[string]interface{}
}

func (m *MetricStore) Add(key string, value int) {
m.mu.Lock()
defer m.mu.Unlock()

num, ok := m.store[key].(int)
if ok {
m.store[key] = num + value
} else {
m.store[key] = value
}
}

func (m *MetricStore) Metrics() map[string]interface{} {
m.mu.Lock()
defer m.mu.Unlock()

metrics := make(map[string]interface{})
for key, value := range m.store {
metrics[key] = value
}
return metrics
}
25 changes: 23 additions & 2 deletions pkg/server/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/lthibault/log"

"github.com/wetware/casm/pkg/cluster"
statsdutil "github.com/wetware/ww/internal/util/statsd"
"github.com/wetware/ww/pkg/vat"

cluster_cap "github.com/wetware/ww/pkg/ocap/cluster"
Expand All @@ -22,8 +23,9 @@ type PubSub interface {
}

type Joiner struct {
log log.Logger
opts []cluster.Option
log log.Logger
opts []cluster.Option
metrics *statsdutil.MetricsReporter
}

func NewJoiner(opt ...Option) Joiner {
Expand All @@ -50,6 +52,9 @@ func (j Joiner) Join(ctx context.Context, vat vat.Network, ps PubSub) (*Node, er
return nil, fmt.Errorf("join cluster: %w", err)
}

// add metric provider
j.metrics.Add(ClusterMetrics{View: c.View()})

// export default capabilities
logger := j.log.With(vat)

Expand Down Expand Up @@ -84,3 +89,19 @@ func (j Joiner) options(vat vat.Network, u uuid.UUID) []cluster.Option {
cluster.WithNamespace(vat.NS),
}, j.opts...)
}

type ClusterMetrics struct {
cluster.View
}

func (c ClusterMetrics) Metrics() map[string]interface{} {
metrics := make(map[string]interface{}, 0)

view_size := 0
for it := c.Iter(); it.Record() != nil; it.Next() {
view_size++
}

metrics["view.size"] = view_size
return metrics
}
8 changes: 8 additions & 0 deletions pkg/server/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"github.com/lthibault/log"
"github.com/wetware/casm/pkg/cluster"
statsdutil "github.com/wetware/ww/internal/util/statsd"
)

type Option func(*Joiner)
Expand All @@ -19,6 +20,13 @@ func WithLogger(l log.Logger) Option {
}
}

// WithStatsd sets the statsd client for recording statistics.
func WithMetrics(m *statsdutil.MetricsReporter) Option {
return func(j *Joiner) {
j.metrics = m
}
}

func WithClusterConfig(opt ...cluster.Option) Option {
return func(j *Joiner) {
j.opts = opt
Expand Down
10 changes: 8 additions & 2 deletions pkg/vat/vat.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ package vat
import (
"context"
"errors"
"fmt"
"strings"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/multiformats/go-multistream"
statsdutil "github.com/wetware/ww/internal/util/statsd"
ww "github.com/wetware/ww/pkg"

"capnproto.org/go/capnp/v3"
Expand Down Expand Up @@ -52,8 +54,9 @@ type ClientProvider interface {
// Network wraps a libp2p Host and provides a high-level interface to
// a capability-oriented network.
type Network struct {
NS string
Host host.Host
NS string
Host host.Host
Metrics *statsdutil.MetricStore
}

func (n Network) Loggable() map[string]interface{} {
Expand Down Expand Up @@ -109,7 +112,10 @@ func (n Network) Export(c Capability, boot ClientProvider) {
conn := rpc.NewConn(c.Upgrade(s), &rpc.Options{
BootstrapClient: boot.Client(),
})
n.Metrics.Add(fmt.Sprintf("rpc.%s.open", id), 1)

defer conn.Close()
defer n.Metrics.Add(fmt.Sprintf("rpc.%s.open", id), -1)

<-conn.Done()
})
Expand Down

0 comments on commit bf6bbfc

Please sign in to comment.