Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix (dot/telemetry): NoTelemetry flag stops telemetry #1660

Merged
merged 16 commits into from
Jul 7, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions dot/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,7 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node,
return nil, err
}

if cfg.Global.NoTelemetry {
return node, nil
}
telemetry.Enabled = !cfg.Global.NoTelemetry

telemetry.GetInstance().AddConnections(gd.TelemetryEndpoints)

Expand Down
32 changes: 30 additions & 2 deletions dot/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ type Handler struct {
sendMessageTimeout time.Duration
}

// Instance functions that telemetry handler instance needs to implement
edwardmack marked this conversation as resolved.
Show resolved Hide resolved
type Instance interface {
AddConnections(conns []*genesis.TelemetryEndpoint)
SendMessage(msg *Message) error
startListening()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the private method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to control who can start the listener, this is called when the instance is created.

}

// KeyValue object to hold key value pairs used in telemetry messages
type KeyValue struct {
key string
Expand All @@ -54,13 +61,16 @@ type KeyValue struct {

var (
once sync.Once
handlerInstance *Handler
handlerInstance Instance

// Enabled flag to determine if telemetry is enabled
Enabled = true // enabled by default
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make Enabled private. Shouldn't be able to change this from outside the package.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to disable telemetry from outside the package (i.e. in node.go line 343). I'm following suggestion by @noot above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I think this is fine, we could to create some Initialise function that accepts the enabled bool and is also a singleton and change this package var to internal. what do you think @timwu20

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup that works for me.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I believe I've implemented it correctly now.

)

const defaultMessageTimeout = time.Second

// GetInstance singleton pattern to for accessing TelemetryHandler
func GetInstance() *Handler { //nolint
func GetInstance() Instance {
if handlerInstance == nil {
once.Do(
func() {
Expand All @@ -72,6 +82,10 @@ func GetInstance() *Handler { //nolint
go handlerInstance.startListening()
})
}
if !Enabled {
return &NoopHandler{}
}

return handlerInstance
}

Expand Down Expand Up @@ -158,3 +172,17 @@ func msgToBytes(message Message) []byte {
}
return resB
}

// NoopHandler struct no op handling (ignoring) telemetry messages
type NoopHandler struct {
}

func (h *NoopHandler) startListening() {}

// SendMessage no op for telemetry send message function
func (h *NoopHandler) SendMessage(msg *Message) error {
return nil
}

// AddConnections no op for telemetry add connections function
func (h *NoopHandler) AddConnections(conns []*genesis.TelemetryEndpoint) {}
36 changes: 36 additions & 0 deletions dot/telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,42 @@ func TestListenerConcurrency(t *testing.T) {
}
}

func TestKillInstance(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

const qty = 1000
var wg sync.WaitGroup
wg.Add(qty)

resultCh = make(chan []byte)
for i := 0; i < qty; i++ {
if i == qty/2 {
Enabled = false
}
go func() {
GetInstance().SendMessage(NewTelemetryMessage(
NewKeyValue("best", "hash"),
NewKeyValue("height", big.NewInt(2)),
NewKeyValue("msg", "block.import"),
NewKeyValue("origin", "NetworkInitialSync")))
wg.Done()
}()
}
wg.Wait()
counter := 0
tk := time.NewTicker(time.Second * 2)
main:
for {
select {
case <-tk.C:
break main
case <-resultCh:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this case will be reached? The resultsCh is created but none data is pass through channel

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestMain starts a web server that has a HandleFunc listen which reads websocket messages and passes them to resultCh at line 191. Added a fmt.Println to the select case confirms this.

counter++
}
}
tk.Stop()

require.LessOrEqual(t, counter, qty/2)
}

func listen(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
Expand Down