Skip to content

Commit

Permalink
Gcplog targetmanager (#3083)
Browse files Browse the repository at this point in the history
* Add new target manager(PubsubTarget)  for promtail

Lets you to scrape log entries from pubsub topic and
send it to lok

- Basic `Target` and `TargetManager`
- Minimum config to make it work

* make the receive loop on pubsub manager work

* Add mmock pubsub server for testing

* Remove debugging logs

* Minor changes to messages pushing into loki

* Basic formatter to convert GCP log entry to loki log entry

* Reverting go.mod, go.sum and vendors to master branch

* only pubsub dependencies

* rebuild proto

* Fix protobuf diffs

* Remove debug printfs

* Add basic formatter for GCP log entry

- Create uniq label to create stream without out-of-order timestamp
- extract instance id and timestamp from log entry

* - Small fixes to formatter
- Config changes on pubsub manager for testing

* - Formatting changes
- Rewrite timestamp to avoid out-of-order errors on loki
- Minor config changes to Pubsub Target manager

* Fix target formatter tests

* PR remarks

* Minor refactoring and following changes

- Rename Pubsub* entities to Gcplog*
- Rename pubsub package into gcplog
- Add flag `KeepIncomingTimestamp` to toggle between either keeping incoming timestamps or rewriting with current timestamp
- Tests for gcplog target
- Docs and basic metrics to gcplog target

* Add docs for cloud configure for gcplog.

* Add docs for cloud configure for gcplog.

* Review feedback

* Checkin generated files

* Making linters happy

Co-authored-by: Edward Welch <edward.welch@grafana.com>
  • Loading branch information
kavirajk and slim-bean authored Jan 19, 2021
1 parent 1ffd4f6 commit 6cc41f9
Show file tree
Hide file tree
Showing 50 changed files with 14,324 additions and 63 deletions.
18 changes: 18 additions & 0 deletions cmd/promtail/promtail-local-pubsub-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
server:
http_listen_port: 9080
grpc_listen_port: 0

positions:
filename: /tmp/positions.yaml

clients:
- url: http://localhost:3100/loki/api/v1/push

scrape_configs:
- job_name: pubsub-test
gcplog:
project_id: "grafanalabs-dev"
subscription: "dev-logs-pull"
use_incoming_timestamp: false # default rewrite timestamp.
labels:
job: pubsub-gcp
64 changes: 64 additions & 0 deletions docs/sources/clients/promtail/gcplog-cloud.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
---
title: Cloud setup for gcplog TargetManager
---
This document explain how one can setup Google Cloud Platform to forward its cloud resource logs from a particular GCP project into Google Pubsub topic so that is available for Loki promtail to consume.

This document assumes, that reader have `gcloud` installed and have required permissions(as mentioned in #[Roles and Permission] section)

# Roles and Permission
User should have following roles to complete the setup.
- "roles/pubsub.editor"
- "roles/logging.configWriter"

# Setup Pubsub Topic
Google Pubsub Topic will act as the queue to persist log messages which then can be read from `promtail`.

```bash
$ gcloud pubsub topics create $TOPIC_ID
```

e.g:
```bash
$ gcloud pubsub topics create cloud-logs
```

# Setup Log Router
We create a log sink to forward cloud logs into pubsub topic created before

```bash
$ gcloud beta logging sinks create $SINK_NAME $SINK_LOCATION $OPTIONAL_FLAGS
```

e.g:
```bash
$ gcloud beta logging sinks create cloud-logs pubsub.googleapis.com/projects/my-project/topics/cloud-logs \
--log-filter='resource.type=("gcs_bucket")' \
--description="Cloud logs"
```

Above command also adds `log-filter` option which represents what type of logs should get into the destination `pubsub` topic.
For more information on adding `log-filter` refer this [document](https://cloud.google.com/logging/docs/export/configure_export_v2#creating_sink)

# Create Pubsub subscription for Loki
We create subscription for the pubsub topic we create above and `promtail` uses this subscription to consume log messages.

```bash
$ gcloud pubsub subscriptions create cloud-logs --topic=$TOPIC_ID \
--ack-deadline=$ACK_DEADLINE \
--message-retention-duration=$RETENTION_DURATION \
```

e.g:
```bash
$ gcloud pubsub subscriptions create cloud-logs --topic=pubsub.googleapis.com/projects/my-project/topics/cloud-logs \
--ack-deadline=10s \
--message-retention-duration=7d \
```

For more fine grained options, refer to the `gcloud pubsub subscriptions --help`

# ServiceAccount for Promtail
We need a service account with following permissions.
- pubsub.subscriber

This enables promtail to read log entries from the pubsub subscription created before.
22 changes: 22 additions & 0 deletions docs/sources/clients/promtail/scraping.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,28 @@ Keep in mind that labels prefixed with `__` will be dropped, so relabeling is re
target_label: syslog_identifier
```

## Gcplog scraping
Promtail supports scraping cloud resource logs(say GCS bucket logs, Load Balancer logs, Kubernetes Cluster logs) from GCP.
Configs are set in `gcplog` section in `scrape_config`

```yaml
- job_name: gcplog
gcplog:
project_id: "my-gcp-project"
subscription: "my-pubsub-subscription"
use_incoming_timestamp: false # default rewrite timestamps.
labels:
job: "gcplog"
```
Here `project_id` and `subscription` are the only required fields.

- `project_id` is the GCP project id.
- `subscription` is the GCP pubsub subscription where promtail can consume log entries from.

Before using `gcplog` target, GCP should be [configured](../gcplog-cloud) with pubsub subscription to receive logs from.

It also support `relabeling` and `pipeline` stages just like other targets.

## Syslog Receiver

Promtail supports receiving [IETF Syslog (RFC5424)](https://tools.ietf.org/html/rfc5424)
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/grafana/loki
go 1.15

require (
cloud.google.com/go/pubsub v1.3.1
github.com/NYTimes/gziphandler v1.1.1
github.com/aws/aws-lambda-go v1.17.0
github.com/blang/semver v3.5.1+incompatible // indirect
Expand Down Expand Up @@ -61,6 +62,7 @@ require (
go.uber.org/atomic v1.7.0
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b
google.golang.org/api v0.35.0
google.golang.org/grpc v1.33.1
gopkg.in/alecthomas/kingpin.v2 v2.2.6
gopkg.in/fsnotify.v1 v1.4.7
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2k
cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw=
cloud.google.com/go/pubsub v1.2.0 h1:Lpy6hKgdcl7a3WGSfJIFmxmcdjSpP6OmBEfcOv1Y680=
cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA=
cloud.google.com/go/pubsub v1.3.1 h1:ukjixP1wl0LpnZ6LWtZJ0mX5tBmjp1f8Sqer8Z2OMUU=
cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU=
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
cloud.google.com/go/storage v1.3.0/go.mod h1:9IAwXhoyBJ7z9LcAwkj0/7NnPzYaPeZxxVp3zm+5IqA=
Expand Down
68 changes: 6 additions & 62 deletions pkg/distributor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,23 @@ import (
"math"
"net/http"

"github.com/dustin/go-humanize"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/util"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/unmarshal"
unmarshal_legacy "github.com/grafana/loki/pkg/logql/unmarshal/legacy"
lokiutil "github.com/grafana/loki/pkg/util"
)

var (
contentType = http.CanonicalHeaderKey("Content-Type")

bytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_bytes_received_total",
Help: "The total number of uncompressed bytes received per tenant",
}, []string{"tenant"})
linesIngested = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "distributor_lines_received_total",
Help: "The total number of lines received per tenant",
}, []string{"tenant"})
)
var contentType = http.CanonicalHeaderKey("Content-Type")

const applicationJSON = "application/json"

// PushHandler reads a snappy-compressed proto from the HTTP body.
func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {

req, err := ParseRequest(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
Expand All @@ -60,62 +42,24 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
}

func ParseRequest(r *http.Request) (*logproto.PushRequest, error) {
userID, _ := user.ExtractOrgID(r.Context())
logger := util.WithContext(r.Context(), util.Logger)
body := lokiutil.NewSizeReader(r.Body)
contentType := r.Header.Get(contentType)
var req logproto.PushRequest

defer func() {
var (
entriesSize int64
streamLabelsSize int64
totalEntries int64
)

for _, s := range req.Streams {
streamLabelsSize += int64(len(s.Labels))
for _, e := range s.Entries {
totalEntries++
entriesSize += int64(len(e.Line))
}
}

// incrementing tenant metrics if we have a tenant.
if totalEntries != 0 && userID != "" {
bytesIngested.WithLabelValues(userID).Add(float64(entriesSize))
linesIngested.WithLabelValues(userID).Add(float64(totalEntries))
}

level.Debug(logger).Log(
"msg", "push request parsed",
"path", r.URL.Path,
"contentType", contentType,
"bodySize", humanize.Bytes(uint64(body.Size())),
"streams", len(req.Streams),
"entries", totalEntries,
"streamLabelsSize", humanize.Bytes(uint64(streamLabelsSize)),
"entriesSize", humanize.Bytes(uint64(entriesSize)),
"totalSize", humanize.Bytes(uint64(entriesSize+streamLabelsSize)),
)
}()

switch contentType {
switch r.Header.Get(contentType) {
case applicationJSON:
var err error

if loghttp.GetVersion(r.RequestURI) == loghttp.VersionV1 {
err = unmarshal.DecodePushRequest(body, &req)
err = unmarshal.DecodePushRequest(r.Body, &req)
} else {
err = unmarshal_legacy.DecodePushRequest(body, &req)
err = unmarshal_legacy.DecodePushRequest(r.Body, &req)
}

if err != nil {
return nil, err
}

default:
if err := util.ParseProtoReader(r.Context(), body, int(r.ContentLength), math.MaxInt32, &req, util.RawSnappy); err != nil {
if err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), math.MaxInt32, &req, util.RawSnappy); err != nil {
return nil, err
}
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/promtail/scrapeconfig/scrapeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Config struct {
JobName string `yaml:"job_name,omitempty"`
PipelineStages stages.PipelineStages `yaml:"pipeline_stages,omitempty"`
JournalConfig *JournalTargetConfig `yaml:"journal,omitempty"`
GcplogConfig *GcplogTargetConfig `yaml:"gcplog,omitempty"`
SyslogConfig *SyslogTargetConfig `yaml:"syslog,omitempty"`
PushConfig *PushTargetConfig `yaml:"loki_push_api,omitempty"`
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
Expand Down Expand Up @@ -164,6 +165,23 @@ type SyslogTargetConfig struct {
UseIncomingTimestamp bool `yaml:"use_incoming_timestamp"`
}

// GcplogTargetConfig describes a scrape config to pull logs from any pubsub topic.
type GcplogTargetConfig struct {
// ProjectID is the Cloud project id
ProjectID string `yaml:"project_id"`

// Subscription is the scription name we use to pull logs from a pubsub topic.
Subscription string `yaml:"subscription"`

// Labels are the additional labels to be added to log entry while pushing it to Loki server.
Labels model.LabelSet `yaml:"labels"`

// UseIncomingTimestamp represents whether to keep the timestamp same as actual log entry coming in or replace it with
// current timestamp at the time of processing.
// Its default value(`false`) denotes, replace it with current timestamp at the time of processing.
UseIncomingTimestamp bool `yaml:"use_incoming_timestamp"`
}

// PushTargetConfig describes a scrape config that listens for Loki push messages.
type PushTargetConfig struct {
// Server is the weaveworks server config for listening connections
Expand Down
11 changes: 10 additions & 1 deletion pkg/promtail/targets/file/filetarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,16 @@ type FileTarget struct {
}

// NewFileTarget create a new FileTarget.
func NewFileTarget(metrics *Metrics, logger log.Logger, handler api.EntryHandler, positions positions.Positions, path string, labels model.LabelSet, discoveredLabels model.LabelSet, targetConfig *Config) (*FileTarget, error) {
func NewFileTarget(
metrics *Metrics,
logger log.Logger,
handler api.EntryHandler,
positions positions.Positions,
path string,
labels model.LabelSet,
discoveredLabels model.LabelSet,
targetConfig *Config,
) (*FileTarget, error) {

watcher, err := fsnotify.NewWatcher()
if err != nil {
Expand Down
90 changes: 90 additions & 0 deletions pkg/promtail/targets/gcplog/formatter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package gcplog

import (
"fmt"
"strings"
"time"

"cloud.google.com/go/pubsub"
json "github.com/json-iterator/go"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/promtail/api"
)

// LogEntry that will be written to the pubsub topic.
// According to the following spec.
// https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry
// nolint: golint
type GCPLogEntry struct {
LogName string `json:"logName"`
Resource struct {
Type string `json:"type"`
Labels map[string]string `json:"labels"`
} `json:"resource"`
Timestamp string `json:"timestamp"`

// The time the log entry was received by Logging.
// Its important that `Timestamp` is optional in GCE log entry.
ReceiveTimestamp string `json:"receiveTimestamp"`

TextPayload string `json:"textPayload"`

// NOTE(kavi): There are other fields on GCPLogEntry. but we need only need above fields for now
// anyway we will be sending the entire entry to Loki.
}

func format(m *pubsub.Message, other model.LabelSet, useIncomingTimestamp bool) (api.Entry, error) {
var ge GCPLogEntry

if err := json.Unmarshal(m.Data, &ge); err != nil {
return api.Entry{}, err
}

labels := model.LabelSet{
"logName": model.LabelValue(ge.LogName),
"resourceType": model.LabelValue(ge.Resource.Type),
}
for k, v := range ge.Resource.Labels {
if !model.LabelName(k).IsValid() || !model.LabelValue(k).IsValid() {
continue
}
labels[model.LabelName(k)] = model.LabelValue(v)
}

// add labels from config as well.
labels = labels.Merge(other)

ts := time.Now()
line := string(m.Data)

if useIncomingTimestamp {
tt := ge.Timestamp
if tt == "" {
tt = ge.ReceiveTimestamp
}
var err error
ts, err = time.Parse(time.RFC3339, tt)
if err != nil {
return api.Entry{}, fmt.Errorf("invalid timestamp format: %w", err)
}

if ts.IsZero() {
return api.Entry{}, fmt.Errorf("no timestamp found in the log entry")
}
}

// Send only `ge.textPaylload` as log line if its present.
if strings.TrimSpace(ge.TextPayload) != "" {
line = ge.TextPayload
}

return api.Entry{
Labels: labels,
Entry: logproto.Entry{
Timestamp: ts,
Line: line,
},
}, nil
}
Loading

0 comments on commit 6cc41f9

Please sign in to comment.