Skip to content

Commit

Permalink
Merge pull request #28876 from roberth-k/f-aws_lambda_event_source_ma…
Browse files Browse the repository at this point in the history
…pping-scaling_config

r/aws_lambda_event_source_mapping: add scaling_config attribute
  • Loading branch information
ewbankkit committed Jan 13, 2023
2 parents 9350e36 + 678b65a commit 310a46c
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .changelog/28876.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_lambda_event_source_mapping: Add `scaling_config` argument
```
62 changes: 62 additions & 0 deletions internal/service/lambda/event_source_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,20 @@ func ResourceEventSourceMapping() *schema.Resource {
ValidateFunc: validation.StringLenBetween(1, 1000),
},
},
"scaling_config": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"maximum_concurrency": {
Type: schema.TypeInt,
Optional: true,
ValidateFunc: validation.IntBetween(2, 1000),
},
},
},
},
"self_managed_event_source": {
Type: schema.TypeList,
Optional: true,
Expand Down Expand Up @@ -385,6 +399,10 @@ func resourceEventSourceMappingCreate(d *schema.ResourceData, meta interface{})
input.Queues = flex.ExpandStringSet(v.(*schema.Set))
}

if v, ok := d.GetOk("scaling_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.ScalingConfig = expandScalingConfig(v.([]interface{})[0].(map[string]interface{}))
}

if v, ok := d.GetOk("self_managed_event_source"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.SelfManagedEventSource = expandSelfManagedEventSource(v.([]interface{})[0].(map[string]interface{}))

Expand Down Expand Up @@ -520,6 +538,13 @@ func resourceEventSourceMappingRead(d *schema.ResourceData, meta interface{}) er
d.Set("maximum_retry_attempts", eventSourceMappingConfiguration.MaximumRetryAttempts)
d.Set("parallelization_factor", eventSourceMappingConfiguration.ParallelizationFactor)
d.Set("queues", aws.StringValueSlice(eventSourceMappingConfiguration.Queues))
if v := eventSourceMappingConfiguration.ScalingConfig; v != nil {
if err := d.Set("scaling_config", []interface{}{flattenScalingConfig(v)}); err != nil {
return fmt.Errorf("error setting scaling_config: %w", err)
}
} else {
d.Set("scaling_config", nil)
}
if eventSourceMappingConfiguration.SelfManagedEventSource != nil {
if err := d.Set("self_managed_event_source", []interface{}{flattenSelfManagedEventSource(eventSourceMappingConfiguration.SelfManagedEventSource)}); err != nil {
return fmt.Errorf("error setting self_managed_event_source: %w", err)
Expand Down Expand Up @@ -622,6 +647,15 @@ func resourceEventSourceMappingUpdate(d *schema.ResourceData, meta interface{})
input.ParallelizationFactor = aws.Int64(int64(d.Get("parallelization_factor").(int)))
}

if d.HasChange("scaling_config") {
if v, ok := d.GetOk("scaling_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.ScalingConfig = expandScalingConfig(v.([]interface{})[0].(map[string]interface{}))
} else {
// AWS ignores the removal if this is left as nil.
input.ScalingConfig = &lambda.ScalingConfig{}
}
}

if d.HasChange("source_access_configuration") {
if v, ok := d.GetOk("source_access_configuration"); ok && v.(*schema.Set).Len() > 0 {
input.SourceAccessConfigurations = expandSourceAccessConfigurations(v.(*schema.Set).List())
Expand Down Expand Up @@ -1036,6 +1070,34 @@ func flattenFilter(apiObject *lambda.Filter) map[string]interface{} {
return tfMap
}

func expandScalingConfig(tfMap map[string]interface{}) *lambda.ScalingConfig {
if tfMap == nil {
return nil
}

apiObject := &lambda.ScalingConfig{}

if v, ok := tfMap["maximum_concurrency"].(int); ok && v != 0 {
apiObject.MaximumConcurrency = aws.Int64(int64(v))
}

return apiObject
}

func flattenScalingConfig(apiObject *lambda.ScalingConfig) map[string]interface{} {
if apiObject == nil {
return nil
}

tfMap := map[string]interface{}{}

if v := apiObject.MaximumConcurrency; v != nil {
tfMap["maximum_concurrency"] = int(aws.Int64Value(v))
}

return tfMap
}

func findEventSourceMappingConfiguration(conn *lambda.Lambda, input *lambda.GetEventSourceMappingInput) (*lambda.EventSourceMappingConfiguration, error) {
output, err := conn.GetEventSourceMapping(input)

Expand Down
81 changes: 81 additions & 0 deletions internal/service/lambda/event_source_mapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func TestAccLambdaEventSourceMapping_SQS_basic(t *testing.T) {
resource.TestCheckResourceAttrPair(resourceName, "function_arn", functionResourceName, "arn"),
acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"),
resource.TestCheckResourceAttr(resourceName, "filter_criteria.#", "0"),
resource.TestCheckResourceAttr(resourceName, "scaling_config.#", "0"),
),
},
// batch_size became optional. Ensure that if the user supplies the default
Expand Down Expand Up @@ -1083,6 +1084,64 @@ func TestAccLambdaEventSourceMapping_SQS_filterCriteria(t *testing.T) {
})
}

func TestAccLambdaEventSourceMapping_SQS_scalingConfig(t *testing.T) {
if testing.Short() {
t.Skip("skipping long-running test in short mode")
}

var conf lambda.EventSourceMappingConfiguration
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_lambda_event_source_mapping.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(t) },
ErrorCheck: acctest.ErrorCheck(t, lambda.EndpointsID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckEventSourceMappingDestroy,
Steps: []resource.TestStep{
{
Config: testAccEventSourceMappingConfig_sqsScalingConfig1(rName, 10),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(resourceName, &conf),
resource.TestCheckResourceAttr(resourceName, "scaling_config.0.maximum_concurrency", "10"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"last_modified"},
},
{
Config: testAccEventSourceMappingConfig_sqsScalingConfig1(rName, 15),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(resourceName, &conf),
resource.TestCheckResourceAttr(resourceName, "scaling_config.0.maximum_concurrency", "15"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"last_modified"},
},
{
Config: testAccEventSourceMappingConfig_sqsScalingConfig2(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(resourceName, &conf),
resource.TestCheckResourceAttr(resourceName, "scaling_config.#", "0"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"last_modified"},
},
},
})
}

func testAccCheckEventSourceMappingIsBeingDisabled(conf *lambda.EventSourceMappingConfiguration) resource.TestCheckFunc {
return func(s *terraform.State) error {
conn := acctest.Provider.Meta().(*conns.AWSClient).LambdaConn()
Expand Down Expand Up @@ -2219,3 +2278,25 @@ resource "aws_lambda_event_source_mapping" "test" {
}
`)
}

func testAccEventSourceMappingConfig_sqsScalingConfig1(rName string, maximumConcurrency int) string {
return acctest.ConfigCompose(testAccEventSourceMappingConfig_sqsBase(rName), fmt.Sprintf(`
resource "aws_lambda_event_source_mapping" "test" {
event_source_arn = aws_sqs_queue.test.arn
function_name = aws_lambda_function.test.arn
scaling_config {
maximum_concurrency = %[1]d
}
}
`, maximumConcurrency))
}

func testAccEventSourceMappingConfig_sqsScalingConfig2(rName string) string {
return acctest.ConfigCompose(testAccEventSourceMappingConfig_sqsBase(rName), `
resource "aws_lambda_event_source_mapping" "test" {
event_source_arn = aws_sqs_queue.test.arn
function_name = aws_lambda_function.test.arn
}
`)
}
5 changes: 5 additions & 0 deletions website/docs/r/lambda_event_source_mapping.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ resource "aws_lambda_event_source_mapping" "example" {
* `maximum_retry_attempts`: - (Optional) The maximum number of times to retry when the function returns an error. Only available for stream sources (DynamoDB and Kinesis). Minimum and default of -1 (forever), maximum of 10000.
* `parallelization_factor`: - (Optional) The number of batches to process from each shard concurrently. Only available for stream sources (DynamoDB and Kinesis). Minimum and default of 1, maximum of 10.
* `queues` - (Optional) The name of the Amazon MQ broker destination queue to consume. Only available for MQ sources. A single queue name must be specified.
* `scaling_config` - (Optional) Scaling configuration of the event source. Only available for SQS queues. Detailed below.
* `self_managed_event_source`: - (Optional) For Self Managed Kafka sources, the location of the self managed cluster. If set, configuration must also include `source_access_configuration`. Detailed below.
* `self_managed_kafka_event_source_config` - (Optional) Additional configuration block for Self Managed Kafka sources. Incompatible with "event_source_arn" and "amazon_managed_kafka_event_source_config". Detailed below.
* `source_access_configuration`: (Optional) For Self Managed Kafka sources, the access configuration for the source. If set, configuration must also include `self_managed_event_source`. Detailed below.
Expand Down Expand Up @@ -189,6 +190,10 @@ resource "aws_lambda_event_source_mapping" "example" {

* `pattern` - (Optional) A filter pattern up to 4096 characters. See [Filter Rule Syntax](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-syntax).

### scaling_config Configuration Block

* `maximum_concurrency` - (Optional) Limits the number of concurrent instances that the Amazon SQS event source can invoke. Must be between `2` and `1000`. See [Configuring maximum concurrency for Amazon SQS event sources](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#events-sqs-max-concurrency).

### self_managed_event_source Configuration Block

* `endpoints` - (Required) A map of endpoints for the self managed source. For Kafka self-managed sources, the key should be `KAFKA_BOOTSTRAP_SERVERS` and the value should be a string with a comma separated list of broker endpoints.
Expand Down

0 comments on commit 310a46c

Please sign in to comment.