Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lambda-promtail: Add ability to ingest logs from S3 #5065

Merged
merged 26 commits into from
Feb 4, 2022

Conversation

AndreZiviani
Copy link
Contributor

@AndreZiviani AndreZiviani commented Jan 6, 2022

What this PR does / why we need it:
Allows lambda-promtail to ingest load balancer logs stored on S3, before it was only possible to ingest logs from CloudWatch

Which issue(s) this PR fixes:

Special notes for your reviewer:
I've made it somewhat modular allowing to plug new datasources (e.g. SNS, SQS...) on the same lambda function easier but it is not trivial to have the same lambda receive events from multiple sources in go (it's easy on other languages) that's why I changed the event parameter to a map of interface.
Since Loki now accepts out of order writes I'm sending the timestamp from the log itself.
I've also copied the batch feature from promtail itself because a S3 log can be quite big.
Most of the changes are either inspired or straight copied from promtail source code.

Checklist

  • Documentation added
  • Tests updated
  • Add an entry in the CHANGELOG.md about the changes.

@AndreZiviani AndreZiviani requested a review from a team as a code owner January 6, 2022 17:11
@AndreZiviani
Copy link
Contributor Author

am I supposed to manually edit the changelog? the PR template is a little ambiguous to me

@zswanson
Copy link

zswanson commented Jan 6, 2022

I can't speak to the cloudformation (not my jam) but we should add the variables (bucket name, prefixes) and lambda permissions and s3 event notification to the terraform to support this.

https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/s3_bucket_notification#add-notification-configuration-to-lambda-function

@trevorwhitney
Copy link
Collaborator

@AndreZiviani yes, please manually add a changelog entry to this PR.

@pull-request-size pull-request-size bot added size/XL and removed size/L labels Jan 7, 2022
tools/lambda-promtail/lambda-promtail/s3.go Outdated Show resolved Hide resolved
tools/lambda-promtail/main.tf Outdated Show resolved Hide resolved
tools/lambda-promtail/main.tf Outdated Show resolved Hide resolved
Copy link
Contributor

@cstyan cstyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking good. I don't know as much about s3 and it's metadata so I will double check docs there before reading the s3 specific go code here.

tools/lambda-promtail/lambda-promtail/cw.go Outdated Show resolved Hide resolved
tools/lambda-promtail/lambda-promtail/cw.go Outdated Show resolved Hide resolved
tools/lambda-promtail/lambda-promtail/cw.go Outdated Show resolved Hide resolved
tools/lambda-promtail/lambda-promtail/main.go Show resolved Hide resolved
tools/lambda-promtail/lambda-promtail/main.go Outdated Show resolved Hide resolved
@AndreZiviani
Copy link
Contributor Author

sorry for those force pushes, I made a mistake when merging/rebasing...

@cstyan
Copy link
Contributor

cstyan commented Jan 21, 2022

@AndreZiviani let me know if you need help with any of the outstanding comments :)

@AndreZiviani
Copy link
Contributor Author

thanks @cstyan, I had to focus on other things this past week, will try to address your suggestions on Monday/Tuesday

Copy link
Contributor

@cstyan cstyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Test failures don't look related, just a failed git clone for a sub job.

Copy link
Member

@owen-d owen-d left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, this is looking good! Would you mind updating the docs with this new functionality?

tools/lambda-promtail/main.tf Outdated Show resolved Hide resolved
Copy link

@arvindkonar arvindkonar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great! this is what i already did to make mine work!

Copy link
Contributor

@KMiller-Grafana KMiller-Grafana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs portion of the PR looks good to me.

Copy link
Member

@owen-d owen-d left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, this is looking good. Thanks for the PR! I've left a few suggestions here that could improve the approach, but I also think we could do those in a followup PR if you prefer (either done by yourself or someone else, maybe @cstyan?).

tools/lambda-promtail/lambda-promtail/s3.go Outdated Show resolved Hide resolved
tools/lambda-promtail/lambda-promtail/promtail.go Outdated Show resolved Hide resolved
Copy link
Contributor

@cstyan cstyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good, a few more comments but they're mostly style nits.

I think it would be a good idea to eventually move the global variables we have in main.go that we're now using within other files for batchSize to a struct of config options or something like that, but that doesn't need to happen in this PR.

tools/lambda-promtail/lambda-promtail/main.go Outdated Show resolved Hide resolved
Comment on lines 57 to 69
if stream, ok := b.streams[labels]; ok {
stream.Entries = append(stream.Entries, e.entry)
return

b.size += stream.Size()

} else {

b.streams[labels] = &logproto.Stream{
Labels: labels,
Entries: []logproto.Entry{e.entry},
}

b.size += b.streams[labels].Size()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit again: we could simplify to something like this

stream, ok := b.streams[labels];
if  !ok {
    b.streams[labels] = &logproto.Stream{
	Labels:  labels,
    }
    stream = b.stream[labels]
}
stream.Entries = append(stream.Entries, e.entry)
b.size += stream.Size()

tools/lambda-promtail/lambda-promtail/promtail.go Outdated Show resolved Hide resolved
return nil, err
var s3Client *s3.Client

if c, ok := s3Clients[labels["bucket_region"]]; ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we only need a single client per bucket region? there isn't the possibility of different buckets in the same region requiring different clients for auth reasons?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect we can ignore this for now as multiple regions/bucket auth can be attached to this lambda.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think multiple buckets with different auth options would increase the complexity, it would be easier to configure another lambda with another role

@@ -38,7 +38,7 @@ func parseCWEvent(ctx context.Context, b *batch, ev *events.CloudwatchLogsEvent)
}

func processCWEvent(ctx context.Context, ev *events.CloudwatchLogsEvent) error {
batch := newBatch()
batch, _ := newBatch(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not swallow this error. Perhaps newBatch shouldn't accept entries and then newBatch won't need to return an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the batch will always be empty here it should never fail, thats why I ignored it

if stream, ok := b.streams[labels]; ok {
stream.Entries = append(stream.Entries, e.entry)

b.size += stream.Size()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Size() method doesn't actually measure what we're expecting (it's part of the generated protobuf). Instead, I'd keep track of the line's bytel length as they're added, like so:
https://github.com/grafana/loki/blob/main/clients/pkg/promtail/client/batch.go#L44-L45

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, will fix that

tools/lambda-promtail/lambda-promtail/promtail.go Outdated Show resolved Hide resolved
return nil, err
var s3Client *s3.Client

if c, ok := s3Clients[labels["bucket_region"]]; ok {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect we can ignore this for now as multiple regions/bucket auth can be attached to this lambda.

docs/sources/clients/lambda-promtail/_index.md Outdated Show resolved Hide resolved
Copy link
Member

@owen-d owen-d left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, let's get this merged :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants