diff --git a/exchange/bitswap/message/message.go b/exchange/bitswap/message/message.go index 95a86ee4951..47ec07ff2fa 100644 --- a/exchange/bitswap/message/message.go +++ b/exchange/bitswap/message/message.go @@ -135,7 +135,10 @@ func (m *impl) AddBlock(b blocks.Block) { func FromNet(r io.Reader) (BitSwapMessage, error) { pbr := ggio.NewDelimitedReader(r, inet.MessageSizeMax) + return FromPBReader(pbr) +} +func FromPBReader(pbr ggio.Reader) (BitSwapMessage, error) { pb := new(pb.Message) if err := pbr.ReadMsg(pb); err != nil { return nil, err diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index 4158b65a104..24145eb960c 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -1,12 +1,15 @@ package network import ( + "io" + key "github.com/ipfs/go-ipfs/blocks/key" bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" routing "github.com/ipfs/go-ipfs/routing" host "gx/ipfs/QmVL44QeoQDTYK8RVdpkyja7uYcK3WDNoBNHVLonf9YDtm/go-libp2p/p2p/host" inet "gx/ipfs/QmVL44QeoQDTYK8RVdpkyja7uYcK3WDNoBNHVLonf9YDtm/go-libp2p/p2p/net" ma "gx/ipfs/QmYzDkkgAEmrcNzFCiYo6L1dTX4EAG1gZkbtdbd9trL4vd/go-multiaddr" + ggio "gx/ipfs/QmZ4Qi3GaRbjcx28Sme5eMH7RQjGkt8wHxt2a65oLaeFEV/gogo-protobuf/io" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" logging "gx/ipfs/QmaDNZ4QMdBdku1YZWBysufYyoQt1negQGNav6PLYarbY8/go-log" peer "gx/ipfs/QmbyvM8zRFDkbFdYyt1MnevUMJ62SiSGbfDFZ3Z8nkrzr4/go-libp2p-peer" @@ -150,11 +153,14 @@ func (bsnet *impl) handleNewStream(s inet.Stream) { return } + reader := ggio.NewDelimitedReader(s, inet.MessageSizeMax) for { - received, err := bsmsg.FromNet(s) + received, err := bsmsg.FromPBReader(reader) if err != nil { - go bsnet.receiver.ReceiveError(err) - log.Debugf("bitswap net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err) + if err != io.EOF { + go bsnet.receiver.ReceiveError(err) + log.Debugf("bitswap net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err) + } return }