diff --git a/tools/lambda-promtail/lambda-promtail/kinesis.go b/tools/lambda-promtail/lambda-promtail/kinesis.go index 80491fc265ab..800dd7903eef 100644 --- a/tools/lambda-promtail/lambda-promtail/kinesis.go +++ b/tools/lambda-promtail/lambda-promtail/kinesis.go @@ -9,7 +9,11 @@ import ( "github.com/prometheus/common/model" ) -func parseKinesisEvent(ctx context.Context, b *batch, ev *events.KinesisEvent) error { +func parseKinesisEvent(ctx context.Context, b batchIf, ev *events.KinesisEvent) error { + if ev == nil { + return nil + } + for _, record := range ev.Records { timestamp := time.UnixMilli(record.Kinesis.ApproximateArrivalTimestamp.Unix()) diff --git a/tools/lambda-promtail/lambda-promtail/kinesis_test.go b/tools/lambda-promtail/lambda-promtail/kinesis_test.go new file mode 100644 index 000000000000..cf4bba296417 --- /dev/null +++ b/tools/lambda-promtail/lambda-promtail/kinesis_test.go @@ -0,0 +1,62 @@ +package main + +import ( + "context" + "encoding/json" + "io/ioutil" + "testing" + + "github.com/aws/aws-lambda-go/events" + "github.com/grafana/loki/pkg/logproto" + "github.com/stretchr/testify/require" +) + +type MockBatch struct { + streams map[string]*logproto.Stream + size int +} + +func (b *MockBatch) add(ctx context.Context, e entry) error { + return nil +} + +func (b *MockBatch) flushBatch(ctx context.Context) error { + return nil +} +func (b *MockBatch) encode() ([]byte, int, error) { + return nil, 0, nil +} +func (b *MockBatch) createPushRequest() (*logproto.PushRequest, int) { + return nil, 0 +} + +func ReadJSONFromFile(t *testing.T, inputFile string) []byte { + inputJSON, err := ioutil.ReadFile(inputFile) + if err != nil { + t.Errorf("could not open test file. details: %v", err) + } + + return inputJSON +} + +func TestLambdaPromtail_KinesisParseEvents(t *testing.T) { + inputJson, err := ioutil.ReadFile("../testdata/kinesis-event.json") + + if err != nil { + t.Errorf("could not open test file. details: %v", err) + } + + var testEvent events.KinesisEvent + if err := json.Unmarshal(inputJson, &testEvent); err != nil { + t.Errorf("could not unmarshal event. details: %v", err) + } + + ctx := context.TODO() + b := &MockBatch{ + streams: map[string]*logproto.Stream{}, + } + + err = parseKinesisEvent(ctx, b, &testEvent) + + require.Nil(t, err) +} diff --git a/tools/lambda-promtail/lambda-promtail/promtail.go b/tools/lambda-promtail/lambda-promtail/promtail.go index 9df78fb61e80..5d649d4e46cd 100644 --- a/tools/lambda-promtail/lambda-promtail/promtail.go +++ b/tools/lambda-promtail/lambda-promtail/promtail.go @@ -39,6 +39,13 @@ type batch struct { size int } +type batchIf interface { + add(ctx context.Context, e entry) error + encode() ([]byte, int, error) + createPushRequest() (*logproto.PushRequest, int) + flushBatch(ctx context.Context) error +} + func newBatch(ctx context.Context, entries ...entry) (*batch, error) { b := &batch{ streams: map[string]*logproto.Stream{}, diff --git a/tools/lambda-promtail/testdata/kinesis-event.json b/tools/lambda-promtail/testdata/kinesis-event.json new file mode 100644 index 000000000000..c3cb2020f1e8 --- /dev/null +++ b/tools/lambda-promtail/testdata/kinesis-event.json @@ -0,0 +1,36 @@ +{ + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "s1", + "sequenceNumber": "49568167373333333333333333333333333333333333333333333333", + "data": "SGVsbG8gV29ybGQ=", + "approximateArrivalTimestamp": 1480641523.477 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000000:49568167373333333333333333333333333333333333333333333333", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/LambdaRole", + "awsRegion": "us-east-1", + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/simple-stream" + }, + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "s1", + "sequenceNumber": "49568167373333333334444444444444444444444444444444444444", + "data": "SGVsbG8gV29ybGQ=", + "approximateArrivalTimestamp": 1480841523.477 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000000:49568167373333333334444444444444444444444444444444444444", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/LambdaRole", + "awsRegion": "us-east-1", + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/simple-stream" + } + ] +} \ No newline at end of file