diff --git a/plugin/plugins/peerlog/peerlog.go b/plugin/plugins/peerlog/peerlog.go index 59356317165e..20f8aecce08f 100644 --- a/plugin/plugins/peerlog/peerlog.go +++ b/plugin/plugins/peerlog/peerlog.go @@ -3,6 +3,7 @@ package peerlog import ( "fmt" "sync/atomic" + "time" core "github.com/ipfs/go-ipfs/core" plugin "github.com/ipfs/go-ipfs/plugin" @@ -18,6 +19,13 @@ var log = logging.Logger("plugin/peerlog") type eventType int +var ( + // size of the event queue buffer + eventQueueSize = 64 * 1024 + // number of events to drop when busy. + busyDropAmount = eventQueueSize / 8 +) + const ( eventConnect eventType = iota eventIdentify @@ -60,7 +68,7 @@ func (*peerLogPlugin) Version() string { // Init initializes plugin func (pl *peerLogPlugin) Init(*plugin.Environment) error { - pl.events = make(chan plEvent, 64*1024) + pl.events = make(chan plEvent, eventQueueSize) return nil } @@ -68,11 +76,41 @@ func (pl *peerLogPlugin) collectEvents(node *core.IpfsNode) { go func() { ctx := node.Context() + busyCounter := 0 dlog := log.Desugar() for { + // Deal with dropped events. dropped := atomic.SwapUint64(&pl.droppedCount, 0) if dropped > 0 { + busyCounter++ + + // sleep a bit to give the system a chance to catch up with logging. + select { + case <-time.After(time.Duration(busyCounter) * time.Second): + case <-ctx.Done(): + return + } + + // drain 1/8th of the backlog backlog so we + // don't immediately run into this situation + // again. + loop: + for i := 0; i < busyDropAmount; i++ { + select { + case <-pl.events: + dropped++ + default: + break loop + } + } + + // Add in any events we've dropped in the mean-time. + dropped += atomic.SwapUint64(&pl.droppedCount, 0) + + // Report that we've dropped events. dlog.Error("dropped events", zap.Uint64("count", dropped)) + } else { + busyCounter = 0 } var e plEvent