diff --git a/examples/websocket/nbioclient/client.go b/examples/websocket/nbioclient/client.go index 90f8468e..7c5f131e 100644 --- a/examples/websocket/nbioclient/client.go +++ b/examples/websocket/nbioclient/client.go @@ -2,6 +2,7 @@ package main import ( "context" + "flag" "fmt" "log" "net/url" @@ -13,24 +14,38 @@ import ( "github.com/lesismal/nbio/nbhttp/websocket" ) -func newUpgrader() *websocket.Upgrader { +var ( + disableEcho = flag.Bool("disable-echo", true, "diable echo incoming message") + verbose = flag.Bool("verbose", false, "verbose mode") + connections = flag.Int("connections", 1000, "number of connecions") + msgSize = flag.Int("msg-size", 1*1024*1024, "message size") +) + +func newUpgrader(connectedChannel *chan *websocket.Conn) *websocket.Upgrader { u := websocket.NewUpgrader() u.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, data []byte) { // echo - time.AfterFunc(time.Second, func() { - c.WriteMessage(messageType, data) - }) - log.Println("onEcho:", string(data)) + if !*disableEcho { + time.AfterFunc(time.Second, func() { + c.WriteMessage(messageType, data) + }) + log.Println("onEcho:", string(data)) + } + if *verbose { + log.Println("received ack:", string(data)) + } + *connectedChannel <- c }) - u.OnClose(func(c *websocket.Conn, err error) { - fmt.Println("OnClose:", c.RemoteAddr().String(), err) + if *verbose { + fmt.Println("OnClose:", c.RemoteAddr().String(), err) + } }) - return u } func main() { + flag.Parse() engine := nbhttp.NewEngine(nbhttp.Config{ SupportClient: true, }) @@ -40,24 +55,50 @@ func main() { return } - for i := 0; i < 1; i++ { + connectionChannels := make(chan *websocket.Conn, *connections) + for i := 0; i < *connections; i++ { u := url.URL{Scheme: "ws", Host: "localhost:8888", Path: "/ws"} dialer := &websocket.Dialer{ Engine: engine, - Upgrader: newUpgrader(), + Upgrader: newUpgrader(&connectionChannels), DialTimeout: time.Second * 3, } c, _, err := dialer.Dial(u.String(), nil) if err != nil { panic(fmt.Errorf("dial: %v", err)) } - c.WriteMessage(websocket.TextMessage, []byte("hello")) + connectionChannels <- c + } + output := make([]byte, *msgSize, *msgSize) + ctx, cancel := context.WithCancel(context.Background()) + for i := 0; i < 10; i++ { + go func() { + defer func() { + if *verbose { + log.Println("sender routine exited") + } + }() + for { + select { + case c := <-connectionChannels: + time.Sleep(1000) + if *verbose { + log.Println("sending message") + } + c.WriteMessage(websocket.BinaryMessage, output) + if *verbose { + log.Println("done sending message") + } + case <-ctx.Done(): + log.Println("ctx done") + return + } + } + }() } - interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt) <-interrupt - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() engine.Shutdown(ctx) } diff --git a/examples/websocket/server/server.go b/examples/websocket/server/server.go index 1929f87a..b06c2fb5 100644 --- a/examples/websocket/server/server.go +++ b/examples/websocket/server/server.go @@ -7,30 +7,60 @@ import ( "net/http" "os" "os/signal" + "sync/atomic" "time" + "github.com/pkg/profile" "github.com/lesismal/nbio/nbhttp" "github.com/lesismal/nbio/nbhttp/websocket" ) -var onDataFrame = flag.Bool("UseOnDataFrame", false, "Server will use OnDataFrame api instead of OnMessage") +var ( + onDataFrame = flag.Bool("UseOnDataFrame", false, "Server will use OnDataFrame api instead of OnMessage") + disableEcho = flag.Bool("disable-echo", false, "disable echo") + verbose = flag.Bool("verbose", false, "verbose mode") + ack = flag.Bool("ack", true, "send back an acknowledgement") + + connections uint64 + msgReceived uint64 + bytesReceived uint64 +) func newUpgrader() *websocket.Upgrader { u := websocket.NewUpgrader() + u.OnOpen(func(c *websocket.Conn) { + atomic.AddUint64(&connections, 1) + }) if *onDataFrame { u.OnDataFrame(func(c *websocket.Conn, messageType websocket.MessageType, fin bool, data []byte) { // echo - c.WriteFrame(messageType, true, fin, data) + if !*disableEcho { + c.WriteFrame(messageType, true, fin, data) + } + atomic.AddUint64(&bytesReceived, uint64(len(data))) + if fin { + atomic.AddUint64(&msgReceived, 1) + } }) } else { u.OnMessage(func(c *websocket.Conn, messageType websocket.MessageType, data []byte) { // echo - c.WriteMessage(messageType, data) + if !*disableEcho { + c.WriteMessage(messageType, data) + } + atomic.AddUint64(&bytesReceived, uint64(len(data))) + atomic.AddUint64(&msgReceived, 1) + if *ack { + c.WriteMessage(messageType, []byte("a")) + } }) } u.OnClose(func(c *websocket.Conn, err error) { - fmt.Println("OnClose:", c.RemoteAddr().String(), err) + if *verbose { + fmt.Println("OnClose:", c.RemoteAddr().String(), err) + } + atomic.AddUint64(&connections, ^uint64(0)) }) return u } @@ -44,7 +74,9 @@ func onWebsocket(w http.ResponseWriter, r *http.Request) { } wsConn := conn.(*websocket.Conn) wsConn.SetReadDeadline(time.Time{}) - fmt.Println("OnOpen:", wsConn.RemoteAddr().String()) + if *verbose { + fmt.Println("OnOpen:", wsConn.RemoteAddr().String()) + } } func main() { @@ -57,11 +89,24 @@ func main() { Addrs: []string{"localhost:8888"}, }, mux, nil) + defer profile.Start(profile.MemProfile).Stop() + err := svr.Start() if err != nil { fmt.Printf("nbio.Start failed: %v\n", err) return } + go func() { + ticker := time.NewTicker(time.Second * 5) + for { + select { + case <-ticker.C: + fmt.Printf("connection %d data received %d\n", atomic.LoadUint64(&connections), atomic.LoadUint64(&bytesReceived)) + } + + } + + }() interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt) diff --git a/go.mod b/go.mod index eaac9562..ae5c4160 100644 --- a/go.mod +++ b/go.mod @@ -5,4 +5,5 @@ go 1.16 require ( github.com/gorilla/websocket v1.4.2 github.com/lesismal/llib v1.1.0 + github.com/pkg/profile v1.6.0 // indirect ) diff --git a/go.sum b/go.sum index 07d0f655..2ae99adf 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/lesismal/llib v1.1.0 h1:dpU21CcgQXyY9GW/ibLZhXf46Lst+Y4KhWO5R34FAWk= github.com/lesismal/llib v1.1.0/go.mod h1:3vmCrIMrpkaoA3bDu/sI+J7EyEUMPbOvmAxb7PlzilM= +github.com/pkg/profile v1.6.0 h1:hUDfIISABYI59DyeB3OTay/HxSRwTQ8rB/H83k6r5dM= +github.com/pkg/profile v1.6.0/go.mod h1:qBsxPvzyUincmltOk6iyRVxHYg4adc0OFOv72ZdLa18= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= golang.org/x/crypto v0.0.0-20210513122933-cd7d49e622d5 h1:N6Jp/LCiEoIBX56BZSR2bepK5GtbSC2DDOYT742mMfE= golang.org/x/crypto v0.0.0-20210513122933-cd7d49e622d5/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8= diff --git a/nbhttp/websocket/upgrader.go b/nbhttp/websocket/upgrader.go index aa682bb7..9facc7c6 100644 --- a/nbhttp/websocket/upgrader.go +++ b/nbhttp/websocket/upgrader.go @@ -315,6 +315,14 @@ func (u *Upgrader) validFrame(opcode MessageType, fin, res1, res2, res3, expecti return nil } +func readAppend(a, b []byte) []byte { + return append(a, b...) +} + +func messageAppend(a, b []byte) []byte { + return append(a, b...) +} + // Read . func (u *Upgrader) Read(p *nbhttp.Parser, data []byte) error { bufLen := len(u.buffer) @@ -326,7 +334,7 @@ func (u *Upgrader) Read(p *nbhttp.Parser, data []byte) error { if bufLen == 0 { u.buffer = data } else { - u.buffer = append(u.buffer, data...) + u.buffer = readAppend(u.buffer, data) oldBuffer = u.buffer } @@ -362,7 +370,7 @@ func (u *Upgrader) Read(p *nbhttp.Parser, data []byte) error { u.message = mempool.Malloc(len(body)) copy(u.message, body) } else { - u.message = append(u.message, body...) + u.message = messageAppend(u.message, body) } } if fin { @@ -447,6 +455,7 @@ func (u *Upgrader) handleMessage(p *nbhttp.Parser, opcode MessageType, body []by p.Execute(func() { u.handleWsMessage(u.conn, opcode, body) + mempool.Free(body) }) }