From 75a5c511d01836a0857b99163e9048eb137fcedb Mon Sep 17 00:00:00 2001 From: Menno Finlay-Smits Date: Tue, 24 Apr 2018 16:54:01 +1200 Subject: [PATCH] filter: Use nats_pending_max_mb As for the writer, the filter now uses the `nats_pending_max_mb` configuration option to set the maximum pending buffer size for the incoming NATS subject. Previously it was using the default size of 65MB. Using a higher number may help to prevent dropped messages in the filter. The default is 200MB. --- README.md | 5 +++++ filter/filter.go | 3 +++ filter/filter_medium_test.go | 1 + writer/writer.go | 4 ++-- 4 files changed, 11 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 49d2ad6..3e4e6c7 100644 --- a/README.md +++ b/README.md @@ -206,6 +206,11 @@ nats_subject_junkyard = "influx-spout-junk" # by the monitor). nats_subject_monitor = "influx-spout-monitor" +# The maximum size that the pending buffer for the NATS subject that the filter +# is reading from may become (in megabytes). Measurements will be dropped if +# this limit is reached. +nats_pending_max_mb = 200 + # The number of filter workers to spawn. workers = 8 diff --git a/filter/filter.go b/filter/filter.go index 805bf19..c0ca75f 100644 --- a/filter/filter.go +++ b/filter/filter.go @@ -91,6 +91,9 @@ func StartFilter(conf *config.Config) (_ *Filter, err error) { if err != nil { return nil, fmt.Errorf("NATS: failed to subscribe: %v", err) } + if err := f.sub.SetPendingLimits(-1, conf.NATSPendingMaxMB*1024*1024); err != nil { + return nil, fmt.Errorf("NATS: failed to set pending limits: %v", err) + } f.wg.Add(1) go f.startStatistician(stats, rules) diff --git a/filter/filter_medium_test.go b/filter/filter_medium_test.go index d338297..2489cb5 100644 --- a/filter/filter_medium_test.go +++ b/filter/filter_medium_test.go @@ -38,6 +38,7 @@ func testConfig() *config.Config { NATSSubject: []string{"filter-test"}, NATSSubjectMonitor: "filter-test-monitor", NATSSubjectJunkyard: "filter-junkyard", + NATSPendingMaxMB: 32, Workers: 1, MaxTimeDeltaSecs: 600, Rule: []config.Rule{{ diff --git a/writer/writer.go b/writer/writer.go index 208914f..66a014d 100644 --- a/writer/writer.go +++ b/writer/writer.go @@ -104,10 +104,10 @@ func StartWriter(c *config.Config) (_ *Writer, err error) { jobs <- msg }) if err != nil { - return nil, fmt.Errorf("subscription for %q failed: %v", subject, err) + return nil, fmt.Errorf("NATS: subscription for %q failed: %v", subject, err) } if err := sub.SetPendingLimits(-1, maxPendingBytes); err != nil { - return nil, fmt.Errorf("failed to set pending limits: %v", err) + return nil, fmt.Errorf("NATS: failed to set pending limits: %v", err) } w.wg.Add(1)