Skip to content

Commit

Permalink
Merge pull request #2500 from cuichenli/aws-sqs-connector
Browse files Browse the repository at this point in the history
Add basic support for AWS SQS - send and read
  • Loading branch information
ozangunalp authored Apr 4, 2024
2 parents ce7d8a8 + 01a7440 commit 0995c4f
Show file tree
Hide file tree
Showing 50 changed files with 3,738 additions and 1 deletion.
5 changes: 4 additions & 1 deletion documentation/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ nav:
- 'Client Customization' : rabbitmq/rabbitmq-client-customization.md
- 'Connecting to managed instances' : rabbitmq/rabbitmq-cloud.md


- Pulsar:
- pulsar/pulsar.md
- 'Receiving messages': pulsar/receiving-pulsar-messages.md
Expand Down Expand Up @@ -96,6 +95,10 @@ nav:
- 'Sending MQTT messages': mqtt/sending-messages-to-mqtt.md
- 'Customizing the MQTT client': mqtt/client-customization.md

- AWS SQS:
- sqs/sqs.md
- 'Receiving AWS SQS messages': sqs/receiving-aws-sqs-messages.md
- 'Sending AWS SQS messages': sqs/sending-aws-sqs-messages.md

plugins:
- search
Expand Down
5 changes: 5 additions & 0 deletions documentation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@
<artifactId>smallrye-reactive-messaging-pulsar</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-aws-sqs</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>jakarta.jms</groupId>
<artifactId>jakarta.jms-api</artifactId>
Expand Down
99 changes: 99 additions & 0 deletions documentation/src/main/docs/sqs/receiving-aws-sqs-messages.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Receiving AWS SQS Messages

The AWS SQS connector allows you to receive messages from an AWS SQS queue.

## Receiving messages

Before you start, you need to have an AWS account and a SQS queue created.
To receive messages from an SQS queue, you need to create a method that consumes messages from the queue.
``` properties
mp.messaging.incoming.data.connector=smallrye-sqs
mp.messaging.incoming.data.queue=my-queue
```

Then, your application receives `Message<String>`.
You can consume the payload directly:

``` java
{{ insert('sqs/inbound/SqsStringConsumer.java') }}
```

Or, you can retrieve the `Message<String>`:

``` java
{{ insert('sqs/inbound/SqsMessageStringConsumer.java') }}
```

You also can directly consume the `software.amazon.awssdk.services.sqs.model.Message`:

``` java
{{ insert('sqs/inbound/SqsSdkMessageConsumer.java') }}
```

### Receive message request customizer

The receive message requests sent to AWS SQS can be customized by providing a CDI bean implementation of
{{ javadoc('io.smallrye.reactive.messaging.aws.sqs.SqsReceiveMessageRequestCustomizer', False, 'io.smallrye.reactive/smallrye-reactive-messaging-aws-sqs') }}
and configuring it's identifier using the `receive.request.customizer` connector attribute.

``` java
{{ insert('sqs/inbound/SqsReceiveMessageRequestCustomizerExample.java') }}
```

```properties
mp.messaging.incoming.data.connector=smallrye-sqs
mp.messaging.incoming.data.queue=my-queue
mp.messaging.incoming.data.receive.request.customizer=my-customizer
```

Receive requests failed with retryable exceptions are retried automatically, by setting the failed request id.

### Receive message request pause and resume

The AWS SQS connector fetches messages by continuously sending receive message requests.
If messages are not processed in a timely manner, the connector pauses fetching messages until queued messages are processed.

The pause resume can be disabled using the `receive.request.pause.resume` connector attribute.

```properties
mp.messaging.incoming.data.receive.request.pause.resume=false
```

## Deserialization

The connector converts incoming SQS Messages into Reactive Messaging `Message<T>` instances.

The payload type `T` is determined based on the value of the SQS message attribute `_classname`.

If you send messages with the AWS SQS connector (outbound connector),
the `_classname` attribute is automatically added to the message.
The primitive types are transformed from the string representation to the corresponding Java type.
For objects, if one of the `JsonMapping` modules is present on the classpath,
the connector used that JSON module to deserialize the message body to objects.

If the `_classname` attribute is not present, the payload is deserialized as a `String`.

``` java
{{ insert('sqs/json/SqsJsonMapping.java', 'code') }}
```

## Inbound Metadata

Messages coming from SQS contain an instance of {{ javadoc('io.smallrye.reactive.messaging.aws.sqs.SqsIncomingMetadata', False, 'io.smallrye.reactive/smallrye-reactive-messaging-aws-sqs') }}
in the metadata.

SQS message attributes can be accessed from the metadata either by name or by the `MessageAttributeValue` object.

``` java
{{ insert('sqs/inbound/SqsMetadataExample.java') }}
```

## Acknowledgement

The default strategy for acknowledging AWS SQS Message is to *delete* the message from the queue.
With `ack.delete` set to `false`, the message is not deleted from the queue.

## Configuration Reference

{{ insert('../../../target/connectors/smallrye-sqs-incoming.md') }}

55 changes: 55 additions & 0 deletions documentation/src/main/docs/sqs/sending-aws-sqs-messages.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Sending AWS SQS Messages

The AWS SQS connector allows you to send messages to an AWS SQS queue.

## Sending messages

Before you start, you need to have an AWS account and an SQS queue created.
To send messages to an SQS queue, you need to create a method that produces messages to the queue.

Then, your application can send `Message<String>` to the prices channel.
It can use `String` payloads as in the following snippet:

``` java
{{ insert('sqs/outbound/SqsStringProducer.java') }}
```

Or, you can send `Message<Double>`, which affords the opportunity to
explicitly specify metadata on the outgoing message:

``` java
{{ insert('sqs/outbound/SqsMessageStringProducer.java') }}
```

## Serialization

When sending a `Message<T>`, the connector converts the message into a AWS SQS Message.
How the message is converted depends on the payload type:

- If the payload is of type `SendMessageRequest` it is sent as is.
- If the payload is of type `SendMessageRequest.Builder`, the queue url is set and sent.
- If the payload is of type `software.amazon.awssdk.services.sqs.model.Message` it is usd to set the message body and attributes.
- If the payload is of primitive types the paylaod is converted to String and the message attribute `_classname` is set to the class name of the payload.
- If the payload is of any other object type, the payload is serialized (using the `JsonMapping` implementation discovered) and the message attribute `_classname` is set to the class name of the payload.

If the message payload cannot be serialized to JSON, the message is *nacked*.

## Outbound Metadata

When sending `Messages`, you can add an instance of {{ javadoc('io.smallrye.reactive.messaging.aws.sqs.SqsOutboundMetadata', False, 'io.smallrye.reactive/smallrye-reactive-messaging-aws-sqs') }}
to influence how the message is handled by AWS SQS. For example, you
can configure the routing key, timestamp and headers:

``` java
{{ insert('sqs/outbound/SqsOutboundMetadataExample.java', 'code') }}
```

## Acknowledgement

By default, the Reactive Messaging `Message` is acknowledged when the
send message request is successful. If the message is not sent successfully, the message is *nacked*.

## Configuration Reference

{{ insert('../../../target/connectors/smallrye-sqs-outgoing.md') }}

52 changes: 52 additions & 0 deletions documentation/src/main/docs/sqs/sqs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# AWS SQS Connector

!!! warning "Preview"
The AWS SQS Connector is currently in **preview**.

The AWS SQS Connector adds support for AWS SQS to Reactive Messaging.

With this connector, your application can:

- receive messages from a RabbitMQ queue
- send messages to a RabbitMQ exchange

The AWS SQS connector is based on the [AWS SDK for Java V2](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html).

## Using the AWS SQS connector

To use the AWS SQS Connector, add the following dependency to your project:

``` xml
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-aws-sqs</artifactId>
<version>{{ attributes['project-version'] }}</version>
</dependency>
```

The connector name is: `smallrye-sqs`.

So, to indicate that a channel is managed by this connector you need:
```properties
# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-sqs

# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-sqs
```

## Configuration

When available the AWS SQS connector discovers the SQS client as a CDI bean.
This allows using the connector with the [Quarkiverse AWS SQS extension](https://docs.quarkiverse.io/quarkus-amazon-services/dev/amazon-sqs.html).
If the SQS client is not available as a CDI bean, the connector creates a new client using the provided configuration.

- `queue` - The name of the SQS queue, defaults to channel name if not provided.
- `region` - The name of the SQS region.
- `endpoint-override` - The endpoint url override.
- `credentials-provider` - The fully qualified class name of the credential provider to be used in the client, if not provided the default provider chain is used.

## Additional Resources

- [AWS SQS Documentation](https://docs.aws.amazon.com/sqs/index.html)
- [Quarkiverse AWS SQS Extension](https://docs.quarkiverse.io/quarkus-amazon-services/dev/amazon-sqs.html)
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package sqs.inbound;

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

@ApplicationScoped
public class SqsMessageStringConsumer {
@Incoming("data")
CompletionStage<Void> consume(Message<String> msg) {
System.out.println("Received: " + msg.getPayload());
return msg.ack();
}
}
21 changes: 21 additions & 0 deletions documentation/src/main/java/sqs/inbound/SqsMetadataExample.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package sqs.inbound;

import java.util.Map;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.smallrye.reactive.messaging.aws.sqs.SqsIncomingMetadata;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;

@ApplicationScoped
public class SqsMetadataExample {

@Incoming("queue")
public void metadata(String body, SqsIncomingMetadata metadata) {
Map<String, MessageAttributeValue> attributes = metadata.getMessage().messageAttributes();
attributes.forEach((k, v) -> System.out.println(k + " -> " + v.stringValue()));
System.out.println("Message body: " + body);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package sqs.inbound;

import jakarta.enterprise.context.ApplicationScoped;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.aws.sqs.SqsReceiveMessageRequestCustomizer;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;

@Identifier("my-customizer") // or with channel name @Identifier("data")
@ApplicationScoped
public class SqsReceiveMessageRequestCustomizerExample implements SqsReceiveMessageRequestCustomizer {
@Override
public void customize(ReceiveMessageRequest.Builder builder) {
builder.visibilityTimeout(10)
.messageAttributeNames("my-attribute");
}
}
21 changes: 21 additions & 0 deletions documentation/src/main/java/sqs/inbound/SqsSdkMessageConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package sqs.inbound;

import java.util.Map;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;

@ApplicationScoped
public class SqsSdkMessageConsumer {

@Incoming("data")
void consume(Message msg) {
System.out.println("Received: " + msg.body());
Map<String, MessageAttributeValue> attributes = msg.messageAttributes();
// ...
}
}
13 changes: 13 additions & 0 deletions documentation/src/main/java/sqs/inbound/SqsStringConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package sqs.inbound;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;

@ApplicationScoped
public class SqsStringConsumer {
@Incoming("data")
void consume(String messageBody) {
System.out.println("Received: " + messageBody);
}
}
56 changes: 56 additions & 0 deletions documentation/src/main/java/sqs/json/SqsJsonMapping.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package sqs.json;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

public class SqsJsonMapping {

// <code>
@ApplicationScoped
public static class Generator {

@Outgoing("to-rabbitmq")
public Multi<Price> prices() {
AtomicInteger count = new AtomicInteger();
return Multi.createFrom().ticks().every(Duration.ofMillis(1000))
.map(l -> new Price().setPrice(count.incrementAndGet()))
.onOverflow().drop();
}

}

@ApplicationScoped
public static class Consumer {

List<Price> prices = new CopyOnWriteArrayList<>();

@Incoming("from-rabbitmq")
public void consume(Price price) {
prices.add(price);
}

public List<Price> list() {
return prices;
}
}

public static class Price {
public int price;

public Price setPrice(int price) {
this.price = price;
return this;
}
}
// </code>

}
Loading

0 comments on commit 0995c4f

Please sign in to comment.