diff --git a/CHANGELOG.md b/CHANGELOG.md index f5135f47371..8f205b99149 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,7 +49,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New -- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX)) +- **AWS SQS Scaler**: Support for scaling to include delayed messages. [#4377](https://github.com/kedacore/keda/issues/4377) ### Improvements diff --git a/pkg/scalers/aws_sqs_queue_scaler.go b/pkg/scalers/aws_sqs_queue_scaler.go index effc6fa607e..2bc663ed244 100644 --- a/pkg/scalers/aws_sqs_queue_scaler.go +++ b/pkg/scalers/aws_sqs_queue_scaler.go @@ -21,17 +21,9 @@ const ( targetQueueLengthDefault = 5 activationTargetQueueLengthDefault = 0 defaultScaleOnInFlight = true + defaultScaleOnDelayed = false ) -var awsSqsQueueMetricNamesForScalingInFlight = []string{ - "ApproximateNumberOfMessages", - "ApproximateNumberOfMessagesNotVisible", -} - -var awsSqsQueueMetricNamesForNotScalingInFlight = []string{ - "ApproximateNumberOfMessages", -} - type awsSqsQueueScaler struct { metricType v2.MetricTargetType metadata *awsSqsQueueMetadata @@ -49,6 +41,7 @@ type awsSqsQueueMetadata struct { awsAuthorization awsAuthorizationMetadata scalerIndex int scaleOnInFlight bool + scaleOnDelayed bool awsSqsQueueMetricNames []string } @@ -78,6 +71,7 @@ func parseAwsSqsQueueMetadata(config *ScalerConfig, logger logr.Logger) (*awsSqs meta := awsSqsQueueMetadata{} meta.targetQueueLength = defaultTargetQueueLength meta.scaleOnInFlight = defaultScaleOnInFlight + meta.scaleOnDelayed = defaultScaleOnDelayed if val, ok := config.TriggerMetadata["queueLength"]; ok && val != "" { queueLength, err := strconv.ParseInt(val, 10, 64) @@ -109,10 +103,22 @@ func parseAwsSqsQueueMetadata(config *ScalerConfig, logger logr.Logger) (*awsSqs } } + if val, ok := config.TriggerMetadata["scaleOnDelayed"]; ok && val != "" { + scaleOnDelayed, err := strconv.ParseBool(val) + if err != nil { + meta.scaleOnDelayed = defaultScaleOnDelayed + logger.Error(err, "Error parsing SQS queue metadata scaleOnDelayed, using default %n", defaultScaleOnDelayed) + } else { + meta.scaleOnDelayed = scaleOnDelayed + } + } + + meta.awsSqsQueueMetricNames = []string{"ApproximateNumberOfMessages"} if meta.scaleOnInFlight { - meta.awsSqsQueueMetricNames = awsSqsQueueMetricNamesForScalingInFlight - } else { - meta.awsSqsQueueMetricNames = awsSqsQueueMetricNamesForNotScalingInFlight + meta.awsSqsQueueMetricNames = append(meta.awsSqsQueueMetricNames, "ApproximateNumberOfMessagesNotVisible") + } + if meta.scaleOnDelayed { + meta.awsSqsQueueMetricNames = append(meta.awsSqsQueueMetricNames, "ApproximateNumberOfMessagesDelayed") } if val, ok := config.TriggerMetadata["queueURL"]; ok && val != "" { diff --git a/pkg/scalers/aws_sqs_queue_scaler_test.go b/pkg/scalers/aws_sqs_queue_scaler_test.go index fc7e4cdd971..4bb71e40797 100644 --- a/pkg/scalers/aws_sqs_queue_scaler_test.go +++ b/pkg/scalers/aws_sqs_queue_scaler_test.go @@ -65,6 +65,7 @@ func (m *mockSqs) GetQueueAttributes(input *sqs.GetQueueAttributesInput) (*sqs.G Attributes: map[string]*string{ "ApproximateNumberOfMessages": aws.String("NotInt"), "ApproximateNumberOfMessagesNotVisible": aws.String("NotInt"), + "ApproximateNumberOfMessagesDelayed": aws.String("NotInt"), }, }, nil } @@ -73,6 +74,7 @@ func (m *mockSqs) GetQueueAttributes(input *sqs.GetQueueAttributesInput) (*sqs.G Attributes: map[string]*string{ "ApproximateNumberOfMessages": aws.String("200"), "ApproximateNumberOfMessagesNotVisible": aws.String("100"), + "ApproximateNumberOfMessagesDelayed": aws.String("50"), }, }, nil } @@ -326,6 +328,44 @@ var awsSQSGetMetricTestData = []*parseAWSSQSMetadataTestData{ testAWSSQSEmptyResolvedEnv, false, "not error with scaleOnInFlight enabled"}, + {map[string]string{ + "queueURL": testAWSSQSProperQueueURL, + "queueLength": "1", + "awsRegion": "eu-west-1", + "scaleOnDelayed": "false"}, + testAWSSQSAuthentication, + testAWSSQSEmptyResolvedEnv, + false, + "not error with scaleOnDelayed disabled"}, + {map[string]string{ + "queueURL": testAWSSQSProperQueueURL, + "queueLength": "1", + "awsRegion": "eu-west-1", + "scaleOnDelayed": "true"}, + testAWSSQSAuthentication, + testAWSSQSEmptyResolvedEnv, + false, + "not error with scaleOnDelayed enabled"}, + {map[string]string{ + "queueURL": testAWSSQSProperQueueURL, + "queueLength": "1", + "awsRegion": "eu-west-1", + "scaleOnInFlight": "false", + "scaleOnDelayed": "false"}, + testAWSSQSAuthentication, + testAWSSQSEmptyResolvedEnv, + false, + "not error with scaledOnInFlight and scaleOnDelayed disabled"}, + {map[string]string{ + "queueURL": testAWSSQSProperQueueURL, + "queueLength": "1", + "awsRegion": "eu-west-1", + "scaleOnInFlight": "true", + "scaleOnDelayed": "true"}, + testAWSSQSAuthentication, + testAWSSQSEmptyResolvedEnv, + false, + "not error with scaledOnInFlight and scaleOnDelayed enabled"}, {map[string]string{ "queueURL": testAWSSQSErrorQueueURL, "queueLength": "1", @@ -390,8 +430,12 @@ func TestAWSSQSScalerGetMetrics(t *testing.T) { case testAWSSQSBadDataQueueURL: assert.Error(t, err, "expect error because of bad data return from sqs") default: - if meta.scaleOnInFlight { + if meta.scaleOnInFlight && meta.scaleOnDelayed { + assert.EqualValues(t, int64(350.0), value[0].Value.Value()) + } else if meta.scaleOnInFlight && !meta.scaleOnDelayed { assert.EqualValues(t, int64(300.0), value[0].Value.Value()) + } else if !meta.scaleOnInFlight && meta.scaleOnDelayed { + assert.EqualValues(t, int64(250.0), value[0].Value.Value()) } else { assert.EqualValues(t, int64(200.0), value[0].Value.Value()) }