Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loki Canary: One more round of improvements to query for missing websocket entries up to max-wait #2369

Merged
merged 1 commit into from
Jul 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cmd/loki-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ func main() {

interval := flag.Duration("interval", 1000*time.Millisecond, "Duration between log entries")
size := flag.Int("size", 100, "Size in bytes of each log line")
wait := flag.Duration("wait", 60*time.Second, "Duration to wait for log entries before reporting them lost")
wait := flag.Duration("wait", 60*time.Second, "Duration to wait for log entries on websocket before querying loki for them")
maxWait := flag.Duration("max-wait", 5*time.Minute, "Duration to keep querying Loki for missing websocket entries before reporting them missing")
pruneInterval := flag.Duration("pruneinterval", 60*time.Second, "Frequency to check sent vs received logs, "+
"also the frequency which queries for missing logs will be dispatched to loki, and the frequency spot check queries are run")
"also the frequency which queries for missing logs will be dispatched to loki")
buckets := flag.Int("buckets", 10, "Number of buckets in the response_latency histogram")

metricTestInterval := flag.Duration("metric-test-interval", 1*time.Hour, "The interval the metric test query should be run")
Expand Down Expand Up @@ -83,7 +84,7 @@ func main() {

c.writer = writer.NewWriter(os.Stdout, sentChan, *interval, *size)
c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *queryTimeout, *lName, *lVal, *sName, *sValue, *interval)
c.comparator = comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *spotCheckInterval, *spotCheckMax, *spotCheckQueryRate, *metricTestInterval, *metricTestQueryRange, *interval, *buckets, sentChan, receivedChan, c.reader, true)
c.comparator = comparator.NewComparator(os.Stderr, *wait, *maxWait, *pruneInterval, *spotCheckInterval, *spotCheckMax, *spotCheckQueryRate, *metricTestInterval, *metricTestQueryRange, *interval, *buckets, sentChan, receivedChan, c.reader, true)
}

startCanary()
Expand Down
205 changes: 112 additions & 93 deletions pkg/canary/comparator/comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
ErrUnexpectedEntry = "received an unexpected entry with ts %v\n"
DebugWebsocketMissingEntry = "websocket missing entry: %v\n"
DebugQueryResult = "confirmation query result: %v\n"
DebugEntryFound = "missing websocket entry %v was found %v seconds after it was originally sent\n"
)

var (
Expand All @@ -37,7 +38,7 @@ var (
wsMissingEntries = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "loki_canary",
Name: "websocket_missing_entries_total",
Help: "counts log entries not received within the maxWait duration via the websocket connection",
Help: "counts log entries not received within the wait duration via the websocket connection",
})
missingEntries = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "loki_canary",
Expand Down Expand Up @@ -72,21 +73,24 @@ var (
metricTestActual = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "loki_canary",
Name: "metric_test_actual",
Help: "How many counts were actually recevied by the metric test query",
Help: "How many counts were actually received by the metric test query",
})
responseLatency prometheus.Histogram
)

type Comparator struct {
entMtx sync.Mutex
spotEntMtx sync.Mutex
spotMtx sync.Mutex
metTestMtx sync.Mutex
pruneMtx sync.Mutex
entMtx sync.Mutex // Locks access to []entries and []ackdEntries
missingMtx sync.Mutex // Locks access to []missingEntries
spotEntMtx sync.Mutex // Locks access to []spotCheck
spotMtx sync.Mutex // Locks spotcheckRunning for single threaded but async spotCheck()
metTestMtx sync.Mutex // Locks metricTestRunning for single threaded but async metricTest()
pruneMtx sync.Mutex // Locks pruneEntriesRunning for single threaded but async pruneEntries()
w io.Writer
entries []*time.Time
missingEntries []*time.Time
spotCheck []*time.Time
ackdEntries []*time.Time
wait time.Duration
maxWait time.Duration
pruneInterval time.Duration
pruneEntriesRunning bool
Expand All @@ -108,6 +112,7 @@ type Comparator struct {
}

func NewComparator(writer io.Writer,
wait time.Duration,
maxWait time.Duration,
pruneInterval time.Duration,
spotCheckInterval, spotCheckMax, spotCheckQueryRate time.Duration,
Expand All @@ -123,6 +128,7 @@ func NewComparator(writer io.Writer,
w: writer,
entries: []*time.Time{},
spotCheck: []*time.Time{},
wait: wait,
maxWait: maxWait,
pruneInterval: pruneInterval,
pruneEntriesRunning: false,
Expand Down Expand Up @@ -184,29 +190,23 @@ func (c *Comparator) entryReceived(ts time.Time) {
c.entMtx.Lock()
defer c.entMtx.Unlock()

// Output index
k := 0
matched := false
for i, e := range c.entries {
if ts.Equal(*e) {
c.entries = pruneList(c.entries,
func(_ int, t *time.Time) bool {
return ts.Equal(*t)
},
func(i int, t *time.Time) {
matched = true
// If this isn't the first item in the list we received it out of order
if i != 0 {
outOfOrderEntries.Inc()
fmt.Fprintf(c.w, ErrOutOfOrderEntry, e, c.entries[:i])
fmt.Fprintf(c.w, ErrOutOfOrderEntry, t, c.entries[:i])
}
responseLatency.Observe(time.Since(ts).Seconds())
// Put this element in the acknowledged entries list so we can use it to check for duplicates
c.ackdEntries = append(c.ackdEntries, c.entries[i])
// Do not increment output index, effectively causing this element to be dropped
} else {
// If the current index doesn't match the output index, update the array with the correct position
if i != k {
c.entries[k] = c.entries[i]
}
k++
}
}
})

if !matched {
duplicate := false
for _, e := range c.ackdEntries {
Expand All @@ -222,11 +222,6 @@ func (c *Comparator) entryReceived(ts time.Time) {
unexpectedEntries.Inc()
}
}
// Nil out the pointers to any trailing elements which were removed from the slice
for i := k; i < len(c.entries); i++ {
c.entries[i] = nil // or the zero value of T
}
c.entries = c.entries[:k]
}

func (c *Comparator) Size() int {
Expand Down Expand Up @@ -257,7 +252,7 @@ func (c *Comparator) run() {
c.pruneMtx.Lock()
if !c.pruneEntriesRunning {
c.pruneEntriesRunning = true
go c.pruneEntries()
go c.pruneEntries(time.Now())
}
c.pruneMtx.Unlock()
case <-sc.C:
Expand Down Expand Up @@ -315,24 +310,18 @@ func (c *Comparator) spotCheckEntries(currTime time.Time) {
c.spotMtx.Unlock()
}()
c.spotEntMtx.Lock()
k := 0
for i, e := range c.spotCheck {
if e.Before(currTime.Add(-c.spotCheckMax)) {
// Do nothing, if we don't increment the output index k, this will be dropped
} else {
if i != k {
c.spotCheck[k] = c.spotCheck[i]
}
k++
}
}
// Nil out the pointers to any trailing elements which were removed from the slice
for i := k; i < len(c.spotCheck); i++ {
c.spotCheck[i] = nil // or the zero value of T
}
c.spotCheck = c.spotCheck[:k]

// Remove any entries from the spotcheck list which are too old
c.spotCheck = pruneList(c.spotCheck,
func(_ int, t *time.Time) bool {
return t.Before(currTime.Add(-c.spotCheckMax))
},
func(_ int, t *time.Time) {

})

// Make a copy so we don't have to hold the lock to verify entries
cpy := make([]*time.Time, len(c.spotCheck))
//Make a copy so we don't have to hold the lock to verify entries
copy(cpy, c.spotCheck)
c.spotEntMtx.Unlock()

Expand Down Expand Up @@ -367,7 +356,7 @@ func (c *Comparator) spotCheckEntries(currTime time.Time) {

}

func (c *Comparator) pruneEntries() {
func (c *Comparator) pruneEntries(currentTime time.Time) {
// Always make sure to set the running state back to false
defer func() {
c.pruneMtx.Lock()
Expand All @@ -378,59 +367,46 @@ func (c *Comparator) pruneEntries() {
defer c.entMtx.Unlock()

missing := []*time.Time{}
k := 0
for i, e := range c.entries {
// If the time is outside our range, assume the entry has been lost report and remove it
if e.Before(time.Now().Add(-c.maxWait)) {
missing = append(missing, e)
// Prune entry list of anything older than c.wait and add it to missing list
c.entries = pruneList(c.entries,
func(_ int, t *time.Time) bool {
return t.Before(currentTime.Add(-c.wait)) || t.Equal(currentTime.Add(-c.wait))
},
func(_ int, t *time.Time) {
missing = append(missing, t)
wsMissingEntries.Inc()
fmt.Fprintf(c.w, ErrEntryNotReceivedWs, e.UnixNano(), c.maxWait.Seconds())
} else {
if i != k {
c.entries[k] = c.entries[i]
}
k++
}
}
// Nil out the pointers to any trailing elements which were removed from the slice
for i := k; i < len(c.entries); i++ {
c.entries[i] = nil // or the zero value of T
}
c.entries = c.entries[:k]
if len(missing) > 0 {
fmt.Fprintf(c.w, ErrEntryNotReceivedWs, t.UnixNano(), c.wait.Seconds())
})

// Add the list of missing entries to the list for which we will attempt to query Loki for
c.missingMtx.Lock()
c.missingEntries = append(c.missingEntries, missing...)
c.missingMtx.Unlock()
if len(c.missingEntries) > 0 {
if c.confirmAsync {
go c.confirmMissing(missing)
go c.confirmMissing(currentTime)
} else {
c.confirmMissing(missing)
c.confirmMissing(currentTime)
}

}

// Prune the acknowledged list, remove anything older than our maxwait
k = 0
for i, e := range c.ackdEntries {
if e.Before(time.Now().Add(-c.maxWait)) {
// Do nothing, if we don't increment the output index k, this will be dropped
} else {
if i != k {
c.ackdEntries[k] = c.ackdEntries[i]
}
k++
}
}
// Nil out the pointers to any trailing elements which were removed from the slice
for i := k; i < len(c.ackdEntries); i++ {
c.ackdEntries[i] = nil // or the zero value of T
}
c.ackdEntries = c.ackdEntries[:k]
// Prune c.ackdEntries list of old acknowledged entries which we were using to find duplicates
c.ackdEntries = pruneList(c.ackdEntries,
func(_ int, t *time.Time) bool {
return t.Before(currentTime.Add(-c.wait))
},
func(_ int, t *time.Time) {

})
}

func (c *Comparator) confirmMissing(missing []*time.Time) {
func (c *Comparator) confirmMissing(currentTime time.Time) {
// Because we are querying loki timestamps vs the timestamp in the log,
// make the range +/- 10 seconds to allow for clock inaccuracies
start := *missing[0]
start := *c.missingEntries[0]
start = start.Add(-10 * time.Second)
end := *missing[len(missing)-1]
end := *c.missingEntries[len(c.missingEntries)-1]
end = end.Add(10 * time.Second)
recvd, err := c.rdr.Query(start, end)
if err != nil {
Expand All @@ -439,38 +415,81 @@ func (c *Comparator) confirmMissing(missing []*time.Time) {
}
// This is to help debug some missing log entries when queried,
// let's print exactly what we are missing and what Loki sent back
for _, r := range missing {
for _, r := range c.missingEntries {
fmt.Fprintf(c.w, DebugWebsocketMissingEntry, r.UnixNano())
}
for _, r := range recvd {
fmt.Fprintf(c.w, DebugQueryResult, r.UnixNano())
}

// Now that query has returned, take out the lock on the missingEntries list so we can modify it
// It's possible more entries were added to this list but that's ok, if they match something in the
// query result we will remove them, if they don't they won't be old enough yet to remove.
c.missingMtx.Lock()
defer c.missingMtx.Unlock()
k := 0
for i, m := range missing {
for i, m := range c.missingEntries {
found := false
for _, r := range recvd {
if (*m).Equal(r) {
// Entry was found in loki, this can be dropped from the list of missing
// which is done by NOT incrementing the output index k
fmt.Fprintf(c.w, DebugEntryFound, (*m).UnixNano(), currentTime.Sub(*m).Seconds())
found = true
}
}
if !found {
// Item is still missing
if i != k {
missing[k] = missing[i]
c.missingEntries[k] = c.missingEntries[i]
}
k++
}
}
// Nil out the pointers to any trailing elements which were removed from the slice
for i := k; i < len(missing); i++ {
missing[i] = nil // or the zero value of T
for i := k; i < len(c.missingEntries); i++ {
c.missingEntries[i] = nil // or the zero value of T
}
missing = missing[:k]
for _, e := range missing {
c.missingEntries = c.missingEntries[:k]

// Remove entries from missing list which are older than maxWait
removed := []*time.Time{}
c.missingEntries = pruneList(c.missingEntries,
func(_ int, t *time.Time) bool {
return t.Before(currentTime.Add(-c.maxWait))
},
func(_ int, t *time.Time) {
removed = append(removed, t)
})

// Record the entries which were removed and never received
for _, e := range removed {
missingEntries.Inc()
fmt.Fprintf(c.w, ErrEntryNotReceived, e.UnixNano(), c.maxWait.Seconds())
}
}

func pruneList(list []*time.Time, shouldRemove func(int, *time.Time) bool, handleRemoved func(int, *time.Time)) []*time.Time {
// Prune the acknowledged list, remove anything older than our maxwait
k := 0
for i, e := range list {
if shouldRemove(i, e) {
handleRemoved(i, e)
// Do not increment output index k, causing this entry to be dropped
} else {
// If items were skipped, assign the kth element to the current item which is not skipped
if i != k {
list[k] = list[i]
}
// Increment k for the next output item
k++
}
}
// Nil out the pointers to any trailing elements which were removed from the slice
for i := k; i < len(list); i++ {
list[i] = nil // or the zero value of T
}
// Reslice the list to the new size k
return list[:k]

}
Loading