Skip to content

Commit

Permalink
Cherry-pick #14113 to 7.5: [Filebeat] Improve error message when hand…
Browse files Browse the repository at this point in the history
…leSQSMessage failed (#14248)

* [Filebeat] Improve error message when handleSQSMessage failed (#14113)

* Add message id to debug log

(cherry picked from commit b089094)

* run mage fmt under x-pack/libbeat
  • Loading branch information
kaiyan-sheng committed Oct 25, 2019
1 parent f728bd0 commit ad11393
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Filebeat*

- Improve error message in s3 input when handleSQSMessage failed. {pull}14113[14113]

*Heartbeat*

Expand Down
11 changes: 8 additions & 3 deletions x-pack/filebeat/input/s3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,16 +248,16 @@ func (p *s3Input) processMessage(svcS3 s3iface.ClientAPI, message sqs.Message, w

s3Infos, err := handleSQSMessage(message)
if err != nil {
p.logger.Error(errors.Wrap(err, "handleMessage failed"))
p.logger.Error(errors.Wrap(err, "handleSQSMessage failed"))
return
}

// read from s3 object and create event for each log line
err = p.handleS3Objects(svcS3, s3Infos, errC)
if err != nil {
err = errors.Wrap(err, "handleS3Objects failed")
errC <- err
p.logger.Error(err)
errC <- err
}
}

Expand All @@ -276,7 +276,10 @@ func (p *s3Input) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.Mess
}
p.logger.Warnf("Message visibility timeout updated to %v", visibilityTimeout)
} else {
p.logger.Debug("ACK done, deleting message from SQS")
// When ACK done, message will be deleted. Or when message is
// not s3 ObjectCreated event related(handleSQSMessage function
// failed), it will be removed as well.
p.logger.Debug("Deleting message from SQS: ", message.MessageId)
// only delete sqs message when errC is closed with no error
err := p.deleteMessage(queueURL, *message.ReceiptHandle, svcSQS)
if err != nil {
Expand Down Expand Up @@ -347,6 +350,8 @@ func handleSQSMessage(m sqs.Message) ([]s3Info, error) {
key: record.S3.object.Key,
arn: record.S3.bucket.Arn,
})
} else {
return nil, errors.New("this SQS queue should be dedicated to s3 ObjectCreated event notifications")
}
}
return s3Infos, nil
Expand Down
44 changes: 36 additions & 8 deletions x-pack/filebeat/input/s3/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestGetRegionFromQueueURL(t *testing.T) {
}

func TestHandleMessage(t *testing.T) {
cases := []struct {
casesPositive := []struct {
title string
message sqs.Message
expectedS3Infos []s3Info
Expand All @@ -91,6 +91,37 @@ func TestHandleMessage(t *testing.T) {
},
},
},
{
"sqs message with event source aws:s3 and event name ObjectCreated:CompleteMultipartUpload",
sqs.Message{
Body: awssdk.String("{\"Records\":[{\"eventSource\":\"aws:s3\",\"awsRegion\":\"ap-southeast-1\",\"eventTime\":\"2019-06-21T16:16:54.629Z\",\"eventName\":\"ObjectCreated:CompleteMultipartUpload\",\"s3\":{\"configurationId\":\"object-created-event\",\"bucket\":{\"name\":\"test-s3-ks-2\",\"arn\":\"arn:aws:s3:::test-s3-ks-2\"},\"object\":{\"key\":\"server-access-logging2019-06-21-16-16-54-E68E4316CEB285AA\"}}}]}"),
},
[]s3Info{
{
name: "test-s3-ks-2",
key: "server-access-logging2019-06-21-16-16-54-E68E4316CEB285AA",
},
},
},
}

for _, c := range casesPositive {
t.Run(c.title, func(t *testing.T) {
s3Info, err := handleSQSMessage(c.message)
assert.NoError(t, err)
assert.Equal(t, len(c.expectedS3Infos), len(s3Info))
if len(s3Info) > 0 {
assert.Equal(t, c.expectedS3Infos[0].key, s3Info[0].key)
assert.Equal(t, c.expectedS3Infos[0].name, s3Info[0].name)
}
})
}

casesNegative := []struct {
title string
message sqs.Message
expectedS3Infos []s3Info
}{
{
"sqs message with event source aws:s3 and event name ObjectRemoved:Delete",
sqs.Message{
Expand All @@ -107,17 +138,14 @@ func TestHandleMessage(t *testing.T) {
},
}

for _, c := range cases {
for _, c := range casesNegative {
t.Run(c.title, func(t *testing.T) {
s3Info, err := handleSQSMessage(c.message)
assert.NoError(t, err)
assert.Equal(t, len(c.expectedS3Infos), len(s3Info))
if len(s3Info) > 0 {
assert.Equal(t, c.expectedS3Infos[0].key, s3Info[0].key)
assert.Equal(t, c.expectedS3Infos[0].name, s3Info[0].name)
}
assert.Error(t, err)
assert.Nil(t, s3Info)
})
}

}

func TestNewS3BucketReader(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions x-pack/libbeat/common/aws/credentials.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (

// ConfigAWS is a structure defined for AWS credentials
type ConfigAWS struct {
AccessKeyID string `config:"access_key_id"`
SecretAccessKey string `config:"secret_access_key"`
SessionToken string `config:"session_token"`
ProfileName string `config:"credential_profile_name"`
AccessKeyID string `config:"access_key_id"`
SecretAccessKey string `config:"secret_access_key"`
SessionToken string `config:"session_token"`
ProfileName string `config:"credential_profile_name"`
SharedCredentialFile string `config:"shared_credential_file"`
}

Expand Down

0 comments on commit ad11393

Please sign in to comment.