Skip to content

Commit

Permalink
feat(aws): Draft to add AWS SQS connector FIFO support
Browse files Browse the repository at this point in the history
  • Loading branch information
sbuettner committed Apr 4, 2023
1 parent 764fedc commit 3e3e53c
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 13 deletions.
58 changes: 58 additions & 0 deletions connectors/sqs/element-templates/aws-sqs-connector.json
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,64 @@
"notEmpty": true
}
},
{
"id": "queue.type",
"label": "Queue type",
"description": "Specify whether the queue is a <a href=\"https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html\">standard</a> or <a href=\"https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html\">FIFO</a> queue",
"group": "queueProperties",
"type": "Dropdown",
"optional": false,
"value": "standard",
"choices": [
{
"name": "Standard",
"value": "standard"
},
{
"name": "FIFO",
"value": "fifo"
}
],
"binding": {
"type": "property",
"name": "queue.type"
},
"constraints": {
"notEmpty": true
}
},
{
"label": "Message Group Id",
"description": "Message Group Id (FIFO only). See also <a href=\"https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html\">using the MessageGroupId Property</a> in the Amazon SQS Developer Guide.",
"group": "input",
"type": "String",
"binding": {
"type": "zeebe:input",
"name": "queue.messageGroupId"
},
"optional": false,
"condition": {
"property": "queue.type",
"equals": "fifo"
},
"feel": "optional"
},
{
"label": "Message Deduplication Id",
"description": "Message Deduplication Id (FIFO only). See also <a href=\"https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagededuplicationid-property.html\">using the MessageDeduplicationId Property</a> in the Amazon SQS Developer Guide.",
"group": "input",
"type": "String",
"binding": {
"type": "zeebe:input",
"name": "queue.messageDeduplicationId"
},
"optional": false,
"condition": {
"property": "queue.type",
"equals": "fifo"
},
"feel": "optional"
},
{
"label": "Message body",
"description": "Data to send to the SQS queue",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ private SendMessageResult sendMsgToSqs(SqsConnectorRequest request) {
new SendMessageRequest()
.withQueueUrl(request.getQueue().getUrl())
.withMessageBody(payload)
.withMessageAttributes(request.getQueue().getAwsSqsNativeMessageAttributes());
.withMessageAttributes(request.getQueue().getAwsSqsNativeMessageAttributes())
.withMessageGroupId(request.getQueue().getMessageGroupId())
.withMessageDeduplicationId(request.getQueue().getMessageDeduplicationId());
return sqsClient.sendMessage(message);
} finally {
if (sqsClient != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
package io.camunda.connector.model;

import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.util.StringUtils;
import io.camunda.connector.api.annotation.Secret;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import javax.validation.constraints.AssertTrue;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;

Expand All @@ -23,8 +25,14 @@ public class QueueRequestData {

@NotNull private Object messageBody;

@NotNull private QueueType type = QueueType.standard;

private Map<String, SqsMessageAttribute> messageAttributes;

private String messageGroupId;

private String messageDeduplicationId;

public String getUrl() {
return url;
}
Expand Down Expand Up @@ -79,24 +87,55 @@ private Function<SqsMessageAttribute, MessageAttributeValue> messageAttributeTra
};
}

public QueueType getType() {
return type;
}

public void setType(QueueType type) {
this.type = type;
}

public String getMessageGroupId() {
return messageGroupId;
}

public void setMessageGroupId(String messageGroupId) {
this.messageGroupId = messageGroupId;
}

public String getMessageDeduplicationId() {
return messageDeduplicationId;
}

public void setMessageDeduplicationId(String messageDeduplicationId) {
this.messageDeduplicationId = messageDeduplicationId;
}

@AssertTrue
public boolean hasValidFifoQueueProperties() {
if (QueueType.fifo == type) {
return StringUtils.hasValue(messageDeduplicationId) && StringUtils.hasValue(messageGroupId);
} else return true;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
QueueRequestData that = (QueueRequestData) o;
return url.equals(that.url)
&& region.equals(that.region)
&& messageBody.equals(that.messageBody)
&& Objects.equals(messageAttributes, that.messageAttributes);
return Objects.equals(url, that.url)
&& Objects.equals(region, that.region)
&& Objects.equals(messageBody, that.messageBody)
&& Objects.equals(type, that.type)
&& Objects.equals(messageAttributes, that.messageAttributes)
&& Objects.equals(messageGroupId, that.messageGroupId)
&& Objects.equals(messageDeduplicationId, that.messageDeduplicationId);
}

@Override
public int hashCode() {
return Objects.hash(url, region, messageBody, messageAttributes);
return Objects.hash(
url, region, messageBody, type, messageAttributes, messageGroupId, messageDeduplicationId);
}

@Override
Expand All @@ -110,8 +149,16 @@ public String toString() {
+ '\''
+ ", messageBody="
+ messageBody
+ ", type="
+ type
+ ", messageAttributes="
+ messageAttributes
+ ", messageGroupId='"
+ messageGroupId
+ '\''
+ ", messageDeduplicationId='"
+ messageDeduplicationId
+ '\''
+ '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH
* under one or more contributor license agreements. Licensed under a proprietary license.
* See the License.txt file for more information. You may not use this file
* except in compliance with the proprietary license.
*/
package io.camunda.connector.model;

public enum QueueType {
/**
* <a
* href="https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html">...</a>
*/
standard,
/**
* <a
* href="https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html">...</a>
*/
fifo
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith(MockitoExtension.class)
class SqsConnectorFunctionParametrizedTest {
Expand Down Expand Up @@ -98,9 +100,15 @@ void execute_ShouldSucceedSuccessCases(final String incomingJson) {

@ParameterizedTest
@MethodSource("failRequestCases")
@MockitoSettings(strictness = Strictness.LENIENT)
void execute_ShouldThrowExceptionOnMalformedRequests(final String incomingJson) {
// given
SqsConnectorRequest expectedRequest = GSON.fromJson(incomingJson, SqsConnectorRequest.class);
when(sqsClientSupplier.sqsClient(ACTUAL_ACCESS_KEY, ACTUAL_SECRET_KEY, ACTUAL_QUEUE_REGION))
.thenReturn(sqsClient);
SendMessageResult sendMessageResult = mock(SendMessageResult.class);
when(sendMessageResult.getMessageId()).thenReturn(MSG_ID);
when(sqsClient.sendMessage(sendMessageRequest.capture())).thenReturn(sendMessageResult);

OutboundConnectorContext ctx =
OutboundConnectorContextBuilder.create()
.variables(incomingJson)
Expand Down
52 changes: 52 additions & 0 deletions connectors/sqs/src/test/resources/requests/fail-test-cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,57 @@
"secretKey":"secrets.AWS_SECRET_KEY",
"accessKey":"secrets.AWS_ACCESS_KEY"
}
},
{
"testDescription": "No messageGroupId for FIFO queue",
"authentication": {
"secretKey": "secrets.AWS_SECRET_KEY",
"accessKey": "secrets.AWS_ACCESS_KEY"
},
"queue": {
"type": "fifo",
"messageDeduplicationId": "messageDeduplicationId",
"messageAttributes": {},
"messageBody": {
"data": "ok"
},
"region": "us-east-1",
"url": "secrets.SQS_QUEUE_URL"
}
},
{
"testDescription": "No messageDeduplicationId for FIFO queue",
"authentication": {
"secretKey": "secrets.AWS_SECRET_KEY",
"accessKey": "secrets.AWS_ACCESS_KEY"
},
"queue": {
"type": "fifo",
"messageGroupId": "messageGroupId",
"messageAttributes": {},
"messageBody": {
"data": "ok"
},
"region": "us-east-1",
"url": "secrets.SQS_QUEUE_URL"
}
},
{
"testDescription": "Invalid queue type",
"authentication": {
"secretKey": "secrets.AWS_SECRET_KEY",
"accessKey": "secrets.AWS_ACCESS_KEY"
},
"queue": {
"type": "test",
"messageGroupId": "messageGroupId",
"messageDeduplicationId": "messageDeduplicationId",
"messageAttributes": {},
"messageBody": {
"data": "ok"
},
"region": "us-east-1",
"url": "secrets.SQS_QUEUE_URL"
}
}
]
18 changes: 18 additions & 0 deletions connectors/sqs/src/test/resources/requests/success-test-cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,23 @@
"region":"us-east-1",
"url":"secrets.SQS_QUEUE_URL"
}
},
{
"testDescription":"Request for FIFO queue",
"authentication":{
"secretKey":"secrets.AWS_SECRET_KEY",
"accessKey":"secrets.AWS_ACCESS_KEY"
},
"queue":{
"type": "fifo",
"messageGroupId": "messageGroupId",
"messageDeduplicationId": "messageDeduplicationId",
"messageAttributes":{},
"messageBody":{
"data":"ok"
},
"region":"us-east-1",
"url":"secrets.SQS_QUEUE_URL"
}
}
]

0 comments on commit 3e3e53c

Please sign in to comment.