Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: large message test, and leak fix #114

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 54 additions & 13 deletions examples/websocket/nbioclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"flag"
"fmt"
"log"
"net/url"
Expand All @@ -13,24 +14,38 @@ import (
"github.com/lesismal/nbio/nbhttp/websocket"
)

func newUpgrader() *websocket.Upgrader {
var (
disableEcho = flag.Bool("disable-echo", true, "diable echo incoming message")
verbose = flag.Bool("verbose", false, "verbose mode")
connections = flag.Int("connections", 1000, "number of connecions")
msgSize = flag.Int("msg-size", 1*1024*1024, "message size")
)

func newUpgrader(connectedChannel *chan *websocket.Conn) *websocket.Upgrader {
u := websocket.NewUpgrader()
u.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, data []byte) {
// echo
time.AfterFunc(time.Second, func() {
c.WriteMessage(messageType, data)
})
log.Println("onEcho:", string(data))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If use time.AfterFunc, it will execute in another goroutine, then we should not set ReleaseWebsocketPayload=true, but we can release the buffer like this:

time.AfterFunc(time.Second, func() {
	c.WriteMessage(messageType, data)
	mempool.Free(data)
})

if !*disableEcho {
time.AfterFunc(time.Second, func() {
c.WriteMessage(messageType, data)
})
log.Println("onEcho:", string(data))
}
if *verbose {
log.Println("received ack:", string(data))
}
*connectedChannel <- c
})

u.OnClose(func(c *websocket.Conn, err error) {
fmt.Println("OnClose:", c.RemoteAddr().String(), err)
if *verbose {
fmt.Println("OnClose:", c.RemoteAddr().String(), err)
}
})

return u
}

func main() {
flag.Parse()
engine := nbhttp.NewEngine(nbhttp.Config{
SupportClient: true,
})
Expand All @@ -40,24 +55,50 @@ func main() {
return
}

for i := 0; i < 1; i++ {
connectionChannels := make(chan *websocket.Conn, *connections)
for i := 0; i < *connections; i++ {
u := url.URL{Scheme: "ws", Host: "localhost:8888", Path: "/ws"}
dialer := &websocket.Dialer{
Engine: engine,
Upgrader: newUpgrader(),
Upgrader: newUpgrader(&connectionChannels),
DialTimeout: time.Second * 3,
}
c, _, err := dialer.Dial(u.String(), nil)
if err != nil {
panic(fmt.Errorf("dial: %v", err))
}
c.WriteMessage(websocket.TextMessage, []byte("hello"))
connectionChannels <- c
}
output := make([]byte, *msgSize, *msgSize)
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < 10; i++ {
go func() {
defer func() {
if *verbose {
log.Println("sender routine exited")
}
}()
for {
select {
case c := <-connectionChannels:
time.Sleep(1000)
if *verbose {
log.Println("sending message")
}
c.WriteMessage(websocket.BinaryMessage, output)
if *verbose {
log.Println("done sending message")
}
case <-ctx.Done():
log.Println("ctx done")
return
}
}
}()
}

interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
<-interrupt
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
engine.Shutdown(ctx)
}
55 changes: 50 additions & 5 deletions examples/websocket/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,60 @@ import (
"net/http"
"os"
"os/signal"
"sync/atomic"
"time"

"github.com/pkg/profile"
"github.com/lesismal/nbio/nbhttp"
"github.com/lesismal/nbio/nbhttp/websocket"
)

var onDataFrame = flag.Bool("UseOnDataFrame", false, "Server will use OnDataFrame api instead of OnMessage")
var (
onDataFrame = flag.Bool("UseOnDataFrame", false, "Server will use OnDataFrame api instead of OnMessage")
disableEcho = flag.Bool("disable-echo", false, "disable echo")
verbose = flag.Bool("verbose", false, "verbose mode")
ack = flag.Bool("ack", true, "send back an acknowledgement")

connections uint64
msgReceived uint64
bytesReceived uint64
)

func newUpgrader() *websocket.Upgrader {
u := websocket.NewUpgrader()
u.OnOpen(func(c *websocket.Conn) {
atomic.AddUint64(&connections, 1)
})
if *onDataFrame {
u.OnDataFrame(func(c *websocket.Conn, messageType websocket.MessageType, fin bool, data []byte) {
// echo
c.WriteFrame(messageType, true, fin, data)
if !*disableEcho {
c.WriteFrame(messageType, true, fin, data)
}
atomic.AddUint64(&bytesReceived, uint64(len(data)))
if fin {
atomic.AddUint64(&msgReceived, 1)
}
})
} else {
u.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, data []byte) {
// echo
c.WriteMessage(messageType, data)
if !*disableEcho {
c.WriteMessage(messageType, data)
}
atomic.AddUint64(&bytesReceived, uint64(len(data)))
atomic.AddUint64(&msgReceived, 1)
if *ack {
c.WriteMessage(messageType, []byte("a"))
}
})
}

u.OnClose(func(c *websocket.Conn, err error) {
fmt.Println("OnClose:", c.RemoteAddr().String(), err)
if *verbose {
fmt.Println("OnClose:", c.RemoteAddr().String(), err)
}
atomic.AddUint64(&connections, ^uint64(0))
})
return u
}
Expand All @@ -44,7 +74,9 @@ func onWebsocket(w http.ResponseWriter, r *http.Request) {
}
wsConn := conn.(*websocket.Conn)
wsConn.SetReadDeadline(time.Time{})
fmt.Println("OnOpen:", wsConn.RemoteAddr().String())
if *verbose {
fmt.Println("OnOpen:", wsConn.RemoteAddr().String())
}
}

func main() {
Expand All @@ -57,11 +89,24 @@ func main() {
Addrs: []string{"localhost:8888"},
}, mux, nil)

defer profile.Start(profile.MemProfile).Stop()

err := svr.Start()
if err != nil {
fmt.Printf("nbio.Start failed: %v\n", err)
return
}
go func() {
ticker := time.NewTicker(time.Second * 5)
for {
select {
case <-ticker.C:
fmt.Printf("connection %d data received %d\n", atomic.LoadUint64(&connections), atomic.LoadUint64(&bytesReceived))
}

}

}()

interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ go 1.16
require (
github.com/gorilla/websocket v1.4.2
github.com/lesismal/llib v1.1.0
github.com/pkg/profile v1.6.0 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/lesismal/llib v1.1.0 h1:dpU21CcgQXyY9GW/ibLZhXf46Lst+Y4KhWO5R34FAWk=
github.com/lesismal/llib v1.1.0/go.mod h1:3vmCrIMrpkaoA3bDu/sI+J7EyEUMPbOvmAxb7PlzilM=
github.com/pkg/profile v1.6.0 h1:hUDfIISABYI59DyeB3OTay/HxSRwTQ8rB/H83k6r5dM=
github.com/pkg/profile v1.6.0/go.mod h1:qBsxPvzyUincmltOk6iyRVxHYg4adc0OFOv72ZdLa18=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
golang.org/x/crypto v0.0.0-20210513122933-cd7d49e622d5 h1:N6Jp/LCiEoIBX56BZSR2bepK5GtbSC2DDOYT742mMfE=
golang.org/x/crypto v0.0.0-20210513122933-cd7d49e622d5/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
Expand Down
13 changes: 11 additions & 2 deletions nbhttp/websocket/upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,14 @@ func (u *Upgrader) validFrame(opcode MessageType, fin, res1, res2, res3, expecti
return nil
}

func readAppend(a, b []byte) []byte {
return append(a, b...)
}

func messageAppend(a, b []byte) []byte {
return append(a, b...)
}

// Read .
func (u *Upgrader) Read(p *nbhttp.Parser, data []byte) error {
bufLen := len(u.buffer)
Expand All @@ -326,7 +334,7 @@ func (u *Upgrader) Read(p *nbhttp.Parser, data []byte) error {
if bufLen == 0 {
u.buffer = data
} else {
u.buffer = append(u.buffer, data...)
u.buffer = readAppend(u.buffer, data)
oldBuffer = u.buffer
}

Expand Down Expand Up @@ -362,7 +370,7 @@ func (u *Upgrader) Read(p *nbhttp.Parser, data []byte) error {
u.message = mempool.Malloc(len(body))
copy(u.message, body)
} else {
u.message = append(u.message, body...)
u.message = messageAppend(u.message, body)
}
}
if fin {
Expand Down Expand Up @@ -447,6 +455,7 @@ func (u *Upgrader) handleMessage(p *nbhttp.Parser, opcode MessageType, body []by

p.Execute(func() {
u.handleWsMessage(u.conn, opcode, body)
mempool.Free(body)
})

}
Expand Down