Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logcli: automatically batch requests #2482

Merged
merged 17 commits into from
Aug 10, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions cmd/logcli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func main() {
ColoredOutput: rangeQuery.ColoredOutput,
}

out, err := output.NewLogOutput(*outputMode, outputOptions)
out, err := output.NewLogOutput(os.Stdout, *outputMode, outputOptions)
if err != nil {
log.Fatalf("Unable to create log output: %s", err)
}
Expand All @@ -145,7 +145,7 @@ func main() {
ColoredOutput: instantQuery.ColoredOutput,
}

out, err := output.NewLogOutput(*outputMode, outputOptions)
out, err := output.NewLogOutput(os.Stdout, *outputMode, outputOptions)
if err != nil {
log.Fatalf("Unable to create log output: %s", err)
}
Expand All @@ -158,8 +158,9 @@ func main() {
}
}

func newQueryClient(app *kingpin.Application) *client.Client {
client := &client.Client{
func newQueryClient(app *kingpin.Application) client.Client {

client := &client.DefaultClient{
TLSConfig: config.TLSConfig{},
}

Expand Down Expand Up @@ -273,6 +274,7 @@ func newQuery(instant bool, cmd *kingpin.CmdClause) *query.Query {
cmd.Flag("to", "Stop looking for logs at this absolute time (exclusive)").StringVar(&to)
cmd.Flag("step", "Query resolution step width, for metric queries. Evaluate the query at the specified step over the time range.").DurationVar(&q.Step)
cmd.Flag("interval", "Query interval, for log queries. Return entries at the specified interval, ignoring those between. **This parameter is experimental, please see Issue 1779**").DurationVar(&q.Interval)
cmd.Flag("batch", "Query batch size to use until 'limit' is reached").Default("1000").IntVar(&q.BatchSize)
}

cmd.Flag("forward", "Scan forwards through logs.").Default("false").BoolVar(&q.Forward)
Expand Down
55 changes: 35 additions & 20 deletions pkg/logcli/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,19 @@ var (
userAgent = fmt.Sprintf("loki-logcli/%s", build.Version)
)

// Client contains all the methods to query a Loki instance, it's an interface to allow multiple implementations.
type Client interface {
Query(queryStr string, limit int, time time.Time, direction logproto.Direction, quiet bool) (*loghttp.QueryResponse, error)
QueryRange(queryStr string, limit int, from, through time.Time, direction logproto.Direction, step, interval time.Duration, quiet bool) (*loghttp.QueryResponse, error)
ListLabelNames(quiet bool, from, through time.Time) (*loghttp.LabelResponse, error)
ListLabelValues(name string, quiet bool, from, through time.Time) (*loghttp.LabelResponse, error)
Series(matchers []string, from, through time.Time, quiet bool) (*loghttp.SeriesResponse, error)
LiveTailQueryConn(queryStr string, delayFor int, limit int, from int64, quiet bool) (*websocket.Conn, error)
GetOrgID() string
}

// Client contains fields necessary to query a Loki instance
type Client struct {
type DefaultClient struct {
TLSConfig config.TLSConfig
Username string
Password string
Expand All @@ -46,7 +57,7 @@ type Client struct {
// Query uses the /api/v1/query endpoint to execute an instant query
// excluding interfacer b/c it suggests taking the interface promql.Node instead of logproto.Direction b/c it happens to have a String() method
// nolint:interfacer
func (c *Client) Query(queryStr string, limit int, time time.Time, direction logproto.Direction, quiet bool) (*loghttp.QueryResponse, error) {
func (c *DefaultClient) Query(queryStr string, limit int, time time.Time, direction logproto.Direction, quiet bool) (*loghttp.QueryResponse, error) {
qsb := util.NewQueryStringBuilder()
qsb.SetString("query", queryStr)
qsb.SetInt("limit", int64(limit))
Expand All @@ -59,7 +70,7 @@ func (c *Client) Query(queryStr string, limit int, time time.Time, direction log
// QueryRange uses the /api/v1/query_range endpoint to execute a range query
// excluding interfacer b/c it suggests taking the interface promql.Node instead of logproto.Direction b/c it happens to have a String() method
// nolint:interfacer
func (c *Client) QueryRange(queryStr string, limit int, from, through time.Time, direction logproto.Direction, step, interval time.Duration, quiet bool) (*loghttp.QueryResponse, error) {
func (c *DefaultClient) QueryRange(queryStr string, limit int, from, through time.Time, direction logproto.Direction, step, interval time.Duration, quiet bool) (*loghttp.QueryResponse, error) {
params := util.NewQueryStringBuilder()
params.SetString("query", queryStr)
params.SetInt32("limit", limit)
Expand All @@ -81,7 +92,7 @@ func (c *Client) QueryRange(queryStr string, limit int, from, through time.Time,
}

// ListLabelNames uses the /api/v1/label endpoint to list label names
func (c *Client) ListLabelNames(quiet bool, from, through time.Time) (*loghttp.LabelResponse, error) {
func (c *DefaultClient) ListLabelNames(quiet bool, from, through time.Time) (*loghttp.LabelResponse, error) {
var labelResponse loghttp.LabelResponse
params := util.NewQueryStringBuilder()
params.SetInt("start", from.UnixNano())
Expand All @@ -94,7 +105,7 @@ func (c *Client) ListLabelNames(quiet bool, from, through time.Time) (*loghttp.L
}

// ListLabelValues uses the /api/v1/label endpoint to list label values
func (c *Client) ListLabelValues(name string, quiet bool, from, through time.Time) (*loghttp.LabelResponse, error) {
func (c *DefaultClient) ListLabelValues(name string, quiet bool, from, through time.Time) (*loghttp.LabelResponse, error) {
path := fmt.Sprintf(labelValuesPath, url.PathEscape(name))
var labelResponse loghttp.LabelResponse
params := util.NewQueryStringBuilder()
Expand All @@ -106,7 +117,7 @@ func (c *Client) ListLabelValues(name string, quiet bool, from, through time.Tim
return &labelResponse, nil
}

func (c *Client) Series(matchers []string, from, through time.Time, quiet bool) (*loghttp.SeriesResponse, error) {
func (c *DefaultClient) Series(matchers []string, from, through time.Time, quiet bool) (*loghttp.SeriesResponse, error) {
params := util.NewQueryStringBuilder()
params.SetInt("start", from.UnixNano())
params.SetInt("end", through.UnixNano())
Expand All @@ -119,7 +130,22 @@ func (c *Client) Series(matchers []string, from, through time.Time, quiet bool)
return &seriesResponse, nil
}

func (c *Client) doQuery(path string, query string, quiet bool) (*loghttp.QueryResponse, error) {
// LiveTailQueryConn uses /api/prom/tail to set up a websocket connection and returns it
func (c *DefaultClient) LiveTailQueryConn(queryStr string, delayFor int, limit int, from int64, quiet bool) (*websocket.Conn, error) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I only moved this to keep it in the same order as the interface and put it before private methods

qsb := util.NewQueryStringBuilder()
qsb.SetString("query", queryStr)
qsb.SetInt("delay_for", int64(delayFor))
qsb.SetInt("limit", int64(limit))
qsb.SetInt("from", from)

return c.wsConnect(tailPath, qsb.Encode(), quiet)
}

func (c *DefaultClient) GetOrgID() string {
return c.OrgID
}

func (c *DefaultClient) doQuery(path string, query string, quiet bool) (*loghttp.QueryResponse, error) {
var err error
var r loghttp.QueryResponse

Expand All @@ -130,7 +156,7 @@ func (c *Client) doQuery(path string, query string, quiet bool) (*loghttp.QueryR
return &r, nil
}

func (c *Client) doRequest(path, query string, quiet bool, out interface{}) error {
func (c *DefaultClient) doRequest(path, query string, quiet bool, out interface{}) error {

us, err := buildURL(c.Address, path, query)
if err != nil {
Expand Down Expand Up @@ -180,18 +206,7 @@ func (c *Client) doRequest(path, query string, quiet bool, out interface{}) erro
return json.NewDecoder(resp.Body).Decode(out)
}

// LiveTailQueryConn uses /api/prom/tail to set up a websocket connection and returns it
func (c *Client) LiveTailQueryConn(queryStr string, delayFor int, limit int, from int64, quiet bool) (*websocket.Conn, error) {
qsb := util.NewQueryStringBuilder()
qsb.SetString("query", queryStr)
qsb.SetInt("delay_for", int64(delayFor))
qsb.SetInt("limit", int64(limit))
qsb.SetInt("from", from)

return c.wsConnect(tailPath, qsb.Encode(), quiet)
}

func (c *Client) wsConnect(path, query string, quiet bool) (*websocket.Conn, error) {
func (c *DefaultClient) wsConnect(path, query string, quiet bool) (*websocket.Conn, error) {
us, err := buildURL(c.Address, path, query)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/logcli/labelquery/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type LabelQuery struct {
}

// DoLabels prints out label results
func (q *LabelQuery) DoLabels(c *client.Client) {
func (q *LabelQuery) DoLabels(c client.Client) {
values := q.ListLabels(c)

for _, value := range values {
Expand All @@ -27,7 +27,7 @@ func (q *LabelQuery) DoLabels(c *client.Client) {
}

// ListLabels returns an array of label strings
func (q *LabelQuery) ListLabels(c *client.Client) []string {
func (q *LabelQuery) ListLabels(c client.Client) []string {
var labelResponse *loghttp.LabelResponse
var err error
if len(q.LabelName) > 0 {
Expand Down
13 changes: 8 additions & 5 deletions pkg/logcli/output/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package output

import (
"fmt"
"io"
"strings"
"time"

Expand All @@ -12,24 +13,26 @@ import (

// DefaultOutput provides logs and metadata in human readable format
type DefaultOutput struct {
w io.Writer
options *LogOutputOptions
}

// Format a log entry in a human readable format
func (o *DefaultOutput) Format(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen int, line string) string {
func (o *DefaultOutput) FormatAndPrintln(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen int, line string) {
timestamp := ts.In(o.options.Timezone).Format(time.RFC3339)
line = strings.TrimSpace(line)

if o.options.NoLabels {
return fmt.Sprintf("%s %s", color.BlueString(timestamp), line)
fmt.Fprintf(o.w, "%s %s\n", color.BlueString(timestamp), line)
return
}

if o.options.ColoredOutput {
labelsColor := getColor(lbls.String()).SprintFunc()
return fmt.Sprintf("%s %s %s", color.BlueString(timestamp), labelsColor(padLabel(lbls, maxLabelsLen)), line)
fmt.Fprintf(o.w, "%s %s %s\n", color.BlueString(timestamp), labelsColor(padLabel(lbls, maxLabelsLen)), line)
} else {
fmt.Fprintf(o.w, "%s %s %s\n", color.BlueString(timestamp), color.RedString(padLabel(lbls, maxLabelsLen)), line)
}

return fmt.Sprintf("%s %s %s", color.BlueString(timestamp), color.RedString(padLabel(lbls, maxLabelsLen)), line)
}

// add some padding after labels
Expand Down
29 changes: 17 additions & 12 deletions pkg/logcli/output/default_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package output

import (
"bytes"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -33,47 +34,47 @@ func TestDefaultOutput_Format(t *testing.T) {
emptyLabels,
0,
"",
"2006-01-02T08:04:05Z {} ",
"2006-01-02T08:04:05Z {} \n",
},
"empty line with labels": {
&LogOutputOptions{Timezone: time.UTC, NoLabels: false},
timestamp,
someLabels,
len(someLabels.String()),
"",
"2006-01-02T08:04:05Z {type=\"test\"} ",
"2006-01-02T08:04:05Z {type=\"test\"} \n",
},
"max labels length shorter than input labels": {
&LogOutputOptions{Timezone: time.UTC, NoLabels: false},
timestamp,
someLabels,
0,
"Hello",
"2006-01-02T08:04:05Z {type=\"test\"} Hello",
"2006-01-02T08:04:05Z {type=\"test\"} Hello\n",
},
"max labels length longer than input labels": {
&LogOutputOptions{Timezone: time.UTC, NoLabels: false},
timestamp,
someLabels,
20,
"Hello",
"2006-01-02T08:04:05Z {type=\"test\"} Hello",
"2006-01-02T08:04:05Z {type=\"test\"} Hello\n",
},
"timezone option set to a Local one": {
&LogOutputOptions{Timezone: time.FixedZone("test", 2*60*60), NoLabels: false},
timestamp,
someLabels,
0,
"Hello",
"2006-01-02T10:04:05+02:00 {type=\"test\"} Hello",
"2006-01-02T10:04:05+02:00 {type=\"test\"} Hello\n",
},
"labels output disabled": {
&LogOutputOptions{Timezone: time.UTC, NoLabels: true},
timestamp,
someLabels,
0,
"Hello",
"2006-01-02T08:04:05Z Hello",
"2006-01-02T08:04:05Z Hello\n",
},
}

Expand All @@ -83,10 +84,11 @@ func TestDefaultOutput_Format(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()

out := &DefaultOutput{testData.options}
actual := out.Format(testData.timestamp, testData.lbls, testData.maxLabelsLen, testData.line)
writer := &bytes.Buffer{}
out := &DefaultOutput{writer,testData.options}
out.FormatAndPrintln(testData.timestamp, testData.lbls, testData.maxLabelsLen, testData.line)

assert.Equal(t, testData.expected, actual)
assert.Equal(t, testData.expected, writer.String())
})
}
}
Expand All @@ -111,16 +113,19 @@ func TestDefaultOutput_FormatLabelsPadding(t *testing.T) {
timestamp, _ := time.Parse(time.RFC3339, "2006-01-02T15:04:05+07:00")
maxLabelsLen := findMaxLabelsLength(labelsList)
options := &LogOutputOptions{Timezone: time.UTC, NoLabels: false}
out := &DefaultOutput{options}
writer := &bytes.Buffer{}
out := &DefaultOutput{writer,options}

// Format the same log line with different labels
formattedEntries := make([]string, 0, len(labelsList))
for _, lbls := range labelsList {
formattedEntries = append(formattedEntries, out.Format(timestamp, lbls, maxLabelsLen, "XXX"))
out.FormatAndPrintln(timestamp, lbls, maxLabelsLen, "XXX")
formattedEntries = append(formattedEntries, writer.String())
writer.Reset()
}

// Ensure the log line starts at the same position in each formatted output
assert.Equal(t, len(formattedEntries), len(labelsList))
assert.Equal(t, len(labelsList), len(formattedEntries))

expectedIndex := strings.Index(formattedEntries[0], "XXX")
if expectedIndex <= 0 {
Expand Down
7 changes: 5 additions & 2 deletions pkg/logcli/output/jsonl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package output

import (
"encoding/json"
"fmt"
"io"
"log"
"time"

Expand All @@ -10,11 +12,12 @@ import (

// JSONLOutput prints logs and metadata as JSON Lines, suitable for scripts
type JSONLOutput struct {
w io.Writer
options *LogOutputOptions
}

// Format a log entry as json line
func (o *JSONLOutput) Format(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen int, line string) string {
func (o *JSONLOutput) FormatAndPrintln(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen int, line string) {
entry := map[string]interface{}{
"timestamp": ts.In(o.options.Timezone),
"line": line,
Expand All @@ -30,5 +33,5 @@ func (o *JSONLOutput) Format(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen i
log.Fatalf("error marshalling entry: %s", err)
}

return string(out)
fmt.Fprintln(o.w, string(out))
}
Loading