From 4580d78e263a8804d1d28d5c237a3416ade1958b Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 27 Apr 2020 19:10:11 -0700 Subject: [PATCH] fix: non-blocking peerlog logging Avoid ever blocking new connections in the peer logger. Instead: 1. Send all new peers to a highly buffered channel. 2. Emit "dropped event" errors whenever we detect that we're dropping events and falling behind. 3. Don't log protocols, they're too large. 4. Don't log disconnects, we don't need them. --- go.mod | 2 +- plugin/plugins/peerlog/peerlog.go | 124 +++++++++++++++++++++--------- 2 files changed, 88 insertions(+), 38 deletions(-) diff --git a/go.mod b/go.mod index 4d659ee0eb53..3ebadc7d95f2 100644 --- a/go.mod +++ b/go.mod @@ -59,7 +59,6 @@ require ( github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c github.com/jbenet/go-temp-err-catcher v0.1.0 github.com/jbenet/goprocess v0.1.4 - github.com/libp2p/go-eventbus v0.1.0 github.com/libp2p/go-libp2p v0.8.2 github.com/libp2p/go-libp2p-circuit v0.2.2 github.com/libp2p/go-libp2p-connmgr v0.2.1 @@ -101,6 +100,7 @@ require ( github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c go.uber.org/fx v1.12.0 + go.uber.org/zap v1.14.1 golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5 golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e // indirect golang.org/x/sys v0.0.0-20200413165638-669c56c373c4 diff --git a/plugin/plugins/peerlog/peerlog.go b/plugin/plugins/peerlog/peerlog.go index 0b6b7a338ce0..0088daec78a8 100644 --- a/plugin/plugins/peerlog/peerlog.go +++ b/plugin/plugins/peerlog/peerlog.go @@ -1,27 +1,46 @@ package peerlog import ( + "context" "fmt" + "sync/atomic" core "github.com/ipfs/go-ipfs/core" plugin "github.com/ipfs/go-ipfs/plugin" logging "github.com/ipfs/go-log" - eventbus "github.com/libp2p/go-eventbus" event "github.com/libp2p/go-libp2p-core/event" network "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" + "go.uber.org/zap" ) var log = logging.Logger("plugin/peerlog") +type eventType int + +const ( + eventConnect eventType = iota + eventIdentify +) + +type plEvent struct { + kind eventType + peer peer.ID +} + // Log all the PeerIDs we see // // Usage: // GOLOG_FILE=~/peer.log IPFS_LOGGING_FMT=json ipfs daemon // Output: // {"level":"info","ts":"2020-02-10T13:54:26.639Z","logger":"plugin/peerlog","caller":"peerlog/peerlog.go:51","msg":"connected","peer":"QmS2H72gdrekXJggGdE9SunXPntBqdkJdkXQJjuxcH8Cbt"} -// {"level":"info","ts":"2020-02-10T13:54:59.095Z","logger":"plugin/peerlog","caller":"peerlog/peerlog.go:56","msg":"disconnected","peer":"QmS2H72gdrekXJggGdE9SunXPntBqdkJdkXQJjuxcH8Cbt"} +// {"level":"info","ts":"2020-02-10T13:54:59.095Z","logger":"plugin/peerlog","caller":"peerlog/peerlog.go:56","msg":"identified","peer":"QmS2H72gdrekXJggGdE9SunXPntBqdkJdkXQJjuxcH8Cbt","agent":"go-ipfs/0.5.0/"} // -type peerLogPlugin struct{} +type peerLogPlugin struct { + droppedCount uint64 + events chan plEvent +} var _ plugin.PluginDaemonInternal = (*peerLogPlugin)(nil) @@ -41,60 +60,91 @@ func (*peerLogPlugin) Version() string { } // Init initializes plugin -func (*peerLogPlugin) Init(*plugin.Environment) error { +func (pl *peerLogPlugin) Init(*plugin.Environment) error { + pl.events = make(chan plEvent, 64*1024) return nil } -func (*peerLogPlugin) Start(node *core.IpfsNode) error { +func (pl *peerLogPlugin) collectEvents(node *core.IpfsNode) { + go func() { + ctx := node.Context() + + dlog := log.Desugar() + for { + dropped := atomic.SwapUint64(&pl.droppedCount, 0) + if dropped > 0 { + dlog.Error("dropped events", zap.Uint64("count", dropped)) + } + + var e plEvent + select { + case <-ctx.Done(): + return + case e = <-pl.events: + } + + peerID := zap.String("peer", e.peer.Pretty()) + + switch e.kind { + case eventConnect: + dlog.Info("connected", peerID) + case eventIdentify: + agent, err := node.Peerstore.Get(e.peer, "AgentVersion") + switch err { + case nil: + case peerstore.ErrNotFound: + continue + default: + dlog.Error("failed to get agent version", zap.Error(err)) + continue + } + + agentS, ok := agent.(string) + if !ok { + continue + } + dlog.Info("identified", peerID, zap.String("agent", agentS)) + } + } + }() + +} + +func (pl *peerLogPlugin) emit(evt eventType, p peer.ID) { + select { + case pl.events <- plEvent{kind: evt, peer: p}: + default: + atomic.AddUint64(&pl.droppedCount, 1) + } +} + +func (pl *peerLogPlugin) Start(node *core.IpfsNode) error { // Ensure logs from this plugin get printed regardless of global IPFS_LOGGING value if err := logging.SetLogLevel("plugin/peerlog", "info"); err != nil { return fmt.Errorf("failed to set log level: %w", err) } + + sub, err := node.PeerHost.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted)) + if err != nil { + return fmt.Errorf("failed to subscribe to identify notifications") + } + var notifee network.NotifyBundle notifee.ConnectedF = func(net network.Network, conn network.Conn) { - // TODO: Log transport, country, etc? - log.Infow("connected", - "peer", conn.RemotePeer().Pretty(), - ) - } - notifee.DisconnectedF = func(net network.Network, conn network.Conn) { - log.Infow("disconnected", - "peer", conn.RemotePeer().Pretty(), - ) + pl.emit(eventConnect, conn.RemotePeer()) } node.PeerHost.Network().Notify(¬ifee) - sub, err := node.PeerHost.EventBus().Subscribe( - new(event.EvtPeerIdentificationCompleted), - eventbus.BufSize(1024), - ) - if err != nil { - return fmt.Errorf("failed to subscribe to identify notifications") - } go func() { defer sub.Close() for e := range sub.Out() { switch e := e.(type) { case event.EvtPeerIdentificationCompleted: - protocols, err := node.Peerstore.GetProtocols(e.Peer) - if err != nil { - log.Errorw("failed to get protocols", "error", err) - continue - } - agent, err := node.Peerstore.Get(e.Peer, "AgentVersion") - if err != nil { - log.Errorw("failed to get agent version", "error", err) - continue - } - log.Infow( - "identified", - "peer", e.Peer.Pretty(), - "agent", agent, - "protocols", protocols, - ) + pl.emit(eventIdentify, e.Peer) } } }() + return nil }