Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] Improve error message when handleSQSMessage failed #14113

Merged
merged 11 commits into from
Oct 25, 2019
5 changes: 3 additions & 2 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix a race condition in the TCP input when close the client socket. {pull}13038[13038]
- cisco/asa fileset: Renamed log.original to event.original and cisco.asa.list_id to cisco.asa.rule_name. {pull}13286[13286]
- cisco/asa fileset: Fix parsing of 302021 message code. {pull}13476[13476]
- Add support for gzipped files in S3 input {pull}13980[13980]
- Add support for all the ObjectCreated events in S3 input. {pull}14077[14077]

*Heartbeat*

Expand Down Expand Up @@ -181,6 +179,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix calculation of `network.bytes` and `network.packets` for bi-directional netflow events. {pull}14111[14111]
- Accept '-' as http.response.body.bytes in apache module. {pull}14137[14137]
- Fix timezone parsing of MySQL module ingest pipelines. {pull}14130[14130]
- Improve error message in s3 input when handleSQSMessage failed. {pull}14113[14113]
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved

*Heartbeat*

Expand Down Expand Up @@ -368,6 +367,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add ELB fileset to AWS module. {pull}14020[14020]
- Add module for MISP (Malware Information Sharing Platform). {pull}13805[13805]
- Add `source.bytes` and `source.packets` for uni-directional netflow events. {pull}14111[14111]
- Add support for gzipped files in S3 input. {pull}13980[13980]
- Add support for all the ObjectCreated events in S3 input. {pull}14077[14077]
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved
- Add Kibana Dashboard for MISP module. {pull}14147[14147]
- Add more filesets to Zeek module. {pull}14150[14150]

Expand Down
11 changes: 8 additions & 3 deletions x-pack/filebeat/input/s3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,16 +248,16 @@ func (p *s3Input) processMessage(svcS3 s3iface.ClientAPI, message sqs.Message, w

s3Infos, err := handleSQSMessage(message)
if err != nil {
p.logger.Error(errors.Wrap(err, "handleMessage failed"))
p.logger.Error(errors.Wrap(err, "handleSQSMessage failed"))
return
}

// read from s3 object and create event for each log line
err = p.handleS3Objects(svcS3, s3Infos, errC)
if err != nil {
err = errors.Wrap(err, "handleS3Objects failed")
errC <- err
p.logger.Error(err)
errC <- err
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching the order here so the error message get logged first and then send to error channel. Otherwise the error log never get published.

}
}

Expand All @@ -276,7 +276,10 @@ func (p *s3Input) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.Mess
}
p.logger.Warnf("Message visibility timeout updated to %v", visibilityTimeout)
} else {
p.logger.Debug("ACK done, deleting message from SQS")
// When ACK done, message will be deleted. Or when message is
// not s3 ObjectCreated event related(handleSQSMessage function
// failed), it will be removed as well.
p.logger.Debug("Deleting message from SQS: ", message.MessageId)
// only delete sqs message when errC is closed with no error
err := p.deleteMessage(queueURL, *message.ReceiptHandle, svcSQS)
if err != nil {
Expand Down Expand Up @@ -347,6 +350,8 @@ func handleSQSMessage(m sqs.Message) ([]s3Info, error) {
key: record.S3.object.Key,
arn: record.S3.bucket.Arn,
})
} else {
return nil, errors.New("this SQS queue should be dedicated to s3 ObjectCreated event notifications")
}
}
return s3Infos, nil
Expand Down
44 changes: 36 additions & 8 deletions x-pack/filebeat/input/s3/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestGetRegionFromQueueURL(t *testing.T) {
}

func TestHandleMessage(t *testing.T) {
cases := []struct {
casesPositive := []struct {
title string
message sqs.Message
expectedS3Infos []s3Info
Expand All @@ -91,6 +91,37 @@ func TestHandleMessage(t *testing.T) {
},
},
},
{
"sqs message with event source aws:s3 and event name ObjectCreated:CompleteMultipartUpload",
sqs.Message{
Body: awssdk.String("{\"Records\":[{\"eventSource\":\"aws:s3\",\"awsRegion\":\"ap-southeast-1\",\"eventTime\":\"2019-06-21T16:16:54.629Z\",\"eventName\":\"ObjectCreated:CompleteMultipartUpload\",\"s3\":{\"configurationId\":\"object-created-event\",\"bucket\":{\"name\":\"test-s3-ks-2\",\"arn\":\"arn:aws:s3:::test-s3-ks-2\"},\"object\":{\"key\":\"server-access-logging2019-06-21-16-16-54-E68E4316CEB285AA\"}}}]}"),
},
[]s3Info{
{
name: "test-s3-ks-2",
key: "server-access-logging2019-06-21-16-16-54-E68E4316CEB285AA",
},
},
},
}

for _, c := range casesPositive {
t.Run(c.title, func(t *testing.T) {
s3Info, err := handleSQSMessage(c.message)
assert.NoError(t, err)
assert.Equal(t, len(c.expectedS3Infos), len(s3Info))
if len(s3Info) > 0 {
assert.Equal(t, c.expectedS3Infos[0].key, s3Info[0].key)
assert.Equal(t, c.expectedS3Infos[0].name, s3Info[0].name)
}
})
}

casesNegative := []struct {
title string
message sqs.Message
expectedS3Infos []s3Info
}{
{
"sqs message with event source aws:s3 and event name ObjectRemoved:Delete",
sqs.Message{
Expand All @@ -107,17 +138,14 @@ func TestHandleMessage(t *testing.T) {
},
}

for _, c := range cases {
for _, c := range casesNegative {
t.Run(c.title, func(t *testing.T) {
s3Info, err := handleSQSMessage(c.message)
assert.NoError(t, err)
assert.Equal(t, len(c.expectedS3Infos), len(s3Info))
if len(s3Info) > 0 {
assert.Equal(t, c.expectedS3Infos[0].key, s3Info[0].key)
assert.Equal(t, c.expectedS3Infos[0].name, s3Info[0].name)
}
assert.Error(t, err)
assert.Nil(t, s3Info)
})
}

}

func TestNewS3BucketReader(t *testing.T) {
Expand Down