-
-
Notifications
You must be signed in to change notification settings - Fork 121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support sse transfer mode #122
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= | ||
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= | ||
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= | ||
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= | ||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s= | ||
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= | ||
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= | ||
golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,10 @@ | ||
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= | ||
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= | ||
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= | ||
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= | ||
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= | ||
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= | ||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s= | ||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= | ||
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= | ||
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= | ||
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= | ||
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= | ||
golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= | ||
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,15 +3,6 @@ import * as plot from "./plot.js"; | |
import * as theme from "./theme.js"; | ||
import PlotsDef from './plotsdef.js'; | ||
|
||
const buildWebsocketURI = () => { | ||
var loc = window.location, | ||
ws_prot = "ws:"; | ||
if (loc.protocol === "https:") { | ||
ws_prot = "wss:"; | ||
} | ||
return ws_prot + "//" + loc.host + loc.pathname + "ws" | ||
} | ||
|
||
const dataRetentionSeconds = 600; | ||
var timeout = 250; | ||
|
||
|
@@ -26,38 +17,45 @@ let paused = false; | |
let show_gc = true; | ||
let timerange = 60; | ||
|
||
/* WebSocket connection handling */ | ||
const connect = () => { | ||
const uri = buildWebsocketURI(); | ||
let ws = new WebSocket(uri); | ||
console.info(`Attempting websocket connection to server at ${uri}`); | ||
|
||
ws.onopen = () => { | ||
const dataProcessor = { | ||
initDone: false, | ||
close: () => { | ||
}, | ||
connected: false, | ||
retrying: false, | ||
onopen: () => { | ||
dataProcessor.initDone = false; | ||
dataProcessor.connected = true; | ||
console.info("Successfully connected"); | ||
timeout = 250; // reset connection timeout for next time | ||
}; | ||
|
||
ws.onclose = event => { | ||
console.error(`Closed websocket connection: code ${event.code}`); | ||
setTimeout(connect, clamp(timeout += timeout, 250, 5000)); | ||
}; | ||
|
||
ws.onerror = err => { | ||
console.error(`Websocket error, closing connection.`); | ||
ws.close(); | ||
}; | ||
|
||
let initDone = false; | ||
ws.onmessage = event => { | ||
}, | ||
onclose: event => { | ||
dataProcessor.connected = false; | ||
console.error(`Closed connection: code ${event.code || event}`); | ||
if (dataProcessor.retrying) { | ||
return | ||
} | ||
dataProcessor.retrying = true | ||
setTimeout(() => { | ||
connect() | ||
dataProcessor.retrying = false | ||
}, clamp(timeout += timeout, 250, 5000)); | ||
}, | ||
onerror: err => { | ||
console.error(`error, closing connection.`, err); | ||
dataProcessor.close(); | ||
}, | ||
onmessage: event => { | ||
let data = JSON.parse(event.data) | ||
|
||
if (!initDone) { | ||
if (!dataProcessor.initDone) { | ||
configurePlots(PlotsDef); | ||
stats.init(PlotsDef, dataRetentionSeconds); | ||
|
||
attachPlots(); | ||
|
||
$('#play_pause').change(() => { paused = !paused; }); | ||
$('#play_pause').change(() => { | ||
paused = !paused; | ||
}); | ||
$('#show_gc').change(() => { | ||
show_gc = !show_gc; | ||
updatePlots(); | ||
|
@@ -67,16 +65,27 @@ const connect = () => { | |
timerange = val; | ||
updatePlots(); | ||
}); | ||
initDone = true; | ||
return; | ||
dataProcessor.initDone = true; | ||
} | ||
|
||
dataProcessor.onData(data); | ||
}, | ||
onData: data => { | ||
stats.pushData(data); | ||
if (paused) { | ||
if (paused || !dataProcessor.connected) { | ||
return | ||
} | ||
updatePlots(PlotsDef.events); | ||
updatePlots() | ||
} | ||
} | ||
/* WebSocket connection handling */ | ||
const connect = () => { | ||
const url = window.location.pathname + "ws"; | ||
const eventSource = new EventSource(url); | ||
console.info(`Attempting sse connection to server at ${url}`); | ||
for (let event in dataProcessor) { | ||
eventSource[event] = dataProcessor[event]; | ||
} | ||
dataProcessor.close = eventSource.close | ||
} | ||
|
||
connect(); | ||
|
@@ -102,7 +111,31 @@ const attachPlots = () => { | |
} | ||
} | ||
|
||
const updatePlots = () => { | ||
function throttle(func, delay) { | ||
let initial = true; | ||
let last = null; | ||
let timer = null; | ||
return function () { | ||
const context = this; | ||
const args = arguments; | ||
if (initial) { | ||
func.apply(context, args); | ||
initial = false; | ||
last = Date.now(); | ||
} else { | ||
clearTimeout(timer); | ||
timer = setTimeout(function () { | ||
const now = Date.now(); | ||
if (now - last >= delay) { | ||
func.apply(context, args); | ||
last = now; | ||
} | ||
}, delay - (Date.now() - last)); | ||
} | ||
} | ||
} | ||
|
||
const updatePlots = throttle(() => { | ||
// Create shapes. | ||
let shapes = new Map(); | ||
|
||
|
@@ -123,7 +156,7 @@ const updatePlots = () => { | |
plot.update(xrange, data, shapes); | ||
} | ||
}); | ||
} | ||
}, PlotsDef.sendFrequency||1000) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This sets a minimum time for DOM redrawing, maybe I should set it smaller, such as 100 |
||
|
||
const updatePlotsLayout = () => { | ||
plots.forEach(plot => { | ||
|
@@ -139,7 +172,7 @@ theme.updateThemeMode(); | |
$('#color_theme_sw').change(() => { | ||
const themeMode = theme.getThemeMode(); | ||
const newTheme = themeMode === "dark" && "light" || "dark"; | ||
localStorage.setItem("theme-mode", newTheme); | ||
localStorage.setItem("theme-mode", newTheme); | ||
theme.updateThemeMode(); | ||
updatePlotsLayout(); | ||
}); |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -92,4 +92,4 @@ const slice = (n) => { | |
return sliced; | ||
} | ||
|
||
export { init, pushData, slice }; | ||
export { init, pushData, slice }; |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,15 +43,14 @@ import ( | |
"bytes" | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"net/http" | ||
"os" | ||
"path/filepath" | ||
"strconv" | ||
"strings" | ||
"time" | ||
|
||
"github.com/gorilla/websocket" | ||
|
||
"github.com/arl/statsviz/internal/plot" | ||
"github.com/arl/statsviz/internal/static" | ||
) | ||
|
@@ -82,7 +81,7 @@ func Register(mux *http.ServeMux, opts ...Option) error { | |
// updates metrics data and provides two essential HTTP handlers: | ||
// - the Index handler serves Statsviz user interface, allowing you to | ||
// visualize runtime metrics on your browser. | ||
// - The Ws handler establishes a WebSocket connection allowing the connected | ||
// - The Ws handler establishes a data connection allowing the connected | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The So, we should keep the |
||
// browser to receive metrics updates from the server. | ||
// | ||
// The zero value is not a valid Server, use NewServer to create a valid one. | ||
|
@@ -161,20 +160,34 @@ func (s *Server) Register(mux *http.ServeMux) { | |
// intercept is a middleware that intercepts requests for plotsdef.js, which is | ||
// generated dynamically based on the plots configuration. Other requests are | ||
// forwarded as-is. | ||
func intercept(h http.Handler, cfg *plot.Config) http.HandlerFunc { | ||
buf := bytes.Buffer{} | ||
buf.WriteString("export default ") | ||
enc := json.NewEncoder(&buf) | ||
enc.SetIndent("", " ") | ||
if err := enc.Encode(cfg); err != nil { | ||
panic("unexpected failure to encode plot definitions: " + err.Error()) | ||
func intercept(h http.Handler, cfg *plot.Config, extraConfig map[string]any) http.HandlerFunc { | ||
var plotsdefjs []byte | ||
//Using parentheses helps gc | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I've never heard of that, can you point to an blog post or a benchmark or anything about that? |
||
{ | ||
buf := bytes.Buffer{} | ||
buf.WriteString("export default ") | ||
enc := json.NewEncoder(&buf) | ||
enc.SetIndent("", " ") | ||
var encodeValue any = cfg | ||
if len(extraConfig) > 0 { | ||
encodeValue1 := map[string]any{ | ||
"series": cfg.Series, | ||
"events": cfg.Events, | ||
} | ||
for k, v := range extraConfig { | ||
encodeValue1[k] = v | ||
} | ||
encodeValue = encodeValue1 | ||
} | ||
if err := enc.Encode(encodeValue); err != nil { | ||
panic("unexpected failure to encode plot definitions: " + err.Error()) | ||
} | ||
buf.WriteString(";") | ||
plotsdefjs = buf.Bytes() | ||
} | ||
buf.WriteString(";") | ||
plotsdefjs := buf.Bytes() | ||
|
||
return func(w http.ResponseWriter, r *http.Request) { | ||
if r.URL.Path == "js/plotsdef.js" { | ||
w.Header().Add("Content-Length", strconv.Itoa(buf.Len())) | ||
w.Header().Add("Content-Length", strconv.Itoa(len(plotsdefjs))) | ||
w.Header().Add("Content-Type", "text/javascript; charset=utf-8") | ||
w.Write(plotsdefjs) | ||
return | ||
|
@@ -228,7 +241,9 @@ func assetsFS() http.FileSystem { | |
func (s *Server) Index() http.HandlerFunc { | ||
prefix := strings.TrimSuffix(s.root, "/") + "/" | ||
assets := http.FileServer(assetsFS()) | ||
handler := intercept(assets, s.plots.Config()) | ||
handler := intercept(assets, s.plots.Config(), map[string]any{ | ||
"sendFrequency": s.intv.Milliseconds(), | ||
}) | ||
|
||
return http.StripPrefix(prefix, handler).ServeHTTP | ||
} | ||
|
@@ -238,46 +253,51 @@ func (s *Server) Index() http.HandlerFunc { | |
// connection to the WebSocket protocol. | ||
func (s *Server) Ws() http.HandlerFunc { | ||
return func(w http.ResponseWriter, r *http.Request) { | ||
var upgrader = websocket.Upgrader{ | ||
ReadBufferSize: 1024, | ||
WriteBufferSize: 1024, | ||
} | ||
|
||
ws, err := upgrader.Upgrade(w, r, nil) | ||
if err != nil { | ||
if strings.Contains(r.Header.Get("Accept"), "/event-stream") { | ||
// If the connection is initiated by an already open web UI | ||
// (started by a previous process, for example), then plotsdef.js won't be | ||
// requested. Call plots.Config() manually to ensure that s.plots internals | ||
// are correctly initialized. | ||
s.plots.Config() | ||
|
||
w.Header().Set("Content-Type", "text/event-stream") | ||
w.Header().Set("Cache-Control", "no-cache") | ||
w.Header().Set("Connection", "keep-alive") | ||
s.startTransfer(w) | ||
return | ||
} | ||
defer ws.Close() | ||
|
||
// Ignore this error. This happens when the other end connection closes, | ||
// for example. We can't handle it in any meaningful way anyways. | ||
_ = s.sendStats(ws, s.intv) | ||
w.WriteHeader(http.StatusBadRequest) | ||
w.Write([]byte("This endpoint only supports text/event-stream requests")) | ||
} | ||
} | ||
|
||
// sendStats sends runtime statistics over the WebSocket connection. | ||
func (s *Server) sendStats(conn *websocket.Conn, frequency time.Duration) error { | ||
tick := time.NewTicker(frequency) | ||
defer tick.Stop() | ||
|
||
// If the WebSocket connection is initiated by an already open web UI | ||
// (started by a previous process, for example), then plotsdef.js won't be | ||
// requested. Call plots.Config() manually to ensure that s.plots internals | ||
// are correctly initialized. | ||
s.plots.Config() | ||
|
||
for range tick.C { | ||
w, err := conn.NextWriter(websocket.TextMessage) | ||
if err != nil { | ||
func (s *Server) startTransfer(w io.Writer) { | ||
buffer := bytes.Buffer{} | ||
buffer.WriteString("data: ") | ||
callData := func() error { | ||
if err := s.plots.WriteValues(&buffer); err == nil { | ||
_, err = w.Write(buffer.Bytes()) | ||
if err != nil { | ||
return err | ||
} | ||
if f, ok := w.(http.Flusher); ok { | ||
f.Flush() | ||
} | ||
} else { | ||
return err | ||
} | ||
if err := s.plots.WriteValues(w); err != nil { | ||
return err | ||
} | ||
if err := w.Close(); err != nil { | ||
return err | ||
return nil | ||
} | ||
//the first time it was sent immediately | ||
err := callData() | ||
if err != nil { | ||
return | ||
} | ||
tick := time.NewTicker(s.intv) | ||
defer tick.Stop() | ||
for range tick.C { | ||
if callData() != nil { | ||
return | ||
} | ||
} | ||
|
||
panic("unreachable") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can avoid repeated redrawing of DOM caused by browser energy saving mode. This will not affect normal users.
Described in #120