Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
[WIP] aggressively free memory
Browse files Browse the repository at this point in the history
  • Loading branch information
Stebalien committed Jun 14, 2019
1 parent 480b3f7 commit 07abee7
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 17 deletions.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ require (
github.com/ipfs/go-metrics-interface v0.0.1
github.com/ipfs/go-peertaskqueue v0.1.1
github.com/jbenet/goprocess v0.1.3
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-libp2p v0.1.1
github.com/libp2p/go-libp2p-core v0.0.3
github.com/libp2p/go-libp2p-loggables v0.1.0
github.com/libp2p/go-libp2p-netutil v0.1.0
github.com/libp2p/go-libp2p-testing v0.0.4
github.com/libp2p/go-msgio v0.0.3 // indirect
github.com/libp2p/go-msgio v0.0.3
github.com/mattn/go-colorable v0.1.2 // indirect
github.com/multiformats/go-multiaddr v0.0.4
github.com/opentracing/opentracing-go v1.1.0 // indirect
Expand All @@ -37,3 +38,5 @@ require (
golang.org/x/text v0.3.2 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
)

replace github.com/libp2p/go-msgio => /home/steb/projects/go/src/github.com/libp2p/go-msgio
42 changes: 29 additions & 13 deletions message/message.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package message

import (
"encoding/binary"
"fmt"
"io"

pb "github.com/ipfs/go-bitswap/message/pb"
wantlist "github.com/ipfs/go-bitswap/wantlist"
blocks "github.com/ipfs/go-block-format"

ggio "github.com/gogo/protobuf/io"
cid "github.com/ipfs/go-cid"
pool "github.com/libp2p/go-buffer-pool"
msgio "github.com/libp2p/go-msgio"

"github.com/libp2p/go-libp2p-core/network"
)
Expand Down Expand Up @@ -170,18 +172,22 @@ func (m *impl) AddBlock(b blocks.Block) {

// FromNet generates a new BitswapMessage from incoming data on an io.Reader.
func FromNet(r io.Reader) (BitSwapMessage, error) {
pbr := ggio.NewDelimitedReader(r, network.MessageSizeMax)
return FromPBReader(pbr)
reader := msgio.NewVarintReaderSize(r, network.MessageSizeMax)
return FromMsgReader(reader)
}

// FromPBReader generates a new Bitswap message from a gogo-protobuf reader
func FromPBReader(pbr ggio.Reader) (BitSwapMessage, error) {
pb := new(pb.Message)
if err := pbr.ReadMsg(pb); err != nil {
func FromMsgReader(r msgio.Reader) (BitSwapMessage, error) {
msg, err := r.ReadMsg()
if err != nil {
return nil, err
}

return newMessageFromProto(*pb)
var pb pb.Message
if err := pb.Unmarshal(msg); err != nil {
return nil, err
}
r.ReleaseMsg(msg)
return newMessageFromProto(pb)
}

func (m *impl) ToProtoV0() *pb.Message {
Expand Down Expand Up @@ -228,15 +234,25 @@ func (m *impl) ToProtoV1() *pb.Message {
}

func (m *impl) ToNetV0(w io.Writer) error {
pbw := ggio.NewDelimitedWriter(w)

return pbw.WriteMsg(m.ToProtoV0())
return write(w, m.ToProtoV0())
}

func (m *impl) ToNetV1(w io.Writer) error {
pbw := ggio.NewDelimitedWriter(w)
return write(w, m.ToProtoV1())
}

return pbw.WriteMsg(m.ToProtoV1())
func write(w io.Writer, m *pb.Message) error {
size := m.Size()
buf := pool.Get(size + binary.MaxVarintLen64)
defer pool.Put(buf)
n := binary.PutUvarint(buf, uint64(size))
if written, err := m.MarshalTo(buf[n:]); err != nil {
return err
} else {
n += written
}
_, err := w.Write(buf[:n])
return err
}

func (m *impl) Loggable() map[string]interface{} {
Expand Down
6 changes: 3 additions & 3 deletions network/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
bsmsg "github.com/ipfs/go-bitswap/message"
"github.com/libp2p/go-libp2p-core/helpers"

ggio "github.com/gogo/protobuf/io"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/connmgr"
Expand All @@ -19,6 +18,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
peerstore "github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"
msgio "github.com/libp2p/go-msgio"
ma "github.com/multiformats/go-multiaddr"
)

Expand Down Expand Up @@ -178,9 +178,9 @@ func (bsnet *impl) handleNewStream(s network.Stream) {
return
}

reader := ggio.NewDelimitedReader(s, network.MessageSizeMax)
reader := msgio.NewVarintReaderSize(s, network.MessageSizeMax)
for {
received, err := bsmsg.FromPBReader(reader)
received, err := bsmsg.FromMsgReader(reader)
if err != nil {
if err != io.EOF {
s.Reset()
Expand Down

0 comments on commit 07abee7

Please sign in to comment.