Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
Merge pull request #44 from mjs/remove-writer-state-metrics
Browse files Browse the repository at this point in the history
writer: Remove unused state metrics
  • Loading branch information
oplehto authored Mar 13, 2018
2 parents a67dc47 + dd8a516 commit cbc51ed
Showing 1 changed file with 7 additions and 24 deletions.
31 changes: 7 additions & 24 deletions writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"io/ioutil"
"log"
"os"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -83,7 +82,7 @@ func StartWriter(c *config.Config) (_ *Writer, err error) {

w.nc, err = nats.Connect(c.NATSAddress)
if err != nil {
return nil, fmt.Errorf("NATS Error: can't connect: %v\n", err)
return nil, fmt.Errorf("NATS Error: can't connect: %v", err)
}

// if we disconnect, we want to try reconnecting as many times as
Expand All @@ -92,7 +91,6 @@ func StartWriter(c *config.Config) (_ *Writer, err error) {

http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 100

w.notifyState("boot") // notify the monitor that we have finished booting and soon are ready
jobs := make(chan *nats.Msg, 1024)
w.wg.Add(w.c.Workers)
for wk := 0; wk < w.c.Workers; wk++ {
Expand All @@ -116,16 +114,14 @@ func StartWriter(c *config.Config) (_ *Writer, err error) {
go w.monitorSub(sub)
}

w.wg.Add(1)
go w.startStatistician()

if err = w.nc.LastError(); err != nil {
w.notifyState("crashed")
return nil, err
// Subscriptions don't always seem to be reliable without flushing
// after subscribing.
if err := w.nc.Flush(); err != nil {
return nil, fmt.Errorf("NATS flush error: %v", err)
}

// notify the monitor that we are ready to receive messages and transmit to influxdb
w.notifyState("ready")
w.wg.Add(1)
go w.startStatistician()

log.Printf("writer subscribed to [%v] at %s with %d workers",
c.NATSSubject, c.NATSAddress, c.Workers)
Expand Down Expand Up @@ -327,16 +323,3 @@ func (w *Writer) startStatistician() {
}
}
}

var notifyLine = lineformatter.New("spout_mon", nil, "type", "state", "pid")

func (w *Writer) notifyState(state string) {
line := notifyLine.Format(nil, "writer", state, os.Getpid())
if err := w.nc.Publish(w.c.NATSSubjectMonitor, line); err != nil {
log.Printf("NATS Error: %v\n", err)
return
}
if err := w.nc.Flush(); err != nil {
log.Printf("NATS Error: %v\n", err)
}
}

0 comments on commit cbc51ed

Please sign in to comment.