From d26d9eb9c958c526767ad6ca2d5cf566e9906da2 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Sat, 8 Aug 2020 12:24:17 -0400 Subject: [PATCH 01/17] first stab at batching logcli requests --- cmd/logcli/main.go | 1 + pkg/logcli/query/query.go | 59 +++++++++++++++++++++++++++++---------- 2 files changed, 46 insertions(+), 14 deletions(-) diff --git a/cmd/logcli/main.go b/cmd/logcli/main.go index 165e30e2da92..cc40a13d50d1 100644 --- a/cmd/logcli/main.go +++ b/cmd/logcli/main.go @@ -273,6 +273,7 @@ func newQuery(instant bool, cmd *kingpin.CmdClause) *query.Query { cmd.Flag("to", "Stop looking for logs at this absolute time (exclusive)").StringVar(&to) cmd.Flag("step", "Query resolution step width, for metric queries. Evaluate the query at the specified step over the time range.").DurationVar(&q.Step) cmd.Flag("interval", "Query interval, for log queries. Return entries at the specified interval, ignoring those between. **This parameter is experimental, please see Issue 1779**").DurationVar(&q.Interval) + cmd.Flag("batch", "Query batch size to use until 'limit' is reached").Default("1000").IntVar(&q.BatchSize) } cmd.Flag("forward", "Scan forwards through logs.").Default("false").BoolVar(&q.Forward) diff --git a/pkg/logcli/query/query.go b/pkg/logcli/query/query.go index 969996fcfc1e..bcdf2338a8cc 100644 --- a/pkg/logcli/query/query.go +++ b/pkg/logcli/query/query.go @@ -43,6 +43,7 @@ type Query struct { Start time.Time End time.Time Limit int + BatchSize int Forward bool Step time.Duration Interval time.Duration @@ -72,26 +73,49 @@ func (q *Query) DoQuery(c *client.Client, out output.LogOutput, statistics bool) if q.isInstant() { resp, err = c.Query(q.QueryString, q.Limit, q.Start, d, q.Quiet) + if err != nil { + log.Fatalf("Query failed: %+v", err) + } + if statistics { + q.printStats(resp.Data.Statistics) + } + _, _ = q.printResult(resp.Data.Result, out) } else { - resp, err = c.QueryRange(q.QueryString, q.Limit, q.Start, q.End, d, q.Step, q.Interval, q.Quiet) - } - - if err != nil { - log.Fatalf("Query failed: %+v", err) - } - - if statistics { - q.printStats(resp.Data.Statistics) + if q.Limit < q.BatchSize { + q.BatchSize = q.Limit + } + resultLength := q.BatchSize + total := 0 + start := q.Start + var lastEntry *loghttp.Entry + // Make the assumption if the result size == batch size there will be more rows to query + for resultLength == q.BatchSize && total < q.Limit { + resp, err = c.QueryRange(q.QueryString, q.BatchSize, start, q.End, d, q.Step, q.Interval, q.Quiet) + if err != nil { + log.Fatalf("Query failed: %+v", err) + } + + if statistics { + q.printStats(resp.Data.Statistics) + } + + resultLength, lastEntry = q.printResult(resp.Data.Result, out, lastEntry) + // Was not a log stream query, there is no batching. + if resultLength == -1 { + break + } + total += resultLength + } } - q.printResult(resp.Data.Result, out) - } -func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput) { +func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput, lastEntry *loghttp.Entry) (int, *loghttp.Entry) { + length := -1 + var entry loghttp.Entry switch value.Type() { case logql.ValueTypeStreams: - q.printStream(value.(loghttp.Streams), out) + length, entry = q.printStream(value.(loghttp.Streams), out, lastEntry) case parser.ValueTypeScalar: q.printScalar(value.(loghttp.Scalar)) case parser.ValueTypeMatrix: @@ -101,6 +125,7 @@ func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput) { default: log.Fatalf("Unable to print unsupported type: %v", value.Type()) } + return length, &entry } // DoLocalQuery executes the query against the local store using a Loki configuration file. @@ -186,7 +211,7 @@ func (q *Query) isInstant() bool { return q.Start == q.End } -func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput) { +func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput, lastEntry *loghttp.Entry) (int, loghttp.Entry) { common := commonLabels(streams) // Remove the labels we want to show from common @@ -243,8 +268,14 @@ func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput) { } for _, e := range allEntries { + // Skip the last entry if it overlaps, this happens because batching includes the last entry from the last batch + if lastEntry != nil && e.entry.Timestamp == lastEntry.Timestamp && e.entry.Line == lastEntry.Line { + continue + } fmt.Println(out.Format(e.entry.Timestamp, e.labels, maxLabelsLen, e.entry.Line)) } + + return len(allEntries), allEntries[len(allEntries)-1].entry } func (q *Query) printMatrix(matrix loghttp.Matrix) { From 152ec3ba0a6969e95c89a95deed3b576f3231311 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Sat, 8 Aug 2020 12:26:26 -0400 Subject: [PATCH 02/17] fix other calls to printResult --- pkg/logcli/query/query.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/logcli/query/query.go b/pkg/logcli/query/query.go index bcdf2338a8cc..7c7e1800ede8 100644 --- a/pkg/logcli/query/query.go +++ b/pkg/logcli/query/query.go @@ -79,7 +79,7 @@ func (q *Query) DoQuery(c *client.Client, out output.LogOutput, statistics bool) if statistics { q.printStats(resp.Data.Statistics) } - _, _ = q.printResult(resp.Data.Result, out) + _, _ = q.printResult(resp.Data.Result, out, nil) } else { if q.Limit < q.BatchSize { q.BatchSize = q.Limit @@ -197,7 +197,7 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string return err } - q.printResult(value, out) + q.printResult(value, out, nil) return nil } From fb16394ab8b9ebc54de2f74f2a3148ea8ad424be Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Sat, 8 Aug 2020 14:39:27 -0400 Subject: [PATCH 03/17] working batching code --- pkg/logcli/query/query.go | 37 +++++++++++++++++++++++++++++----- pkg/logcli/query/query_test.go | 5 +++++ 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/pkg/logcli/query/query.go b/pkg/logcli/query/query.go index 7c7e1800ede8..b8fb868f0787 100644 --- a/pkg/logcli/query/query.go +++ b/pkg/logcli/query/query.go @@ -87,10 +87,17 @@ func (q *Query) DoQuery(c *client.Client, out output.LogOutput, statistics bool) resultLength := q.BatchSize total := 0 start := q.Start + end := q.End var lastEntry *loghttp.Entry // Make the assumption if the result size == batch size there will be more rows to query - for resultLength == q.BatchSize && total < q.Limit { - resp, err = c.QueryRange(q.QueryString, q.BatchSize, start, q.End, d, q.Step, q.Interval, q.Quiet) + for resultLength == q.BatchSize || total < q.Limit { + bs := q.BatchSize + if q.Limit - total < q.BatchSize { + // Have to add one because of the timestamp overlap described below, the first result + // will always be the last result of the last batch. + bs = q.Limit - total + 1 + } + resp, err = c.QueryRange(q.QueryString, bs, start, end, d, q.Step, q.Interval, q.Quiet) if err != nil { log.Fatalf("Query failed: %+v", err) } @@ -100,11 +107,29 @@ func (q *Query) DoQuery(c *client.Client, out output.LogOutput, statistics bool) } resultLength, lastEntry = q.printResult(resp.Data.Result, out, lastEntry) - // Was not a log stream query, there is no batching. - if resultLength == -1 { + // Was not a log stream query, or no results, no more batching + if resultLength <= 0 { + break + } + // Happens when there were no logs returned for the query. + if lastEntry == nil { break } + // Batching works by taking the timestamp of the last query and using it in the next query, + // because Loki supports multiple entries with the same timestamp it's possible for a batch to have + // fallen in the middle of a list of entries for the same time, so to make sure we get all entries + // we start the query on the same time as the last entry from the last batch, and then we keep this last + // entry and remove the duplicate when printing the results. + // Because of this duplicate entry, we have to subtract it here from the total for each batch + // to get the desired limit. total += resultLength + // Based on the query direction we either set the start or end for the next query. + if q.Forward{ + start = lastEntry.Timestamp + } else { + end = lastEntry.Timestamp + } + } } @@ -267,15 +292,17 @@ func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput, lastE sort.Slice(allEntries, func(i, j int) bool { return allEntries[i].entry.Timestamp.After(allEntries[j].entry.Timestamp) }) } + printed := 0 for _, e := range allEntries { // Skip the last entry if it overlaps, this happens because batching includes the last entry from the last batch if lastEntry != nil && e.entry.Timestamp == lastEntry.Timestamp && e.entry.Line == lastEntry.Line { continue } fmt.Println(out.Format(e.entry.Timestamp, e.labels, maxLabelsLen, e.entry.Line)) + printed++ } - return len(allEntries), allEntries[len(allEntries)-1].entry + return printed, allEntries[len(allEntries)-1].entry } func (q *Query) printMatrix(matrix loghttp.Matrix) { diff --git a/pkg/logcli/query/query_test.go b/pkg/logcli/query/query_test.go index e197a8b044b5..6d1d021ee283 100644 --- a/pkg/logcli/query/query_test.go +++ b/pkg/logcli/query/query_test.go @@ -140,6 +140,11 @@ func Test_subtract(t *testing.T) { } } + +func Test_batch(t *testing.T) { + +} + func mustParseLabels(s string) loghttp.LabelSet { l, err := marshal.NewLabelSet(s) From 3b6641e752e087e4305898a939315ec2cb83eae0 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Sat, 8 Aug 2020 16:42:35 -0400 Subject: [PATCH 04/17] refactor the logcli Client into an interface so that it can be mocked for testing. --- cmd/logcli/main.go | 5 +-- pkg/logcli/client/client.go | 55 ++++++++++++++++++++------------ pkg/logcli/labelquery/labels.go | 4 +-- pkg/logcli/query/query.go | 16 ++++++---- pkg/logcli/query/tail.go | 2 +- pkg/logcli/seriesquery/series.go | 4 +-- 6 files changed, 53 insertions(+), 33 deletions(-) diff --git a/cmd/logcli/main.go b/cmd/logcli/main.go index cc40a13d50d1..60314a461b48 100644 --- a/cmd/logcli/main.go +++ b/cmd/logcli/main.go @@ -158,8 +158,9 @@ func main() { } } -func newQueryClient(app *kingpin.Application) *client.Client { - client := &client.Client{ +func newQueryClient(app *kingpin.Application) client.Client { + + client := &client.DefaultClient{ TLSConfig: config.TLSConfig{}, } diff --git a/pkg/logcli/client/client.go b/pkg/logcli/client/client.go index 5dfd9a03ad5c..a0f6f3defe53 100644 --- a/pkg/logcli/client/client.go +++ b/pkg/logcli/client/client.go @@ -34,8 +34,19 @@ var ( userAgent = fmt.Sprintf("loki-logcli/%s", build.Version) ) +// Client contains all the methods to query a Loki instance, it's an interface to allow multiple implementations. +type Client interface { + Query(queryStr string, limit int, time time.Time, direction logproto.Direction, quiet bool) (*loghttp.QueryResponse, error) + QueryRange(queryStr string, limit int, from, through time.Time, direction logproto.Direction, step, interval time.Duration, quiet bool) (*loghttp.QueryResponse, error) + ListLabelNames(quiet bool, from, through time.Time) (*loghttp.LabelResponse, error) + ListLabelValues(name string, quiet bool, from, through time.Time) (*loghttp.LabelResponse, error) + Series(matchers []string, from, through time.Time, quiet bool) (*loghttp.SeriesResponse, error) + LiveTailQueryConn(queryStr string, delayFor int, limit int, from int64, quiet bool) (*websocket.Conn, error) + GetOrgID() string +} + // Client contains fields necessary to query a Loki instance -type Client struct { +type DefaultClient struct { TLSConfig config.TLSConfig Username string Password string @@ -46,7 +57,7 @@ type Client struct { // Query uses the /api/v1/query endpoint to execute an instant query // excluding interfacer b/c it suggests taking the interface promql.Node instead of logproto.Direction b/c it happens to have a String() method // nolint:interfacer -func (c *Client) Query(queryStr string, limit int, time time.Time, direction logproto.Direction, quiet bool) (*loghttp.QueryResponse, error) { +func (c *DefaultClient) Query(queryStr string, limit int, time time.Time, direction logproto.Direction, quiet bool) (*loghttp.QueryResponse, error) { qsb := util.NewQueryStringBuilder() qsb.SetString("query", queryStr) qsb.SetInt("limit", int64(limit)) @@ -59,7 +70,7 @@ func (c *Client) Query(queryStr string, limit int, time time.Time, direction log // QueryRange uses the /api/v1/query_range endpoint to execute a range query // excluding interfacer b/c it suggests taking the interface promql.Node instead of logproto.Direction b/c it happens to have a String() method // nolint:interfacer -func (c *Client) QueryRange(queryStr string, limit int, from, through time.Time, direction logproto.Direction, step, interval time.Duration, quiet bool) (*loghttp.QueryResponse, error) { +func (c *DefaultClient) QueryRange(queryStr string, limit int, from, through time.Time, direction logproto.Direction, step, interval time.Duration, quiet bool) (*loghttp.QueryResponse, error) { params := util.NewQueryStringBuilder() params.SetString("query", queryStr) params.SetInt32("limit", limit) @@ -81,7 +92,7 @@ func (c *Client) QueryRange(queryStr string, limit int, from, through time.Time, } // ListLabelNames uses the /api/v1/label endpoint to list label names -func (c *Client) ListLabelNames(quiet bool, from, through time.Time) (*loghttp.LabelResponse, error) { +func (c *DefaultClient) ListLabelNames(quiet bool, from, through time.Time) (*loghttp.LabelResponse, error) { var labelResponse loghttp.LabelResponse params := util.NewQueryStringBuilder() params.SetInt("start", from.UnixNano()) @@ -94,7 +105,7 @@ func (c *Client) ListLabelNames(quiet bool, from, through time.Time) (*loghttp.L } // ListLabelValues uses the /api/v1/label endpoint to list label values -func (c *Client) ListLabelValues(name string, quiet bool, from, through time.Time) (*loghttp.LabelResponse, error) { +func (c *DefaultClient) ListLabelValues(name string, quiet bool, from, through time.Time) (*loghttp.LabelResponse, error) { path := fmt.Sprintf(labelValuesPath, url.PathEscape(name)) var labelResponse loghttp.LabelResponse params := util.NewQueryStringBuilder() @@ -106,7 +117,7 @@ func (c *Client) ListLabelValues(name string, quiet bool, from, through time.Tim return &labelResponse, nil } -func (c *Client) Series(matchers []string, from, through time.Time, quiet bool) (*loghttp.SeriesResponse, error) { +func (c *DefaultClient) Series(matchers []string, from, through time.Time, quiet bool) (*loghttp.SeriesResponse, error) { params := util.NewQueryStringBuilder() params.SetInt("start", from.UnixNano()) params.SetInt("end", through.UnixNano()) @@ -119,7 +130,22 @@ func (c *Client) Series(matchers []string, from, through time.Time, quiet bool) return &seriesResponse, nil } -func (c *Client) doQuery(path string, query string, quiet bool) (*loghttp.QueryResponse, error) { +// LiveTailQueryConn uses /api/prom/tail to set up a websocket connection and returns it +func (c *DefaultClient) LiveTailQueryConn(queryStr string, delayFor int, limit int, from int64, quiet bool) (*websocket.Conn, error) { + qsb := util.NewQueryStringBuilder() + qsb.SetString("query", queryStr) + qsb.SetInt("delay_for", int64(delayFor)) + qsb.SetInt("limit", int64(limit)) + qsb.SetInt("from", from) + + return c.wsConnect(tailPath, qsb.Encode(), quiet) +} + +func (c *DefaultClient) GetOrgID() string { + return c.OrgID +} + +func (c *DefaultClient) doQuery(path string, query string, quiet bool) (*loghttp.QueryResponse, error) { var err error var r loghttp.QueryResponse @@ -130,7 +156,7 @@ func (c *Client) doQuery(path string, query string, quiet bool) (*loghttp.QueryR return &r, nil } -func (c *Client) doRequest(path, query string, quiet bool, out interface{}) error { +func (c *DefaultClient) doRequest(path, query string, quiet bool, out interface{}) error { us, err := buildURL(c.Address, path, query) if err != nil { @@ -180,18 +206,7 @@ func (c *Client) doRequest(path, query string, quiet bool, out interface{}) erro return json.NewDecoder(resp.Body).Decode(out) } -// LiveTailQueryConn uses /api/prom/tail to set up a websocket connection and returns it -func (c *Client) LiveTailQueryConn(queryStr string, delayFor int, limit int, from int64, quiet bool) (*websocket.Conn, error) { - qsb := util.NewQueryStringBuilder() - qsb.SetString("query", queryStr) - qsb.SetInt("delay_for", int64(delayFor)) - qsb.SetInt("limit", int64(limit)) - qsb.SetInt("from", from) - - return c.wsConnect(tailPath, qsb.Encode(), quiet) -} - -func (c *Client) wsConnect(path, query string, quiet bool) (*websocket.Conn, error) { +func (c *DefaultClient) wsConnect(path, query string, quiet bool) (*websocket.Conn, error) { us, err := buildURL(c.Address, path, query) if err != nil { return nil, err diff --git a/pkg/logcli/labelquery/labels.go b/pkg/logcli/labelquery/labels.go index 6be033907985..38e0a500df86 100644 --- a/pkg/logcli/labelquery/labels.go +++ b/pkg/logcli/labelquery/labels.go @@ -18,7 +18,7 @@ type LabelQuery struct { } // DoLabels prints out label results -func (q *LabelQuery) DoLabels(c *client.Client) { +func (q *LabelQuery) DoLabels(c client.Client) { values := q.ListLabels(c) for _, value := range values { @@ -27,7 +27,7 @@ func (q *LabelQuery) DoLabels(c *client.Client) { } // ListLabels returns an array of label strings -func (q *LabelQuery) ListLabels(c *client.Client) []string { +func (q *LabelQuery) ListLabels(c client.Client) []string { var labelResponse *loghttp.LabelResponse var err error if len(q.LabelName) > 0 { diff --git a/pkg/logcli/query/query.go b/pkg/logcli/query/query.go index b8fb868f0787..d1c50c4f3525 100644 --- a/pkg/logcli/query/query.go +++ b/pkg/logcli/query/query.go @@ -57,10 +57,10 @@ type Query struct { } // DoQuery executes the query and prints out the results -func (q *Query) DoQuery(c *client.Client, out output.LogOutput, statistics bool) { +func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool) { if q.LocalConfig != "" { - if err := q.DoLocalQuery(out, statistics, c.OrgID); err != nil { + if err := q.DoLocalQuery(out, statistics, c.GetOrgID()); err != nil { log.Fatalf("Query failed: %+v", err) } return @@ -137,7 +137,7 @@ func (q *Query) DoQuery(c *client.Client, out output.LogOutput, statistics bool) func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput, lastEntry *loghttp.Entry) (int, *loghttp.Entry) { length := -1 - var entry loghttp.Entry + var entry *loghttp.Entry switch value.Type() { case logql.ValueTypeStreams: length, entry = q.printStream(value.(loghttp.Streams), out, lastEntry) @@ -150,7 +150,7 @@ func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput, las default: log.Fatalf("Unable to print unsupported type: %v", value.Type()) } - return length, &entry + return length, entry } // DoLocalQuery executes the query against the local store using a Loki configuration file. @@ -236,7 +236,7 @@ func (q *Query) isInstant() bool { return q.Start == q.End } -func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput, lastEntry *loghttp.Entry) (int, loghttp.Entry) { +func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput, lastEntry *loghttp.Entry) (int, *loghttp.Entry) { common := commonLabels(streams) // Remove the labels we want to show from common @@ -286,6 +286,10 @@ func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput, lastE } } + if len(allEntries) == 0 { + return 0, nil + } + if q.Forward { sort.Slice(allEntries, func(i, j int) bool { return allEntries[i].entry.Timestamp.Before(allEntries[j].entry.Timestamp) }) } else { @@ -302,7 +306,7 @@ func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput, lastE printed++ } - return printed, allEntries[len(allEntries)-1].entry + return printed, &allEntries[len(allEntries)-1].entry } func (q *Query) printMatrix(matrix loghttp.Matrix) { diff --git a/pkg/logcli/query/tail.go b/pkg/logcli/query/tail.go index 6137bd2e68d6..2187c2ef8b2f 100644 --- a/pkg/logcli/query/tail.go +++ b/pkg/logcli/query/tail.go @@ -17,7 +17,7 @@ import ( ) // TailQuery connects to the Loki websocket endpoint and tails logs -func (q *Query) TailQuery(delayFor int, c *client.Client, out output.LogOutput) { +func (q *Query) TailQuery(delayFor int, c client.Client, out output.LogOutput) { conn, err := c.LiveTailQueryConn(q.QueryString, delayFor, q.Limit, q.Start.UnixNano(), q.Quiet) if err != nil { log.Fatalf("Tailing logs failed: %+v", err) diff --git a/pkg/logcli/seriesquery/series.go b/pkg/logcli/seriesquery/series.go index 7bbdc790965e..3acda03c741a 100644 --- a/pkg/logcli/seriesquery/series.go +++ b/pkg/logcli/seriesquery/series.go @@ -18,7 +18,7 @@ type SeriesQuery struct { } // DoSeries prints out series results -func (q *SeriesQuery) DoSeries(c *client.Client) { +func (q *SeriesQuery) DoSeries(c client.Client) { values := q.GetSeries(c) for _, value := range values { @@ -27,7 +27,7 @@ func (q *SeriesQuery) DoSeries(c *client.Client) { } // GetSeries returns an array of label sets -func (q *SeriesQuery) GetSeries(c *client.Client) []loghttp.LabelSet { +func (q *SeriesQuery) GetSeries(c client.Client) []loghttp.LabelSet { seriesResponse, err := c.Series(q.Matchers, q.Start, q.End, q.Quiet) if err != nil { log.Fatalf("Error doing request: %+v", err) From 3380262a48e24d317484e09c846839878d9da8c8 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Sat, 8 Aug 2020 21:12:42 -0400 Subject: [PATCH 05/17] fixing some bugs --- pkg/logcli/query/query.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/pkg/logcli/query/query.go b/pkg/logcli/query/query.go index d1c50c4f3525..b299bf25d8f2 100644 --- a/pkg/logcli/query/query.go +++ b/pkg/logcli/query/query.go @@ -89,10 +89,9 @@ func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool) start := q.Start end := q.End var lastEntry *loghttp.Entry - // Make the assumption if the result size == batch size there will be more rows to query - for resultLength == q.BatchSize || total < q.Limit { + for total < q.Limit { bs := q.BatchSize - if q.Limit - total < q.BatchSize { + if q.Limit-total < q.BatchSize { // Have to add one because of the timestamp overlap described below, the first result // will always be the last result of the last batch. bs = q.Limit - total + 1 @@ -111,10 +110,14 @@ func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool) if resultLength <= 0 { break } - // Happens when there were no logs returned for the query. + // Also no result, wouldn't expect to hit this. if lastEntry == nil { break } + // Can only happen if all the results return in one request + if resultLength == q.Limit { + break + } // Batching works by taking the timestamp of the last query and using it in the next query, // because Loki supports multiple entries with the same timestamp it's possible for a batch to have // fallen in the middle of a list of entries for the same time, so to make sure we get all entries @@ -124,10 +127,12 @@ func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool) // to get the desired limit. total += resultLength // Based on the query direction we either set the start or end for the next query. - if q.Forward{ + if q.Forward { start = lastEntry.Timestamp } else { - end = lastEntry.Timestamp + // The end timestamp is exclusive on a backward query, so to make sure we get back an overlapping result + // fudge the timestamp forward in time to make sure to get the last entry from this batch in the next query + end = lastEntry.Timestamp.Add(1 * time.Nanosecond) } } From 94aba587c31ee8732f78a315f8b373aedea4595f Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Sun, 9 Aug 2020 07:39:53 -0400 Subject: [PATCH 06/17] refactoring the output interface to make testing easier --- cmd/logcli/main.go | 4 ++-- pkg/logcli/output/default.go | 13 ++++++++----- pkg/logcli/output/default_test.go | 29 +++++++++++++++++------------ pkg/logcli/output/jsonl.go | 7 +++++-- pkg/logcli/output/jsonl_test.go | 16 +++++++++------- pkg/logcli/output/output.go | 8 ++++++-- pkg/logcli/output/output_test.go | 14 +++++++------- pkg/logcli/output/raw.go | 7 +++++-- pkg/logcli/output/raw_test.go | 16 +++++++++------- pkg/logcli/query/query.go | 2 +- pkg/logcli/query/tail.go | 3 +-- 11 files changed, 70 insertions(+), 49 deletions(-) diff --git a/cmd/logcli/main.go b/cmd/logcli/main.go index 60314a461b48..32345fe652bd 100644 --- a/cmd/logcli/main.go +++ b/cmd/logcli/main.go @@ -123,7 +123,7 @@ func main() { ColoredOutput: rangeQuery.ColoredOutput, } - out, err := output.NewLogOutput(*outputMode, outputOptions) + out, err := output.NewLogOutput(os.Stdout, *outputMode, outputOptions) if err != nil { log.Fatalf("Unable to create log output: %s", err) } @@ -145,7 +145,7 @@ func main() { ColoredOutput: instantQuery.ColoredOutput, } - out, err := output.NewLogOutput(*outputMode, outputOptions) + out, err := output.NewLogOutput(os.Stdout, *outputMode, outputOptions) if err != nil { log.Fatalf("Unable to create log output: %s", err) } diff --git a/pkg/logcli/output/default.go b/pkg/logcli/output/default.go index c23554a9f707..d69fab470439 100644 --- a/pkg/logcli/output/default.go +++ b/pkg/logcli/output/default.go @@ -2,6 +2,7 @@ package output import ( "fmt" + "io" "strings" "time" @@ -12,24 +13,26 @@ import ( // DefaultOutput provides logs and metadata in human readable format type DefaultOutput struct { + w io.Writer options *LogOutputOptions } // Format a log entry in a human readable format -func (o *DefaultOutput) Format(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen int, line string) string { +func (o *DefaultOutput) FormatAndPrintln(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen int, line string) { timestamp := ts.In(o.options.Timezone).Format(time.RFC3339) line = strings.TrimSpace(line) if o.options.NoLabels { - return fmt.Sprintf("%s %s", color.BlueString(timestamp), line) + fmt.Fprintf(o.w, "%s %s\n", color.BlueString(timestamp), line) + return } - if o.options.ColoredOutput { labelsColor := getColor(lbls.String()).SprintFunc() - return fmt.Sprintf("%s %s %s", color.BlueString(timestamp), labelsColor(padLabel(lbls, maxLabelsLen)), line) + fmt.Fprintf(o.w, "%s %s %s\n", color.BlueString(timestamp), labelsColor(padLabel(lbls, maxLabelsLen)), line) + } else { + fmt.Fprintf(o.w, "%s %s %s\n", color.BlueString(timestamp), color.RedString(padLabel(lbls, maxLabelsLen)), line) } - return fmt.Sprintf("%s %s %s", color.BlueString(timestamp), color.RedString(padLabel(lbls, maxLabelsLen)), line) } // add some padding after labels diff --git a/pkg/logcli/output/default_test.go b/pkg/logcli/output/default_test.go index 5f6dbb1b9961..2252ff175093 100644 --- a/pkg/logcli/output/default_test.go +++ b/pkg/logcli/output/default_test.go @@ -1,6 +1,7 @@ package output import ( + "bytes" "strings" "testing" "time" @@ -33,7 +34,7 @@ func TestDefaultOutput_Format(t *testing.T) { emptyLabels, 0, "", - "2006-01-02T08:04:05Z {} ", + "2006-01-02T08:04:05Z {} \n", }, "empty line with labels": { &LogOutputOptions{Timezone: time.UTC, NoLabels: false}, @@ -41,7 +42,7 @@ func TestDefaultOutput_Format(t *testing.T) { someLabels, len(someLabels.String()), "", - "2006-01-02T08:04:05Z {type=\"test\"} ", + "2006-01-02T08:04:05Z {type=\"test\"} \n", }, "max labels length shorter than input labels": { &LogOutputOptions{Timezone: time.UTC, NoLabels: false}, @@ -49,7 +50,7 @@ func TestDefaultOutput_Format(t *testing.T) { someLabels, 0, "Hello", - "2006-01-02T08:04:05Z {type=\"test\"} Hello", + "2006-01-02T08:04:05Z {type=\"test\"} Hello\n", }, "max labels length longer than input labels": { &LogOutputOptions{Timezone: time.UTC, NoLabels: false}, @@ -57,7 +58,7 @@ func TestDefaultOutput_Format(t *testing.T) { someLabels, 20, "Hello", - "2006-01-02T08:04:05Z {type=\"test\"} Hello", + "2006-01-02T08:04:05Z {type=\"test\"} Hello\n", }, "timezone option set to a Local one": { &LogOutputOptions{Timezone: time.FixedZone("test", 2*60*60), NoLabels: false}, @@ -65,7 +66,7 @@ func TestDefaultOutput_Format(t *testing.T) { someLabels, 0, "Hello", - "2006-01-02T10:04:05+02:00 {type=\"test\"} Hello", + "2006-01-02T10:04:05+02:00 {type=\"test\"} Hello\n", }, "labels output disabled": { &LogOutputOptions{Timezone: time.UTC, NoLabels: true}, @@ -73,7 +74,7 @@ func TestDefaultOutput_Format(t *testing.T) { someLabels, 0, "Hello", - "2006-01-02T08:04:05Z Hello", + "2006-01-02T08:04:05Z Hello\n", }, } @@ -83,10 +84,11 @@ func TestDefaultOutput_Format(t *testing.T) { t.Run(testName, func(t *testing.T) { t.Parallel() - out := &DefaultOutput{testData.options} - actual := out.Format(testData.timestamp, testData.lbls, testData.maxLabelsLen, testData.line) + writer := &bytes.Buffer{} + out := &DefaultOutput{writer,testData.options} + out.FormatAndPrintln(testData.timestamp, testData.lbls, testData.maxLabelsLen, testData.line) - assert.Equal(t, testData.expected, actual) + assert.Equal(t, testData.expected, writer.String()) }) } } @@ -111,16 +113,19 @@ func TestDefaultOutput_FormatLabelsPadding(t *testing.T) { timestamp, _ := time.Parse(time.RFC3339, "2006-01-02T15:04:05+07:00") maxLabelsLen := findMaxLabelsLength(labelsList) options := &LogOutputOptions{Timezone: time.UTC, NoLabels: false} - out := &DefaultOutput{options} + writer := &bytes.Buffer{} + out := &DefaultOutput{writer,options} // Format the same log line with different labels formattedEntries := make([]string, 0, len(labelsList)) for _, lbls := range labelsList { - formattedEntries = append(formattedEntries, out.Format(timestamp, lbls, maxLabelsLen, "XXX")) + out.FormatAndPrintln(timestamp, lbls, maxLabelsLen, "XXX") + formattedEntries = append(formattedEntries, writer.String()) + writer.Reset() } // Ensure the log line starts at the same position in each formatted output - assert.Equal(t, len(formattedEntries), len(labelsList)) + assert.Equal(t, len(labelsList), len(formattedEntries)) expectedIndex := strings.Index(formattedEntries[0], "XXX") if expectedIndex <= 0 { diff --git a/pkg/logcli/output/jsonl.go b/pkg/logcli/output/jsonl.go index 9e558d1780f5..b8cbe8615d7f 100644 --- a/pkg/logcli/output/jsonl.go +++ b/pkg/logcli/output/jsonl.go @@ -2,6 +2,8 @@ package output import ( "encoding/json" + "fmt" + "io" "log" "time" @@ -10,11 +12,12 @@ import ( // JSONLOutput prints logs and metadata as JSON Lines, suitable for scripts type JSONLOutput struct { + w io.Writer options *LogOutputOptions } // Format a log entry as json line -func (o *JSONLOutput) Format(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen int, line string) string { +func (o *JSONLOutput) FormatAndPrintln(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen int, line string) { entry := map[string]interface{}{ "timestamp": ts.In(o.options.Timezone), "line": line, @@ -30,5 +33,5 @@ func (o *JSONLOutput) Format(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen i log.Fatalf("error marshalling entry: %s", err) } - return string(out) + fmt.Fprintln(o.w, string(out)) } diff --git a/pkg/logcli/output/jsonl_test.go b/pkg/logcli/output/jsonl_test.go index 49de0432a1b1..26215a84458c 100644 --- a/pkg/logcli/output/jsonl_test.go +++ b/pkg/logcli/output/jsonl_test.go @@ -1,6 +1,7 @@ package output import ( + "bytes" "encoding/json" "testing" "time" @@ -33,7 +34,7 @@ func TestJSONLOutput_Format(t *testing.T) { emptyLabels, 0, "", - `{"labels":{},"line":"","timestamp":"2006-01-02T08:04:05Z"}`, + `{"labels":{},"line":"","timestamp":"2006-01-02T08:04:05Z"}` + "\n", }, "empty line with labels": { &LogOutputOptions{Timezone: time.UTC, NoLabels: false}, @@ -41,7 +42,7 @@ func TestJSONLOutput_Format(t *testing.T) { someLabels, len(someLabels.String()), "", - `{"labels":{"type":"test"},"line":"","timestamp":"2006-01-02T08:04:05Z"}`, + `{"labels":{"type":"test"},"line":"","timestamp":"2006-01-02T08:04:05Z"}` + "\n", }, "timezone option set to a Local one": { &LogOutputOptions{Timezone: time.FixedZone("test", 2*60*60), NoLabels: false}, @@ -49,7 +50,7 @@ func TestJSONLOutput_Format(t *testing.T) { someLabels, 0, "Hello", - `{"labels":{"type":"test"},"line":"Hello","timestamp":"2006-01-02T10:04:05+02:00"}`, + `{"labels":{"type":"test"},"line":"Hello","timestamp":"2006-01-02T10:04:05+02:00"}` + "\n", }, "labels output disabled": { &LogOutputOptions{Timezone: time.UTC, NoLabels: true}, @@ -57,7 +58,7 @@ func TestJSONLOutput_Format(t *testing.T) { someLabels, 0, "Hello", - `{"line":"Hello","timestamp":"2006-01-02T08:04:05Z"}`, + `{"line":"Hello","timestamp":"2006-01-02T08:04:05Z"}` + "\n", }, } @@ -66,10 +67,11 @@ func TestJSONLOutput_Format(t *testing.T) { t.Run(testName, func(t *testing.T) { t.Parallel() + writer := &bytes.Buffer{} + out := &JSONLOutput{writer,testData.options} + out.FormatAndPrintln(testData.timestamp, testData.lbls, testData.maxLabelsLen, testData.line) - out := &JSONLOutput{testData.options} - actual := out.Format(testData.timestamp, testData.lbls, testData.maxLabelsLen, testData.line) - + actual := writer.String() assert.Equal(t, testData.expected, actual) assert.NoError(t, isValidJSON(actual)) }) diff --git a/pkg/logcli/output/output.go b/pkg/logcli/output/output.go index fa181d8d5d1a..1cb6e54f58dd 100644 --- a/pkg/logcli/output/output.go +++ b/pkg/logcli/output/output.go @@ -3,6 +3,7 @@ package output import ( "fmt" "hash/fnv" + "io" "time" "github.com/fatih/color" @@ -27,7 +28,7 @@ var colorList = []*color.Color{ // LogOutput is the interface any output mode must implement type LogOutput interface { - Format(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen int, line string) string + FormatAndPrintln(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen int, line string) } // LogOutputOptions defines options supported by LogOutput @@ -38,7 +39,7 @@ type LogOutputOptions struct { } // NewLogOutput creates a log output based on the input mode and options -func NewLogOutput(mode string, options *LogOutputOptions) (LogOutput, error) { +func NewLogOutput(w io.Writer, mode string, options *LogOutputOptions) (LogOutput, error) { if options.Timezone == nil { options.Timezone = time.Local } @@ -46,14 +47,17 @@ func NewLogOutput(mode string, options *LogOutputOptions) (LogOutput, error) { switch mode { case "default": return &DefaultOutput{ + w: w, options: options, }, nil case "jsonl": return &JSONLOutput{ + w: w, options: options, }, nil case "raw": return &RawOutput{ + w: w, options: options, }, nil default: diff --git a/pkg/logcli/output/output_test.go b/pkg/logcli/output/output_test.go index 914b36e0cdde..ab60881d88b1 100644 --- a/pkg/logcli/output/output_test.go +++ b/pkg/logcli/output/output_test.go @@ -10,19 +10,19 @@ import ( func TestNewLogOutput(t *testing.T) { options := &LogOutputOptions{time.UTC, false, false} - out, err := NewLogOutput("default", options) + out, err := NewLogOutput(nil,"default", options) assert.NoError(t, err) - assert.IsType(t, &DefaultOutput{options}, out) + assert.IsType(t, &DefaultOutput{nil,options}, out) - out, err = NewLogOutput("jsonl", options) + out, err = NewLogOutput(nil,"jsonl", options) assert.NoError(t, err) - assert.IsType(t, &JSONLOutput{options}, out) + assert.IsType(t, &JSONLOutput{nil,options}, out) - out, err = NewLogOutput("raw", options) + out, err = NewLogOutput(nil,"raw", options) assert.NoError(t, err) - assert.IsType(t, &RawOutput{options}, out) + assert.IsType(t, &RawOutput{nil,options}, out) - out, err = NewLogOutput("unknown", options) + out, err = NewLogOutput(nil,"unknown", options) assert.Error(t, err) assert.Nil(t, out) } diff --git a/pkg/logcli/output/raw.go b/pkg/logcli/output/raw.go index 0934ddda97a1..32a26179ac4a 100644 --- a/pkg/logcli/output/raw.go +++ b/pkg/logcli/output/raw.go @@ -1,6 +1,8 @@ package output import ( + "fmt" + "io" "time" "github.com/grafana/loki/pkg/loghttp" @@ -8,13 +10,14 @@ import ( // RawOutput prints logs in their original form, without any metadata type RawOutput struct { + w io.Writer options *LogOutputOptions } // Format a log entry as is -func (o *RawOutput) Format(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen int, line string) string { +func (o *RawOutput) FormatAndPrintln(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen int, line string) { if len(line) > 0 && line[len(line)-1] == '\n' { line = line[:len(line)-1] } - return line + fmt.Fprintln(o.w, line) } diff --git a/pkg/logcli/output/raw_test.go b/pkg/logcli/output/raw_test.go index 4c280ac3c269..040a27ee7537 100644 --- a/pkg/logcli/output/raw_test.go +++ b/pkg/logcli/output/raw_test.go @@ -1,6 +1,7 @@ package output import ( + "bytes" "testing" "time" @@ -31,7 +32,7 @@ func TestRawOutput_Format(t *testing.T) { someLabels, 0, "", - "", + "\n", }, "non empty line": { &LogOutputOptions{Timezone: time.UTC, NoLabels: false}, @@ -39,7 +40,7 @@ func TestRawOutput_Format(t *testing.T) { someLabels, 0, "Hello world", - "Hello world", + "Hello world\n", }, "line with single newline at the end": { &LogOutputOptions{Timezone: time.UTC, NoLabels: false}, @@ -47,7 +48,7 @@ func TestRawOutput_Format(t *testing.T) { someLabels, 0, "Hello world\n", - "Hello world", + "Hello world\n", }, "line with multiple newlines at the end": { &LogOutputOptions{Timezone: time.UTC, NoLabels: false}, @@ -55,7 +56,7 @@ func TestRawOutput_Format(t *testing.T) { someLabels, 0, "Hello world\n\n\n", - "Hello world\n\n", + "Hello world\n\n\n", }, } @@ -65,10 +66,11 @@ func TestRawOutput_Format(t *testing.T) { t.Run(testName, func(t *testing.T) { t.Parallel() - out := &RawOutput{testData.options} - actual := out.Format(testData.timestamp, testData.lbls, testData.maxLabelsLen, testData.line) + writer := &bytes.Buffer{} + out := &RawOutput{writer,testData.options} + out.FormatAndPrintln(testData.timestamp, testData.lbls, testData.maxLabelsLen, testData.line) - assert.Equal(t, testData.expected, actual) + assert.Equal(t, testData.expected, writer.String()) }) } } diff --git a/pkg/logcli/query/query.go b/pkg/logcli/query/query.go index b299bf25d8f2..8a2d5582c3dd 100644 --- a/pkg/logcli/query/query.go +++ b/pkg/logcli/query/query.go @@ -307,7 +307,7 @@ func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput, lastE if lastEntry != nil && e.entry.Timestamp == lastEntry.Timestamp && e.entry.Line == lastEntry.Line { continue } - fmt.Println(out.Format(e.entry.Timestamp, e.labels, maxLabelsLen, e.entry.Line)) + out.FormatAndPrintln(e.entry.Timestamp, e.labels, maxLabelsLen, e.entry.Line) printed++ } diff --git a/pkg/logcli/query/tail.go b/pkg/logcli/query/tail.go index 2187c2ef8b2f..b1f9395e7bf7 100644 --- a/pkg/logcli/query/tail.go +++ b/pkg/logcli/query/tail.go @@ -1,7 +1,6 @@ package query import ( - "fmt" "log" "os" "os/signal" @@ -75,7 +74,7 @@ func (q *Query) TailQuery(delayFor int, c client.Client, out output.LogOutput) { } for _, entry := range stream.Entries { - fmt.Println(out.Format(entry.Timestamp, labels, 0, entry.Line)) + out.FormatAndPrintln(entry.Timestamp, labels, 0, entry.Line) } } From f3208bca8709ad81733ac7af79438767ae6a6e11 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Sun, 9 Aug 2020 11:54:08 -0400 Subject: [PATCH 07/17] starting to add tests --- pkg/logcli/output/raw.go | 7 ++ pkg/logcli/query/query_test.go | 138 ++++++++++++++++++++++++++++++++- 2 files changed, 144 insertions(+), 1 deletion(-) diff --git a/pkg/logcli/output/raw.go b/pkg/logcli/output/raw.go index 32a26179ac4a..794f9a4a6bdf 100644 --- a/pkg/logcli/output/raw.go +++ b/pkg/logcli/output/raw.go @@ -14,6 +14,13 @@ type RawOutput struct { options *LogOutputOptions } +func NewRaw (writer io.Writer, options *LogOutputOptions) LogOutput { + return &RawOutput{ + w: writer, + options: options, + } +} + // Format a log entry as is func (o *RawOutput) FormatAndPrintln(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen int, line string) { if len(line) > 0 && line[len(line)-1] == '\n' { diff --git a/pkg/logcli/query/query_test.go b/pkg/logcli/query/query_test.go index 6d1d021ee283..cab1325fcea4 100644 --- a/pkg/logcli/query/query_test.go +++ b/pkg/logcli/query/query_test.go @@ -1,11 +1,20 @@ package query import ( + "bytes" + "context" "log" "reflect" "testing" + "time" + "github.com/gorilla/websocket" + "github.com/stretchr/testify/assert" + + "github.com/grafana/loki/pkg/logcli/output" "github.com/grafana/loki/pkg/loghttp" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql/marshal" ) @@ -140,9 +149,76 @@ func Test_subtract(t *testing.T) { } } - func Test_batch(t *testing.T) { + tests := []struct { + name string + streams []logproto.Stream + start, end time.Time + limit, batch int + labelMatcher string + forward bool + expected []string + }{ + { + name: "super simple forward", + streams: []logproto.Stream{ + logproto.Stream{ + Labels: "{test=\"simple\"}", + Entries: []logproto.Entry{ + logproto.Entry{ + Timestamp: time.Unix(1, 0), + Line: "line1", + }, + logproto.Entry{ + Timestamp: time.Unix(2, 0), + Line: "line2", + }, + logproto.Entry{ + Timestamp: time.Unix(3, 0), + Line: "line3", + }, + }, + }, + }, + start: time.Unix(1, 0), + end: time.Unix(3, 0), + limit: 10, + batch: 10, + labelMatcher: "{test=\"simple\"}", + forward: true, + expected: []string{ + "line1", + "line2", + }, + + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tc := newTestQueryClient(tt.streams...) + writer := &bytes.Buffer{} + out := output.NewRaw(writer, nil) + q := Query{ + QueryString: tt.labelMatcher, + Start: tt.start, + End: tt.end, + Limit: tt.limit, + BatchSize: tt.batch, + Forward: tt.forward, + Step: 0, + Interval: 0, + Quiet: false, + NoLabels: false, + IgnoreLabelsKey: nil, + ShowLabelsKey: nil, + FixedLabelsLen: 0, + LocalConfig: "", + } + q.DoQuery(tc, out, false) + assert.Equal(t, tt.expected, writer.String()) + }) + } } func mustParseLabels(s string) loghttp.LabelSet { @@ -154,3 +230,63 @@ func mustParseLabels(s string) loghttp.LabelSet { return l } + +type testQueryClient struct { + engine *logql.Engine +} + +func newTestQueryClient(testStreams ...logproto.Stream) *testQueryClient { + q := logql.NewMockQuerier(0, testStreams) + e := logql.NewEngine(logql.EngineOpts{}, q) + return &testQueryClient{engine: e} +} + +func (t *testQueryClient) Query(queryStr string, limit int, time time.Time, direction logproto.Direction, quiet bool) (*loghttp.QueryResponse, error) { + panic("implement me") +} + +func (t *testQueryClient) QueryRange(queryStr string, limit int, from, through time.Time, direction logproto.Direction, step, interval time.Duration, quiet bool) (*loghttp.QueryResponse, error) { + + params := logql.NewLiteralParams(queryStr, from, through, step, interval, direction, uint32(limit), nil) + + v, err := t.engine.Query(params).Exec(context.Background()) + if err != nil { + return nil, err + } + + value, err := marshal.NewResultValue(v.Data) + if err != nil { + return nil, err + } + + q := &loghttp.QueryResponse{ + Status: "success", + Data: loghttp.QueryResponseData{ + ResultType: value.Type(), + Result: value, + Statistics: v.Statistics, + }, + } + + return q, nil +} + +func (t *testQueryClient) ListLabelNames(quiet bool, from, through time.Time) (*loghttp.LabelResponse, error) { + panic("implement me") +} + +func (t *testQueryClient) ListLabelValues(name string, quiet bool, from, through time.Time) (*loghttp.LabelResponse, error) { + panic("implement me") +} + +func (t *testQueryClient) Series(matchers []string, from, through time.Time, quiet bool) (*loghttp.SeriesResponse, error) { + panic("implement me") +} + +func (t *testQueryClient) LiveTailQueryConn(queryStr string, delayFor int, limit int, from int64, quiet bool) (*websocket.Conn, error) { + panic("implement me") +} + +func (t *testQueryClient) GetOrgID() string { + panic("implement me") +} From 0b5de2aecd601f8f36c5e38568095f2ca927eed0 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Sun, 9 Aug 2020 14:22:28 -0400 Subject: [PATCH 08/17] adding a bunch of tests fixing up how the MockQuerier works. --- pkg/logcli/query/query.go | 63 +++++-- pkg/logcli/query/query_test.go | 332 +++++++++++++++++++++++++++++---- pkg/logql/test_utils.go | 22 ++- 3 files changed, 367 insertions(+), 50 deletions(-) diff --git a/pkg/logcli/query/query.go b/pkg/logcli/query/query.go index 8a2d5582c3dd..d83287cc9973 100644 --- a/pkg/logcli/query/query.go +++ b/pkg/logcli/query/query.go @@ -88,13 +88,16 @@ func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool) total := 0 start := q.Start end := q.End - var lastEntry *loghttp.Entry + var lastEntry []*loghttp.Entry for total < q.Limit { bs := q.BatchSize + // We want to truncate the batch size if the remaining number + // of items needed to reach the limit is less than the batch size if q.Limit-total < q.BatchSize { - // Have to add one because of the timestamp overlap described below, the first result - // will always be the last result of the last batch. - bs = q.Limit - total + 1 + // Truncated batchsize is q.Limit - total, however we add to this + // the length of the overlap from the last query to make sure we get the + // correct amount of new logs knowing there will be some overlapping logs returned. + bs = q.Limit - total + len(lastEntry) } resp, err = c.QueryRange(q.QueryString, bs, start, end, d, q.Step, q.Interval, q.Quiet) if err != nil { @@ -111,13 +114,21 @@ func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool) break } // Also no result, wouldn't expect to hit this. - if lastEntry == nil { + if lastEntry == nil || len(lastEntry) == 0 { break } // Can only happen if all the results return in one request if resultLength == q.Limit { break } + if len(lastEntry) >= q.BatchSize { + log.Fatalf("Invalid batch size %v, the next query will have %v overlapping entries "+ + "(there will always be 1 overlapping entry but Loki allows multiple entries to have "+ + "the same timestamp, so when a batch ends in this scenario the next query will include "+ + "all the overlapping entries again). Please increase your batch size to at least %v to account "+ + "for overlapping entryes\n", q.BatchSize, len(lastEntry), len(lastEntry)+1) + } + // Batching works by taking the timestamp of the last query and using it in the next query, // because Loki supports multiple entries with the same timestamp it's possible for a batch to have // fallen in the middle of a list of entries for the same time, so to make sure we get all entries @@ -127,12 +138,13 @@ func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool) // to get the desired limit. total += resultLength // Based on the query direction we either set the start or end for the next query. + // If there are multiple entries in `lastEntry` they have to have the same timestamp so we can pick just the first if q.Forward { - start = lastEntry.Timestamp + start = lastEntry[0].Timestamp } else { // The end timestamp is exclusive on a backward query, so to make sure we get back an overlapping result // fudge the timestamp forward in time to make sure to get the last entry from this batch in the next query - end = lastEntry.Timestamp.Add(1 * time.Nanosecond) + end = lastEntry[0].Timestamp.Add(1 * time.Nanosecond) } } @@ -140,9 +152,9 @@ func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool) } -func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput, lastEntry *loghttp.Entry) (int, *loghttp.Entry) { +func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) { length := -1 - var entry *loghttp.Entry + var entry []*loghttp.Entry switch value.Type() { case logql.ValueTypeStreams: length, entry = q.printStream(value.(loghttp.Streams), out, lastEntry) @@ -241,7 +253,7 @@ func (q *Query) isInstant() bool { return q.Start == q.End } -func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput, lastEntry *loghttp.Entry) (int, *loghttp.Entry) { +func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) { common := commonLabels(streams) // Remove the labels we want to show from common @@ -304,14 +316,39 @@ func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput, lastE printed := 0 for _, e := range allEntries { // Skip the last entry if it overlaps, this happens because batching includes the last entry from the last batch - if lastEntry != nil && e.entry.Timestamp == lastEntry.Timestamp && e.entry.Line == lastEntry.Line { - continue + if lastEntry != nil && len(lastEntry) > 0 && e.entry.Timestamp == lastEntry[0].Timestamp { + skip := false + // Because many logs can share a timestamp in the unlucky event a batch ends with a timestamp + // shared by multiple entries we have to check all that were stored to see if we've already + // printed them. + for _, le := range lastEntry { + if e.entry.Line == le.Line { + skip = true + } + } + if skip { + continue + } } out.FormatAndPrintln(e.entry.Timestamp, e.labels, maxLabelsLen, e.entry.Line) printed++ } - return printed, &allEntries[len(allEntries)-1].entry + // Loki allows multiple entries at the same timestamp, this is a bit of a mess if a batch ends + // with an entry that shared multiple timestamps, so we need to keep a list of all these entries + // because the next query is going to contain them too and we want to not duplicate anything already + // printed. + lel := []*loghttp.Entry{} + // Start with the timestamp of the last entry + le := allEntries[len(allEntries)-1].entry + for i, e := range allEntries { + // Save any entry which has this timestamp (most of the time this will only be the single last entry) + if e.entry.Timestamp.Equal(le.Timestamp) { + lel = append(lel, &allEntries[i].entry) + } + } + + return printed, lel } func (q *Query) printMatrix(matrix loghttp.Matrix) { diff --git a/pkg/logcli/query/query_test.go b/pkg/logcli/query/query_test.go index cab1325fcea4..49a4be61bab7 100644 --- a/pkg/logcli/query/query_test.go +++ b/pkg/logcli/query/query_test.go @@ -5,6 +5,7 @@ import ( "context" "log" "reflect" + "strings" "testing" "time" @@ -151,48 +152,304 @@ func Test_subtract(t *testing.T) { func Test_batch(t *testing.T) { tests := []struct { - name string - streams []logproto.Stream - start, end time.Time - limit, batch int - labelMatcher string - forward bool - expected []string + name string + streams []logproto.Stream + start, end time.Time + limit, batch int + labelMatcher string + forward bool + expectedCalls int + expected []string }{ { name: "super simple forward", streams: []logproto.Stream{ logproto.Stream{ - Labels: "{test=\"simple\"}", + Labels: "{test=\"simple\"}", Entries: []logproto.Entry{ - logproto.Entry{ - Timestamp: time.Unix(1, 0), - Line: "line1", - }, - logproto.Entry{ - Timestamp: time.Unix(2, 0), - Line: "line2", - }, - logproto.Entry{ - Timestamp: time.Unix(3, 0), - Line: "line3", - }, + logproto.Entry{Timestamp: time.Unix(1, 0), Line: "line1"}, + logproto.Entry{Timestamp: time.Unix(2, 0), Line: "line2"}, + logproto.Entry{Timestamp: time.Unix(3, 0), Line: "line3"}, // End timestmap is exclusive }, }, }, - start: time.Unix(1, 0), - end: time.Unix(3, 0), - limit: 10, - batch: 10, - labelMatcher: "{test=\"simple\"}", - forward: true, + start: time.Unix(1, 0), + end: time.Unix(3, 0), + limit: 10, + batch: 10, + labelMatcher: "{test=\"simple\"}", + forward: true, + expectedCalls: 2, // Client doesn't know if the server hit a limit or there were no results so we have to query until there is no results, in this case 2 calls expected: []string{ "line1", "line2", }, - }, - + { + name: "super simple backward", + streams: []logproto.Stream{ + logproto.Stream{ + Labels: "{test=\"simple\"}", + Entries: []logproto.Entry{ + logproto.Entry{Timestamp: time.Unix(1, 0), Line: "line1"}, + logproto.Entry{Timestamp: time.Unix(2, 0), Line: "line2"}, + logproto.Entry{Timestamp: time.Unix(3, 0), Line: "line3"}, // End timestmap is exclusive + }, + }, + }, + start: time.Unix(1, 0), + end: time.Unix(3, 0), + limit: 10, + batch: 10, + labelMatcher: "{test=\"simple\"}", + forward: false, + expectedCalls: 2, + expected: []string{ + "line2", + "line1", + }, + }, + { + name: "single stream forward batch", + streams: []logproto.Stream{ + logproto.Stream{ + Labels: "{test=\"simple\"}", + Entries: []logproto.Entry{ + logproto.Entry{Timestamp: time.Unix(1, 0), Line: "line1"}, + logproto.Entry{Timestamp: time.Unix(2, 0), Line: "line2"}, + logproto.Entry{Timestamp: time.Unix(3, 0), Line: "line3"}, + logproto.Entry{Timestamp: time.Unix(4, 0), Line: "line4"}, + logproto.Entry{Timestamp: time.Unix(5, 0), Line: "line5"}, + logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6"}, + logproto.Entry{Timestamp: time.Unix(7, 0), Line: "line7"}, + logproto.Entry{Timestamp: time.Unix(8, 0), Line: "line8"}, + logproto.Entry{Timestamp: time.Unix(9, 0), Line: "line9"}, + logproto.Entry{Timestamp: time.Unix(10, 0), Line: "line10"}, + }, + }, + }, + start: time.Unix(1, 0), + end: time.Unix(11, 0), + limit: 9, + batch: 2, + labelMatcher: "{test=\"simple\"}", + forward: true, + // Our batchsize is 2 but each query will also return the overlapping last element from the + // previous batch, as such we only get one item per call so we make a lot of calls + // Call one: line1 line2 + // Call two: line2 line3 + // Call three: line3 line4 + // Call four: line4 line5 + // Call five: line5 line6 + // Call six: line6 line7 + // Call seven: line7 line8 + // Call eight: line8 line9 + expectedCalls: 8, + expected: []string{ + "line1", "line2", "line3", "line4", "line5", "line6", "line7", "line8", "line9", + }, + }, + { + name: "single stream backward batch", + streams: []logproto.Stream{ + logproto.Stream{ + Labels: "{test=\"simple\"}", + Entries: []logproto.Entry{ + logproto.Entry{Timestamp: time.Unix(1, 0), Line: "line1"}, + logproto.Entry{Timestamp: time.Unix(2, 0), Line: "line2"}, + logproto.Entry{Timestamp: time.Unix(3, 0), Line: "line3"}, + logproto.Entry{Timestamp: time.Unix(4, 0), Line: "line4"}, + logproto.Entry{Timestamp: time.Unix(5, 0), Line: "line5"}, + logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6"}, + logproto.Entry{Timestamp: time.Unix(7, 0), Line: "line7"}, + logproto.Entry{Timestamp: time.Unix(8, 0), Line: "line8"}, + logproto.Entry{Timestamp: time.Unix(9, 0), Line: "line9"}, + logproto.Entry{Timestamp: time.Unix(10, 0), Line: "line10"}, + }, + }, + }, + start: time.Unix(1, 0), + end: time.Unix(11, 0), + limit: 9, + batch: 2, + labelMatcher: "{test=\"simple\"}", + forward: false, + expectedCalls: 8, + expected: []string{ + "line10", "line9", "line8", "line7", "line6", "line5", "line4", "line3", "line2", + }, + }, + { + name: "two streams forward batch", + streams: []logproto.Stream{ + logproto.Stream{ + Labels: "{test=\"one\"}", + Entries: []logproto.Entry{ + logproto.Entry{Timestamp: time.Unix(1, 0), Line: "line1"}, + logproto.Entry{Timestamp: time.Unix(2, 0), Line: "line2"}, + logproto.Entry{Timestamp: time.Unix(3, 0), Line: "line3"}, + logproto.Entry{Timestamp: time.Unix(4, 0), Line: "line4"}, + logproto.Entry{Timestamp: time.Unix(5, 0), Line: "line5"}, + logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6"}, + logproto.Entry{Timestamp: time.Unix(7, 0), Line: "line7"}, + logproto.Entry{Timestamp: time.Unix(8, 0), Line: "line8"}, + logproto.Entry{Timestamp: time.Unix(9, 0), Line: "line9"}, + logproto.Entry{Timestamp: time.Unix(10, 0), Line: "line10"}, + }, + }, + logproto.Stream{ + Labels: "{test=\"two\"}", + Entries: []logproto.Entry{ + logproto.Entry{Timestamp: time.Unix(1, 1000), Line: "s2line1"}, + logproto.Entry{Timestamp: time.Unix(2, 1000), Line: "s2line2"}, + logproto.Entry{Timestamp: time.Unix(3, 1000), Line: "s2line3"}, + logproto.Entry{Timestamp: time.Unix(4, 1000), Line: "s2line4"}, + logproto.Entry{Timestamp: time.Unix(5, 1000), Line: "s2line5"}, + logproto.Entry{Timestamp: time.Unix(6, 1000), Line: "s2line6"}, + logproto.Entry{Timestamp: time.Unix(7, 1000), Line: "s2line7"}, + logproto.Entry{Timestamp: time.Unix(8, 1000), Line: "s2line8"}, + logproto.Entry{Timestamp: time.Unix(9, 1000), Line: "s2line9"}, + logproto.Entry{Timestamp: time.Unix(10, 1000), Line: "s2line10"}, + }, + }, + }, + start: time.Unix(1, 0), + end: time.Unix(11, 0), + limit: 12, + batch: 3, + labelMatcher: "{test=~\"one|two\"}", + forward: true, + // Six calls + // 1 line1, s2line1, line2 + // 2 line2, s2line2, line3 + // 3 line3, s2line3, line4 + // 4 line4, s2line4, line5 + // 5 line5, s2line5, line6 + // 6 line6, s2line6 + expectedCalls: 6, + expected: []string{ + "line1", "s2line1", "line2", "s2line2", "line3", "s2line3", "line4", "s2line4", "line5", "s2line5", "line6", "s2line6", + }, + }, + { + name: "two streams backward batch", + streams: []logproto.Stream{ + logproto.Stream{ + Labels: "{test=\"one\"}", + Entries: []logproto.Entry{ + logproto.Entry{Timestamp: time.Unix(1, 0), Line: "line1"}, + logproto.Entry{Timestamp: time.Unix(2, 0), Line: "line2"}, + logproto.Entry{Timestamp: time.Unix(3, 0), Line: "line3"}, + logproto.Entry{Timestamp: time.Unix(4, 0), Line: "line4"}, + logproto.Entry{Timestamp: time.Unix(5, 0), Line: "line5"}, + logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6"}, + logproto.Entry{Timestamp: time.Unix(7, 0), Line: "line7"}, + logproto.Entry{Timestamp: time.Unix(8, 0), Line: "line8"}, + logproto.Entry{Timestamp: time.Unix(9, 0), Line: "line9"}, + logproto.Entry{Timestamp: time.Unix(10, 0), Line: "line10"}, + }, + }, + logproto.Stream{ + Labels: "{test=\"two\"}", + Entries: []logproto.Entry{ + logproto.Entry{Timestamp: time.Unix(1, 1000), Line: "s2line1"}, + logproto.Entry{Timestamp: time.Unix(2, 1000), Line: "s2line2"}, + logproto.Entry{Timestamp: time.Unix(3, 1000), Line: "s2line3"}, + logproto.Entry{Timestamp: time.Unix(4, 1000), Line: "s2line4"}, + logproto.Entry{Timestamp: time.Unix(5, 1000), Line: "s2line5"}, + logproto.Entry{Timestamp: time.Unix(6, 1000), Line: "s2line6"}, + logproto.Entry{Timestamp: time.Unix(7, 1000), Line: "s2line7"}, + logproto.Entry{Timestamp: time.Unix(8, 1000), Line: "s2line8"}, + logproto.Entry{Timestamp: time.Unix(9, 1000), Line: "s2line9"}, + logproto.Entry{Timestamp: time.Unix(10, 1000), Line: "s2line10"}, + }, + }, + }, + start: time.Unix(1, 0), + end: time.Unix(11, 0), + limit: 12, + batch: 3, + labelMatcher: "{test=~\"one|two\"}", + forward: false, + expectedCalls: 6, + expected: []string{ + "s2line10", "line10", "s2line9", "line9", "s2line8", "line8", "s2line7", "line7", "s2line6", "line6", "s2line5", "line5", + }, + }, + { + name: "single stream forward batch identical timestamps", + streams: []logproto.Stream{ + logproto.Stream{ + Labels: "{test=\"simple\"}", + Entries: []logproto.Entry{ + logproto.Entry{Timestamp: time.Unix(1, 0), Line: "line1"}, + logproto.Entry{Timestamp: time.Unix(2, 0), Line: "line2"}, + logproto.Entry{Timestamp: time.Unix(3, 0), Line: "line3"}, + logproto.Entry{Timestamp: time.Unix(4, 0), Line: "line4"}, + logproto.Entry{Timestamp: time.Unix(5, 0), Line: "line5"}, + logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6"}, + logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6a"}, + logproto.Entry{Timestamp: time.Unix(7, 0), Line: "line7"}, + logproto.Entry{Timestamp: time.Unix(8, 0), Line: "line8"}, + logproto.Entry{Timestamp: time.Unix(9, 0), Line: "line9"}, + logproto.Entry{Timestamp: time.Unix(10, 0), Line: "line10"}, + }, + }, + }, + start: time.Unix(1, 0), + end: time.Unix(11, 0), + limit: 9, + batch: 4, + labelMatcher: "{test=\"simple\"}", + forward: true, + // Our batchsize is 2 but each query will also return the overlapping last element from the + // previous batch, as such we only get one item per call so we make a lot of calls + // Call one: line1 line2 line3 line4 + // Call two: line4 line5 line6 line6a + // Call three: line6 line6a line7 line8 <- notice line 6 and 6a share the same timestamp so they get returned as overlap in the next query. + expectedCalls: 3, + expected: []string{ + "line1", "line2", "line3", "line4", "line5", "line6", "line6a", "line7", "line8", + }, + }, + { + name: "single stream backward batch identical timestamps", + streams: []logproto.Stream{ + logproto.Stream{ + Labels: "{test=\"simple\"}", + Entries: []logproto.Entry{ + logproto.Entry{Timestamp: time.Unix(1, 0), Line: "line1"}, + logproto.Entry{Timestamp: time.Unix(2, 0), Line: "line2"}, + logproto.Entry{Timestamp: time.Unix(3, 0), Line: "line3"}, + logproto.Entry{Timestamp: time.Unix(4, 0), Line: "line4"}, + logproto.Entry{Timestamp: time.Unix(5, 0), Line: "line5"}, + logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6"}, + logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6a"}, + logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6b"}, + logproto.Entry{Timestamp: time.Unix(7, 0), Line: "line7"}, + logproto.Entry{Timestamp: time.Unix(8, 0), Line: "line8"}, + logproto.Entry{Timestamp: time.Unix(9, 0), Line: "line9"}, + logproto.Entry{Timestamp: time.Unix(10, 0), Line: "line10"}, + }, + }, + }, + start: time.Unix(1, 0), + end: time.Unix(11, 0), + limit: 11, + batch: 4, + labelMatcher: "{test=\"simple\"}", + forward: false, + // Our batchsize is 2 but each query will also return the overlapping last element from the + // previous batch, as such we only get one item per call so we make a lot of calls + // Call one: line10 line9 line8 line7 + // Call two: line7 line6b line6a line6 + // Call three: line6b line6a line6 line5 + // Call four: line5 line5 line3 line2 + expectedCalls: 4, + expected: []string{ + "line10", "line9", "line8", "line7", "line6b", "line6a", "line6", "line5", "line4", "line3", "line2", + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -216,7 +473,14 @@ func Test_batch(t *testing.T) { LocalConfig: "", } q.DoQuery(tc, out, false) - assert.Equal(t, tt.expected, writer.String()) + split := strings.Split(writer.String(), "\n") + // Remove the last entry because there is always a newline after the last line which + // leaves an entry element in the list of lines. + if len(split) > 0 { + split = split[:len(split)-1] + } + assert.Equal(t, tt.expected, split) + assert.Equal(t, tt.expectedCalls, tc.queryRangeCalls) }) } } @@ -232,13 +496,17 @@ func mustParseLabels(s string) loghttp.LabelSet { } type testQueryClient struct { - engine *logql.Engine + engine *logql.Engine + queryRangeCalls int } func newTestQueryClient(testStreams ...logproto.Stream) *testQueryClient { q := logql.NewMockQuerier(0, testStreams) e := logql.NewEngine(logql.EngineOpts{}, q) - return &testQueryClient{engine: e} + return &testQueryClient{ + engine: e, + queryRangeCalls: 0, + } } func (t *testQueryClient) Query(queryStr string, limit int, time time.Time, direction logproto.Direction, quiet bool) (*loghttp.QueryResponse, error) { @@ -267,7 +535,7 @@ func (t *testQueryClient) QueryRange(queryStr string, limit int, from, through t Statistics: v.Statistics, }, } - + t.queryRangeCalls++ return q, nil } diff --git a/pkg/logql/test_utils.go b/pkg/logql/test_utils.go index ecaa17bd1f69..35b2eeb33342 100644 --- a/pkg/logql/test_utils.go +++ b/pkg/logql/test_utils.go @@ -91,11 +91,23 @@ outer: } - return iter.NewTimeRangedIterator( - iter.NewStreamsIterator(ctx, filtered, req.Direction), - req.Start, - req.End, - ), nil + streamIters := make([]iter.EntryIterator, 0, len(filtered)) + for i := range filtered { + // This is the same as how LazyChunk or MemChunk build their iterators, + // they return a TimeRangedIterator which is wrapped in a EntryReversedIter if the direction is BACKWARD + iterForward := iter.NewTimeRangedIterator(iter.NewStreamIterator(filtered[i]), req.Start, req.End) + if req.Direction == logproto.FORWARD { + streamIters = append(streamIters, iterForward) + } else { + reversed, err := iter.NewEntryReversedIter(iterForward) + if err != nil { + return nil, err + } + streamIters = append(streamIters, reversed) + } + } + + return iter.NewHeapIterator(ctx, streamIters, req.Direction), nil } func (q MockQuerier) SelectSamples(ctx context.Context, req SelectSampleParams) (iter.SampleIterator, error) { From 844f1bf1e7a71bb1c924ac53c4fc8c3d89f3ef9c Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Sun, 9 Aug 2020 14:57:31 -0400 Subject: [PATCH 09/17] updating docs --- docs/sources/getting-started/logcli.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/docs/sources/getting-started/logcli.md b/docs/sources/getting-started/logcli.md index 769898261f60..8439a63d2765 100644 --- a/docs/sources/getting-started/logcli.md +++ b/docs/sources/getting-started/logcli.md @@ -62,6 +62,21 @@ $ logcli series -q --match='{namespace="loki",container_name="loki"}' {app="loki", container_name="loki", controller_revision_hash="loki-57c9df47f4", filename="/var/log/pods/loki_loki-0_8ed03ded-bacb-4b13-a6fe-53a445a15887/loki/0.log", instance="loki-0", job="loki/loki", name="loki", namespace="loki", release="loki", statefulset_kubernetes_io_pod_name="loki-0", stream="stderr"} ``` +#### Batched Queries + +Starting with Loki 1.6.0, `logcli` will batch log queries to Loki. + +If you set a `--limit` on a query (default is 30) to a large number, say `--limit=10000`, logcli will automatically +send this request to Loki in batches. + +The default batch size is `1000` + +Loki has a server side limit for the maximum lines returned in a query (default is 5000), + +Batching allows for making larger requests than the server side limit so long as the `--batch` size is less than the server limit. + +Please note that the query metadata is printed for each batch (it is printed on `stderr`), this can be removed with the `--quiet` flag. + ### Configuration Configuration values are considered in the following order (lowest to highest): @@ -188,6 +203,7 @@ Flags: range. --interval=INTERVAL Query interval, for log queries. Return entries at the specified interval, ignoring those between. **This parameter is experimental, please see Issue 1779**. + --batch=1000 Query batch size to use until 'limit' is reached --forward Scan forwards through logs. --no-labels Do not print any labels. --exclude-label=EXCLUDE-LABEL ... From 7af9a6af420daf45f227f4dafec3c4b0b008283d Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 10 Aug 2020 16:11:48 -0400 Subject: [PATCH 10/17] Update docs/sources/getting-started/logcli.md Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com> --- docs/sources/getting-started/logcli.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/getting-started/logcli.md b/docs/sources/getting-started/logcli.md index 8439a63d2765..d2b02ed50c7f 100644 --- a/docs/sources/getting-started/logcli.md +++ b/docs/sources/getting-started/logcli.md @@ -64,7 +64,7 @@ $ logcli series -q --match='{namespace="loki",container_name="loki"}' #### Batched Queries -Starting with Loki 1.6.0, `logcli` will batch log queries to Loki. +Starting with Loki 1.6.0, `logcli` batches log queries to Loki. If you set a `--limit` on a query (default is 30) to a large number, say `--limit=10000`, logcli will automatically send this request to Loki in batches. From 58c714f6836734294c619c1a3a3a3abda83ea49b Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 10 Aug 2020 16:12:15 -0400 Subject: [PATCH 11/17] Update docs/sources/getting-started/logcli.md Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com> --- docs/sources/getting-started/logcli.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/getting-started/logcli.md b/docs/sources/getting-started/logcli.md index d2b02ed50c7f..fbb24db74901 100644 --- a/docs/sources/getting-started/logcli.md +++ b/docs/sources/getting-started/logcli.md @@ -66,7 +66,7 @@ $ logcli series -q --match='{namespace="loki",container_name="loki"}' Starting with Loki 1.6.0, `logcli` batches log queries to Loki. -If you set a `--limit` on a query (default is 30) to a large number, say `--limit=10000`, logcli will automatically +If you set a `--limit` on a query (default is 30) to a large number, say `--limit=10000`, then logcli automatically send this request to Loki in batches. The default batch size is `1000` From 8fdb5c03b37dd6d69ad076f4d5f48a546d45fe8c Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 10 Aug 2020 16:12:22 -0400 Subject: [PATCH 12/17] Update docs/sources/getting-started/logcli.md Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com> --- docs/sources/getting-started/logcli.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/getting-started/logcli.md b/docs/sources/getting-started/logcli.md index fbb24db74901..2699255658d5 100644 --- a/docs/sources/getting-started/logcli.md +++ b/docs/sources/getting-started/logcli.md @@ -67,7 +67,7 @@ $ logcli series -q --match='{namespace="loki",container_name="loki"}' Starting with Loki 1.6.0, `logcli` batches log queries to Loki. If you set a `--limit` on a query (default is 30) to a large number, say `--limit=10000`, then logcli automatically -send this request to Loki in batches. +sends this request to Loki in batches. The default batch size is `1000` From 9cb43839434fa322aca72dff8982022e346a8e7b Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 10 Aug 2020 16:12:28 -0400 Subject: [PATCH 13/17] Update docs/sources/getting-started/logcli.md Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com> --- docs/sources/getting-started/logcli.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/getting-started/logcli.md b/docs/sources/getting-started/logcli.md index 2699255658d5..e379be45a44b 100644 --- a/docs/sources/getting-started/logcli.md +++ b/docs/sources/getting-started/logcli.md @@ -69,7 +69,7 @@ Starting with Loki 1.6.0, `logcli` batches log queries to Loki. If you set a `--limit` on a query (default is 30) to a large number, say `--limit=10000`, then logcli automatically sends this request to Loki in batches. -The default batch size is `1000` +The default batch size is `1000`. Loki has a server side limit for the maximum lines returned in a query (default is 5000), From 6d64aa4b372e3494ba1a49d70d3005b00b4ff0ee Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 10 Aug 2020 16:12:56 -0400 Subject: [PATCH 14/17] Update docs/sources/getting-started/logcli.md Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com> --- docs/sources/getting-started/logcli.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/getting-started/logcli.md b/docs/sources/getting-started/logcli.md index e379be45a44b..8ce09a41f35a 100644 --- a/docs/sources/getting-started/logcli.md +++ b/docs/sources/getting-started/logcli.md @@ -71,7 +71,7 @@ sends this request to Loki in batches. The default batch size is `1000`. -Loki has a server side limit for the maximum lines returned in a query (default is 5000), +Loki has a server-side limit for the maximum lines returned in a query (default is 5000). Batching allows for making larger requests than the server side limit so long as the `--batch` size is less than the server limit. From 521d5edee6012bd4e1ebf52fc656452b41cab59f Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 10 Aug 2020 16:13:15 -0400 Subject: [PATCH 15/17] Update docs/sources/getting-started/logcli.md Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com> --- docs/sources/getting-started/logcli.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/getting-started/logcli.md b/docs/sources/getting-started/logcli.md index 8ce09a41f35a..8960b1fcf58c 100644 --- a/docs/sources/getting-started/logcli.md +++ b/docs/sources/getting-started/logcli.md @@ -73,7 +73,7 @@ The default batch size is `1000`. Loki has a server-side limit for the maximum lines returned in a query (default is 5000). -Batching allows for making larger requests than the server side limit so long as the `--batch` size is less than the server limit. +Batching allows you to make larger requests than the server-side limit as long as the `--batch` size is less than the server limit. Please note that the query metadata is printed for each batch (it is printed on `stderr`), this can be removed with the `--quiet` flag. From a850c1d56fa7024830dd1e97131987905ce8f7f8 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 10 Aug 2020 16:13:28 -0400 Subject: [PATCH 16/17] Update docs/sources/getting-started/logcli.md Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com> --- docs/sources/getting-started/logcli.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/getting-started/logcli.md b/docs/sources/getting-started/logcli.md index 8960b1fcf58c..5a028dc57074 100644 --- a/docs/sources/getting-started/logcli.md +++ b/docs/sources/getting-started/logcli.md @@ -75,7 +75,7 @@ Loki has a server-side limit for the maximum lines returned in a query (default Batching allows you to make larger requests than the server-side limit as long as the `--batch` size is less than the server limit. -Please note that the query metadata is printed for each batch (it is printed on `stderr`), this can be removed with the `--quiet` flag. +Please note that the query metadata is printed for each batch on `stderr`. Set the `--quiet` flag to stop this behavior. ### Configuration From f3181568c54218e4a93f3978b158e6f498fa004f Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 10 Aug 2020 16:13:41 -0400 Subject: [PATCH 17/17] Update docs/sources/getting-started/logcli.md Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com> --- docs/sources/getting-started/logcli.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/getting-started/logcli.md b/docs/sources/getting-started/logcli.md index 5a028dc57074..3caaa8b92aa0 100644 --- a/docs/sources/getting-started/logcli.md +++ b/docs/sources/getting-started/logcli.md @@ -203,7 +203,7 @@ Flags: range. --interval=INTERVAL Query interval, for log queries. Return entries at the specified interval, ignoring those between. **This parameter is experimental, please see Issue 1779**. - --batch=1000 Query batch size to use until 'limit' is reached + --batch=1000 Query batch size to use until 'limit' is reached. --forward Scan forwards through logs. --no-labels Do not print any labels. --exclude-label=EXCLUDE-LABEL ...