From 3ee34932708e2f3f352ef29c9a7d31616fd21891 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Thu, 16 Jul 2020 21:03:57 -0400 Subject: [PATCH] If an entry is missing on the websocket, start checking after `wait` but continue checking until `max-wait` Refactored out a useful pruneList function --- cmd/loki-canary/main.go | 7 +- pkg/canary/comparator/comparator.go | 205 +++++++++++++---------- pkg/canary/comparator/comparator_test.go | 106 +++++++----- 3 files changed, 181 insertions(+), 137 deletions(-) diff --git a/cmd/loki-canary/main.go b/cmd/loki-canary/main.go index 0937e93867f4..6c3d88c2da82 100644 --- a/cmd/loki-canary/main.go +++ b/cmd/loki-canary/main.go @@ -43,9 +43,10 @@ func main() { interval := flag.Duration("interval", 1000*time.Millisecond, "Duration between log entries") size := flag.Int("size", 100, "Size in bytes of each log line") - wait := flag.Duration("wait", 60*time.Second, "Duration to wait for log entries before reporting them lost") + wait := flag.Duration("wait", 60*time.Second, "Duration to wait for log entries on websocket before querying loki for them") + maxWait := flag.Duration("max-wait", 5*time.Minute, "Duration to keep querying Loki for missing websocket entries before reporting them missing") pruneInterval := flag.Duration("pruneinterval", 60*time.Second, "Frequency to check sent vs received logs, "+ - "also the frequency which queries for missing logs will be dispatched to loki, and the frequency spot check queries are run") + "also the frequency which queries for missing logs will be dispatched to loki") buckets := flag.Int("buckets", 10, "Number of buckets in the response_latency histogram") metricTestInterval := flag.Duration("metric-test-interval", 1*time.Hour, "The interval the metric test query should be run") @@ -83,7 +84,7 @@ func main() { c.writer = writer.NewWriter(os.Stdout, sentChan, *interval, *size) c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *queryTimeout, *lName, *lVal, *sName, *sValue, *interval) - c.comparator = comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *spotCheckInterval, *spotCheckMax, *spotCheckQueryRate, *metricTestInterval, *metricTestQueryRange, *interval, *buckets, sentChan, receivedChan, c.reader, true) + c.comparator = comparator.NewComparator(os.Stderr, *wait, *maxWait, *pruneInterval, *spotCheckInterval, *spotCheckMax, *spotCheckQueryRate, *metricTestInterval, *metricTestQueryRange, *interval, *buckets, sentChan, receivedChan, c.reader, true) } startCanary() diff --git a/pkg/canary/comparator/comparator.go b/pkg/canary/comparator/comparator.go index 504a0eebb417..71e1a91d9c74 100644 --- a/pkg/canary/comparator/comparator.go +++ b/pkg/canary/comparator/comparator.go @@ -21,6 +21,7 @@ const ( ErrUnexpectedEntry = "received an unexpected entry with ts %v\n" DebugWebsocketMissingEntry = "websocket missing entry: %v\n" DebugQueryResult = "confirmation query result: %v\n" + DebugEntryFound = "missing websocket entry %v was found %v seconds after it was originally sent\n" ) var ( @@ -37,7 +38,7 @@ var ( wsMissingEntries = promauto.NewCounter(prometheus.CounterOpts{ Namespace: "loki_canary", Name: "websocket_missing_entries_total", - Help: "counts log entries not received within the maxWait duration via the websocket connection", + Help: "counts log entries not received within the wait duration via the websocket connection", }) missingEntries = promauto.NewCounter(prometheus.CounterOpts{ Namespace: "loki_canary", @@ -72,21 +73,24 @@ var ( metricTestActual = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: "loki_canary", Name: "metric_test_actual", - Help: "How many counts were actually recevied by the metric test query", + Help: "How many counts were actually received by the metric test query", }) responseLatency prometheus.Histogram ) type Comparator struct { - entMtx sync.Mutex - spotEntMtx sync.Mutex - spotMtx sync.Mutex - metTestMtx sync.Mutex - pruneMtx sync.Mutex + entMtx sync.Mutex // Locks access to []entries and []ackdEntries + missingMtx sync.Mutex // Locks access to []missingEntries + spotEntMtx sync.Mutex // Locks access to []spotCheck + spotMtx sync.Mutex // Locks spotcheckRunning for single threaded but async spotCheck() + metTestMtx sync.Mutex // Locks metricTestRunning for single threaded but async metricTest() + pruneMtx sync.Mutex // Locks pruneEntriesRunning for single threaded but async pruneEntries() w io.Writer entries []*time.Time + missingEntries []*time.Time spotCheck []*time.Time ackdEntries []*time.Time + wait time.Duration maxWait time.Duration pruneInterval time.Duration pruneEntriesRunning bool @@ -108,6 +112,7 @@ type Comparator struct { } func NewComparator(writer io.Writer, + wait time.Duration, maxWait time.Duration, pruneInterval time.Duration, spotCheckInterval, spotCheckMax, spotCheckQueryRate time.Duration, @@ -123,6 +128,7 @@ func NewComparator(writer io.Writer, w: writer, entries: []*time.Time{}, spotCheck: []*time.Time{}, + wait: wait, maxWait: maxWait, pruneInterval: pruneInterval, pruneEntriesRunning: false, @@ -184,29 +190,23 @@ func (c *Comparator) entryReceived(ts time.Time) { c.entMtx.Lock() defer c.entMtx.Unlock() - // Output index - k := 0 matched := false - for i, e := range c.entries { - if ts.Equal(*e) { + c.entries = pruneList(c.entries, + func(_ int, t *time.Time) bool { + return ts.Equal(*t) + }, + func(i int, t *time.Time) { matched = true // If this isn't the first item in the list we received it out of order if i != 0 { outOfOrderEntries.Inc() - fmt.Fprintf(c.w, ErrOutOfOrderEntry, e, c.entries[:i]) + fmt.Fprintf(c.w, ErrOutOfOrderEntry, t, c.entries[:i]) } responseLatency.Observe(time.Since(ts).Seconds()) // Put this element in the acknowledged entries list so we can use it to check for duplicates c.ackdEntries = append(c.ackdEntries, c.entries[i]) - // Do not increment output index, effectively causing this element to be dropped - } else { - // If the current index doesn't match the output index, update the array with the correct position - if i != k { - c.entries[k] = c.entries[i] - } - k++ - } - } + }) + if !matched { duplicate := false for _, e := range c.ackdEntries { @@ -222,11 +222,6 @@ func (c *Comparator) entryReceived(ts time.Time) { unexpectedEntries.Inc() } } - // Nil out the pointers to any trailing elements which were removed from the slice - for i := k; i < len(c.entries); i++ { - c.entries[i] = nil // or the zero value of T - } - c.entries = c.entries[:k] } func (c *Comparator) Size() int { @@ -257,7 +252,7 @@ func (c *Comparator) run() { c.pruneMtx.Lock() if !c.pruneEntriesRunning { c.pruneEntriesRunning = true - go c.pruneEntries() + go c.pruneEntries(time.Now()) } c.pruneMtx.Unlock() case <-sc.C: @@ -315,24 +310,18 @@ func (c *Comparator) spotCheckEntries(currTime time.Time) { c.spotMtx.Unlock() }() c.spotEntMtx.Lock() - k := 0 - for i, e := range c.spotCheck { - if e.Before(currTime.Add(-c.spotCheckMax)) { - // Do nothing, if we don't increment the output index k, this will be dropped - } else { - if i != k { - c.spotCheck[k] = c.spotCheck[i] - } - k++ - } - } - // Nil out the pointers to any trailing elements which were removed from the slice - for i := k; i < len(c.spotCheck); i++ { - c.spotCheck[i] = nil // or the zero value of T - } - c.spotCheck = c.spotCheck[:k] + + // Remove any entries from the spotcheck list which are too old + c.spotCheck = pruneList(c.spotCheck, + func(_ int, t *time.Time) bool { + return t.Before(currTime.Add(-c.spotCheckMax)) + }, + func(_ int, t *time.Time) { + + }) + + // Make a copy so we don't have to hold the lock to verify entries cpy := make([]*time.Time, len(c.spotCheck)) - //Make a copy so we don't have to hold the lock to verify entries copy(cpy, c.spotCheck) c.spotEntMtx.Unlock() @@ -367,7 +356,7 @@ func (c *Comparator) spotCheckEntries(currTime time.Time) { } -func (c *Comparator) pruneEntries() { +func (c *Comparator) pruneEntries(currentTime time.Time) { // Always make sure to set the running state back to false defer func() { c.pruneMtx.Lock() @@ -378,59 +367,46 @@ func (c *Comparator) pruneEntries() { defer c.entMtx.Unlock() missing := []*time.Time{} - k := 0 - for i, e := range c.entries { - // If the time is outside our range, assume the entry has been lost report and remove it - if e.Before(time.Now().Add(-c.maxWait)) { - missing = append(missing, e) + // Prune entry list of anything older than c.wait and add it to missing list + c.entries = pruneList(c.entries, + func(_ int, t *time.Time) bool { + return t.Before(currentTime.Add(-c.wait)) || t.Equal(currentTime.Add(-c.wait)) + }, + func(_ int, t *time.Time) { + missing = append(missing, t) wsMissingEntries.Inc() - fmt.Fprintf(c.w, ErrEntryNotReceivedWs, e.UnixNano(), c.maxWait.Seconds()) - } else { - if i != k { - c.entries[k] = c.entries[i] - } - k++ - } - } - // Nil out the pointers to any trailing elements which were removed from the slice - for i := k; i < len(c.entries); i++ { - c.entries[i] = nil // or the zero value of T - } - c.entries = c.entries[:k] - if len(missing) > 0 { + fmt.Fprintf(c.w, ErrEntryNotReceivedWs, t.UnixNano(), c.wait.Seconds()) + }) + + // Add the list of missing entries to the list for which we will attempt to query Loki for + c.missingMtx.Lock() + c.missingEntries = append(c.missingEntries, missing...) + c.missingMtx.Unlock() + if len(c.missingEntries) > 0 { if c.confirmAsync { - go c.confirmMissing(missing) + go c.confirmMissing(currentTime) } else { - c.confirmMissing(missing) + c.confirmMissing(currentTime) } } - // Prune the acknowledged list, remove anything older than our maxwait - k = 0 - for i, e := range c.ackdEntries { - if e.Before(time.Now().Add(-c.maxWait)) { - // Do nothing, if we don't increment the output index k, this will be dropped - } else { - if i != k { - c.ackdEntries[k] = c.ackdEntries[i] - } - k++ - } - } - // Nil out the pointers to any trailing elements which were removed from the slice - for i := k; i < len(c.ackdEntries); i++ { - c.ackdEntries[i] = nil // or the zero value of T - } - c.ackdEntries = c.ackdEntries[:k] + // Prune c.ackdEntries list of old acknowledged entries which we were using to find duplicates + c.ackdEntries = pruneList(c.ackdEntries, + func(_ int, t *time.Time) bool { + return t.Before(currentTime.Add(-c.wait)) + }, + func(_ int, t *time.Time) { + + }) } -func (c *Comparator) confirmMissing(missing []*time.Time) { +func (c *Comparator) confirmMissing(currentTime time.Time) { // Because we are querying loki timestamps vs the timestamp in the log, // make the range +/- 10 seconds to allow for clock inaccuracies - start := *missing[0] + start := *c.missingEntries[0] start = start.Add(-10 * time.Second) - end := *missing[len(missing)-1] + end := *c.missingEntries[len(c.missingEntries)-1] end = end.Add(10 * time.Second) recvd, err := c.rdr.Query(start, end) if err != nil { @@ -439,38 +415,81 @@ func (c *Comparator) confirmMissing(missing []*time.Time) { } // This is to help debug some missing log entries when queried, // let's print exactly what we are missing and what Loki sent back - for _, r := range missing { + for _, r := range c.missingEntries { fmt.Fprintf(c.w, DebugWebsocketMissingEntry, r.UnixNano()) } for _, r := range recvd { fmt.Fprintf(c.w, DebugQueryResult, r.UnixNano()) } + // Now that query has returned, take out the lock on the missingEntries list so we can modify it + // It's possible more entries were added to this list but that's ok, if they match something in the + // query result we will remove them, if they don't they won't be old enough yet to remove. + c.missingMtx.Lock() + defer c.missingMtx.Unlock() k := 0 - for i, m := range missing { + for i, m := range c.missingEntries { found := false for _, r := range recvd { if (*m).Equal(r) { // Entry was found in loki, this can be dropped from the list of missing // which is done by NOT incrementing the output index k + fmt.Fprintf(c.w, DebugEntryFound, (*m).UnixNano(), currentTime.Sub(*m).Seconds()) found = true } } if !found { // Item is still missing if i != k { - missing[k] = missing[i] + c.missingEntries[k] = c.missingEntries[i] } k++ } } // Nil out the pointers to any trailing elements which were removed from the slice - for i := k; i < len(missing); i++ { - missing[i] = nil // or the zero value of T + for i := k; i < len(c.missingEntries); i++ { + c.missingEntries[i] = nil // or the zero value of T } - missing = missing[:k] - for _, e := range missing { + c.missingEntries = c.missingEntries[:k] + + // Remove entries from missing list which are older than maxWait + removed := []*time.Time{} + c.missingEntries = pruneList(c.missingEntries, + func(_ int, t *time.Time) bool { + return t.Before(currentTime.Add(-c.maxWait)) + }, + func(_ int, t *time.Time) { + removed = append(removed, t) + }) + + // Record the entries which were removed and never received + for _, e := range removed { missingEntries.Inc() fmt.Fprintf(c.w, ErrEntryNotReceived, e.UnixNano(), c.maxWait.Seconds()) } } + +func pruneList(list []*time.Time, shouldRemove func(int, *time.Time) bool, handleRemoved func(int, *time.Time)) []*time.Time { + // Prune the acknowledged list, remove anything older than our maxwait + k := 0 + for i, e := range list { + if shouldRemove(i, e) { + handleRemoved(i, e) + // Do not increment output index k, causing this entry to be dropped + } else { + // If items were skipped, assign the kth element to the current item which is not skipped + if i != k { + list[k] = list[i] + } + // Increment k for the next output item + k++ + } + } + // Nil out the pointers to any trailing elements which were removed from the slice + for i := k; i < len(list); i++ { + list[i] = nil // or the zero value of T + } + // Reslice the list to the new size k + return list[:k] + +} diff --git a/pkg/canary/comparator/comparator_test.go b/pkg/canary/comparator/comparator_test.go index 8db08b4b310c..591bf53b32a2 100644 --- a/pkg/canary/comparator/comparator_test.go +++ b/pkg/canary/comparator/comparator_test.go @@ -19,7 +19,7 @@ func TestComparatorEntryReceivedOutOfOrder(t *testing.T) { duplicateEntries = &mockCounter{} actual := &bytes.Buffer{} - c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false) + c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false) t1 := time.Now() t2 := t1.Add(1 * time.Second) @@ -60,7 +60,7 @@ func TestComparatorEntryReceivedNotExpected(t *testing.T) { duplicateEntries = &mockCounter{} actual := &bytes.Buffer{} - c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false) + c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false) t1 := time.Now() t2 := t1.Add(1 * time.Second) @@ -101,9 +101,9 @@ func TestComparatorEntryReceivedDuplicate(t *testing.T) { duplicateEntries = &mockCounter{} actual := &bytes.Buffer{} - c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false) + c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false) - t1 := time.Now() + t1 := time.Unix(0, 0) t2 := t1.Add(1 * time.Second) t3 := t2.Add(1 * time.Second) t4 := t3.Add(1 * time.Second) @@ -147,18 +147,19 @@ func TestEntryNeverReceived(t *testing.T) { actual := &bytes.Buffer{} - t1 := time.Now() - t2 := t1.Add(1 * time.Millisecond) - t3 := t2.Add(1 * time.Millisecond) - t4 := t3.Add(1 * time.Millisecond) - t5 := t4.Add(1 * time.Millisecond) + t1 := time.Unix(10, 0) + t2 := time.Unix(20, 0) + t3 := time.Unix(30, 0) + t4 := time.Unix(40, 0) + t5 := time.Unix(50, 0) found := []time.Time{t1, t3, t4, t5} mr := &mockReader{resp: found} - maxWait := 50 * time.Millisecond + wait := 60 * time.Second + maxWait := 300 * time.Second //We set the prune interval timer to a huge value here so that it never runs, instead we call pruneEntries manually below - c := NewComparator(actual, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false) + c := NewComparator(actual, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false) c.entrySent(t1) c.entrySent(t2) @@ -174,27 +175,32 @@ func TestEntryNeverReceived(t *testing.T) { assert.Equal(t, 2, c.Size()) - //Wait a few maxWait intervals just to make sure all entries are expired - <-time.After(2 * maxWait) - - c.pruneEntries() - - expected := fmt.Sprintf(ErrOutOfOrderEntry+ErrOutOfOrderEntry+ // Out of order because we missed entries - ErrEntryNotReceivedWs+ErrEntryNotReceivedWs+ // Complain about missed entries - DebugWebsocketMissingEntry+DebugWebsocketMissingEntry+ // List entries we are missing - DebugQueryResult+DebugQueryResult+DebugQueryResult+DebugQueryResult+ // List entries we got back from Loki - ErrEntryNotReceived, // List entry not received from Loki - t3, []time.Time{t2}, - t5, []time.Time{t2, t4}, - t2.UnixNano(), maxWait.Seconds(), - t4.UnixNano(), maxWait.Seconds(), - t2.UnixNano(), - t4.UnixNano(), - t1.UnixNano(), - t3.UnixNano(), - t4.UnixNano(), - t5.UnixNano(), - t2.UnixNano(), maxWait.Seconds()) + //Set the time to 120s, this would be more than one wait (60s) and enough to go looking for the missing entries + c1Time := time.Unix(120, 0) + c.pruneEntries(c1Time) + assert.Equal(t, 1, len(c.missingEntries)) // One of the entries was found so only one should be missing + + //Now set the time to 2x maxWait which should guarnatee we stopped looking for the other missing entry + c2Time := t1.Add(2 * maxWait) + c.pruneEntries(c2Time) + + expected := fmt.Sprintf(ErrOutOfOrderEntry+ErrOutOfOrderEntry+ // 1 Out of order because we missed entries + ErrEntryNotReceivedWs+ErrEntryNotReceivedWs+ // 2 Log that entries weren't received over websocket + DebugWebsocketMissingEntry+DebugWebsocketMissingEntry+ // 3 List entries we are missing + DebugQueryResult+DebugQueryResult+DebugQueryResult+DebugQueryResult+ // 4 List entries we got back from Loki + DebugEntryFound+ // 5 We log when t4 was found on followup query + DebugWebsocketMissingEntry+ // 6 Log missing entries on second run of pruneEntries + DebugQueryResult+DebugQueryResult+DebugQueryResult+DebugQueryResult+ // 7 Because we call pruneEntries twice we get the confirmation query results back twice + ErrEntryNotReceived, // 8 List entry we never received and is missing completely + + t3, []time.Time{t2}, t5, []time.Time{t2, t4}, // 1 Out of order entry params + t2.UnixNano(), wait.Seconds(), t4.UnixNano(), wait.Seconds(), // 2 Entry not received over websocket params + t2.UnixNano(), t4.UnixNano(), // 3 Missing entries + t1.UnixNano(), t3.UnixNano(), t4.UnixNano(), t5.UnixNano(), // 4 Confirmation query results first run + t4.UnixNano(), c1Time.Sub(t4).Seconds(), // 5 t4 Entry found in follow up query + t2.UnixNano(), // 6 Missing Entry + t1.UnixNano(), t3.UnixNano(), t4.UnixNano(), t5.UnixNano(), // 7 Confirmation query results second run + t2.UnixNano(), maxWait.Seconds()) // 8 Entry never found assert.Equal(t, expected, actual.String()) assert.Equal(t, 0, c.Size()) @@ -214,11 +220,12 @@ func TestEntryNeverReceived(t *testing.T) { func TestPruneAckdEntires(t *testing.T) { actual := &bytes.Buffer{} + wait := 30 * time.Millisecond maxWait := 30 * time.Millisecond //We set the prune interval timer to a huge value here so that it never runs, instead we call pruneEntries manually below - c := NewComparator(actual, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false) + c := NewComparator(actual, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false) - t1 := time.Now() + t1 := time.Unix(0, 0) t2 := t1.Add(1 * time.Millisecond) t3 := t2.Add(1 * time.Millisecond) t4 := t3.Add(100 * time.Second) @@ -242,9 +249,8 @@ func TestPruneAckdEntires(t *testing.T) { assert.Equal(t, 4, len(c.ackdEntries)) // Wait a couple maxWaits to make sure the first 3 timestamps get pruned from the ackdEntries, - // the fourth should still remain because its much much newer and we only prune things older than maxWait - <-time.After(2 * maxWait) - c.pruneEntries() + // the fourth should still remain because its much much newer and we only prune things older than wait + c.pruneEntries(t1.Add(2 * maxWait)) assert.Equal(t, 1, len(c.ackdEntries)) assert.Equal(t, t4, *c.ackdEntries[0]) @@ -271,11 +277,10 @@ func TestSpotCheck(t *testing.T) { } mr := &mockReader{resp: found} - maxWait := 50 * time.Millisecond spotCheck := 10 * time.Millisecond spotCheckMax := 10 * time.Millisecond //We set the prune interval timer to a huge value here so that it never runs, instead we call spotCheckEntries manually below - c := NewComparator(actual, maxWait, 50*time.Hour, spotCheck, spotCheckMax, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false) + c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, spotCheck, spotCheckMax, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false) // Send all the entries for i := range entries { @@ -315,10 +320,9 @@ func TestMetricTest(t *testing.T) { writeInterval := 500 * time.Millisecond mr := &mockReader{} - maxWait := 50 * time.Millisecond metricTestRange := 30 * time.Second //We set the prune interval timer to a huge value here so that it never runs, instead we call spotCheckEntries manually below - c := NewComparator(actual, maxWait, 50*time.Hour, 0, 0, 4*time.Hour, 10*time.Minute, metricTestRange, writeInterval, 1, make(chan time.Time), make(chan time.Time), mr, false) + c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, 0, 0, 4*time.Hour, 10*time.Minute, metricTestRange, writeInterval, 1, make(chan time.Time), make(chan time.Time), mr, false) // Force the start time to a known value c.startTime = time.Unix(10, 0) @@ -352,6 +356,26 @@ func TestMetricTest(t *testing.T) { prometheus.Unregister(responseLatency) } +func Test_pruneList(t *testing.T) { + t1 := time.Unix(0, 0) + t2 := time.Unix(1, 0) + t3 := time.Unix(2, 0) + t4 := time.Unix(3, 0) + t5 := time.Unix(4, 0) + + list := []*time.Time{&t1, &t2, &t3, &t4, &t5} + + outList := pruneList(list, func(_ int, ts *time.Time) bool { + // Sorry t2, nobody likes you + return ts.Equal(t2) + }, func(i int, ts *time.Time) { + assert.Equal(t, 1, i) + assert.Equal(t, t2, *ts) + }) + + assert.Equal(t, []*time.Time{&t1, &t3, &t4, &t5}, outList) +} + type mockCounter struct { cLck sync.Mutex count int