Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
Merge pull request #57 from mjs/monitor-component
Browse files Browse the repository at this point in the history
Introduce monitor component
  • Loading branch information
oplehto authored Mar 29, 2018
2 parents a910bad + 02f71fe commit 9256be5
Show file tree
Hide file tree
Showing 8 changed files with 567 additions and 0 deletions.
3 changes: 3 additions & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/jumptrading/influx-spout/config"
"github.com/jumptrading/influx-spout/filter"
"github.com/jumptrading/influx-spout/listener"
"github.com/jumptrading/influx-spout/monitor"
"github.com/jumptrading/influx-spout/writer"
)

Expand Down Expand Up @@ -51,6 +52,8 @@ func Run(configFile string) (out Stoppable, err error) {
c.Workers = runtime.GOMAXPROCS(-1) * 2
}
out, err = writer.StartWriter(c)
case "monitor":
out, err = monitor.Start(c)
default:
return nil, fmt.Errorf("unknown mode of operation: [%s]", c.Mode)
}
Expand Down
148 changes: 148 additions & 0 deletions monitor/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright 2018 Jump Trading
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package monitor defines the influx-spount monitor component.
package monitor

import (
"fmt"
"log"
"net/http"
"sync"

"github.com/jumptrading/influx-spout/config"
"github.com/jumptrading/influx-spout/prometheus"
"github.com/nats-io/go-nats"
)

// Start initialises, starts and returns a new Monitor instance based
// on the configuration supplies.
func Start(conf *config.Config) (_ *Monitor, err error) {
m := &Monitor{
c: conf,
ready: make(chan struct{}),
stop: make(chan struct{}),
metrics: prometheus.NewMetricSet(),
}
defer func() {
if err != nil {
m.Stop()
}
}()

m.nc, err = m.natsConnect()
if err != nil {
return nil, err
}

m.sub, err = m.nc.Subscribe(m.c.NATSSubjectMonitor, m.receiveMetrics)
if err != nil {
return nil, fmt.Errorf("NATS: failed to subscribe: %v", err)
}

m.wg.Add(1)
go m.serveHTTP()

log.Printf("monitor subscribed to [%s] at %s - serving HTTP on port %d",
m.c.NATSSubjectMonitor, m.c.NATSAddress, m.c.Port)
return m, nil
}

// Monitor defines an influx-spout component which accumulates
// runtime statistics from the other influx-spout components and
// makes them available via a HTTP endpoint in Prometheus format.
type Monitor struct {
c *config.Config
nc *nats.Conn
sub *nats.Subscription
wg sync.WaitGroup
ready chan struct{}
stop chan struct{}

mu sync.Mutex
metrics *prometheus.MetricSet
}

// Ready returns a channel which is closed once the monitor is
// actually listening for HTTP metrics requests.
func (m *Monitor) Ready() <-chan struct{} {
return m.ready
}

// Stop shuts down goroutines and closes resources related to the filter.
func (m *Monitor) Stop() {
// Stop receiving lines from NATS.
m.sub.Unsubscribe()

// Shut down goroutines.
close(m.stop)
m.wg.Wait()

// Close the connection to NATS.
if m.nc != nil {
m.nc.Close()
}
}

func (m *Monitor) natsConnect() (*nats.Conn, error) {
nc, err := nats.Connect(m.c.NATSAddress)
if err != nil {
return nil, fmt.Errorf("NATS: failed to connect: %v", err)
}
return nc, nil
}

func (m *Monitor) serveHTTP() {
defer m.wg.Done()

mux := http.NewServeMux()
mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
r.Body.Close()
w.Header().Set("Content-Type", "text/plain")

defer m.mu.Unlock()
m.mu.Lock()
w.Write(m.metrics.ToBytes())
})

server := &http.Server{
Addr: fmt.Sprintf(":%d", m.c.Port),
Handler: mux,
}

go func() {
close(m.ready)
err := server.ListenAndServe()
if err == nil || err == http.ErrServerClosed {
return
}
log.Fatal(err)
}()

// Close the server if the stop channel is closed.
<-m.stop
server.Close()
}

func (m *Monitor) receiveMetrics(msg *nats.Msg) {
newMetrics, err := prometheus.ParseMetrics(msg.Data)
if err != nil {
log.Printf("invalid metrics received: %v", err)
return
}

defer m.mu.Unlock()
m.mu.Lock()
m.metrics.UpdateFromSet(newMetrics)
}
170 changes: 170 additions & 0 deletions monitor/monitor_medium_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// Copyright 2018 Jump Trading
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build medium

package monitor_test

import (
"fmt"
"io/ioutil"
"net/http"
"testing"
"time"

"github.com/nats-io/go-nats"
"github.com/stretchr/testify/require"

"github.com/jumptrading/influx-spout/config"
"github.com/jumptrading/influx-spout/monitor"
"github.com/jumptrading/influx-spout/prometheus"
"github.com/jumptrading/influx-spout/spouttest"
)

const natsPort = 44447
const httpPort = 44448

var natsAddress = fmt.Sprintf("nats://127.0.0.1:%d", natsPort)

func testConfig() *config.Config {
return &config.Config{
Name: "nats.server",
NATSAddress: natsAddress,
NATSSubjectMonitor: "monitor-test-monitor",
Port: httpPort,
}
}

func TestMonitor(t *testing.T) {
nc, stopNats := runGnatsd(t)
defer stopNats()

conf := testConfig()

mon, err := monitor.Start(conf)
require.NoError(t, err)
defer mon.Stop()

select {
case <-mon.Ready():
case <-time.After(spouttest.LongWait):
t.Fatal("timed out waiting for monitor to be ready")
}

publish := func(data []byte) {
err := nc.Publish(conf.NATSSubjectMonitor, data)
require.NoError(t, err)
}

expected := prometheus.NewMetricSet()

// Send a metric to the monitor and see it included at the
// monitor's metric endpoint.
m0 := &prometheus.Metric{
Name: []byte("foo"),
Labels: prometheus.LabelPairs{
{
Name: []byte("host"),
Value: []byte("nyc01"),
},
{
Name: []byte("land"),
Value: []byte("ho"),
},
},
Value: 42,
Milliseconds: 11111111,
}
publish(m0.ToBytes())
expected.Update(m0)

assertMetrics(t, expected)

// Send another update with 2 metrics to the monitor and see them
// included.
nextUpdate := prometheus.NewMetricSet()
nextUpdate.Update(&prometheus.Metric{
Name: []byte("foo"),
Labels: prometheus.LabelPairs{
{
Name: []byte("host"),
Value: []byte("nyc01"),
},
{
Name: []byte("land"),
Value: []byte("ho"),
},
},
Value: 99,
Milliseconds: 22222222,
})
nextUpdate.Update(&prometheus.Metric{
Name: []byte("bar"),
Labels: prometheus.LabelPairs{
{
Name: []byte("host"),
Value: []byte("nyc02"),
},
},
Value: 1024,
Milliseconds: 33333333,
})
publish(nextUpdate.ToBytes())
expected.UpdateFromSet(nextUpdate)

assertMetrics(t, expected)
}

func runGnatsd(t *testing.T) (*nats.Conn, func()) {
gnatsd := spouttest.RunGnatsd(natsPort)

nc, err := nats.Connect(natsAddress)
if err != nil {
gnatsd.Shutdown()
t.Fatalf("NATS connect failed: %v", err)
}

return nc, func() {
nc.Close()
gnatsd.Shutdown()
}
}

func assertMetrics(t *testing.T, expected *prometheus.MetricSet) {
var actual *prometheus.MetricSet

for try := 0; try < 10; try++ {
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", httpPort))
require.NoError(t, err)
defer resp.Body.Close()

require.Equal(t, resp.StatusCode, 200)
require.Equal(t, "text/plain", resp.Header.Get("Content-Type"))

body, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)

actual, err = prometheus.ParseMetrics(body)
require.NoError(t, err)

if string(expected.ToBytes()) == string(actual.ToBytes()) {
return // Success
}

// Metrics may not have been processed yet - sleep and try again.
time.Sleep(250 * time.Millisecond)
}

t.Fatalf("Failed to see expected metrics. Wanted:\n%s\nLast saw:\n%s", expected.ToBytes(), actual.ToBytes())
}
4 changes: 4 additions & 0 deletions prometheus/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ type Metric struct {
Milliseconds int64
}

func (m *Metric) String() string {
return fmt.Sprintf("<Metric: %s>", m.ToBytes())
}

// ToBytes renders the metric to wire format.
func (m *Metric) ToBytes() []byte {
out := bytes.NewBuffer(m.Name)
Expand Down
26 changes: 26 additions & 0 deletions prometheus/metricset.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package prometheus

import (
"bytes"
"fmt"
"sort"
)

// NewMetricSet returns an empty MetricSet.
Expand Down Expand Up @@ -47,6 +49,30 @@ func (set *MetricSet) Update(m *Metric) {
set.metrics[metricKey(m)] = m
}

// UpdateFromSet updates the values in the set from another MetricSet.
func (set *MetricSet) UpdateFromSet(other *MetricSet) {
for _, m := range other.All() {
set.Update(m)
}
}

// ToBytes serialise the metrics contained in the MetricSet to the
// Prometheus exposition format.
func (set *MetricSet) ToBytes() []byte {
keys := make([]string, 0, len(set.metrics))
for key := range set.metrics {
keys = append(keys, key)
}
sort.Strings(keys)

out := new(bytes.Buffer)
for _, key := range keys {
out.Write(set.metrics[key].ToBytes())
out.WriteByte('\n')
}
return out.Bytes()
}

func metricKey(m *Metric) string {
return fmt.Sprintf("%s%s", m.Name, m.Labels.ToBytes())
}
Loading

0 comments on commit 9256be5

Please sign in to comment.