diff --git a/dht_net.go b/dht_net.go index 31775ae8f..0a5e6e665 100644 --- a/dht_net.go +++ b/dht_net.go @@ -82,8 +82,9 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool { for { var req pb.Message msgbytes, err := r.ReadMsg() + msgLen := len(msgbytes) if err != nil { - defer r.ReleaseMsg(msgbytes) + r.ReleaseMsg(msgbytes) if err == io.EOF { return true } @@ -92,21 +93,25 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool { if err.Error() != "stream reset" { logger.Debugf("error reading message: %#v", err) } - stats.RecordWithTags( - ctx, - []tag.Mutator{tag.Upsert(metrics.KeyMessageType, "UNKNOWN")}, - metrics.ReceivedMessageErrors.M(1), - ) + if msgLen > 0 { + stats.RecordWithTags(ctx, + []tag.Mutator{tag.Upsert(metrics.KeyMessageType, "UNKNOWN")}, + metrics.ReceivedMessages.M(1), + metrics.ReceivedMessageErrors.M(1), + metrics.ReceivedBytes.M(int64(msgLen)), + ) + } return false } err = req.Unmarshal(msgbytes) r.ReleaseMsg(msgbytes) if err != nil { logger.Debugf("error unmarshalling message: %#v", err) - stats.RecordWithTags( - ctx, + stats.RecordWithTags(ctx, []tag.Mutator{tag.Upsert(metrics.KeyMessageType, "UNKNOWN")}, + metrics.ReceivedMessages.M(1), metrics.ReceivedMessageErrors.M(1), + metrics.ReceivedBytes.M(int64(msgLen)), ) return false } @@ -114,15 +119,13 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool { timer.Reset(dhtStreamIdleTimeout) startTime := time.Now() - ctx, _ := tag.New( - ctx, + ctx, _ := tag.New(ctx, tag.Upsert(metrics.KeyMessageType, req.GetType().String()), ) - stats.Record( - ctx, + stats.Record(ctx, metrics.ReceivedMessages.M(1), - metrics.ReceivedBytes.M(int64(req.Size())), + metrics.ReceivedBytes.M(int64(msgLen)), ) handler := dht.handlerForMsgType(req.GetType()) @@ -166,7 +169,10 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message ms, err := dht.messageSenderForPeer(ctx, p) if err != nil { - stats.Record(ctx, metrics.SentRequestErrors.M(1)) + stats.Record(ctx, + metrics.SentRequests.M(1), + metrics.SentRequestErrors.M(1), + ) return nil, err } @@ -174,20 +180,20 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message rpmes, err := ms.SendRequest(ctx, pmes) if err != nil { - stats.Record(ctx, metrics.SentRequestErrors.M(1)) + stats.Record(ctx, + metrics.SentRequests.M(1), + metrics.SentRequestErrors.M(1), + ) return nil, err } // update the peer (on valid msgs only) dht.updateFromMessage(ctx, p, rpmes) - stats.Record( - ctx, + stats.Record(ctx, metrics.SentRequests.M(1), metrics.SentBytes.M(int64(pmes.Size())), - metrics.OutboundRequestLatency.M( - float64(time.Since(start))/float64(time.Millisecond), - ), + metrics.OutboundRequestLatency.M(float64(time.Since(start))/float64(time.Millisecond)), ) dht.peerstore.RecordLatency(p, time.Since(start)) logger.Event(ctx, "dhtReceivedMessage", dht.self, p, rpmes) @@ -200,20 +206,26 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message ms, err := dht.messageSenderForPeer(ctx, p) if err != nil { - stats.Record(ctx, metrics.SentMessageErrors.M(1)) + stats.Record(ctx, + metrics.SentMessages.M(1), + metrics.SentMessageErrors.M(1), + ) return err } if err := ms.SendMessage(ctx, pmes); err != nil { - stats.Record(ctx, metrics.SentMessageErrors.M(1)) + stats.Record(ctx, + metrics.SentMessages.M(1), + metrics.SentMessageErrors.M(1), + ) return err } - stats.Record( - ctx, + stats.Record(ctx, metrics.SentMessages.M(1), metrics.SentBytes.M(int64(pmes.Size())), ) + logger.Event(ctx, "dhtSentMessage", dht.self, p, pmes) return nil }