Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: non-blocking peerlog logging #7232

Merged
merged 2 commits into from
Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ require (
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
github.com/jbenet/go-temp-err-catcher v0.1.0
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-eventbus v0.1.0
github.com/libp2p/go-libp2p v0.8.2
github.com/libp2p/go-libp2p-circuit v0.2.2
github.com/libp2p/go-libp2p-connmgr v0.2.1
Expand Down Expand Up @@ -101,6 +100,7 @@ require (
github.com/whyrusleeping/multiaddr-filter v0.0.0-20160516205228-e903e4adabd7
github.com/whyrusleeping/tar-utils v0.0.0-20180509141711-8c6c8ba81d5c
go.uber.org/fx v1.12.0
go.uber.org/zap v1.14.1
golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e // indirect
golang.org/x/sys v0.0.0-20200413165638-669c56c373c4
Expand Down
160 changes: 123 additions & 37 deletions plugin/plugins/peerlog/peerlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,52 @@ package peerlog

import (
"fmt"
"sync/atomic"
"time"

core "github.com/ipfs/go-ipfs/core"
plugin "github.com/ipfs/go-ipfs/plugin"
logging "github.com/ipfs/go-log"
eventbus "github.com/libp2p/go-eventbus"
event "github.com/libp2p/go-libp2p-core/event"
network "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"go.uber.org/zap"
)

var log = logging.Logger("plugin/peerlog")

type eventType int

var (
// size of the event queue buffer
eventQueueSize = 64 * 1024
// number of events to drop when busy.
busyDropAmount = eventQueueSize / 8
)

const (
eventConnect eventType = iota
eventIdentify
)

type plEvent struct {
kind eventType
peer peer.ID
}

// Log all the PeerIDs we see
//
// Usage:
// GOLOG_FILE=~/peer.log IPFS_LOGGING_FMT=json ipfs daemon
// Output:
// {"level":"info","ts":"2020-02-10T13:54:26.639Z","logger":"plugin/peerlog","caller":"peerlog/peerlog.go:51","msg":"connected","peer":"QmS2H72gdrekXJggGdE9SunXPntBqdkJdkXQJjuxcH8Cbt"}
// {"level":"info","ts":"2020-02-10T13:54:59.095Z","logger":"plugin/peerlog","caller":"peerlog/peerlog.go:56","msg":"disconnected","peer":"QmS2H72gdrekXJggGdE9SunXPntBqdkJdkXQJjuxcH8Cbt"}
// {"level":"info","ts":"2020-02-10T13:54:59.095Z","logger":"plugin/peerlog","caller":"peerlog/peerlog.go:56","msg":"identified","peer":"QmS2H72gdrekXJggGdE9SunXPntBqdkJdkXQJjuxcH8Cbt","agent":"go-ipfs/0.5.0/"}
//
type peerLogPlugin struct{}
type peerLogPlugin struct {
droppedCount uint64
events chan plEvent
}

var _ plugin.PluginDaemonInternal = (*peerLogPlugin)(nil)

Expand All @@ -41,60 +67,120 @@ func (*peerLogPlugin) Version() string {
}

// Init initializes plugin
func (*peerLogPlugin) Init(*plugin.Environment) error {
func (pl *peerLogPlugin) Init(*plugin.Environment) error {
pl.events = make(chan plEvent, eventQueueSize)
return nil
}

func (*peerLogPlugin) Start(node *core.IpfsNode) error {
func (pl *peerLogPlugin) collectEvents(node *core.IpfsNode) {
ctx := node.Context()

busyCounter := 0
dlog := log.Desugar()
for {
// Deal with dropped events.
dropped := atomic.SwapUint64(&pl.droppedCount, 0)
if dropped > 0 {
busyCounter++

// sleep a bit to give the system a chance to catch up with logging.
select {
case <-time.After(time.Duration(busyCounter) * time.Second):
case <-ctx.Done():
return
}

// drain 1/8th of the backlog backlog so we
// don't immediately run into this situation
// again.
loop:
for i := 0; i < busyDropAmount; i++ {
select {
case <-pl.events:
dropped++
default:
break loop
}
}

// Add in any events we've dropped in the mean-time.
dropped += atomic.SwapUint64(&pl.droppedCount, 0)

// Report that we've dropped events.
dlog.Error("dropped events", zap.Uint64("count", dropped))
} else {
busyCounter = 0
}

var e plEvent
select {
case <-ctx.Done():
return
case e = <-pl.events:
}

peerID := zap.String("peer", e.peer.Pretty())

switch e.kind {
case eventConnect:
dlog.Info("connected", peerID)
case eventIdentify:
agent, err := node.Peerstore.Get(e.peer, "AgentVersion")
switch err {
case nil:
case peerstore.ErrNotFound:
continue
default:
dlog.Error("failed to get agent version", zap.Error(err))
continue
}

agentS, ok := agent.(string)
if !ok {
continue
}
dlog.Info("identified", peerID, zap.String("agent", agentS))
}
}
}

func (pl *peerLogPlugin) emit(evt eventType, p peer.ID) {
select {
case pl.events <- plEvent{kind: evt, peer: p}:
default:
atomic.AddUint64(&pl.droppedCount, 1)
}
}

func (pl *peerLogPlugin) Start(node *core.IpfsNode) error {
// Ensure logs from this plugin get printed regardless of global IPFS_LOGGING value
if err := logging.SetLogLevel("plugin/peerlog", "info"); err != nil {
return fmt.Errorf("failed to set log level: %w", err)
}

sub, err := node.PeerHost.EventBus().Subscribe(new(event.EvtPeerIdentificationCompleted))
if err != nil {
return fmt.Errorf("failed to subscribe to identify notifications")
}

var notifee network.NotifyBundle
notifee.ConnectedF = func(net network.Network, conn network.Conn) {
// TODO: Log transport, country, etc?
log.Infow("connected",
"peer", conn.RemotePeer().Pretty(),
)
}
notifee.DisconnectedF = func(net network.Network, conn network.Conn) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're intentionally removing this event.

log.Infow("disconnected",
"peer", conn.RemotePeer().Pretty(),
)
pl.emit(eventConnect, conn.RemotePeer())
}
node.PeerHost.Network().Notify(&notifee)

sub, err := node.PeerHost.EventBus().Subscribe(
new(event.EvtPeerIdentificationCompleted),
eventbus.BufSize(1024),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a default buffer size of 16 and that should be enough for us now.

)
if err != nil {
return fmt.Errorf("failed to subscribe to identify notifications")
}
go func() {
defer sub.Close()
for e := range sub.Out() {
switch e := e.(type) {
case event.EvtPeerIdentificationCompleted:
protocols, err := node.Peerstore.GetProtocols(e.Peer)
if err != nil {
log.Errorw("failed to get protocols", "error", err)
continue
}
agent, err := node.Peerstore.Get(e.Peer, "AgentVersion")
if err != nil {
log.Errorw("failed to get agent version", "error", err)
continue
}
log.Infow(
"identified",
"peer", e.Peer.Pretty(),
"agent", agent,
"protocols", protocols,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're removing the protocols (too much data).

)
pl.emit(eventIdentify, e.Peer)
}
}
}()

go pl.collectEvents(node)
Stebalien marked this conversation as resolved.
Show resolved Hide resolved

return nil
}

Expand Down