diff --git a/dot/node.go b/dot/node.go index efb4860b43..0d71c2fad7 100644 --- a/dot/node.go +++ b/dot/node.go @@ -346,9 +346,7 @@ func NewNode(cfg *Config, ks *keystore.GlobalKeystore, stopFunc func()) (*Node, return nil, err } - if cfg.Global.NoTelemetry { - return node, nil - } + telemetry.GetInstance().Initialise(!cfg.Global.NoTelemetry) telemetry.GetInstance().AddConnections(gd.TelemetryEndpoints) genesisHash := stateSrvc.Block.GenesisHash() diff --git a/dot/telemetry/telemetry.go b/dot/telemetry/telemetry.go index 2499415f2b..873ed4273a 100644 --- a/dot/telemetry/telemetry.go +++ b/dot/telemetry/telemetry.go @@ -45,15 +45,26 @@ type Handler struct { sendMessageTimeout time.Duration } +// Instance interface that telemetry handler instance needs to implement +type Instance interface { + AddConnections(conns []*genesis.TelemetryEndpoint) + SendMessage(msg Message) error + startListening() + Initialise(enabled bool) +} + var ( once sync.Once - handlerInstance *Handler + handlerInstance Instance + + enabled = true // enabled by default + initilised sync.Once ) const defaultMessageTimeout = time.Second // GetInstance singleton pattern to for accessing TelemetryHandler -func GetInstance() *Handler { //nolint +func GetInstance() Instance { if handlerInstance == nil { once.Do( func() { @@ -65,9 +76,21 @@ func GetInstance() *Handler { //nolint go handlerInstance.startListening() }) } + if !enabled { + return &NoopHandler{} + } + return handlerInstance } +// Initialise function to set if telemetry is enabled +func (h *Handler) Initialise(e bool) { + initilised.Do( + func() { + enabled = e + }) +} + // AddConnections adds the given telemetry endpoint as listeners that will receive telemetry data func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) { for _, v := range conns { @@ -290,3 +313,20 @@ func NewNetworkStateTM(host libp2phost.Host, peerInfos []common.PeerInfo) *Netwo func (tm *NetworkStateTM) messageType() string { return tm.Msg } + +// NoopHandler struct no op handling (ignoring) telemetry messages +type NoopHandler struct { +} + +// Initialise function to set if telemetry is enabled +func (h *NoopHandler) Initialise(enabled bool) {} + +func (h *NoopHandler) startListening() {} + +// SendMessage no op for telemetry send message function +func (h *NoopHandler) SendMessage(msg Message) error { + return nil +} + +// AddConnections no op for telemetry add connections function +func (h *NoopHandler) AddConnections(conns []*genesis.TelemetryEndpoint) {} diff --git a/dot/telemetry/telemetry_test.go b/dot/telemetry/telemetry_test.go index f9e606b9f8..7e89665149 100644 --- a/dot/telemetry/telemetry_test.go +++ b/dot/telemetry/telemetry_test.go @@ -128,6 +128,39 @@ func TestListenerConcurrency(t *testing.T) { } } +func TestDisableInstance(t *testing.T) { + const qty = 1000 + var wg sync.WaitGroup + wg.Add(qty) + + resultCh = make(chan []byte) + for i := 0; i < qty; i++ { + if i == qty/2 { + GetInstance().Initialise(false) + } + go func() { + bh := common.MustHexToHash("0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6") + GetInstance().SendMessage(NewBlockImportTM(&bh, big.NewInt(2), "NetworkInitialSync")) + wg.Done() + }() + } + wg.Wait() + counter := 0 + tk := time.NewTicker(time.Second * 2) +main: + for { + select { + case <-tk.C: + break main + case <-resultCh: + counter++ + } + } + tk.Stop() + + require.LessOrEqual(t, counter, qty/2) +} + // TestInfiniteListener starts loop that print out data received on websocket ws://localhost:8001/ // this can be useful to see what data is sent to telemetry server func TestInfiniteListener(t *testing.T) {