From df57dc6bf9d93bb98cc5674dc31accd79002f336 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juha=20Tiensyrj=C3=A4?= Date: Wed, 16 Mar 2022 09:29:20 +0200 Subject: [PATCH] lambda-promtail: Add support for Kinesis data stream events --- CHANGELOG.md | 1 + .../sources/clients/lambda-promtail/_index.md | 15 ++++--- .../lambda-promtail/kinesis.go | 45 +++++++++++++++++++ tools/lambda-promtail/lambda-promtail/main.go | 5 ++- 4 files changed, 60 insertions(+), 6 deletions(-) create mode 100644 tools/lambda-promtail/lambda-promtail/kinesis.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 63958e69c975..773f4e0358bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ #### Promtail ##### Enhancements +* [5977](https://github.com/grafana/loki/pull/5977) **juissi-t** lambda-promtail: Add support for Kinesis data stream events * [6395](https://github.com/grafana/loki/pull/6395) **DylanGuedes**: Add encoding support * [6828](https://github.com/grafana/loki/pull/6828) **alexandre1984rj** Add the BotScore and BotScoreSrc fields once the Cloudflare API returns those two fields on the list of all available log fields. * [6656](https://github.com/grafana/loki/pull/6656) **carlospeon**: Allow promtail to add matches to the journal reader diff --git a/docs/sources/clients/lambda-promtail/_index.md b/docs/sources/clients/lambda-promtail/_index.md index 419832af8a4a..13144dea70d0 100644 --- a/docs/sources/clients/lambda-promtail/_index.md +++ b/docs/sources/clients/lambda-promtail/_index.md @@ -34,7 +34,7 @@ terraform apply -var "lambda_promtail_image=" -var "write_address=http ``` The first few lines of `main.tf` define the AWS region to deploy to. -Modify as desired, or remove and deploy to +Modify as desired, or remove and deploy to ``` provider "aws" { region = "us-east-2" @@ -80,7 +80,7 @@ To modify an existing CloudFormation stack, use [update-stack](https://docs.aws. This workflow is intended to be an effective approach for monitoring ephemeral jobs such as those run on AWS Lambda which are otherwise hard/impossible to monitor via one of the other Loki [clients](../). -Ephemeral jobs can quite easily run afoul of cardinality best practices. During high request load, an AWS lambda function might balloon in concurrency, creating many log streams in Cloudwatch. For this reason lambda-promtail defaults to **not** keeping the log stream value as a label when propagating the logs to Loki. This is only possible because new versions of Loki no longer have an ingestion ordering constraint on logs within a single stream. +Ephemeral jobs can quite easily run afoul of cardinality best practices. During high request load, an AWS lambda function might balloon in concurrency, creating many log streams in Cloudwatch. For this reason lambda-promtail defaults to **not** keeping the log stream value as a label when propagating the logs to Loki. This is only possible because new versions of Loki no longer have an ingestion ordering constraint on logs within a single stream. ### Proof of concept Loki deployments @@ -92,14 +92,19 @@ Note: Propagating logs from Cloudwatch to Loki means you'll still need to _pay_ This workflow allows ingesting AWS loadbalancer logs stored on S3 to Loki. +### Cloudfront real-time logs + +Cloudfront [real-time logs](https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/real-time-logs.html) can be sent to a Kinesis data stream. The data stream can be mapped to be an [event source](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html) for lambda-promtail to deliver the logs to Loki. + ## Propagated Labels -Incoming logs can have six special labels assigned to them which can be used in [relabeling](../promtail/configuration/#relabel_config) or later stages in a Promtail [pipeline](../promtail/pipelines/): +Incoming logs can have seven special labels assigned to them which can be used in [relabeling](../promtail/configuration/#relabel_config) or later stages in a Promtail [pipeline](../promtail/pipelines/): -- `__aws_log_type`: Where this log came from (Cloudwatch or S3). +- `__aws_log_type`: Where this log came from (Cloudwatch, Kinesis or S3). - `__aws_cloudwatch_log_group`: The associated Cloudwatch Log Group for this log. - `__aws_cloudwatch_log_stream`: The associated Cloudwatch Log Stream for this log (if `KEEP_STREAM=true`). - `__aws_cloudwatch_owner`: The AWS ID of the owner of this event. +- `__aws_kinesis_event_source_arn`: The Kinesis event source ARN. - `__aws_s3_log_lb`: The name of the loadbalancer. - `__aws_s3_log_lb_owner`: The Account ID of the loadbalancer owner. @@ -109,7 +114,7 @@ Incoming logs can have six special labels assigned to them which can be used in Note: This section is relevant if running Promtail between lambda-promtail and the end Loki deployment and was used to circumvent `out of order` problems prior to the v2.4 Loki release which removed the ordering constraint. -As stated earlier, this workflow moves the worst case stream cardinality from `number_of_log_streams` -> `number_of_log_groups` * `number_of_promtails`. For this reason, each Promtail must have a unique label attached to logs it processes (ideally via something like `--client.external-labels=promtail=${HOSTNAME}`) and it's advised to run a small number of Promtails behind a load balancer according to your throughput and redundancy needs. +As stated earlier, this workflow moves the worst case stream cardinality from `number_of_log_streams` -> `number_of_log_groups` * `number_of_promtails`. For this reason, each Promtail must have a unique label attached to logs it processes (ideally via something like `--client.external-labels=promtail=${HOSTNAME}`) and it's advised to run a small number of Promtails behind a load balancer according to your throughput and redundancy needs. This trade-off is very effective when you have a large number of log streams but want to aggregate them by the log group. This is very common in AWS Lambda, where log groups are the "application" and log streams are the individual application containers which are spun up and down at a whim, possibly just for a single function invocation. diff --git a/tools/lambda-promtail/lambda-promtail/kinesis.go b/tools/lambda-promtail/lambda-promtail/kinesis.go new file mode 100644 index 000000000000..80491fc265ab --- /dev/null +++ b/tools/lambda-promtail/lambda-promtail/kinesis.go @@ -0,0 +1,45 @@ +package main + +import ( + "context" + "time" + + "github.com/aws/aws-lambda-go/events" + "github.com/grafana/loki/pkg/logproto" + "github.com/prometheus/common/model" +) + +func parseKinesisEvent(ctx context.Context, b *batch, ev *events.KinesisEvent) error { + for _, record := range ev.Records { + timestamp := time.UnixMilli(record.Kinesis.ApproximateArrivalTimestamp.Unix()) + + labels := model.LabelSet{ + model.LabelName("__aws_log_type"): model.LabelValue("kinesis"), + model.LabelName("__aws_kinesis_event_source_arn"): model.LabelValue(record.EventSourceArn), + } + + labels = applyExtraLabels(labels) + + b.add(ctx, entry{labels, logproto.Entry{ + Line: string(record.Kinesis.Data), + Timestamp: timestamp, + }}) + } + + return nil +} + +func processKinesisEvent(ctx context.Context, ev *events.KinesisEvent) error { + batch, _ := newBatch(ctx) + + err := parseKinesisEvent(ctx, batch, ev) + if err != nil { + return err + } + + err = sendToPromtail(ctx, batch) + if err != nil { + return err + } + return nil +} diff --git a/tools/lambda-promtail/lambda-promtail/main.go b/tools/lambda-promtail/lambda-promtail/main.go index e6690ed954cf..bea917192b31 100644 --- a/tools/lambda-promtail/lambda-promtail/main.go +++ b/tools/lambda-promtail/lambda-promtail/main.go @@ -109,8 +109,9 @@ func applyExtraLabels(labels model.LabelSet) model.LabelSet { func checkEventType(ev map[string]interface{}) (interface{}, error) { var s3Event events.S3Event var cwEvent events.CloudwatchLogsEvent + var kinesisEvent events.KinesisEvent - types := [...]interface{}{&s3Event, &cwEvent} + types := [...]interface{}{&s3Event, &cwEvent, &kinesisEvent} j, _ := json.Marshal(ev) reader := strings.NewReader(string(j)) @@ -142,6 +143,8 @@ func handler(ctx context.Context, ev map[string]interface{}) error { return processS3Event(ctx, evt) case *events.CloudwatchLogsEvent: return processCWEvent(ctx, evt) + case *events.KinesisEvent: + return processKinesisEvent(ctx, evt) } return err