Skip to content

Commit

Permalink
add multi-client for promtail
Browse files Browse the repository at this point in the history
  • Loading branch information
3Xpl0it3r committed Apr 25, 2021
1 parent 6c9e53f commit 712575e
Show file tree
Hide file tree
Showing 429 changed files with 92,564 additions and 164 deletions.
18 changes: 9 additions & 9 deletions clients/cmd/docker-driver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"bytes"
"fmt"
config2 "github.com/grafana/loki/clients/pkg/promtail/client/config"
"io/ioutil"
"net/url"
"os"
Expand All @@ -21,7 +22,6 @@ import (
"gopkg.in/yaml.v2"

"github.com/grafana/loki/clients/pkg/logentry/stages"
"github.com/grafana/loki/clients/pkg/promtail/client"
"github.com/grafana/loki/clients/pkg/promtail/targets/file"

"github.com/grafana/loki/pkg/util"
Expand Down Expand Up @@ -68,21 +68,21 @@ const (
)

var (
defaultClientConfig = client.Config{
BatchWait: client.BatchWait,
BatchSize: client.BatchSize,
defaultClientConfig = config2.Config{
BatchWait: config2.BatchWait,
BatchSize: config2.BatchSize,
BackoffConfig: cortex_util.BackoffConfig{
MinBackoff: client.MinBackoff,
MaxBackoff: client.MaxBackoff,
MaxRetries: client.MaxRetries,
MinBackoff: config2.MinBackoff,
MaxBackoff: config2.MaxBackoff,
MaxRetries: config2.MaxRetries,
},
Timeout: client.Timeout,
Timeout: config2.Timeout,
}
)

type config struct {
labels model.LabelSet
clientConfig client.Config
clientConfig config2.Config
pipeline PipelineConfig
}

Expand Down
7 changes: 3 additions & 4 deletions clients/cmd/docker-driver/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bytes"
loki2 "github.com/grafana/loki/clients/pkg/promtail/client/loki"

"github.com/docker/docker/daemon/logger"
"github.com/go-kit/kit/log"
Expand All @@ -11,15 +12,13 @@ import (

"github.com/grafana/loki/clients/pkg/logentry/stages"
"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/client"

"github.com/grafana/loki/pkg/logproto"
)

var jobName = "docker"

type loki struct {
client client.Client
client loki2.Client
handler api.EntryHandler
labels model.LabelSet
logger log.Logger
Expand All @@ -34,7 +33,7 @@ func New(logCtx logger.Info, logger log.Logger) (logger.Logger, error) {
if err != nil {
return nil, err
}
c, err := client.New(prometheus.DefaultRegisterer, cfg.clientConfig, logger)
c, err := loki2.New(prometheus.DefaultRegisterer, cfg.clientConfig, logger)
if err != nil {
return nil, err
}
Expand Down
5 changes: 2 additions & 3 deletions clients/cmd/fluent-bit/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package main

import (
"fmt"
loki2 "github.com/grafana/loki/clients/pkg/promtail/client/loki"

"github.com/go-kit/kit/log"

"github.com/grafana/loki/clients/pkg/promtail/client"
)

type bufferConfig struct {
Expand All @@ -21,7 +20,7 @@ var defaultBufferConfig = bufferConfig{
}

// NewBuffer makes a new buffered Client.
func NewBuffer(cfg *config, logger log.Logger) (client.Client, error) {
func NewBuffer(cfg *config, logger log.Logger) (loki2.Client, error) {
switch cfg.bufferConfig.bufferType {
case "dque":
return newDque(cfg, logger)
Expand Down
7 changes: 3 additions & 4 deletions clients/cmd/fluent-bit/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ package main

import (
"github.com/go-kit/kit/log"
loki2 "github.com/grafana/loki/clients/pkg/promtail/client/loki"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/loki/clients/pkg/promtail/client"
)

// NewClient creates a new client based on the fluentbit configuration.
func NewClient(cfg *config, logger log.Logger) (client.Client, error) {
func NewClient(cfg *config, logger log.Logger) (loki2.Client, error) {
if cfg.bufferConfig.buffer {
return NewBuffer(cfg, logger)
}
return client.New(prometheus.DefaultRegisterer, cfg.clientConfig, logger)
return loki2.New(prometheus.DefaultRegisterer, cfg.clientConfig, logger)
}
7 changes: 3 additions & 4 deletions clients/cmd/fluent-bit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
config2 "github.com/grafana/loki/clients/pkg/promtail/client/config"
"io/ioutil"
"strconv"
"strings"
Expand All @@ -14,12 +15,10 @@ import (
"github.com/weaveworks/common/logging"

"github.com/grafana/loki/clients/pkg/logentry/logql"
"github.com/grafana/loki/clients/pkg/promtail/client"

lokiflag "github.com/grafana/loki/pkg/util/flagext"
)

var defaultClientCfg = client.Config{}
var defaultClientCfg = config2.Config{}

func init() {
// Init everything with default values.
Expand All @@ -43,7 +42,7 @@ const (
)

type config struct {
clientConfig client.Config
clientConfig config2.Config
bufferConfig bufferConfig
logLevel logging.Level
autoKubernetesLabels bool
Expand Down
9 changes: 4 additions & 5 deletions clients/cmd/fluent-bit/config_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
config2 "github.com/grafana/loki/clients/pkg/promtail/client/config"
"io/ioutil"
"net/url"
"os"
Expand All @@ -14,8 +15,6 @@ import (
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/weaveworks/common/logging"

"github.com/grafana/loki/clients/pkg/promtail/client"

lokiflag "github.com/grafana/loki/pkg/util/flagext"
)

Expand All @@ -39,7 +38,7 @@ func Test_parseConfig(t *testing.T) {
map[string]string{},
&config{
lineFormat: jsonFormat,
clientConfig: client.Config{
clientConfig: config2.Config{
URL: mustParseURL("http://localhost:3100/loki/api/v1/push"),
BatchSize: defaultClientCfg.BatchSize,
BatchWait: defaultClientCfg.BatchWait,
Expand Down Expand Up @@ -70,7 +69,7 @@ func Test_parseConfig(t *testing.T) {
},
&config{
lineFormat: kvPairFormat,
clientConfig: client.Config{
clientConfig: config2.Config{
URL: mustParseURL("http://somewhere.com:3100/loki/api/v1/push"),
TenantID: "my-tenant-id",
BatchSize: 100,
Expand Down Expand Up @@ -104,7 +103,7 @@ func Test_parseConfig(t *testing.T) {
},
&config{
lineFormat: kvPairFormat,
clientConfig: client.Config{
clientConfig: config2.Config{
URL: mustParseURL("http://somewhere.com:3100/loki/api/v1/push"),
TenantID: "", // empty as not set in fluent-bit plugin config map
BatchSize: 100,
Expand Down
9 changes: 4 additions & 5 deletions clients/cmd/fluent-bit/dque.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
loki2 "github.com/grafana/loki/clients/pkg/promtail/client/loki"
"os"
"sync"
"time"
Expand All @@ -13,8 +14,6 @@ import (
"github.com/prometheus/common/model"

"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/client"

"github.com/grafana/loki/pkg/logproto"
)

Expand Down Expand Up @@ -45,14 +44,14 @@ func dqueEntryBuilder() interface{} {
type dqueClient struct {
logger log.Logger
queue *dque.DQue
loki client.Client
loki loki2.Client
once sync.Once
wg sync.WaitGroup
entries chan api.Entry
}

// New makes a new dque loki client
func newDque(cfg *config, logger log.Logger) (client.Client, error) {
func newDque(cfg *config, logger log.Logger) (loki2.Client, error) {
var err error

q := &dqueClient{
Expand All @@ -73,7 +72,7 @@ func newDque(cfg *config, logger log.Logger) (client.Client, error) {
_ = q.queue.TurboOn()
}

q.loki, err = client.New(prometheus.DefaultRegisterer, cfg.clientConfig, logger)
q.loki, err = loki2.New(prometheus.DefaultRegisterer, cfg.clientConfig, logger)
if err != nil {
return nil, err
}
Expand Down
5 changes: 2 additions & 3 deletions clients/cmd/fluent-bit/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"fmt"
loki2 "github.com/grafana/loki/clients/pkg/promtail/client/loki"
"os"
"sort"
"strings"
Expand All @@ -17,8 +18,6 @@ import (
"github.com/weaveworks/common/logging"

"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/client"

"github.com/grafana/loki/pkg/logproto"
)

Expand All @@ -29,7 +28,7 @@ var (

type loki struct {
cfg *config
client client.Client
client loki2.Client
logger log.Logger
}

Expand Down
5 changes: 2 additions & 3 deletions clients/pkg/logentry/stages/tenant.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stages

import (
"github.com/grafana/loki/clients/pkg/promtail/client/loki"
"reflect"
"time"

Expand All @@ -9,8 +10,6 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/common/model"

"github.com/grafana/loki/clients/pkg/promtail/client"
)

const (
Expand Down Expand Up @@ -76,7 +75,7 @@ func (s *tenantStage) Process(labels model.LabelSet, extracted map[string]interf
return
}

labels[client.ReservedLabelTenantID] = model.LabelValue(tenantID)
labels[loki.ReservedLabelTenantID] = model.LabelValue(tenantID)
}

// Name implements Stage
Expand Down
11 changes: 5 additions & 6 deletions clients/pkg/logentry/stages/tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package stages

import (
"bytes"
"github.com/grafana/loki/clients/pkg/promtail/client/loki"
"strings"
"testing"
"time"
Expand All @@ -13,8 +14,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/clients/pkg/promtail/client"

lokiutil "github.com/grafana/loki/pkg/util"
)

Expand Down Expand Up @@ -131,7 +130,7 @@ func TestTenantStage_Process(t *testing.T) {
},
"should not override the tenant if the source field is not defined in the extracted map": {
config: &TenantConfig{Source: "tenant_id"},
inputLabels: model.LabelSet{client.ReservedLabelTenantID: "foo"},
inputLabels: model.LabelSet{loki.ReservedLabelTenantID: "foo"},
inputExtracted: map[string]interface{}{},
expectedTenant: lokiutil.StringRef("foo"),
},
Expand All @@ -143,7 +142,7 @@ func TestTenantStage_Process(t *testing.T) {
},
"should override the tenant if the source field is defined in the extracted map": {
config: &TenantConfig{Source: "tenant_id"},
inputLabels: model.LabelSet{client.ReservedLabelTenantID: "foo"},
inputLabels: model.LabelSet{loki.ReservedLabelTenantID: "foo"},
inputExtracted: map[string]interface{}{"tenant_id": "bar"},
expectedTenant: lokiutil.StringRef("bar"),
},
Expand All @@ -161,7 +160,7 @@ func TestTenantStage_Process(t *testing.T) {
},
"should override the tenant with the configured static value": {
config: &TenantConfig{Value: "bar"},
inputLabels: model.LabelSet{client.ReservedLabelTenantID: "foo"},
inputLabels: model.LabelSet{loki.ReservedLabelTenantID: "foo"},
inputExtracted: map[string]interface{}{},
expectedTenant: lokiutil.StringRef("bar"),
},
Expand All @@ -182,7 +181,7 @@ func TestTenantStage_Process(t *testing.T) {
assert.Equal(t, time.Unix(1, 1), out.Timestamp)
assert.Equal(t, "hello world", out.Line)

actualTenant, ok := out.Labels[client.ReservedLabelTenantID]
actualTenant, ok := out.Labels[loki.ReservedLabelTenantID]
if testData.expectedTenant == nil {
assert.False(t, ok)
} else {
Expand Down
Loading

0 comments on commit 712575e

Please sign in to comment.