diff --git a/internal/runtime/network.go b/internal/runtime/network.go index e06e10fa..c93c209f 100644 --- a/internal/runtime/network.go +++ b/internal/runtime/network.go @@ -120,12 +120,14 @@ type vatConfig struct { CLI *cli.Context DHT *dual.DHT Lifecycle fx.Lifecycle + Metrics *statsdutil.MetricsReporter } func vatnet(config vatConfig) vat.Network { return vat.Network{ - NS: config.Namespace(), - Host: routedhost.Wrap(config.Host(), config.DHT), + NS: config.Namespace(), + Host: routedhost.Wrap(config.Host(), config.DHT), + Metrics: config.Metrics.NewStore(), } } @@ -156,7 +158,6 @@ func peercache(config pexConfig) (*pex.PeerExchange, error) { if err == nil { config.SetCloseHook(px) } - return px, err } @@ -205,7 +206,6 @@ func bootstrap(config bootConfig) (bootstrapper, error) { config.SetCloseHook(c) } } - return bootstrapper{ Log: config.Logger(), Discovery: b, diff --git a/internal/runtime/runtime.go b/internal/runtime/runtime.go index bd772535..0a627242 100644 --- a/internal/runtime/runtime.go +++ b/internal/runtime/runtime.go @@ -16,6 +16,7 @@ import ( "github.com/wetware/casm/pkg/cluster" "github.com/wetware/casm/pkg/pex" serviceutil "github.com/wetware/ww/internal/util/service" + statsdutil "github.com/wetware/ww/internal/util/statsd" "github.com/wetware/ww/pkg/server" "github.com/wetware/ww/pkg/vat" "go.uber.org/fx" @@ -149,11 +150,12 @@ func supervisor(c *cli.Context) *suture.Supervisor { type serverConfig struct { fx.In - Log log.Logger - Vat vat.Network - PubSub *pubsub.PubSub - PeX *pex.PeerExchange - DHT *dual.DHT + Log log.Logger + Vat vat.Network + PubSub *pubsub.PubSub + PeX *pex.PeerExchange + DHT *dual.DHT + Metrics *statsdutil.MetricsReporter Lifecycle fx.Lifecycle } @@ -174,7 +176,8 @@ func (config serverConfig) SetCloser(c io.Closer) { func node(c *cli.Context, config serverConfig) (*server.Node, error) { n, err := server.New(c.Context, config.Vat, config.PubSub, server.WithLogger(config.Logger()), - server.WithClusterConfig(config.ClusterOpts()...)) + server.WithClusterConfig(config.ClusterOpts()...), + server.WithMetrics(config.Metrics)) if err == nil { config.SetCloser(n) diff --git a/internal/runtime/system.go b/internal/runtime/system.go index b805e33e..e8f4ed86 100644 --- a/internal/runtime/system.go +++ b/internal/runtime/system.go @@ -9,6 +9,7 @@ import ( "github.com/lthibault/log" "github.com/urfave/cli/v2" "go.uber.org/fx" + "gopkg.in/alexcesaro/statsd.v2" ds "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/sync" @@ -36,7 +37,8 @@ var observability = fx.Provide( logging, statsdutil.New, statsdutil.NewBandwidthCounter, - statsdutil.NewPubSubTracer) + statsdutil.NewPubSubTracer, + NewWwMetricsReporter) func logging(c *cli.Context) log.Logger { return logutil.New(c).With(log.F{ @@ -44,6 +46,12 @@ func logging(c *cli.Context) log.Logger { }) } +func NewWwMetricsReporter(c *cli.Context, client *statsd.Client) *statsdutil.MetricsReporter { + metrics := statsdutil.NewMetricsReporter(client) + go metrics.Run(c.Context) + return metrics +} + // hook populates heartbeat messages with system information from the // operating system. type hook struct{} diff --git a/internal/util/statsd/reporter.go b/internal/util/statsd/reporter.go index aae7ccb3..44ba4018 100644 --- a/internal/util/statsd/reporter.go +++ b/internal/util/statsd/reporter.go @@ -1,13 +1,19 @@ package statsdutil import ( + "context" "fmt" + "sync" "time" "github.com/libp2p/go-libp2p-core/metrics" "gopkg.in/alexcesaro/statsd.v2" ) +const ( + sampleTick = time.Minute +) + func NewBandwidthCounter(s *statsd.Client) (b *metrics.BandwidthCounter, stop func()) { b = metrics.NewBandwidthCounter() @@ -15,7 +21,7 @@ func NewBandwidthCounter(s *statsd.Client) (b *metrics.BandwidthCounter, stop fu statsd.SampleRate(.1), // send 10% of metrics statsd.Prefix("libp2p.host.bandwidth.")) - ticker := time.NewTicker(time.Minute) // 1440 samples/day base-rate + ticker := time.NewTicker(sampleTick) // 1440 samples/day base-rate go func() { stat := b.GetBandwidthTotals() s.Gauge("rate.in", stat.RateIn) @@ -31,3 +37,77 @@ func NewBandwidthCounter(s *statsd.Client) (b *metrics.BandwidthCounter, stop fu return b, ticker.Stop } + +type MetricsProvider interface { + Metrics() map[string]interface{} +} + +type MetricsReporter struct { + providers []MetricsProvider + stats *statsd.Client + newProvider chan MetricsProvider +} + +func NewMetricsReporter(stats *statsd.Client) *MetricsReporter { + return &MetricsReporter{providers: make([]MetricsProvider, 0), stats: stats, newProvider: make(chan MetricsProvider)} +} + +func (m *MetricsReporter) Run(ctx context.Context) error { + ticker := time.NewTicker(sampleTick) + for { + select { + case <-ticker.C: + m.report() + case p := <-m.newProvider: + m.providers = append(m.providers, p) + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (m *MetricsReporter) Add(p MetricsProvider) { + m.newProvider <- p +} + +func (m *MetricsReporter) NewStore() *MetricStore { + store := MetricStore{store: make(map[string]interface{})} + m.newProvider <- &store + return &store +} + +func (m *MetricsReporter) report() { + for _, provider := range m.providers { + for name, value := range provider.Metrics() { + m.stats.Gauge(name, value) + } + } +} + +type MetricStore struct { + mu sync.Mutex + store map[string]interface{} +} + +func (m *MetricStore) Add(key string, value int) { + m.mu.Lock() + defer m.mu.Unlock() + + num, ok := m.store[key].(int) + if ok { + m.store[key] = num + value + } else { + m.store[key] = value + } +} + +func (m *MetricStore) Metrics() map[string]interface{} { + m.mu.Lock() + defer m.mu.Unlock() + + metrics := make(map[string]interface{}) + for key, value := range m.store { + metrics[key] = value + } + return metrics +} diff --git a/pkg/server/joiner.go b/pkg/server/joiner.go index 4e761de1..c0920cde 100644 --- a/pkg/server/joiner.go +++ b/pkg/server/joiner.go @@ -9,6 +9,7 @@ import ( "github.com/lthibault/log" "github.com/wetware/casm/pkg/cluster" + statsdutil "github.com/wetware/ww/internal/util/statsd" "github.com/wetware/ww/pkg/vat" cluster_cap "github.com/wetware/ww/pkg/ocap/cluster" @@ -22,8 +23,9 @@ type PubSub interface { } type Joiner struct { - log log.Logger - opts []cluster.Option + log log.Logger + opts []cluster.Option + metrics *statsdutil.MetricsReporter } func NewJoiner(opt ...Option) Joiner { @@ -50,6 +52,9 @@ func (j Joiner) Join(ctx context.Context, vat vat.Network, ps PubSub) (*Node, er return nil, fmt.Errorf("join cluster: %w", err) } + // add metric provider + j.metrics.Add(ClusterMetrics{View: c.View()}) + // export default capabilities logger := j.log.With(vat) @@ -84,3 +89,19 @@ func (j Joiner) options(vat vat.Network, u uuid.UUID) []cluster.Option { cluster.WithNamespace(vat.NS), }, j.opts...) } + +type ClusterMetrics struct { + cluster.View +} + +func (c ClusterMetrics) Metrics() map[string]interface{} { + metrics := make(map[string]interface{}, 0) + + view_size := 0 + for it := c.Iter(); it.Record() != nil; it.Next() { + view_size++ + } + + metrics["view.size"] = view_size + return metrics +} diff --git a/pkg/server/option.go b/pkg/server/option.go index e047fea3..b3e718f2 100644 --- a/pkg/server/option.go +++ b/pkg/server/option.go @@ -3,6 +3,7 @@ package server import ( "github.com/lthibault/log" "github.com/wetware/casm/pkg/cluster" + statsdutil "github.com/wetware/ww/internal/util/statsd" ) type Option func(*Joiner) @@ -19,6 +20,13 @@ func WithLogger(l log.Logger) Option { } } +// WithStatsd sets the statsd client for recording statistics. +func WithMetrics(m *statsdutil.MetricsReporter) Option { + return func(j *Joiner) { + j.metrics = m + } +} + func WithClusterConfig(opt ...cluster.Option) Option { return func(j *Joiner) { j.opts = opt diff --git a/pkg/vat/vat.go b/pkg/vat/vat.go index fc321435..2f5b0da7 100644 --- a/pkg/vat/vat.go +++ b/pkg/vat/vat.go @@ -4,6 +4,7 @@ package vat import ( "context" "errors" + "fmt" "strings" "github.com/libp2p/go-libp2p-core/host" @@ -11,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/protocol" "github.com/multiformats/go-multistream" + statsdutil "github.com/wetware/ww/internal/util/statsd" ww "github.com/wetware/ww/pkg" "capnproto.org/go/capnp/v3" @@ -52,8 +54,9 @@ type ClientProvider interface { // Network wraps a libp2p Host and provides a high-level interface to // a capability-oriented network. type Network struct { - NS string - Host host.Host + NS string + Host host.Host + Metrics *statsdutil.MetricStore } func (n Network) Loggable() map[string]interface{} { @@ -109,7 +112,10 @@ func (n Network) Export(c Capability, boot ClientProvider) { conn := rpc.NewConn(c.Upgrade(s), &rpc.Options{ BootstrapClient: boot.Client(), }) + n.Metrics.Add(fmt.Sprintf("rpc.%s.open", id), 1) + defer conn.Close() + defer n.Metrics.Add(fmt.Sprintf("rpc.%s.open", id), -1) <-conn.Done() })