diff --git a/cmd/run.go b/cmd/run.go index 966bca8..9e3efbd 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -21,6 +21,7 @@ import ( "github.com/jumptrading/influx-spout/config" "github.com/jumptrading/influx-spout/filter" "github.com/jumptrading/influx-spout/listener" + "github.com/jumptrading/influx-spout/monitor" "github.com/jumptrading/influx-spout/writer" ) @@ -51,6 +52,8 @@ func Run(configFile string) (out Stoppable, err error) { c.Workers = runtime.GOMAXPROCS(-1) * 2 } out, err = writer.StartWriter(c) + case "monitor": + out, err = monitor.Start(c) default: return nil, fmt.Errorf("unknown mode of operation: [%s]", c.Mode) } diff --git a/monitor/monitor.go b/monitor/monitor.go new file mode 100644 index 0000000..ff9683f --- /dev/null +++ b/monitor/monitor.go @@ -0,0 +1,148 @@ +// Copyright 2018 Jump Trading +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package monitor defines the influx-spount monitor component. +package monitor + +import ( + "fmt" + "log" + "net/http" + "sync" + + "github.com/jumptrading/influx-spout/config" + "github.com/jumptrading/influx-spout/prometheus" + "github.com/nats-io/go-nats" +) + +// Start initialises, starts and returns a new Monitor instance based +// on the configuration supplies. +func Start(conf *config.Config) (_ *Monitor, err error) { + m := &Monitor{ + c: conf, + ready: make(chan struct{}), + stop: make(chan struct{}), + metrics: prometheus.NewMetricSet(), + } + defer func() { + if err != nil { + m.Stop() + } + }() + + m.nc, err = m.natsConnect() + if err != nil { + return nil, err + } + + m.sub, err = m.nc.Subscribe(m.c.NATSSubjectMonitor, m.receiveMetrics) + if err != nil { + return nil, fmt.Errorf("NATS: failed to subscribe: %v", err) + } + + m.wg.Add(1) + go m.serveHTTP() + + log.Printf("monitor subscribed to [%s] at %s - serving HTTP on port %d", + m.c.NATSSubjectMonitor, m.c.NATSAddress, m.c.Port) + return m, nil +} + +// Monitor defines an influx-spout component which accumulates +// runtime statistics from the other influx-spout components and +// makes them available via a HTTP endpoint in Prometheus format. +type Monitor struct { + c *config.Config + nc *nats.Conn + sub *nats.Subscription + wg sync.WaitGroup + ready chan struct{} + stop chan struct{} + + mu sync.Mutex + metrics *prometheus.MetricSet +} + +// Ready returns a channel which is closed once the monitor is +// actually listening for HTTP metrics requests. +func (m *Monitor) Ready() <-chan struct{} { + return m.ready +} + +// Stop shuts down goroutines and closes resources related to the filter. +func (m *Monitor) Stop() { + // Stop receiving lines from NATS. + m.sub.Unsubscribe() + + // Shut down goroutines. + close(m.stop) + m.wg.Wait() + + // Close the connection to NATS. + if m.nc != nil { + m.nc.Close() + } +} + +func (m *Monitor) natsConnect() (*nats.Conn, error) { + nc, err := nats.Connect(m.c.NATSAddress) + if err != nil { + return nil, fmt.Errorf("NATS: failed to connect: %v", err) + } + return nc, nil +} + +func (m *Monitor) serveHTTP() { + defer m.wg.Done() + + mux := http.NewServeMux() + mux.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { + r.Body.Close() + w.Header().Set("Content-Type", "text/plain") + + defer m.mu.Unlock() + m.mu.Lock() + w.Write(m.metrics.ToBytes()) + }) + + server := &http.Server{ + Addr: fmt.Sprintf(":%d", m.c.Port), + Handler: mux, + } + + go func() { + close(m.ready) + err := server.ListenAndServe() + if err == nil || err == http.ErrServerClosed { + return + } + log.Fatal(err) + }() + + // Close the server if the stop channel is closed. + <-m.stop + server.Close() +} + +func (m *Monitor) receiveMetrics(msg *nats.Msg) { + newMetrics, err := prometheus.ParseMetrics(msg.Data) + if err != nil { + log.Printf("invalid metrics received: %v", err) + return + } + + defer m.mu.Unlock() + m.mu.Lock() + m.metrics.UpdateFromSet(newMetrics) +} diff --git a/monitor/monitor_medium_test.go b/monitor/monitor_medium_test.go new file mode 100644 index 0000000..1eeff64 --- /dev/null +++ b/monitor/monitor_medium_test.go @@ -0,0 +1,170 @@ +// Copyright 2018 Jump Trading +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build medium + +package monitor_test + +import ( + "fmt" + "io/ioutil" + "net/http" + "testing" + "time" + + "github.com/nats-io/go-nats" + "github.com/stretchr/testify/require" + + "github.com/jumptrading/influx-spout/config" + "github.com/jumptrading/influx-spout/monitor" + "github.com/jumptrading/influx-spout/prometheus" + "github.com/jumptrading/influx-spout/spouttest" +) + +const natsPort = 44447 +const httpPort = 44448 + +var natsAddress = fmt.Sprintf("nats://127.0.0.1:%d", natsPort) + +func testConfig() *config.Config { + return &config.Config{ + Name: "nats.server", + NATSAddress: natsAddress, + NATSSubjectMonitor: "monitor-test-monitor", + Port: httpPort, + } +} + +func TestMonitor(t *testing.T) { + nc, stopNats := runGnatsd(t) + defer stopNats() + + conf := testConfig() + + mon, err := monitor.Start(conf) + require.NoError(t, err) + defer mon.Stop() + + select { + case <-mon.Ready(): + case <-time.After(spouttest.LongWait): + t.Fatal("timed out waiting for monitor to be ready") + } + + publish := func(data []byte) { + err := nc.Publish(conf.NATSSubjectMonitor, data) + require.NoError(t, err) + } + + expected := prometheus.NewMetricSet() + + // Send a metric to the monitor and see it included at the + // monitor's metric endpoint. + m0 := &prometheus.Metric{ + Name: []byte("foo"), + Labels: prometheus.LabelPairs{ + { + Name: []byte("host"), + Value: []byte("nyc01"), + }, + { + Name: []byte("land"), + Value: []byte("ho"), + }, + }, + Value: 42, + Milliseconds: 11111111, + } + publish(m0.ToBytes()) + expected.Update(m0) + + assertMetrics(t, expected) + + // Send another update with 2 metrics to the monitor and see them + // included. + nextUpdate := prometheus.NewMetricSet() + nextUpdate.Update(&prometheus.Metric{ + Name: []byte("foo"), + Labels: prometheus.LabelPairs{ + { + Name: []byte("host"), + Value: []byte("nyc01"), + }, + { + Name: []byte("land"), + Value: []byte("ho"), + }, + }, + Value: 99, + Milliseconds: 22222222, + }) + nextUpdate.Update(&prometheus.Metric{ + Name: []byte("bar"), + Labels: prometheus.LabelPairs{ + { + Name: []byte("host"), + Value: []byte("nyc02"), + }, + }, + Value: 1024, + Milliseconds: 33333333, + }) + publish(nextUpdate.ToBytes()) + expected.UpdateFromSet(nextUpdate) + + assertMetrics(t, expected) +} + +func runGnatsd(t *testing.T) (*nats.Conn, func()) { + gnatsd := spouttest.RunGnatsd(natsPort) + + nc, err := nats.Connect(natsAddress) + if err != nil { + gnatsd.Shutdown() + t.Fatalf("NATS connect failed: %v", err) + } + + return nc, func() { + nc.Close() + gnatsd.Shutdown() + } +} + +func assertMetrics(t *testing.T, expected *prometheus.MetricSet) { + var actual *prometheus.MetricSet + + for try := 0; try < 10; try++ { + resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", httpPort)) + require.NoError(t, err) + defer resp.Body.Close() + + require.Equal(t, resp.StatusCode, 200) + require.Equal(t, "text/plain", resp.Header.Get("Content-Type")) + + body, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + + actual, err = prometheus.ParseMetrics(body) + require.NoError(t, err) + + if string(expected.ToBytes()) == string(actual.ToBytes()) { + return // Success + } + + // Metrics may not have been processed yet - sleep and try again. + time.Sleep(250 * time.Millisecond) + } + + t.Fatalf("Failed to see expected metrics. Wanted:\n%s\nLast saw:\n%s", expected.ToBytes(), actual.ToBytes()) +} diff --git a/prometheus/metric.go b/prometheus/metric.go index d81bf25..2e0ceb2 100644 --- a/prometheus/metric.go +++ b/prometheus/metric.go @@ -29,6 +29,10 @@ type Metric struct { Milliseconds int64 } +func (m *Metric) String() string { + return fmt.Sprintf("", m.ToBytes()) +} + // ToBytes renders the metric to wire format. func (m *Metric) ToBytes() []byte { out := bytes.NewBuffer(m.Name) diff --git a/prometheus/metricset.go b/prometheus/metricset.go index de3e870..6f076d9 100644 --- a/prometheus/metricset.go +++ b/prometheus/metricset.go @@ -15,7 +15,9 @@ package prometheus import ( + "bytes" "fmt" + "sort" ) // NewMetricSet returns an empty MetricSet. @@ -47,6 +49,30 @@ func (set *MetricSet) Update(m *Metric) { set.metrics[metricKey(m)] = m } +// UpdateFromSet updates the values in the set from another MetricSet. +func (set *MetricSet) UpdateFromSet(other *MetricSet) { + for _, m := range other.All() { + set.Update(m) + } +} + +// ToBytes serialise the metrics contained in the MetricSet to the +// Prometheus exposition format. +func (set *MetricSet) ToBytes() []byte { + keys := make([]string, 0, len(set.metrics)) + for key := range set.metrics { + keys = append(keys, key) + } + sort.Strings(keys) + + out := new(bytes.Buffer) + for _, key := range keys { + out.Write(set.metrics[key].ToBytes()) + out.WriteByte('\n') + } + return out.Bytes() +} + func metricKey(m *Metric) string { return fmt.Sprintf("%s%s", m.Name, m.Labels.ToBytes()) } diff --git a/prometheus/metricset_small_test.go b/prometheus/metricset_small_test.go index 0cdbd1a..873eee6 100644 --- a/prometheus/metricset_small_test.go +++ b/prometheus/metricset_small_test.go @@ -154,3 +154,99 @@ func TestUpdateLabelOrdering(t *testing.T) { assert.Equal(t, set.All(), []*prometheus.Metric{m1}) } + +func TestUpdateFromSet(t *testing.T) { + // set0 has 2 metrics + set0 := prometheus.NewMetricSet() + m0 := &prometheus.Metric{ + Name: []byte("uptime"), + Labels: prometheus.LabelPairs{ + {Name: []byte("host"), Value: []byte("nyc01")}, + }, + Value: 222, + Milliseconds: 99, + } + set0.Update(m0) + set0.Update(&prometheus.Metric{ + Name: []byte("temp"), + Labels: prometheus.LabelPairs{ + {Name: []byte("host"), Value: []byte("nyc02")}, + {Name: []byte("core"), Value: []byte("0")}, + }, + Value: 55, + Milliseconds: 100, + }) + + // set1 overwrites one item in set0 and introduces a new one. + set1 := prometheus.NewMetricSet() + m1 := &prometheus.Metric{ + Name: []byte("temp"), + Labels: prometheus.LabelPairs{ + {Name: []byte("host"), Value: []byte("nyc02")}, + {Name: []byte("core"), Value: []byte("0")}, + }, + Value: 66, + Milliseconds: 111, + } + set1.Update(m1) + m2 := &prometheus.Metric{ + Name: []byte("uptime"), + Labels: prometheus.LabelPairs{ + {Name: []byte("host"), Value: []byte("nyc02")}, + }, + Value: 1234, + Milliseconds: 222, + } + set1.Update(m2) + + set0.UpdateFromSet(set1) + + assert.ElementsMatch(t, set0.All(), []*prometheus.Metric{m0, m1, m2}) +} + +func TestToBytes(t *testing.T) { + set := prometheus.NewMetricSet() + + set.Update(&prometheus.Metric{ + Name: []byte("uptime"), + Labels: prometheus.LabelPairs{ + {Name: []byte("host"), Value: []byte("nyc01")}, + }, + Value: 1234, + Milliseconds: 101, + }) + set.Update(&prometheus.Metric{ + Name: []byte("temp"), + Labels: prometheus.LabelPairs{ + {Name: []byte("host"), Value: []byte("nyc02")}, + {Name: []byte("core"), Value: []byte("0")}, + }, + Value: 55, + Milliseconds: 100, + }) + set.Update(&prometheus.Metric{ + Name: []byte("temp"), + Labels: prometheus.LabelPairs{ + {Name: []byte("host"), Value: []byte("nyc02")}, + {Name: []byte("core"), Value: []byte("1")}, + }, + Value: 56, + Milliseconds: 100, + }) + set.Update(&prometheus.Metric{ + Name: []byte("uptime"), + Labels: prometheus.LabelPairs{ + {Name: []byte("host"), Value: []byte("nyc02")}, + }, + Value: 4444, + Milliseconds: 102, + }) + + expected := []byte(` +temp{core="0",host="nyc02"} 55 100 +temp{core="1",host="nyc02"} 56 100 +uptime{host="nyc01"} 1234 101 +uptime{host="nyc02"} 4444 102 +`[1:]) + assert.Equal(t, expected, set.ToBytes()) +} diff --git a/prometheus/parse.go b/prometheus/parse.go index db086a0..025e953 100644 --- a/prometheus/parse.go +++ b/prometheus/parse.go @@ -17,10 +17,27 @@ package prometheus import ( "bytes" "errors" + "fmt" "github.com/jumptrading/influx-spout/convert" ) +// ParseMetrics parses multiple Promethesus metric lines, returning a MetricSet. +func ParseMetrics(s []byte) (*MetricSet, error) { + set := NewMetricSet() + for i, line := range bytes.Split(s, []byte("\n")) { + if len(line) == 0 { + continue + } + m, err := ParseMetric(line) + if err != nil { + return nil, fmt.Errorf("line %d: %v", i+1, err) + } + set.Update(m) + } + return set, nil +} + // ParseMetric parses a single Promethesus metric line. // // Note: The implementation currently only supports integer values and diff --git a/prometheus/parse_small_test.go b/prometheus/parse_small_test.go index 0b2f47e..96084cd 100644 --- a/prometheus/parse_small_test.go +++ b/prometheus/parse_small_test.go @@ -180,3 +180,106 @@ func TestParseLabelsAndTimestamp(t *testing.T) { Milliseconds: 123456789, }, m) } + +func TestParseMetrics(t *testing.T) { + input := []byte(` +foo{host="nyc01",bar="definitely",thing="forgot"} 42 11111111 +foo{host="nyc02",bar="maybe"} 999 22222222 +bar 1234`[1:]) + + expected := prometheus.NewMetricSet() + expected.Update(&prometheus.Metric{ + Name: []byte("foo"), + Labels: prometheus.LabelPairs{ + { + Name: []byte("host"), + Value: []byte("nyc01"), + }, + { + Name: []byte("bar"), + Value: []byte("definitely"), + }, + { + Name: []byte("thing"), + Value: []byte("forgot"), + }, + }, + Value: 42, + Milliseconds: 11111111, + }) + expected.Update(&prometheus.Metric{ + Name: []byte("foo"), + Labels: prometheus.LabelPairs{ + { + Name: []byte("host"), + Value: []byte("nyc02"), + }, + { + Name: []byte("bar"), + Value: []byte("maybe"), + }, + }, + Value: 999, + Milliseconds: 22222222, + }) + expected.Update(&prometheus.Metric{ + Name: []byte("bar"), + Value: 1234, + }) + + actual, err := prometheus.ParseMetrics(input) + require.NoError(t, err) + assert.ElementsMatch(t, expected.All(), actual.All()) +} + +func TestParseMetricsBlankLines(t *testing.T) { + input := []byte(` +foo 111 + +bar 222 + +`) + + expected := prometheus.NewMetricSet() + expected.Update(&prometheus.Metric{ + Name: []byte("foo"), + Value: 111, + }) + expected.Update(&prometheus.Metric{ + Name: []byte("bar"), + Value: 222, + }) + + actual, err := prometheus.ParseMetrics(input) + require.NoError(t, err) + assert.ElementsMatch(t, expected.All(), actual.All()) +} + +func TestParseMetricsLastWins(t *testing.T) { + input := []byte(` +foo 1 +foo 2 +foo 3 +foo 4 +foo 5 +foo 6 +`[1:]) + + expected := prometheus.NewMetricSet() + expected.Update(&prometheus.Metric{ + Name: []byte("foo"), + Value: 6, + }) + + actual, err := prometheus.ParseMetrics(input) + require.NoError(t, err) + assert.ElementsMatch(t, expected.All(), actual.All()) +} + +func TestParseMetricsEmpty(t *testing.T) { + expected := prometheus.NewMetricSet() + + actual, err := prometheus.ParseMetrics([]byte{}) + require.NoError(t, err) + assert.ElementsMatch(t, expected.All(), actual.All()) +}