From 927c49ad008f0cee7ca64e5be5048416ce15c369 Mon Sep 17 00:00:00 2001 From: Ravi Naik Date: Thu, 27 Aug 2020 01:36:46 -0700 Subject: [PATCH] Add paralelization_factor to Functionbeat Kinesis (#20727) ## What does this PR do? This PR adds the ability to add the parallelization factor configuration to functionbeat when reading from Kinesis streams. https://aws.amazon.com/about-aws/whats-new/2019/11/aws-lambda-supports-parallelization-factor-for-kinesis-and-dynamodb-event-sources/ ## Why is it important? This configuration allows you to process one shard of a Kinesis or DynamoDB data stream with more than one Lambda invocation simultaneously. - Closes #16901 (cherry picked from commit 98c434a27ab4d39cc9d4dcfe215db06946ab394a) --- CHANGELOG.next.asciidoc | 1 + .../_meta/config/beat.reference.yml.tmpl | 12 +++++-- .../functionbeat/_meta/config/beat.yml.tmpl | 12 +++++-- .../docs/config-options-aws.asciidoc | 9 ++++- .../functionbeat/functionbeat.reference.yml | 12 +++++-- x-pack/functionbeat/functionbeat.yml | 12 +++++-- .../functionbeat/provider/aws/aws/kinesis.go | 21 ++++++----- .../provider/aws/aws/kinesis_test.go | 35 ++++++++++++++++--- 8 files changed, 92 insertions(+), 22 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 0abd8398a53..e7f2e14a323 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -875,6 +875,7 @@ field. You can revert this change by configuring tags for the module and omittin - Add monitoring info about triggered functions. {pull}14876[14876] - Add Google Cloud Platform support. {pull}13598[13598] - Add basic ECS categorization and `cloud` fields. {pull}19174[19174] +- Add support for parallelization factor for kinesis. {pull}20727[20727] *Winlogbeat* diff --git a/x-pack/functionbeat/_meta/config/beat.reference.yml.tmpl b/x-pack/functionbeat/_meta/config/beat.reference.yml.tmpl index c306fb0ac2a..b0ec63db137 100644 --- a/x-pack/functionbeat/_meta/config/beat.reference.yml.tmpl +++ b/x-pack/functionbeat/_meta/config/beat.reference.yml.tmpl @@ -196,7 +196,7 @@ functionbeat.provider.aws.functions: # List of Kinesis streams. triggers: # Arn for the Kinesis stream. - - event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents + - event_source_arn: arn:aws:kinesis:us-east-1:xxxxx:myevents # batch_size is the number of events read in a batch. # Default is 10. @@ -206,6 +206,10 @@ functionbeat.provider.aws.functions: # Default is trim_horizon. #starting_position: "trim_horizon" + # parallelization_factor is the number of batches to process from each shard concurrently. + # Default is 1. + #parallelization_factor: 1 + # Set to true to publish fields with null values in events. #keep_null: false @@ -263,7 +267,7 @@ functionbeat.provider.aws.functions: # List of Kinesis streams. triggers: # Arn for the Kinesis stream. - - event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents + - event_source_arn: arn:aws:kinesis:us-east-1:xxxxx:myevents # batch_size is the number of events read in a batch. # Default is 10. @@ -273,6 +277,10 @@ functionbeat.provider.aws.functions: # Default is trim_horizon. #starting_position: "trim_horizon" + # parallelization_factor is the number of batches to process from each shard concurrently. + # Default is 1. + #parallelization_factor: 1 + # Set to true to publish fields with null values in events. #keep_null: false diff --git a/x-pack/functionbeat/_meta/config/beat.yml.tmpl b/x-pack/functionbeat/_meta/config/beat.yml.tmpl index 533d33dc599..00caf63d94c 100644 --- a/x-pack/functionbeat/_meta/config/beat.yml.tmpl +++ b/x-pack/functionbeat/_meta/config/beat.yml.tmpl @@ -170,7 +170,7 @@ functionbeat.provider.aws.functions: # List of Kinesis streams. triggers: # Arn for the Kinesis stream. - - event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents + - event_source_arn: arn:aws:kinesis:us-east-1:xxxxx:myevents # batch_size is the number of events read in a batch. # Default is 10. @@ -180,6 +180,10 @@ functionbeat.provider.aws.functions: # Default is trim_horizon. #starting_position: "trim_horizon" + # parallelization_factor is the number of batches to process from each shard concurrently. + # Default is 1. + #parallelization_factor: 1 + # Create a function that accepts Cloudwatch logs from Kinesis streams. - name: cloudwatch-logs-kinesis enabled: false @@ -233,7 +237,7 @@ functionbeat.provider.aws.functions: # List of Kinesis streams. triggers: # Arn for the Kinesis stream. - - event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents + - event_source_arn: arn:aws:kinesis:us-east-1:xxxxx:myevents # batch_size is the number of events read in a batch. # Default is 10. @@ -243,6 +247,10 @@ functionbeat.provider.aws.functions: # Default is trim_horizon. #starting_position: "trim_horizon" + # parallelization_factor is the number of batches to process from each shard concurrently. + # Default is 1. + #parallelization_factor: 1 + # Configure functions to run on Google Cloud Platform, currently we assume that the credentials # are present in the environment to correctly create the function when using the CLI. # diff --git a/x-pack/functionbeat/docs/config-options-aws.asciidoc b/x-pack/functionbeat/docs/config-options-aws.asciidoc index fe2550c12d7..dd52ef21ad1 100644 --- a/x-pack/functionbeat/docs/config-options-aws.asciidoc +++ b/x-pack/functionbeat/docs/config-options-aws.asciidoc @@ -176,7 +176,7 @@ Set this option to an ARN that points to an SQS queue. [id="{beatname_lc}-batch-size"] ==== `batch_size` -The number of events to read from a Kinesis stream, the minimal values is 100 and the maximun is +The number of events to read from a Kinesis stream, the minimum value is 100 and the maximum is 10000. The default is 100. [float] @@ -186,6 +186,13 @@ The number of events to read from a Kinesis stream, the minimal values is 100 an The starting position to read from a Kinesis stream, valids values are `trim_horizon` and `latest`. The default is trim_horizon. +[float] +[id="{beatname_lc}-parallelization-factor"] +==== `parallelization_factor` + +The number of batches to process from each shard concurrently, the minimum value is 1 and the maximum is 10 +The default is 1. + [float] [id="{beatname_lc}-keep-null"] ==== `keep_null` diff --git a/x-pack/functionbeat/functionbeat.reference.yml b/x-pack/functionbeat/functionbeat.reference.yml index 9dc723f697d..04d44b055c7 100644 --- a/x-pack/functionbeat/functionbeat.reference.yml +++ b/x-pack/functionbeat/functionbeat.reference.yml @@ -196,7 +196,7 @@ functionbeat.provider.aws.functions: # List of Kinesis streams. triggers: # Arn for the Kinesis stream. - - event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents + - event_source_arn: arn:aws:kinesis:us-east-1:xxxxx:myevents # batch_size is the number of events read in a batch. # Default is 10. @@ -206,6 +206,10 @@ functionbeat.provider.aws.functions: # Default is trim_horizon. #starting_position: "trim_horizon" + # parallelization_factor is the number of batches to process from each shard concurrently. + # Default is 1. + #parallelization_factor: 1 + # Set to true to publish fields with null values in events. #keep_null: false @@ -263,7 +267,7 @@ functionbeat.provider.aws.functions: # List of Kinesis streams. triggers: # Arn for the Kinesis stream. - - event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents + - event_source_arn: arn:aws:kinesis:us-east-1:xxxxx:myevents # batch_size is the number of events read in a batch. # Default is 10. @@ -273,6 +277,10 @@ functionbeat.provider.aws.functions: # Default is trim_horizon. #starting_position: "trim_horizon" + # parallelization_factor is the number of batches to process from each shard concurrently. + # Default is 1. + #parallelization_factor: 1 + # Set to true to publish fields with null values in events. #keep_null: false diff --git a/x-pack/functionbeat/functionbeat.yml b/x-pack/functionbeat/functionbeat.yml index ed637679c85..21109c351bc 100644 --- a/x-pack/functionbeat/functionbeat.yml +++ b/x-pack/functionbeat/functionbeat.yml @@ -170,7 +170,7 @@ functionbeat.provider.aws.functions: # List of Kinesis streams. triggers: # Arn for the Kinesis stream. - - event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents + - event_source_arn: arn:aws:kinesis:us-east-1:xxxxx:myevents # batch_size is the number of events read in a batch. # Default is 10. @@ -180,6 +180,10 @@ functionbeat.provider.aws.functions: # Default is trim_horizon. #starting_position: "trim_horizon" + # parallelization_factor is the number of batches to process from each shard concurrently. + # Default is 1. + #parallelization_factor: 1 + # Create a function that accepts Cloudwatch logs from Kinesis streams. - name: cloudwatch-logs-kinesis enabled: false @@ -233,7 +237,7 @@ functionbeat.provider.aws.functions: # List of Kinesis streams. triggers: # Arn for the Kinesis stream. - - event_source_arn: arn:aws:sqs:us-east-1:xxxxx:myevents + - event_source_arn: arn:aws:kinesis:us-east-1:xxxxx:myevents # batch_size is the number of events read in a batch. # Default is 10. @@ -243,6 +247,10 @@ functionbeat.provider.aws.functions: # Default is trim_horizon. #starting_position: "trim_horizon" + # parallelization_factor is the number of batches to process from each shard concurrently. + # Default is 1. + #parallelization_factor: 1 + # Configure functions to run on Google Cloud Platform, currently we assume that the credentials # are present in the environment to correctly create the function when using the CLI. # diff --git a/x-pack/functionbeat/provider/aws/aws/kinesis.go b/x-pack/functionbeat/provider/aws/aws/kinesis.go index 50f0344ef57..86d1d92959a 100644 --- a/x-pack/functionbeat/provider/aws/aws/kinesis.go +++ b/x-pack/functionbeat/provider/aws/aws/kinesis.go @@ -93,17 +93,19 @@ func (cfg *KinesisConfig) Validate() error { // KinesisTriggerConfig configuration for the current trigger. type KinesisTriggerConfig struct { - EventSourceArn string `config:"event_source_arn" validate:"required"` - BatchSize int `config:"batch_size" validate:"min=100,max=10000"` - StartingPosition startingPosition `config:"starting_position"` + EventSourceArn string `config:"event_source_arn" validate:"required"` + BatchSize int `config:"batch_size" validate:"min=100,max=10000"` + StartingPosition startingPosition `config:"starting_position"` + ParallelizationFactor int `config:"parallelization_factor" validate:"min=1,max=10"` } // Unpack unpacks the trigger and make sure the defaults settings are correctly sets. func (c *KinesisTriggerConfig) Unpack(cfg *common.Config) error { type tmpConfig KinesisTriggerConfig config := tmpConfig{ - BatchSize: 100, - StartingPosition: trimHorizonPos, + BatchSize: 100, + StartingPosition: trimHorizonPos, + ParallelizationFactor: 1, } if err := cfg.Unpack(&config); err != nil { return err @@ -176,10 +178,11 @@ func (k *Kinesis) Template() *cloudformation.Template { for _, trigger := range k.config.Triggers { resourceName := prefix(k.Name() + trigger.EventSourceArn) template.Resources[resourceName] = &lambda.EventSourceMapping{ - BatchSize: trigger.BatchSize, - EventSourceArn: trigger.EventSourceArn, - FunctionName: cloudformation.GetAtt(prefix(""), "Arn"), - StartingPosition: trigger.StartingPosition.String(), + BatchSize: trigger.BatchSize, + ParallelizationFactor: trigger.ParallelizationFactor, + EventSourceArn: trigger.EventSourceArn, + FunctionName: cloudformation.GetAtt(prefix(""), "Arn"), + StartingPosition: trigger.StartingPosition.String(), } } diff --git a/x-pack/functionbeat/provider/aws/aws/kinesis_test.go b/x-pack/functionbeat/provider/aws/aws/kinesis_test.go index 6f54c1bf061..762cce1006d 100644 --- a/x-pack/functionbeat/provider/aws/aws/kinesis_test.go +++ b/x-pack/functionbeat/provider/aws/aws/kinesis_test.go @@ -39,7 +39,7 @@ func TestKinesis(t *testing.T) { assert.NoError(t, err) }) - t.Run("when publish is not succesful", func(t *testing.T) { + t.Run("when publish is not successful", func(t *testing.T) { e := errors.New("something bad") client := &arrayBackedClient{err: e} @@ -141,6 +141,32 @@ func testKinesisConfig(t *testing.T) { }, }, }, + "test upper bound parallelization factor limit": { + valid: false, + rawConfig: map[string]interface{}{ + "name": "mysuperfunctionname", + "description": "mylong description", + "triggers": []map[string]interface{}{ + map[string]interface{}{ + "event_source_arn": "abc123", + "parallelization_factor": 13, + }, + }, + }, + }, + "test lower bound parallelization factor limit": { + valid: false, + rawConfig: map[string]interface{}{ + "name": "mysuperfunctionname", + "description": "mylong description", + "triggers": []map[string]interface{}{ + map[string]interface{}{ + "event_source_arn": "abc123", + "parallelization_factor": 0, + }, + }, + }, + }, "test default values": { valid: true, rawConfig: map[string]interface{}{ @@ -158,9 +184,10 @@ func testKinesisConfig(t *testing.T) { LambdaConfig: DefaultLambdaConfig, Triggers: []*KinesisTriggerConfig{ &KinesisTriggerConfig{ - EventSourceArn: "abc123", - BatchSize: 100, - StartingPosition: trimHorizonPos, + EventSourceArn: "abc123", + BatchSize: 100, + StartingPosition: trimHorizonPos, + ParallelizationFactor: 1, }, }, },