diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 22e3ad15eb8..f1bc8de942c 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -316,6 +316,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Update `aws-s3` input to connect to non AWS S3 buckets {issue}28222[28222] {pull}28234[28234] - Sophos UTM: Support logs containing hostname in syslog header. {pull}28638[28638] - Moving Oracle Filebeat module to GA. {pull}28754[28754] +- Add support in aws-s3 input for s3 notification from SNS to SQS. {pull}28800[28800] *Heartbeat* diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 5a4a6dc8b3d..696a7368e3f 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -17,7 +17,7 @@ The use of SQS notification is preferred: polling list of S3 objects is expensiv in terms of performance and costs and should be preferably used only when no SQS notification can be attached to the S3 buckets. This input can, for example, be used to receive S3 access logs to monitor detailed records for the requests that -are made to a bucket. +are made to a bucket. This input also supports S3 notification from SNS to SQS. SQS notification method is enabled setting `queue_url` configuration value. S3 bucket list polling method is enabled setting `bucket_arn` configuration value. @@ -386,6 +386,14 @@ create a notification through SQS. Please see https://docs.aws.amazon.com/AmazonS3/latest/dev/ways-to-add-notification-config-to-bucket.html#step1-create-sqs-queue-for-notification[create-sqs-queue-for-notification] for more details. +[float] +=== S3 -> SNS -> SQS setup +If you would like to use the bucket notification in multiple different consumers +(others than {beatname_lc}), you should use an SNS topic for the bucket notification. +Please see https://docs.aws.amazon.com/AmazonS3/latest/userguide/ways-to-add-notification-config-to-bucket.html#step1-create-sns-topic-for-notification[create-SNS-topic-for-notification] +for more details. SQS queue will be configured as a +https://docs.aws.amazon.com/sns/latest/dg/sns-sqs-as-subscriber.html[subscriber to the SNS topic]. + [float] === Parallel Processing diff --git a/x-pack/filebeat/input/awss3/_meta/terraform/README.md b/x-pack/filebeat/input/awss3/_meta/terraform/README.md index 7ab27781704..d5614b99a92 100644 --- a/x-pack/filebeat/input/awss3/_meta/terraform/README.md +++ b/x-pack/filebeat/input/awss3/_meta/terraform/README.md @@ -1,9 +1,9 @@ # Terraform setup for AWS S3 Input Integration Tests -This directory contains a Terrafrom module that creates the AWS resources needed +This directory contains a Terraform module that creates the AWS resources needed for executing the integration tests for the `aws-s3` Filebeat input. It creates an S3 bucket and SQS queue and configures S3 `ObjectCreated:*` notifications to -be delivered to SQS. +be delivered to SQS. It also creates a second S3 bucket, SNS topic, SQS queue and configures S3 `ObjectCreated:*` notifications to be delivered to SNS and also creates a subscription for this SNS topic to SQS queue to automatically place messages sent to SNS topic in SQS queue. It outputs configuration information that is consumed by the tests to `outputs.yml`. The AWS resources are randomly named to prevent name collisions @@ -33,7 +33,7 @@ to match the AWS region of the profile you are using. 4. Execute the integration test. ``` - cd x-pack/filebeat/inputs/awss3 + cd x-pack/filebeat/input/awss3 go test -tags aws,integration -run TestInputRun.+ -v . ``` diff --git a/x-pack/filebeat/input/awss3/_meta/terraform/main.tf b/x-pack/filebeat/input/awss3/_meta/terraform/main.tf index 1b22b8bbfdb..62e86abc787 100644 --- a/x-pack/filebeat/input/awss3/_meta/terraform/main.tf +++ b/x-pack/filebeat/input/awss3/_meta/terraform/main.tf @@ -60,3 +60,77 @@ resource "aws_s3_bucket_notification" "bucket_notification" { aws_sqs_queue.filebeat-integtest, ] } + +resource "aws_sns_topic" "filebeat-integtest-sns" { + name = "filebeat-s3-integtest-sns-${random_string.random.result}" + + policy = < SNS -> SQS + if events.TopicArn != "" { + dec := json.NewDecoder(strings.NewReader(events.Message)) + if err := dec.Decode(&events); err != nil { + p.log.Debugw("Invalid SQS message body.", "sqs_message_body", body) + return nil, fmt.Errorf("failed to decode SQS message body as an S3 notification: %w", err) + } + } + return p.getS3Info(events) +} + +func (p *sqsS3EventProcessor) getS3Info(events s3EventsV2) ([]s3EventV2, error) { var out []s3EventV2 for _, record := range events.Records { if !p.isObjectCreatedEvents(record) { @@ -211,7 +227,6 @@ func (p *sqsS3EventProcessor) getS3Notifications(body string) ([]s3EventV2, erro out = append(out, record) } - return out, nil } diff --git a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go index 8865c5d30cd..9edd5ec4ed9 100644 --- a/x-pack/filebeat/input/awss3/sqs_s3_event_test.go +++ b/x-pack/filebeat/input/awss3/sqs_s3_event_test.go @@ -184,6 +184,16 @@ func TestSqsProcessor_getS3Notifications(t *testing.T) { require.NoError(t, err) assert.Len(t, events, 0) }) + + t.Run("sns-sqs notification", func(t *testing.T) { + msg := newSNSSQSMessage() + events, err := p.getS3Notifications(*msg.Body) + require.NoError(t, err) + assert.Len(t, events, 1) + assert.Equal(t, "test-object-key", events[0].S3.Object.Key) + assert.Equal(t, "arn:aws:s3:::vpc-flow-logs-ks", events[0].S3.Bucket.ARN) + assert.Equal(t, "vpc-flow-logs-ks", events[0].S3.Bucket.Name) + }) } func TestNonRecoverableError(t *testing.T) { diff --git a/x-pack/filebeat/input/awss3/sqs_test.go b/x-pack/filebeat/input/awss3/sqs_test.go index 4940b4a6eca..a8b6e7b5f2a 100644 --- a/x-pack/filebeat/input/awss3/sqs_test.go +++ b/x-pack/filebeat/input/awss3/sqs_test.go @@ -126,6 +126,28 @@ func newSQSMessage(events ...s3EventV2) sqs.Message { } } +func newSNSSQSMessage() sqs.Message { + body, err := json.Marshal(s3EventsV2{ + TopicArn: "arn:aws:sns:us-east-1:1234:sns-topic", + Message: "{\"Records\":[{\"eventSource\":\"aws:s3\",\"awsRegion\":\"us-east-1\",\"eventName\":\"ObjectCreated:Put\",\"s3\":{\"configurationId\":\"sns-notification-vpc-flow-logs\",\"bucket\":{\"name\":\"vpc-flow-logs-ks\",\"arn\":\"arn:aws:s3:::vpc-flow-logs-ks\"},\"object\":{\"key\":\"test-object-key\"}}}]}", + }) + if err != nil { + panic(err) + } + + hash := sha256.Sum256(body) + id, _ := uuid.FromBytes(hash[:16]) + messageID := id.String() + receipt := "receipt-" + messageID + bodyStr := string(body) + + return sqs.Message{ + Body: &bodyStr, + MessageId: &messageID, + ReceiptHandle: &receipt, + } +} + func newS3Event(key string) s3EventV2 { record := s3EventV2{ AWSRegion: "us-east-1",