Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

r/aws_lambda_event_source_mapping: add scaling_config attribute #28876

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/28876.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_lambda_event_source_mapping: Add `scaling_config` argument
```
62 changes: 62 additions & 0 deletions internal/service/lambda/event_source_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,20 @@ func ResourceEventSourceMapping() *schema.Resource {
ValidateFunc: validation.StringLenBetween(1, 1000),
},
},
"scaling_config": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"maximum_concurrency": {
Type: schema.TypeInt,
Optional: true,
ValidateFunc: validation.IntBetween(2, 1000),
},
},
},
},
"self_managed_event_source": {
Type: schema.TypeList,
Optional: true,
Expand Down Expand Up @@ -385,6 +399,10 @@ func resourceEventSourceMappingCreate(d *schema.ResourceData, meta interface{})
input.Queues = flex.ExpandStringSet(v.(*schema.Set))
}

if v, ok := d.GetOk("scaling_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.ScalingConfig = expandScalingConfig(v.([]interface{})[0].(map[string]interface{}))
}

if v, ok := d.GetOk("self_managed_event_source"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.SelfManagedEventSource = expandSelfManagedEventSource(v.([]interface{})[0].(map[string]interface{}))

Expand Down Expand Up @@ -520,6 +538,13 @@ func resourceEventSourceMappingRead(d *schema.ResourceData, meta interface{}) er
d.Set("maximum_retry_attempts", eventSourceMappingConfiguration.MaximumRetryAttempts)
d.Set("parallelization_factor", eventSourceMappingConfiguration.ParallelizationFactor)
d.Set("queues", aws.StringValueSlice(eventSourceMappingConfiguration.Queues))
if v := eventSourceMappingConfiguration.ScalingConfig; v != nil {
if err := d.Set("scaling_config", []interface{}{flattenScalingConfig(v)}); err != nil {
return fmt.Errorf("error setting scaling_config: %w", err)
}
} else {
d.Set("scaling_config", nil)
}
if eventSourceMappingConfiguration.SelfManagedEventSource != nil {
if err := d.Set("self_managed_event_source", []interface{}{flattenSelfManagedEventSource(eventSourceMappingConfiguration.SelfManagedEventSource)}); err != nil {
return fmt.Errorf("error setting self_managed_event_source: %w", err)
Expand Down Expand Up @@ -622,6 +647,15 @@ func resourceEventSourceMappingUpdate(d *schema.ResourceData, meta interface{})
input.ParallelizationFactor = aws.Int64(int64(d.Get("parallelization_factor").(int)))
}

if d.HasChange("scaling_config") {
if v, ok := d.GetOk("scaling_config"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
input.ScalingConfig = expandScalingConfig(v.([]interface{})[0].(map[string]interface{}))
} else {
// AWS ignores the removal if this is left as nil.
input.ScalingConfig = &lambda.ScalingConfig{}
}
}

if d.HasChange("source_access_configuration") {
if v, ok := d.GetOk("source_access_configuration"); ok && v.(*schema.Set).Len() > 0 {
input.SourceAccessConfigurations = expandSourceAccessConfigurations(v.(*schema.Set).List())
Expand Down Expand Up @@ -1036,6 +1070,34 @@ func flattenFilter(apiObject *lambda.Filter) map[string]interface{} {
return tfMap
}

func expandScalingConfig(tfMap map[string]interface{}) *lambda.ScalingConfig {
if tfMap == nil {
return nil
}

apiObject := &lambda.ScalingConfig{}

if v, ok := tfMap["maximum_concurrency"].(int); ok && v != 0 {
apiObject.MaximumConcurrency = aws.Int64(int64(v))
}

return apiObject
}

func flattenScalingConfig(apiObject *lambda.ScalingConfig) map[string]interface{} {
if apiObject == nil {
return nil
}

tfMap := map[string]interface{}{}

if v := apiObject.MaximumConcurrency; v != nil {
tfMap["maximum_concurrency"] = int(aws.Int64Value(v))
}

return tfMap
}

func findEventSourceMappingConfiguration(conn *lambda.Lambda, input *lambda.GetEventSourceMappingInput) (*lambda.EventSourceMappingConfiguration, error) {
output, err := conn.GetEventSourceMapping(input)

Expand Down
81 changes: 81 additions & 0 deletions internal/service/lambda/event_source_mapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func TestAccLambdaEventSourceMapping_SQS_basic(t *testing.T) {
resource.TestCheckResourceAttrPair(resourceName, "function_arn", functionResourceName, "arn"),
acctest.CheckResourceAttrRFC3339(resourceName, "last_modified"),
resource.TestCheckResourceAttr(resourceName, "filter_criteria.#", "0"),
resource.TestCheckResourceAttr(resourceName, "scaling_config.#", "0"),
),
},
// batch_size became optional. Ensure that if the user supplies the default
Expand Down Expand Up @@ -1083,6 +1084,64 @@ func TestAccLambdaEventSourceMapping_SQS_filterCriteria(t *testing.T) {
})
}

func TestAccLambdaEventSourceMapping_SQS_scalingConfig(t *testing.T) {
if testing.Short() {
t.Skip("skipping long-running test in short mode")
}

var conf lambda.EventSourceMappingConfiguration
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_lambda_event_source_mapping.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(t) },
ErrorCheck: acctest.ErrorCheck(t, lambda.EndpointsID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckEventSourceMappingDestroy,
Steps: []resource.TestStep{
{
Config: testAccEventSourceMappingConfig_sqsScalingConfig1(rName, 10),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(resourceName, &conf),
resource.TestCheckResourceAttr(resourceName, "scaling_config.0.maximum_concurrency", "10"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"last_modified"},
},
{
Config: testAccEventSourceMappingConfig_sqsScalingConfig1(rName, 15),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(resourceName, &conf),
resource.TestCheckResourceAttr(resourceName, "scaling_config.0.maximum_concurrency", "15"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"last_modified"},
},
{
Config: testAccEventSourceMappingConfig_sqsScalingConfig2(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckEventSourceMappingExists(resourceName, &conf),
resource.TestCheckResourceAttr(resourceName, "scaling_config.#", "0"),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{"last_modified"},
},
},
})
}

func testAccCheckEventSourceMappingIsBeingDisabled(conf *lambda.EventSourceMappingConfiguration) resource.TestCheckFunc {
return func(s *terraform.State) error {
conn := acctest.Provider.Meta().(*conns.AWSClient).LambdaConn()
Expand Down Expand Up @@ -2219,3 +2278,25 @@ resource "aws_lambda_event_source_mapping" "test" {
}
`)
}

func testAccEventSourceMappingConfig_sqsScalingConfig1(rName string, maximumConcurrency int) string {
return acctest.ConfigCompose(testAccEventSourceMappingConfig_sqsBase(rName), fmt.Sprintf(`
resource "aws_lambda_event_source_mapping" "test" {
event_source_arn = aws_sqs_queue.test.arn
function_name = aws_lambda_function.test.arn
scaling_config {
maximum_concurrency = %[1]d
}
}
`, maximumConcurrency))
}

func testAccEventSourceMappingConfig_sqsScalingConfig2(rName string) string {
return acctest.ConfigCompose(testAccEventSourceMappingConfig_sqsBase(rName), `
resource "aws_lambda_event_source_mapping" "test" {
event_source_arn = aws_sqs_queue.test.arn
function_name = aws_lambda_function.test.arn
}
`)
}
5 changes: 5 additions & 0 deletions website/docs/r/lambda_event_source_mapping.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ resource "aws_lambda_event_source_mapping" "example" {
* `maximum_retry_attempts`: - (Optional) The maximum number of times to retry when the function returns an error. Only available for stream sources (DynamoDB and Kinesis). Minimum and default of -1 (forever), maximum of 10000.
* `parallelization_factor`: - (Optional) The number of batches to process from each shard concurrently. Only available for stream sources (DynamoDB and Kinesis). Minimum and default of 1, maximum of 10.
* `queues` - (Optional) The name of the Amazon MQ broker destination queue to consume. Only available for MQ sources. A single queue name must be specified.
* `scaling_config` - (Optional) Scaling configuration of the event source. Only available for SQS queues. Detailed below.
* `self_managed_event_source`: - (Optional) For Self Managed Kafka sources, the location of the self managed cluster. If set, configuration must also include `source_access_configuration`. Detailed below.
* `self_managed_kafka_event_source_config` - (Optional) Additional configuration block for Self Managed Kafka sources. Incompatible with "event_source_arn" and "amazon_managed_kafka_event_source_config". Detailed below.
* `source_access_configuration`: (Optional) For Self Managed Kafka sources, the access configuration for the source. If set, configuration must also include `self_managed_event_source`. Detailed below.
Expand Down Expand Up @@ -189,6 +190,10 @@ resource "aws_lambda_event_source_mapping" "example" {

* `pattern` - (Optional) A filter pattern up to 4096 characters. See [Filter Rule Syntax](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-syntax).

### scaling_config Configuration Block

* `maximum_concurrency` - (Optional) Limits the number of concurrent instances that the Amazon SQS event source can invoke. Must be between `2` and `1000`. See [Configuring maximum concurrency for Amazon SQS event sources](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#events-sqs-max-concurrency).

### self_managed_event_source Configuration Block

* `endpoints` - (Required) A map of endpoints for the self managed source. For Kafka self-managed sources, the key should be `KAFKA_BOOTSTRAP_SERVERS` and the value should be a string with a comma separated list of broker endpoints.
Expand Down