Skip to content
This repository has been archived by the owner on Mar 9, 2022. It is now read-only.

Commit

Permalink
writer: Clean up subscription dropped reporting
Browse files Browse the repository at this point in the history
- Always report drop counts for each subscription, instead of only
  when a drop is detected.
- Move the responsibility for reporting NATS drops to the
  statistician. This avoids a goroutine per subscription (which was
  probably overkill) and simplifies the code.
- Removed a lot of logging that was of dubious value (the metric
  provides the same information and scales better)
- Renamed "dropped" metric to "nats_dropped" to make the meaning
  clearer
  • Loading branch information
mjs committed Apr 24, 2018
1 parent b2eb25e commit d2cc5c0
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 69 deletions.
1 change: 1 addition & 0 deletions spouttest/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ failed_writes{component="writer",influxdb_address="localhost",influxdb_dbname="t
invalid_time{component="filter",name="filter"} 0
max_pending{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",name="writer"} \d+
nats_dropped{component="filter",name="filter"} 0
nats_dropped{component="writer",influxdb_address="localhost",influxdb_dbname="test",influxdb_port="44501",name="writer",subject="system"} 0
passed{component="filter",name="filter"} 10
processed{component="filter",name="filter"} 20
read_errors{component="listener",name="listener"} 0
Expand Down
102 changes: 33 additions & 69 deletions writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
statWriteRequests = "write_requests"
statFailedWrites = "failed_writes"
statMaxPending = "max_pending"
statNATSDropped = "nats_dropped"
)

type Writer struct {
Expand Down Expand Up @@ -93,31 +94,30 @@ func StartWriter(c *config.Config) (_ *Writer, err error) {
go w.worker(jobs)
}

// subscribe this writer to the NATS subject.
// Subscribe the writer to the configured NATS subjects.
subs := make([]*nats.Subscription, 0, len(c.NATSSubject))
maxPendingBytes := c.NATSPendingMaxMB * 1024 * 1024
for _, subject := range c.NATSSubject {
sub, err := w.nc.Subscribe(subject, func(msg *nats.Msg) {
jobs <- msg
})
if err != nil {
return nil, fmt.Errorf("subscription for %q failed: %v", subject, err)
return nil, fmt.Errorf("NATS: subscription for %q failed: %v", subject, err)
}
if err := sub.SetPendingLimits(-1, maxPendingBytes); err != nil {
return nil, fmt.Errorf("failed to set pending limits: %v", err)
return nil, fmt.Errorf("NATS: failed to set pending limits: %v", err)
}

w.wg.Add(1)
go w.monitorSub(sub)
subs = append(subs, sub)
}

// Subscriptions don't always seem to be reliable without flushing
// after subscribing.
// Subscriptions don't seem to be reliable without flushing after
// subscribing.
if err := w.nc.Flush(); err != nil {
return nil, fmt.Errorf("NATS flush error: %v", err)
}

w.wg.Add(1)
go w.startStatistician()
go w.startStatistician(subs)

log.Printf("writer subscribed to [%v] at %s with %d workers",
c.NATSSubject, c.NATSAddress, c.Workers)
Expand Down Expand Up @@ -237,64 +237,38 @@ func (w *Writer) sendBatch(batch *batchBuffer, client *http.Client) error {
return nil
}

func (w *Writer) signalDrop(subject string, drop, last int) {
// uh, this writer is overloaded and had to drop a packet
log.Printf("Warning: dropped %d for subject %q (total dropped: %d)", drop-last, subject, drop)

labels := w.metricsLabels()
labels["subject"] = subject

line := stats.CounterToPrometheus("dropped", drop, time.Now(), labels)
w.nc.Publish(w.c.NATSSubjectMonitor, line)
w.nc.Flush()
}

func (w *Writer) monitorSub(sub *nats.Subscription) {
// This goroutine is responsible for monitoring the statistics and
// sending it to the monitoring backend.
func (w *Writer) startStatistician(subs []*nats.Subscription) {
defer w.wg.Done()

last, err := sub.Dropped()
if err != nil {
log.Printf("NATS Warning: Failed to get the number of dropped message from NATS: %v\n", err)
}
drop := last

for {
_, maxBytes, err := sub.MaxPending()
if err != nil {
log.Printf("NATS warning: failed to get the max pending stats from NATS: %v\n", err)
continue
}
w.stats.Max(statMaxPending, maxBytes)

drop, err = sub.Dropped()
if err != nil {
log.Printf("NATS warning: failed to get the number of dropped message from NATS: %v\n", err)
continue
labels := map[string]string{
"component": "writer",
"name": w.c.Name,
"influxdb_address": w.c.InfluxDBAddress,
"influxdb_port": strconv.Itoa(w.c.InfluxDBPort),
"influxdb_dbname": w.c.DBName,
}
now := time.Now()

if drop != last {
w.signalDrop(sub.Subject, drop, last)
}
last = drop
// Publish general stats.
lines := stats.SnapshotToPrometheus(w.stats.Snapshot(), now, labels)
w.nc.Publish(w.c.NATSSubjectMonitor, lines)

select {
case <-time.After(time.Second):
case <-w.stop:
sub.Unsubscribe()
return
// Publish per-subscription NATS drop counters.
for _, sub := range subs {
dropped, err := sub.Dropped()
if err != nil {
log.Printf("NATS: failed to get dropped count: %v", err)
continue
}
labels["subject"] = sub.Subject
line := stats.CounterToPrometheus(statNATSDropped, dropped, now, labels)
w.nc.Publish(w.c.NATSSubjectMonitor, line)
}
}
}

// This goroutine is responsible for monitoring the statistics and
// sending it to the monitoring backend.
func (w *Writer) startStatistician() {
defer w.wg.Done()

labels := w.metricsLabels()
for {
lines := stats.SnapshotToPrometheus(w.stats.Snapshot(), time.Now(), labels)
w.nc.Publish(w.c.NATSSubjectMonitor, lines)
w.nc.Flush()

select {
case <-time.After(3 * time.Second):
Expand All @@ -303,13 +277,3 @@ func (w *Writer) startStatistician() {
}
}
}

func (w *Writer) metricsLabels() map[string]string {
return map[string]string{
"component": "writer",
"name": w.c.Name,
"influxdb_address": w.c.InfluxDBAddress,
"influxdb_port": strconv.Itoa(w.c.InfluxDBPort),
"influxdb_dbname": w.c.DBName,
}
}

0 comments on commit d2cc5c0

Please sign in to comment.