Skip to content

Commit

Permalink
Allow custom SubtestStress. (#7)
Browse files Browse the repository at this point in the history
SubtestStress is exposed but it was not possible to build a working opt parameters.
  • Loading branch information
Jorropo authored and raulk committed Oct 29, 2019
1 parent 1fa303d commit 82713a6
Showing 1 changed file with 46 additions and 46 deletions.
92 changes: 46 additions & 46 deletions suites/transport/stream_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ func init() {
}

type Options struct {
connNum int
streamNum int
msgNum int
msgMin int
msgMax int
ConnNum int
StreamNum int
MsgNum int
MsgMin int
MsgMax int
}

func fullClose(t *testing.T, s mux.MuxedStream) {
Expand Down Expand Up @@ -170,12 +170,12 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr,
}

writeStream := func(s mux.MuxedStream, bufs chan<- []byte) {
debugLog(t, "writeStream %p, %d msgNum", s, opt.msgNum)
debugLog(t, "writeStream %p, %d MsgNum", s, opt.MsgNum)

for i := 0; i < opt.msgNum; i++ {
for i := 0; i < opt.MsgNum; i++ {
buf := randBuf(msgsize)
bufs <- buf
debugLog(t, "%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, opt.msgNum, buf[:3])
debugLog(t, "%p writing %d bytes (message %d/%d #%x)", s, len(buf), i, opt.MsgNum, buf[:3])
if _, err := s.Write(buf); err != nil {
errs <- fmt.Errorf("s.Write(buf): %s", err)
continue
Expand All @@ -184,17 +184,17 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr,
}

readStream := func(s mux.MuxedStream, bufs <-chan []byte) {
debugLog(t, "readStream %p, %d msgNum", s, opt.msgNum)
debugLog(t, "readStream %p, %d MsgNum", s, opt.MsgNum)

buf2 := make([]byte, msgsize)
i := 0
for buf1 := range bufs {
i++
debugLog(t, "%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.msgNum, buf1[:3])
debugLog(t, "%p reading %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.MsgNum, buf1[:3])

if _, err := io.ReadFull(s, buf2); err != nil {
errs <- fmt.Errorf("io.ReadFull(s, buf2): %s", err)
debugLog(t, "%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.msgNum, buf1[:3])
debugLog(t, "%p failed to read %d bytes (message %d/%d #%x)", s, len(buf1), i-1, opt.MsgNum, buf1[:3])
continue
}
if !bytes.Equal(buf1, buf2) {
Expand All @@ -204,15 +204,15 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr,
}

openStreamAndRW := func(c mux.MuxedConn) {
debugLog(t, "openStreamAndRW %p, %d opt.msgNum", c, opt.msgNum)
debugLog(t, "openStreamAndRW %p, %d opt.MsgNum", c, opt.MsgNum)

s, err := c.OpenStream()
if err != nil {
errs <- fmt.Errorf("Failed to create NewStream: %s", err)
return
}

bufs := make(chan []byte, opt.msgNum)
bufs := make(chan []byte, opt.MsgNum)
go func() {
writeStream(s, bufs)
close(bufs)
Expand Down Expand Up @@ -248,7 +248,7 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr,
}()

var wg sync.WaitGroup
for i := 0; i < opt.streamNum; i++ {
for i := 0; i < opt.StreamNum; i++ {
wg.Add(1)
go rateLimit(func() {
defer wg.Done()
Expand All @@ -260,10 +260,10 @@ func SubtestStress(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr,
}

openConnsAndRW := func() {
debugLog(t, "openConnsAndRW, %d conns", opt.connNum)
debugLog(t, "openConnsAndRW, %d conns", opt.ConnNum)

var wg sync.WaitGroup
for i := 0; i < opt.connNum; i++ {
for i := 0; i < opt.ConnNum; i++ {
wg.Add(1)
go rateLimit(func() {
defer wg.Done()
Expand Down Expand Up @@ -420,60 +420,60 @@ func SubtestStreamReset(t *testing.T, ta, tb transport.Transport, maddr ma.Multi

func SubtestStress1Conn1Stream1Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
SubtestStress(t, ta, tb, maddr, peerA, Options{
connNum: 1,
streamNum: 1,
msgNum: 1,
msgMax: 100,
msgMin: 100,
ConnNum: 1,
StreamNum: 1,
MsgNum: 1,
MsgMax: 100,
MsgMin: 100,
})
}

func SubtestStress1Conn1Stream100Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
SubtestStress(t, ta, tb, maddr, peerA, Options{
connNum: 1,
streamNum: 1,
msgNum: 100,
msgMax: 100,
msgMin: 100,
ConnNum: 1,
StreamNum: 1,
MsgNum: 100,
MsgMax: 100,
MsgMin: 100,
})
}

func SubtestStress1Conn100Stream100Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
SubtestStress(t, ta, tb, maddr, peerA, Options{
connNum: 1,
streamNum: 100,
msgNum: 100,
msgMax: 100,
msgMin: 100,
ConnNum: 1,
StreamNum: 100,
MsgNum: 100,
MsgMax: 100,
MsgMin: 100,
})
}

func SubtestStress50Conn10Stream50Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
SubtestStress(t, ta, tb, maddr, peerA, Options{
connNum: 50,
streamNum: 10,
msgNum: 50,
msgMax: 100,
msgMin: 100,
ConnNum: 50,
StreamNum: 10,
MsgNum: 50,
MsgMax: 100,
MsgMin: 100,
})
}

func SubtestStress1Conn1000Stream10Msg(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
SubtestStress(t, ta, tb, maddr, peerA, Options{
connNum: 1,
streamNum: 1000,
msgNum: 10,
msgMax: 100,
msgMin: 100,
ConnNum: 1,
StreamNum: 1000,
MsgNum: 10,
MsgMax: 100,
MsgMin: 100,
})
}

func SubtestStress1Conn100Stream100Msg10MB(t *testing.T, ta, tb transport.Transport, maddr ma.Multiaddr, peerA peer.ID) {
SubtestStress(t, ta, tb, maddr, peerA, Options{
connNum: 1,
streamNum: 100,
msgNum: 100,
msgMax: 10000,
msgMin: 1000,
ConnNum: 1,
StreamNum: 100,
MsgNum: 100,
MsgMax: 10000,
MsgMin: 1000,
})
}

0 comments on commit 82713a6

Please sign in to comment.