diff --git a/go.mod b/go.mod index 4d659ee0eb5..3ebadc7d95f 100644 --- a/go.mod +++ b/go.mod @@ -59,7 +59,6 @@ require ( github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c github.com/jbenet/go-temp-err-catcher v0.1.0 github.com/jbenet/goprocess v0.1.4 - github.com/libp2p/go-eventbus v0.1.0 github.com/libp2p/go-libp2p v0.8.2 github.com/libp2p/go-libp2p-circuit v0.2.2 github.com/libp2p/go-libp2p-connmgr v0.2.1 @@ -101,6 +100,7 @@ require ( github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7 github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c go.uber.org/fx v1.12.0 + go.uber.org/zap v1.14.1 golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5 golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e // indirect golang.org/x/sys v0.0.0-20200413165638-669c56c373c4 diff --git a/plugin/plugins/peerlog/peerlog.go b/plugin/plugins/peerlog/peerlog.go index 0b6b7a338ce..97f11eee68a 100644 --- a/plugin/plugins/peerlog/peerlog.go +++ b/plugin/plugins/peerlog/peerlog.go @@ -2,26 +2,52 @@ package peerlog import ( "fmt" + "sync/atomic" + "time" core "github.com/ipfs/go-ipfs/core" plugin "github.com/ipfs/go-ipfs/plugin" logging "github.com/ipfs/go-log" - eventbus "github.com/libp2p/go-eventbus" event "github.com/libp2p/go-libp2p-core/event" network "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" + "go.uber.org/zap" ) var log = logging.Logger("plugin/peerlog") +type eventType int + +var ( + // size of the event queue buffer + eventQueueSize = 64 * 1024 + // number of events to drop when busy. + busyDropAmount = eventQueueSize / 8 +) + +const ( + eventConnect eventType = iota + eventIdentify +) + +type plEvent struct { + kind eventType + peer peer.ID +} + // Log all the PeerIDs we see // // Usage: // GOLOG_FILE=~/peer.log IPFS_LOGGING_FMT=json ipfs daemon // Output: // {"level":"info","ts":"2020-02-10T13:54:26.639Z","logger":"plugin/peerlog","caller":"peerlog/peerlog.go:51","msg":"connected","peer":"QmS2H72gdrekXJggGdE9SunXPntBqdkJdkXQJjuxcH8Cbt"} -// {"level":"info","ts":"2020-02-10T13:54:59.095Z","logger":"plugin/peerlog","caller":"peerlog/peerlog.go:56","msg":"disconnected","peer":"QmS2H72gdrekXJggGdE9SunXPntBqdkJdkXQJjuxcH8Cbt"} +// {"level":"info","ts":"2020-02-10T13:54:59.095Z","logger":"plugin/peerlog","caller":"peerlog/peerlog.go:56","msg":"identified","peer":"QmS2H72gdrekXJggGdE9SunXPntBqdkJdkXQJjuxcH8Cbt","agent":"go-ipfs/0.5.0/"} // -type peerLogPlugin struct{} +type peerLogPlugin struct { + droppedCount uint64 + events chan plEvent +} var _ plugin.PluginDaemonInternal = (*peerLogPlugin)(nil) @@ -41,60 +67,120 @@ func (*peerLogPlugin) Version() string { } // Init initializes plugin -func (*peerLogPlugin) Init(*plugin.Environment) error { +func (pl *peerLogPlugin) Init(*plugin.Environment) error { + pl.events = make(chan plEvent, eventQueueSize) return nil } -func (*peerLogPlugin) Start(node *core.IpfsNode) error { +func (pl *peerLogPlugin) collectEvents(node *core.IpfsNode) { + ctx := node.Context() + + busyCounter := 0 + dlog := log.Desugar() + for { + // Deal with dropped events. + dropped := atomic.SwapUint64(&pl.droppedCount, 0) + if dropped > 0 { + busyCounter++ + + // sleep a bit to give the system a chance to catch up with logging. + select { + case <-time.After(time.Duration(busyCounter) * time.Second): + case <-ctx.Done(): + return + } + + // drain 1/8th of the backlog backlog so we + // don't immediately run into this situation + // again. + loop: + for i := 0; i < busyDropAmount; i++ { + select { + case <-pl.events: + dropped++ + default: + break loop + } + } + + // Add in any events we've dropped in the mean-time. + dropped += atomic.SwapUint64(&pl.droppedCount, 0) + + // Report that we've dropped events. + dlog.Error("dropped events", zap.Uint64("count", dropped)) + } else { + busyCounter = 0 + } + + var e plEvent + select { + case <-ctx.Done(): + return + case e = <-pl.events: + } + + peerID := zap.String("peer", e.peer.Pretty()) + + switch e.kind { + case eventConnect: + dlog.Info("connected", peerID) + case eventIdentify: + agent, err := node.Peerstore.Get(e.peer, "AgentVersion") + switch err { + case nil: + case peerstore.ErrNotFound: + continue + default: + dlog.Error("failed to get agent version", zap.Error(err)) + continue + } + + agentS, ok := agent.(string) + if !ok { + continue + } + dlog.Info("identified", peerID, zap.String("agent", agentS)) + } + } +} + +func (pl *peerLogPlugin) emit(evt eventType, p peer.ID) { + select { + case pl.events <- plEvent{kind: evt, peer: p}: + default: + atomic.AddUint64(&pl.droppedCount, 1) + } +} + +func (pl *peerLogPlugin) Start(node *core.IpfsNode) error { // Ensure logs from this plugin get printed regardless of global IPFS_LOGGING value if err := logging.SetLogLevel("plugin/peerlog", "info"); err != nil { return fmt.Errorf("failed to set log level: %w", err) } + + sub, err := node.PeerHost.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted)) + if err != nil { + return fmt.Errorf("failed to subscribe to identify notifications") + } + var notifee network.NotifyBundle notifee.ConnectedF = func(net network.Network, conn network.Conn) { - // TODO: Log transport, country, etc? - log.Infow("connected", - "peer", conn.RemotePeer().Pretty(), - ) - } - notifee.DisconnectedF = func(net network.Network, conn network.Conn) { - log.Infow("disconnected", - "peer", conn.RemotePeer().Pretty(), - ) + pl.emit(eventConnect, conn.RemotePeer()) } node.PeerHost.Network().Notify(¬ifee) - sub, err := node.PeerHost.EventBus().Subscribe( - new(event.EvtPeerIdentificationCompleted), - eventbus.BufSize(1024), - ) - if err != nil { - return fmt.Errorf("failed to subscribe to identify notifications") - } go func() { defer sub.Close() for e := range sub.Out() { switch e := e.(type) { case event.EvtPeerIdentificationCompleted: - protocols, err := node.Peerstore.GetProtocols(e.Peer) - if err != nil { - log.Errorw("failed to get protocols", "error", err) - continue - } - agent, err := node.Peerstore.Get(e.Peer, "AgentVersion") - if err != nil { - log.Errorw("failed to get agent version", "error", err) - continue - } - log.Infow( - "identified", - "peer", e.Peer.Pretty(), - "agent", agent, - "protocols", protocols, - ) + pl.emit(eventIdentify, e.Peer) } } }() + + go pl.collectEvents(node) + return nil }