From d6783abebc0997dbd2649cc23840fc6b6881d065 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Fri, 25 Oct 2019 14:05:35 -0600 Subject: [PATCH] Cherry-pick #14113 to 7.5: [Filebeat] Improve error message when handleSQSMessage failed (#14248) * [Filebeat] Improve error message when handleSQSMessage failed (#14113) * Add message id to debug log (cherry picked from commit f6a19eb14f951858899be18b3c04e978903bade6) * run mage fmt under x-pack/libbeat --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/input/s3/input.go | 11 ++++-- x-pack/filebeat/input/s3/input_test.go | 44 +++++++++++++++++++----- x-pack/libbeat/common/aws/credentials.go | 8 ++--- 4 files changed, 49 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 45b2144814a..bb58773eeb2 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -46,6 +46,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Filebeat* +- Improve error message in s3 input when handleSQSMessage failed. {pull}14113[14113] *Heartbeat* diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index e5e2175cb9d..aaa17dc2b92 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -248,7 +248,7 @@ func (p *s3Input) processMessage(svcS3 s3iface.ClientAPI, message sqs.Message, w s3Infos, err := handleSQSMessage(message) if err != nil { - p.logger.Error(errors.Wrap(err, "handleMessage failed")) + p.logger.Error(errors.Wrap(err, "handleSQSMessage failed")) return } @@ -256,8 +256,8 @@ func (p *s3Input) processMessage(svcS3 s3iface.ClientAPI, message sqs.Message, w err = p.handleS3Objects(svcS3, s3Infos, errC) if err != nil { err = errors.Wrap(err, "handleS3Objects failed") - errC <- err p.logger.Error(err) + errC <- err } } @@ -276,7 +276,10 @@ func (p *s3Input) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.Mess } p.logger.Warnf("Message visibility timeout updated to %v", visibilityTimeout) } else { - p.logger.Debug("ACK done, deleting message from SQS") + // When ACK done, message will be deleted. Or when message is + // not s3 ObjectCreated event related(handleSQSMessage function + // failed), it will be removed as well. + p.logger.Debug("Deleting message from SQS: ", message.MessageId) // only delete sqs message when errC is closed with no error err := p.deleteMessage(queueURL, *message.ReceiptHandle, svcSQS) if err != nil { @@ -347,6 +350,8 @@ func handleSQSMessage(m sqs.Message) ([]s3Info, error) { key: record.S3.object.Key, arn: record.S3.bucket.Arn, }) + } else { + return nil, errors.New("this SQS queue should be dedicated to s3 ObjectCreated event notifications") } } return s3Infos, nil diff --git a/x-pack/filebeat/input/s3/input_test.go b/x-pack/filebeat/input/s3/input_test.go index 8b897de6729..9290b3f664d 100644 --- a/x-pack/filebeat/input/s3/input_test.go +++ b/x-pack/filebeat/input/s3/input_test.go @@ -74,7 +74,7 @@ func TestGetRegionFromQueueURL(t *testing.T) { } func TestHandleMessage(t *testing.T) { - cases := []struct { + casesPositive := []struct { title string message sqs.Message expectedS3Infos []s3Info @@ -91,6 +91,37 @@ func TestHandleMessage(t *testing.T) { }, }, }, + { + "sqs message with event source aws:s3 and event name ObjectCreated:CompleteMultipartUpload", + sqs.Message{ + Body: awssdk.String("{\"Records\":[{\"eventSource\":\"aws:s3\",\"awsRegion\":\"ap-southeast-1\",\"eventTime\":\"2019-06-21T16:16:54.629Z\",\"eventName\":\"ObjectCreated:CompleteMultipartUpload\",\"s3\":{\"configurationId\":\"object-created-event\",\"bucket\":{\"name\":\"test-s3-ks-2\",\"arn\":\"arn:aws:s3:::test-s3-ks-2\"},\"object\":{\"key\":\"server-access-logging2019-06-21-16-16-54-E68E4316CEB285AA\"}}}]}"), + }, + []s3Info{ + { + name: "test-s3-ks-2", + key: "server-access-logging2019-06-21-16-16-54-E68E4316CEB285AA", + }, + }, + }, + } + + for _, c := range casesPositive { + t.Run(c.title, func(t *testing.T) { + s3Info, err := handleSQSMessage(c.message) + assert.NoError(t, err) + assert.Equal(t, len(c.expectedS3Infos), len(s3Info)) + if len(s3Info) > 0 { + assert.Equal(t, c.expectedS3Infos[0].key, s3Info[0].key) + assert.Equal(t, c.expectedS3Infos[0].name, s3Info[0].name) + } + }) + } + + casesNegative := []struct { + title string + message sqs.Message + expectedS3Infos []s3Info + }{ { "sqs message with event source aws:s3 and event name ObjectRemoved:Delete", sqs.Message{ @@ -107,17 +138,14 @@ func TestHandleMessage(t *testing.T) { }, } - for _, c := range cases { + for _, c := range casesNegative { t.Run(c.title, func(t *testing.T) { s3Info, err := handleSQSMessage(c.message) - assert.NoError(t, err) - assert.Equal(t, len(c.expectedS3Infos), len(s3Info)) - if len(s3Info) > 0 { - assert.Equal(t, c.expectedS3Infos[0].key, s3Info[0].key) - assert.Equal(t, c.expectedS3Infos[0].name, s3Info[0].name) - } + assert.Error(t, err) + assert.Nil(t, s3Info) }) } + } func TestNewS3BucketReader(t *testing.T) { diff --git a/x-pack/libbeat/common/aws/credentials.go b/x-pack/libbeat/common/aws/credentials.go index 67570941cdc..3a3bf5b80e6 100644 --- a/x-pack/libbeat/common/aws/credentials.go +++ b/x-pack/libbeat/common/aws/credentials.go @@ -12,10 +12,10 @@ import ( // ConfigAWS is a structure defined for AWS credentials type ConfigAWS struct { - AccessKeyID string `config:"access_key_id"` - SecretAccessKey string `config:"secret_access_key"` - SessionToken string `config:"session_token"` - ProfileName string `config:"credential_profile_name"` + AccessKeyID string `config:"access_key_id"` + SecretAccessKey string `config:"secret_access_key"` + SessionToken string `config:"session_token"` + ProfileName string `config:"credential_profile_name"` SharedCredentialFile string `config:"shared_credential_file"` }