diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 99d14d976a3..d8395a3dc47 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -33,6 +33,8 @@ https://github.com/elastic/beats/compare/v7.0.0-beta1...master[Check the HEAD di *Functionbeat* +- Correctly extract Kinesis Data field from the Kinesis Record. {pull}11141[11141] + ==== Bugfixes *Affecting all Beats* diff --git a/x-pack/functionbeat/provider/aws/transformer/transformer.go b/x-pack/functionbeat/provider/aws/transformer/transformer.go index 2b78bddaac9..2d597fef06b 100644 --- a/x-pack/functionbeat/provider/aws/transformer/transformer.go +++ b/x-pack/functionbeat/provider/aws/transformer/transformer.go @@ -58,19 +58,24 @@ func APIGatewayProxyRequest(request events.APIGatewayProxyRequest) beat.Event { } // KinesisEvent takes a kinesis event and create multiples beat events. +// DOCS: https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html func KinesisEvent(request events.KinesisEvent) []beat.Event { events := make([]beat.Event, len(request.Records)) for idx, record := range request.Records { events[idx] = beat.Event{ Timestamp: time.Now(), Fields: common.MapStr{ - "event_id": record.EventID, - "event_name": record.EventName, - "event_source": record.EventSource, - "event_source_arn": record.EventSourceArn, - "event_version": record.EventVersion, - "aws_region": record.AwsRegion, - // TODO: more meta data at KinesisRecord, need to check doc + "event_id": record.EventID, + "event_name": record.EventName, + "event_source": record.EventSource, + "event_source_arn": record.EventSourceArn, + "event_version": record.EventVersion, + "aws_region": record.AwsRegion, + "message": string(record.Kinesis.Data), + "kinesis_partition_key": record.Kinesis.PartitionKey, + "kinesis_schema_version": record.Kinesis.KinesisSchemaVersion, + "kinesis_sequence_number": record.Kinesis.SequenceNumber, + "kinesis_encryption_type": record.Kinesis.EncryptionType, }, } } diff --git a/x-pack/functionbeat/provider/aws/transformer/transformer_test.go b/x-pack/functionbeat/provider/aws/transformer/transformer_test.go new file mode 100644 index 00000000000..1d7aea5836e --- /dev/null +++ b/x-pack/functionbeat/provider/aws/transformer/transformer_test.go @@ -0,0 +1,55 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package transformer + +import ( + "testing" + + "github.com/aws/aws-lambda-go/events" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" +) + +func TestKinesis(t *testing.T) { + request := events.KinesisEvent{ + Records: []events.KinesisEventRecord{ + events.KinesisEventRecord{ + AwsRegion: "us-east-1", + EventID: "1234", + EventName: "connect", + EventSource: "web", + EventVersion: "1.0", + EventSourceArn: "arn:aws:iam::00000000:role/functionbeat", + Kinesis: events.KinesisRecord{ + Data: []byte("hello world"), + PartitionKey: "abc123", + SequenceNumber: "12345", + KinesisSchemaVersion: "1.0", + EncryptionType: "test", + }, + }, + }, + } + + events := KinesisEvent(request) + assert.Equal(t, 1, len(events)) + + fields := common.MapStr{ + "event_id": "1234", + "event_name": "connect", + "event_source": "web", + "event_source_arn": "arn:aws:iam::00000000:role/functionbeat", + "event_version": "1.0", + "aws_region": "us-east-1", + "message": "hello world", + "kinesis_partition_key": "abc123", + "kinesis_schema_version": "1.0", + "kinesis_sequence_number": "12345", + "kinesis_encryption_type": "test", + } + + assert.Equal(t, fields, events[0].Fields) +}