Skip to content

Commit

Permalink
Cherry-pick #11141 to 7.0: Extract all the fields for a Kinesis Recor…
Browse files Browse the repository at this point in the history
…d and not just the metadata (#11157)

Cherry-pick of PR #11141 to 7.0 branch. Original message: 

Add data field which contains raw data of the events and information
about the partition, the sequence and the schema.

Add a test to ensure the tranformation is OK from a KinesisRecord to a
beat.Event.
  • Loading branch information
ph committed Mar 8, 2019
1 parent 8f18f78 commit 0d8df09
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 7 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ https://github.com/elastic/beats/compare/v7.0.0-beta1...master[Check the HEAD di

*Functionbeat*

- Correctly extract Kinesis Data field from the Kinesis Record. {pull}11141[11141]

==== Bugfixes

*Affecting all Beats*
Expand Down
19 changes: 12 additions & 7 deletions x-pack/functionbeat/provider/aws/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,24 @@ func APIGatewayProxyRequest(request events.APIGatewayProxyRequest) beat.Event {
}

// KinesisEvent takes a kinesis event and create multiples beat events.
// DOCS: https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html
func KinesisEvent(request events.KinesisEvent) []beat.Event {
events := make([]beat.Event, len(request.Records))
for idx, record := range request.Records {
events[idx] = beat.Event{
Timestamp: time.Now(),
Fields: common.MapStr{
"event_id": record.EventID,
"event_name": record.EventName,
"event_source": record.EventSource,
"event_source_arn": record.EventSourceArn,
"event_version": record.EventVersion,
"aws_region": record.AwsRegion,
// TODO: more meta data at KinesisRecord, need to check doc
"event_id": record.EventID,
"event_name": record.EventName,
"event_source": record.EventSource,
"event_source_arn": record.EventSourceArn,
"event_version": record.EventVersion,
"aws_region": record.AwsRegion,
"message": string(record.Kinesis.Data),
"kinesis_partition_key": record.Kinesis.PartitionKey,
"kinesis_schema_version": record.Kinesis.KinesisSchemaVersion,
"kinesis_sequence_number": record.Kinesis.SequenceNumber,
"kinesis_encryption_type": record.Kinesis.EncryptionType,
},
}
}
Expand Down
55 changes: 55 additions & 0 deletions x-pack/functionbeat/provider/aws/transformer/transformer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package transformer

import (
"testing"

"github.com/aws/aws-lambda-go/events"
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/common"
)

func TestKinesis(t *testing.T) {
request := events.KinesisEvent{
Records: []events.KinesisEventRecord{
events.KinesisEventRecord{
AwsRegion: "us-east-1",
EventID: "1234",
EventName: "connect",
EventSource: "web",
EventVersion: "1.0",
EventSourceArn: "arn:aws:iam::00000000:role/functionbeat",
Kinesis: events.KinesisRecord{
Data: []byte("hello world"),
PartitionKey: "abc123",
SequenceNumber: "12345",
KinesisSchemaVersion: "1.0",
EncryptionType: "test",
},
},
},
}

events := KinesisEvent(request)
assert.Equal(t, 1, len(events))

fields := common.MapStr{
"event_id": "1234",
"event_name": "connect",
"event_source": "web",
"event_source_arn": "arn:aws:iam::00000000:role/functionbeat",
"event_version": "1.0",
"aws_region": "us-east-1",
"message": "hello world",
"kinesis_partition_key": "abc123",
"kinesis_schema_version": "1.0",
"kinesis_sequence_number": "12345",
"kinesis_encryption_type": "test",
}

assert.Equal(t, fields, events[0].Fields)
}

0 comments on commit 0d8df09

Please sign in to comment.