Skip to content

Commit

Permalink
Merge branch 'main' into sample-count-and-bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney authored Jun 3, 2024
2 parents 1822b88 + 47f0236 commit 35585db
Show file tree
Hide file tree
Showing 45 changed files with 18,301 additions and 33 deletions.
1 change: 1 addition & 0 deletions clients/cmd/promtail/promtail-local-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ scrape_configs:
labels:
job: varlogs
__path__: /var/log/*log
stream: stdout
6 changes: 5 additions & 1 deletion cmd/loki-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func main() {
metricTestQueryRange := flag.Duration("metric-test-range", 24*time.Hour, "The range value [24h] used in the metric test instant-query."+
" Note: this value is truncated to the running time of the canary until this value is reached")

cacheTestInterval := flag.Duration("cache-test-interval", 15*time.Minute, "The interval the cache test query should be run")
cacheTestQueryRange := flag.Duration("cache-test-range", 24*time.Hour, "The range value [24h] used in the cache test instant-query.")
cacheTestQueryNow := flag.Duration("cache-test-now", 1*time.Hour, "duration how far back from current time the execution time (--now) should be set for running this query in the cache test instant-query.")

spotCheckInterval := flag.Duration("spot-check-interval", 15*time.Minute, "Interval that a single result will be kept from sent entries and spot-checked against Loki, "+
"e.g. 15min default one entry every 15 min will be saved and then queried again every 15min until spot-check-max is reached")
spotCheckMax := flag.Duration("spot-check-max", 4*time.Hour, "How far back to check a spot check entry before dropping it")
Expand Down Expand Up @@ -189,7 +193,7 @@ func main() {
_, _ = fmt.Fprintf(os.Stderr, "Unable to create reader for Loki querier, check config: %s", err)
os.Exit(1)
}
c.comparator = comparator.NewComparator(os.Stderr, *wait, *maxWait, *pruneInterval, *spotCheckInterval, *spotCheckMax, *spotCheckQueryRate, *spotCheckWait, *metricTestInterval, *metricTestQueryRange, *interval, *buckets, sentChan, receivedChan, c.reader, true)
c.comparator = comparator.NewComparator(os.Stderr, *wait, *maxWait, *pruneInterval, *spotCheckInterval, *spotCheckMax, *spotCheckQueryRate, *spotCheckWait, *metricTestInterval, *metricTestQueryRange, *cacheTestInterval, *cacheTestQueryRange, *cacheTestQueryNow, *interval, *buckets, sentChan, receivedChan, c.reader, true)
}

startCanary()
Expand Down
4 changes: 3 additions & 1 deletion cmd/lokitool/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import (
)

var (
ruleCommand commands.RuleCommand
ruleCommand commands.RuleCommand
auditCommand commands.AuditCommand
)

func main() {
app := kingpin.New("lokitool", "A command-line tool to manage Loki.")
ruleCommand.Register(app)
auditCommand.Register(app)

app.Command("version", "Get the version of the lokitool CLI").Action(func(k *kingpin.ParseContext) error {
fmt.Println(version.Print("loki"))
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ require (
github.com/prometheus/alertmanager v0.27.0
github.com/prometheus/common/sigv4 v0.1.0
github.com/richardartoul/molecule v1.0.0
github.com/schollz/progressbar/v3 v3.14.2
github.com/shirou/gopsutil/v4 v4.24.0-alpha.1
github.com/thanos-io/objstore v0.0.0-20230829152104-1b257a36f9a3
github.com/willf/bloom v2.0.3+incompatible
Expand All @@ -153,6 +154,7 @@ require (
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
Expand Down Expand Up @@ -286,7 +288,7 @@ require (
github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/miekg/dns v1.1.58 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
Expand Down
11 changes: 9 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1259,6 +1259,7 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V
github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw=
github.com/kardianos/service v1.0.0/go.mod h1:8CzDhVuCuugtsHyZoTvsOBuvonN/UDBvl0kH+BUxvbo=
github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaRPx4tDPEn4=
github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA=
Expand Down Expand Up @@ -1355,8 +1356,8 @@ github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOA
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
Expand Down Expand Up @@ -1625,6 +1626,8 @@ github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qq
github.com/renier/xmlrpc v0.0.0-20170708154548-ce4a1a486c03/go.mod h1:gRAiPF5C5Nd0eyyRdqIu9qTiFSoZzpTq727b5B8fkkU=
github.com/richardartoul/molecule v1.0.0 h1:+LFA9cT7fn8KF39zy4dhOnwcOwRoqKiBkPqKqya+8+U=
github.com/richardartoul/molecule v1.0.0/go.mod h1:uvX/8buq8uVeiZiFht+0lqSLBHF+uGV8BrTv8W/SIwk=
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
Expand All @@ -1650,6 +1653,8 @@ github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdh
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.25 h1:/8rfZAdFfafRXOgz+ZpMZZWZ5pYggCY9t7e/BvjaBHM=
github.com/scaleway/scaleway-sdk-go v1.0.0-beta.25/go.mod h1:fCa7OJZ/9DRTnOKmxvT6pn+LPWUptQAmHF/SBJUGEcg=
github.com/schollz/progressbar/v3 v3.14.2 h1:EducH6uNLIWsr560zSV1KrTeUb/wZGAHqyMFIEa99ks=
github.com/schollz/progressbar/v3 v3.14.2/go.mod h1:aQAZQnhF4JGFtRJiw/eobaXpsqpVQAftEQ+hLGXaRc4=
github.com/sean-/conswriter v0.0.0-20180208195008-f5ae3917a627/go.mod h1:7zjs06qF79/FKAJpBvFx3P8Ww4UTIMAe+lpNXDHziac=
github.com/sean-/pager v0.0.0-20180208200047-666be9bf53b5/go.mod h1:BeybITEsBEg6qbIiqJ6/Bqeq25bCLbL7YFmpaFfJDuM=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
Expand Down Expand Up @@ -2257,6 +2262,7 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
Expand All @@ -2266,6 +2272,7 @@ golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc=
golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down
99 changes: 90 additions & 9 deletions pkg/canary/comparator/comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package comparator
import (
"fmt"
"io"
"math"
"math/rand"
"sync"
"time"
Expand All @@ -24,6 +25,8 @@ const (
DebugWebsocketMissingEntry = "websocket missing entry: %v\n"
DebugQueryResult = "confirmation query result: %v\n"
DebugEntryFound = "missing websocket entry %v was found %v seconds after it was originally sent\n"

floatDiffTolerance = 1e-6
)

var (
Expand Down Expand Up @@ -90,6 +93,16 @@ var (
Help: "how long the spot check test query execution took in seconds.",
Buckets: instrument.DefBuckets,
})
queryResultsDiff = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "loki_canary",
Name: "cache_test_query_results_diff_total",
Help: "counts number of times the query results was different with and without cache ",
})
queryResultsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki_canary",
Name: "cache_test_query_results_total",
Help: "counts number of times the query results test requests are done ",
}, []string{"status"}) // status=success/failure
)

type Comparator struct {
Expand All @@ -98,6 +111,7 @@ type Comparator struct {
spotEntMtx sync.Mutex // Locks access to []spotCheck
spotMtx sync.Mutex // Locks spotcheckRunning for single threaded but async spotCheck()
metTestMtx sync.Mutex // Locks metricTestRunning for single threaded but async metricTest()
cacheTestMtx sync.Mutex // Locks cacheTestRunning for single threaded but async cacheTest()
pruneMtx sync.Mutex // Locks pruneEntriesRunning for single threaded but async pruneEntries()
w io.Writer
entries []*time.Time
Expand All @@ -116,14 +130,19 @@ type Comparator struct {
metricTestInterval time.Duration
metricTestRange time.Duration
metricTestRunning bool
writeInterval time.Duration
confirmAsync bool
startTime time.Time
sent chan time.Time
recv chan time.Time
rdr reader.LokiReader
quit chan struct{}
done chan struct{}
cacheTestInterval time.Duration
cacheTestRange time.Duration
// how far back from current time the execution time (--now) should be set for running this query.
cacheTestNow time.Duration
cacheTestRunning bool
writeInterval time.Duration
confirmAsync bool
startTime time.Time
sent chan time.Time
recv chan time.Time
rdr reader.LokiReader
quit chan struct{}
done chan struct{}
}

func NewComparator(writer io.Writer,
Expand All @@ -133,6 +152,9 @@ func NewComparator(writer io.Writer,
spotCheckInterval, spotCheckMax, spotCheckQueryRate, spotCheckWait time.Duration,
metricTestInterval time.Duration,
metricTestRange time.Duration,
cacheTestInterval time.Duration,
cacheTestRange time.Duration,
cacheTestNow time.Duration,
writeInterval time.Duration,
buckets int,
sentChan chan time.Time,
Expand All @@ -155,6 +177,10 @@ func NewComparator(writer io.Writer,
metricTestInterval: metricTestInterval,
metricTestRange: metricTestRange,
metricTestRunning: false,
cacheTestInterval: cacheTestInterval,
cacheTestRange: cacheTestRange,
cacheTestNow: cacheTestNow,
cacheTestRunning: false,
writeInterval: writeInterval,
confirmAsync: confirmAsync,
startTime: time.Now(),
Expand Down Expand Up @@ -252,10 +278,12 @@ func (c *Comparator) run() {
randomGenerator := rand.New(rand.NewSource(time.Now().UnixNano()))
mt := time.NewTicker(time.Duration(randomGenerator.Int63n(c.metricTestInterval.Nanoseconds())))
sc := time.NewTicker(c.spotCheckQueryRate)
ct := time.NewTicker(c.cacheTestInterval)
defer func() {
t.Stop()
mt.Stop()
sc.Stop()
ct.Stop()
close(c.done)
}()

Expand Down Expand Up @@ -294,12 +322,65 @@ func (c *Comparator) run() {
firstMt = false
mt.Reset(c.metricTestInterval)
}
case <-ct.C:
// Only run one instance of cache tests at a time.
c.cacheTestMtx.Lock()
if !c.cacheTestRunning {
c.cacheTestRunning = true
go c.cacheTest(time.Now())
}
c.cacheTestMtx.Unlock()

case <-c.quit:
return
}
}
}

func (c *Comparator) cacheTest(currTime time.Time) {
defer func() {
c.cacheTestMtx.Lock()
c.cacheTestRunning = false
c.cacheTestMtx.Unlock()
}()

// cacheTest is currently run using `reader.CountOverTime()` which is an instant query.
// We make the query with and without cache over the data that is not changing (e.g: --now="1hr ago") instead of on latest data that is a moving target.
queryStartTime := currTime.Add(-c.cacheTestNow)

// We cannot query for range before the pod even started.
if queryStartTime.Before(c.startTime) {
// we wait.
fmt.Fprintf(c.w, "cacheTest not run. still waiting for query start range(%s) to past the process start time(%s).\n", queryStartTime, c.startTime)
return
}

rangeDuration := c.cacheTestRange
rng := fmt.Sprintf("%.0fs", rangeDuration.Seconds())

// with cache
countCache, err := c.rdr.QueryCountOverTime(rng, queryStartTime, true)
if err != nil {
fmt.Fprintf(c.w, "error running cache query test with cache: %s\n", err.Error())
queryResultsTotal.WithLabelValues("failure").Inc()
return
}

// without cache
countNocache, err := c.rdr.QueryCountOverTime(rng, queryStartTime, false)
if err != nil {
fmt.Fprintf(c.w, "error running cache query test without cache: %s\n", err.Error())
queryResultsTotal.WithLabelValues("failure").Inc()
return
}

queryResultsTotal.WithLabelValues("success").Inc()
if math.Abs(countNocache-countCache) > floatDiffTolerance {
queryResultsDiff.Inc()
fmt.Fprintf(c.w, "found a diff in instant query results time: %s, result_with_cache: %v, result_without_cache: %v\n", queryStartTime, countCache, countNocache)
}
}

// check that the expected # of log lines have been written to Loki
func (c *Comparator) metricTest(currTime time.Time) {
// Always make sure to set the running state back to false
Expand All @@ -317,7 +398,7 @@ func (c *Comparator) metricTest(currTime time.Time) {
adjustedRange = currTime.Sub(c.startTime)
}
begin := time.Now()
actualCount, err := c.rdr.QueryCountOverTime(fmt.Sprintf("%.0fs", adjustedRange.Seconds()))
actualCount, err := c.rdr.QueryCountOverTime(fmt.Sprintf("%.0fs", adjustedRange.Seconds()), begin, true)
metricTestLatency.Observe(time.Since(begin).Seconds())
if err != nil {
fmt.Fprintf(c.w, "error running metric query test: %s\n", err.Error())
Expand Down
Loading

0 comments on commit 35585db

Please sign in to comment.