Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support sse transfer mode #122

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions _example/default/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,4 @@ go 1.19

require github.com/arl/statsviz v0.6.0

require github.com/gorilla/websocket v1.5.0 // indirect

replace github.com/arl/statsviz => ../../
6 changes: 3 additions & 3 deletions _example/default/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU=
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ module github.com/arl/statsviz
go 1.20

require (
github.com/gorilla/websocket v1.5.0
github.com/rogpeppe/go-internal v1.11.0
)

require (
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/tools v0.1.12 // indirect
)
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
115 changes: 74 additions & 41 deletions internal/static/js/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,6 @@ import * as plot from "./plot.js";
import * as theme from "./theme.js";
import PlotsDef from './plotsdef.js';

const buildWebsocketURI = () => {
var loc = window.location,
ws_prot = "ws:";
if (loc.protocol === "https:") {
ws_prot = "wss:";
}
return ws_prot + "//" + loc.host + loc.pathname + "ws"
}

const dataRetentionSeconds = 600;
var timeout = 250;

Expand All @@ -26,38 +17,45 @@ let paused = false;
let show_gc = true;
let timerange = 60;

/* WebSocket connection handling */
const connect = () => {
const uri = buildWebsocketURI();
let ws = new WebSocket(uri);
console.info(`Attempting websocket connection to server at ${uri}`);

ws.onopen = () => {
const dataProcessor = {
initDone: false,
close: () => {
},
connected: false,
retrying: false,
onopen: () => {
dataProcessor.initDone = false;
dataProcessor.connected = true;
console.info("Successfully connected");
timeout = 250; // reset connection timeout for next time
};

ws.onclose = event => {
console.error(`Closed websocket connection: code ${event.code}`);
setTimeout(connect, clamp(timeout += timeout, 250, 5000));
};

ws.onerror = err => {
console.error(`Websocket error, closing connection.`);
ws.close();
};

let initDone = false;
ws.onmessage = event => {
},
onclose: event => {
dataProcessor.connected = false;
console.error(`Closed connection: code ${event.code || event}`);
if (dataProcessor.retrying) {
return
}
dataProcessor.retrying = true
setTimeout(() => {
connect()
dataProcessor.retrying = false
}, clamp(timeout += timeout, 250, 5000));
},
onerror: err => {
console.error(`error, closing connection.`, err);
dataProcessor.close();
},
onmessage: event => {
let data = JSON.parse(event.data)

if (!initDone) {
if (!dataProcessor.initDone) {
configurePlots(PlotsDef);
stats.init(PlotsDef, dataRetentionSeconds);

attachPlots();

$('#play_pause').change(() => { paused = !paused; });
$('#play_pause').change(() => {
paused = !paused;
});
$('#show_gc').change(() => {
show_gc = !show_gc;
updatePlots();
Expand All @@ -67,16 +65,27 @@ const connect = () => {
timerange = val;
updatePlots();
});
initDone = true;
return;
dataProcessor.initDone = true;
}

dataProcessor.onData(data);
},
onData: data => {
stats.pushData(data);
if (paused) {
if (paused || !dataProcessor.connected) {
return
}
updatePlots(PlotsDef.events);
updatePlots()
}
}
/* WebSocket connection handling */
const connect = () => {
const url = window.location.pathname + "ws";
const eventSource = new EventSource(url);
console.info(`Attempting sse connection to server at ${url}`);
for (let event in dataProcessor) {
eventSource[event] = dataProcessor[event];
}
dataProcessor.close = eventSource.close
}

connect();
Expand All @@ -102,7 +111,31 @@ const attachPlots = () => {
}
}

const updatePlots = () => {
function throttle(func, delay) {
let initial = true;
let last = null;
let timer = null;
return function () {
const context = this;
const args = arguments;
if (initial) {
func.apply(context, args);
initial = false;
last = Date.now();
} else {
clearTimeout(timer);
timer = setTimeout(function () {
const now = Date.now();
if (now - last >= delay) {
func.apply(context, args);
last = now;
}
}, delay - (Date.now() - last));
}
}
}

const updatePlots = throttle(() => {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this necessary?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can avoid repeated redrawing of DOM caused by browser energy saving mode. This will not affect normal users.
Described in #120

// Create shapes.
let shapes = new Map();

Expand All @@ -123,7 +156,7 @@ const updatePlots = () => {
plot.update(xrange, data, shapes);
}
});
}
}, PlotsDef.sendFrequency||1000)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the sendFrequency setting used here? it wasn't before and I don't understand why it's needed now. Updates are pushed from the server, so I'd expect browser-side updates to be performed when data gets pushed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sets a minimum time for DOM redrawing, maybe I should set it smaller, such as 100


const updatePlotsLayout = () => {
plots.forEach(plot => {
Expand All @@ -139,7 +172,7 @@ theme.updateThemeMode();
$('#color_theme_sw').change(() => {
const themeMode = theme.getThemeMode();
const newTheme = themeMode === "dark" && "light" || "dark";
localStorage.setItem("theme-mode", newTheme);
localStorage.setItem("theme-mode", newTheme);
theme.updateThemeMode();
updatePlotsLayout();
});
2 changes: 1 addition & 1 deletion internal/static/js/stats.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,4 @@ const slice = (n) => {
return sliced;
}

export { init, pushData, slice };
export { init, pushData, slice };
116 changes: 68 additions & 48 deletions statsviz.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,14 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/gorilla/websocket"

"github.com/arl/statsviz/internal/plot"
"github.com/arl/statsviz/internal/static"
)
Expand Down Expand Up @@ -82,7 +81,7 @@ func Register(mux *http.ServeMux, opts ...Option) error {
// updates metrics data and provides two essential HTTP handlers:
// - the Index handler serves Statsviz user interface, allowing you to
// visualize runtime metrics on your browser.
// - The Ws handler establishes a WebSocket connection allowing the connected
// - The Ws handler establishes a data connection allowing the connected
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Ws handler is used in examples and is part of the public API, but since ws means websockets and the communication won't be performed via websockets anymore, the Ws handler must be renamed to SSE. Removing it would be a breaking change however.

So, we should keep the Ws handler and mark is as deprecated, and make it an alias of the SSE handler.

// browser to receive metrics updates from the server.
//
// The zero value is not a valid Server, use NewServer to create a valid one.
Expand Down Expand Up @@ -161,20 +160,34 @@ func (s *Server) Register(mux *http.ServeMux) {
// intercept is a middleware that intercepts requests for plotsdef.js, which is
// generated dynamically based on the plots configuration. Other requests are
// forwarded as-is.
func intercept(h http.Handler, cfg *plot.Config) http.HandlerFunc {
buf := bytes.Buffer{}
buf.WriteString("export default ")
enc := json.NewEncoder(&buf)
enc.SetIndent("", " ")
if err := enc.Encode(cfg); err != nil {
panic("unexpected failure to encode plot definitions: " + err.Error())
func intercept(h http.Handler, cfg *plot.Config, extraConfig map[string]any) http.HandlerFunc {
var plotsdefjs []byte
//Using parentheses helps gc
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

//Using parentheses helps gc

I've never heard of that, can you point to an blog post or a benchmark or anything about that?

{
buf := bytes.Buffer{}
buf.WriteString("export default ")
enc := json.NewEncoder(&buf)
enc.SetIndent("", " ")
var encodeValue any = cfg
if len(extraConfig) > 0 {
encodeValue1 := map[string]any{
"series": cfg.Series,
"events": cfg.Events,
}
for k, v := range extraConfig {
encodeValue1[k] = v
}
encodeValue = encodeValue1
}
if err := enc.Encode(encodeValue); err != nil {
panic("unexpected failure to encode plot definitions: " + err.Error())
}
buf.WriteString(";")
plotsdefjs = buf.Bytes()
}
buf.WriteString(";")
plotsdefjs := buf.Bytes()

return func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "js/plotsdef.js" {
w.Header().Add("Content-Length", strconv.Itoa(buf.Len()))
w.Header().Add("Content-Length", strconv.Itoa(len(plotsdefjs)))
w.Header().Add("Content-Type", "text/javascript; charset=utf-8")
w.Write(plotsdefjs)
return
Expand Down Expand Up @@ -228,7 +241,9 @@ func assetsFS() http.FileSystem {
func (s *Server) Index() http.HandlerFunc {
prefix := strings.TrimSuffix(s.root, "/") + "/"
assets := http.FileServer(assetsFS())
handler := intercept(assets, s.plots.Config())
handler := intercept(assets, s.plots.Config(), map[string]any{
"sendFrequency": s.intv.Milliseconds(),
})

return http.StripPrefix(prefix, handler).ServeHTTP
}
Expand All @@ -238,46 +253,51 @@ func (s *Server) Index() http.HandlerFunc {
// connection to the WebSocket protocol.
func (s *Server) Ws() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}

ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
if strings.Contains(r.Header.Get("Accept"), "/event-stream") {
// If the connection is initiated by an already open web UI
// (started by a previous process, for example), then plotsdef.js won't be
// requested. Call plots.Config() manually to ensure that s.plots internals
// are correctly initialized.
s.plots.Config()

w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
s.startTransfer(w)
return
}
defer ws.Close()

// Ignore this error. This happens when the other end connection closes,
// for example. We can't handle it in any meaningful way anyways.
_ = s.sendStats(ws, s.intv)
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte("This endpoint only supports text/event-stream requests"))
}
}

// sendStats sends runtime statistics over the WebSocket connection.
func (s *Server) sendStats(conn *websocket.Conn, frequency time.Duration) error {
tick := time.NewTicker(frequency)
defer tick.Stop()

// If the WebSocket connection is initiated by an already open web UI
// (started by a previous process, for example), then plotsdef.js won't be
// requested. Call plots.Config() manually to ensure that s.plots internals
// are correctly initialized.
s.plots.Config()

for range tick.C {
w, err := conn.NextWriter(websocket.TextMessage)
if err != nil {
func (s *Server) startTransfer(w io.Writer) {
buffer := bytes.Buffer{}
buffer.WriteString("data: ")
callData := func() error {
if err := s.plots.WriteValues(&buffer); err == nil {
_, err = w.Write(buffer.Bytes())
if err != nil {
return err
}
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
} else {
return err
}
if err := s.plots.WriteValues(w); err != nil {
return err
}
if err := w.Close(); err != nil {
return err
return nil
}
//the first time it was sent immediately
err := callData()
if err != nil {
return
}
tick := time.NewTicker(s.intv)
defer tick.Stop()
for range tick.C {
if callData() != nil {
return
}
}

panic("unreachable")
}
Loading
Loading