Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic support for AWS SQS - send and read #2500

Merged
merged 1 commit into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion documentation/mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ nav:
- 'Client Customization' : rabbitmq/rabbitmq-client-customization.md
- 'Connecting to managed instances' : rabbitmq/rabbitmq-cloud.md


- Pulsar:
- pulsar/pulsar.md
- 'Receiving messages': pulsar/receiving-pulsar-messages.md
Expand Down Expand Up @@ -96,6 +95,10 @@ nav:
- 'Sending MQTT messages': mqtt/sending-messages-to-mqtt.md
- 'Customizing the MQTT client': mqtt/client-customization.md

- AWS SQS:
- sqs/sqs.md
- 'Receiving AWS SQS messages': sqs/receiving-aws-sqs-messages.md
- 'Sending AWS SQS messages': sqs/sending-aws-sqs-messages.md

plugins:
- search
Expand Down
5 changes: 5 additions & 0 deletions documentation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@
<artifactId>smallrye-reactive-messaging-pulsar</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-aws-sqs</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>jakarta.jms</groupId>
<artifactId>jakarta.jms-api</artifactId>
Expand Down
99 changes: 99 additions & 0 deletions documentation/src/main/docs/sqs/receiving-aws-sqs-messages.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Receiving AWS SQS Messages

The AWS SQS connector allows you to receive messages from an AWS SQS queue.

## Receiving messages

Before you start, you need to have an AWS account and a SQS queue created.
To receive messages from an SQS queue, you need to create a method that consumes messages from the queue.
``` properties
mp.messaging.incoming.data.connector=smallrye-sqs
mp.messaging.incoming.data.queue=my-queue
```

Then, your application receives `Message<String>`.
You can consume the payload directly:

``` java
{{ insert('sqs/inbound/SqsStringConsumer.java') }}
```

Or, you can retrieve the `Message<String>`:

``` java
{{ insert('sqs/inbound/SqsMessageStringConsumer.java') }}
```

You also can directly consume the `software.amazon.awssdk.services.sqs.model.Message`:

``` java
{{ insert('sqs/inbound/SqsSdkMessageConsumer.java') }}
```

### Receive message request customizer

The receive message requests sent to AWS SQS can be customized by providing a CDI bean implementation of
{{ javadoc('io.smallrye.reactive.messaging.aws.sqs.SqsReceiveMessageRequestCustomizer', False, 'io.smallrye.reactive/smallrye-reactive-messaging-aws-sqs') }}
and configuring it's identifier using the `receive.request.customizer` connector attribute.

``` java
{{ insert('sqs/inbound/SqsReceiveMessageRequestCustomizerExample.java') }}
```

```properties
mp.messaging.incoming.data.connector=smallrye-sqs
mp.messaging.incoming.data.queue=my-queue
mp.messaging.incoming.data.receive.request.customizer=my-customizer
```

Receive requests failed with retryable exceptions are retried automatically, by setting the failed request id.

### Receive message request pause and resume

The AWS SQS connector fetches messages by continuously sending receive message requests.
If messages are not processed in a timely manner, the connector pauses fetching messages until queued messages are processed.

The pause resume can be disabled using the `receive.request.pause.resume` connector attribute.

```properties
mp.messaging.incoming.data.receive.request.pause.resume=false
```

## Deserialization

The connector converts incoming SQS Messages into Reactive Messaging `Message<T>` instances.

The payload type `T` is determined based on the value of the SQS message attribute `_classname`.

If you send messages with the AWS SQS connector (outbound connector),
the `_classname` attribute is automatically added to the message.
The primitive types are transformed from the string representation to the corresponding Java type.
For objects, if one of the `JsonMapping` modules is present on the classpath,
the connector used that JSON module to deserialize the message body to objects.

If the `_classname` attribute is not present, the payload is deserialized as a `String`.

``` java
{{ insert('sqs/json/SqsJsonMapping.java', 'code') }}
```

## Inbound Metadata

Messages coming from SQS contain an instance of {{ javadoc('io.smallrye.reactive.messaging.aws.sqs.SqsIncomingMetadata', False, 'io.smallrye.reactive/smallrye-reactive-messaging-aws-sqs') }}
in the metadata.

SQS message attributes can be accessed from the metadata either by name or by the `MessageAttributeValue` object.

``` java
{{ insert('sqs/inbound/SqsMetadataExample.java') }}
```

## Acknowledgement

The default strategy for acknowledging AWS SQS Message is to *delete* the message from the queue.
With `ack.delete` set to `false`, the message is not deleted from the queue.

## Configuration Reference

{{ insert('../../../target/connectors/smallrye-sqs-incoming.md') }}

55 changes: 55 additions & 0 deletions documentation/src/main/docs/sqs/sending-aws-sqs-messages.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Sending AWS SQS Messages

The AWS SQS connector allows you to send messages to an AWS SQS queue.

## Sending messages

Before you start, you need to have an AWS account and an SQS queue created.
To send messages to an SQS queue, you need to create a method that produces messages to the queue.

Then, your application can send `Message<String>` to the prices channel.
It can use `String` payloads as in the following snippet:

``` java
{{ insert('sqs/outbound/SqsStringProducer.java') }}
```

Or, you can send `Message<Double>`, which affords the opportunity to
explicitly specify metadata on the outgoing message:

``` java
{{ insert('sqs/outbound/SqsMessageStringProducer.java') }}
```

## Serialization

When sending a `Message<T>`, the connector converts the message into a AWS SQS Message.
How the message is converted depends on the payload type:

- If the payload is of type `SendMessageRequest` it is sent as is.
- If the payload is of type `SendMessageRequest.Builder`, the queue url is set and sent.
- If the payload is of type `software.amazon.awssdk.services.sqs.model.Message` it is usd to set the message body and attributes.
- If the payload is of primitive types the paylaod is converted to String and the message attribute `_classname` is set to the class name of the payload.
- If the payload is of any other object type, the payload is serialized (using the `JsonMapping` implementation discovered) and the message attribute `_classname` is set to the class name of the payload.

If the message payload cannot be serialized to JSON, the message is *nacked*.

## Outbound Metadata

When sending `Messages`, you can add an instance of {{ javadoc('io.smallrye.reactive.messaging.aws.sqs.SqsOutboundMetadata', False, 'io.smallrye.reactive/smallrye-reactive-messaging-aws-sqs') }}
to influence how the message is handled by AWS SQS. For example, you
can configure the routing key, timestamp and headers:

``` java
{{ insert('sqs/outbound/SqsOutboundMetadataExample.java', 'code') }}
```

## Acknowledgement

By default, the Reactive Messaging `Message` is acknowledged when the
send message request is successful. If the message is not sent successfully, the message is *nacked*.

## Configuration Reference

{{ insert('../../../target/connectors/smallrye-sqs-outgoing.md') }}

52 changes: 52 additions & 0 deletions documentation/src/main/docs/sqs/sqs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# AWS SQS Connector

!!! warning "Preview"
The AWS SQS Connector is currently in **preview**.

The AWS SQS Connector adds support for AWS SQS to Reactive Messaging.

With this connector, your application can:

- receive messages from a RabbitMQ queue
- send messages to a RabbitMQ exchange

The AWS SQS connector is based on the [AWS SDK for Java V2](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html).

## Using the AWS SQS connector

To use the AWS SQS Connector, add the following dependency to your project:

``` xml
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-aws-sqs</artifactId>
<version>{{ attributes['project-version'] }}</version>
</dependency>
```

The connector name is: `smallrye-sqs`.

So, to indicate that a channel is managed by this connector you need:
```properties
# Inbound
mp.messaging.incoming.[channel-name].connector=smallrye-sqs

# Outbound
mp.messaging.outgoing.[channel-name].connector=smallrye-sqs
```

## Configuration

When available the AWS SQS connector discovers the SQS client as a CDI bean.
This allows using the connector with the [Quarkiverse AWS SQS extension](https://docs.quarkiverse.io/quarkus-amazon-services/dev/amazon-sqs.html).
If the SQS client is not available as a CDI bean, the connector creates a new client using the provided configuration.

- `queue` - The name of the SQS queue, defaults to channel name if not provided.
- `region` - The name of the SQS region.
- `endpoint-override` - The endpoint url override.
- `credentials-provider` - The fully qualified class name of the credential provider to be used in the client, if not provided the default provider chain is used.

## Additional Resources

- [AWS SQS Documentation](https://docs.aws.amazon.com/sqs/index.html)
- [Quarkiverse AWS SQS Extension](https://docs.quarkiverse.io/quarkus-amazon-services/dev/amazon-sqs.html)
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package sqs.inbound;

import java.util.concurrent.CompletionStage;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

@ApplicationScoped
public class SqsMessageStringConsumer {
@Incoming("data")
CompletionStage<Void> consume(Message<String> msg) {
System.out.println("Received: " + msg.getPayload());
return msg.ack();
}
}
21 changes: 21 additions & 0 deletions documentation/src/main/java/sqs/inbound/SqsMetadataExample.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package sqs.inbound;

import java.util.Map;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import io.smallrye.reactive.messaging.aws.sqs.SqsIncomingMetadata;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;

@ApplicationScoped
public class SqsMetadataExample {

@Incoming("queue")
public void metadata(String body, SqsIncomingMetadata metadata) {
Map<String, MessageAttributeValue> attributes = metadata.getMessage().messageAttributes();
attributes.forEach((k, v) -> System.out.println(k + " -> " + v.stringValue()));
System.out.println("Message body: " + body);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package sqs.inbound;

import jakarta.enterprise.context.ApplicationScoped;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.reactive.messaging.aws.sqs.SqsReceiveMessageRequestCustomizer;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;

@Identifier("my-customizer") // or with channel name @Identifier("data")
@ApplicationScoped
public class SqsReceiveMessageRequestCustomizerExample implements SqsReceiveMessageRequestCustomizer {
@Override
public void customize(ReceiveMessageRequest.Builder builder) {
builder.visibilityTimeout(10)
.messageAttributeNames("my-attribute");
}
}
21 changes: 21 additions & 0 deletions documentation/src/main/java/sqs/inbound/SqsSdkMessageConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package sqs.inbound;

import java.util.Map;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;

@ApplicationScoped
public class SqsSdkMessageConsumer {

@Incoming("data")
void consume(Message msg) {
System.out.println("Received: " + msg.body());
Map<String, MessageAttributeValue> attributes = msg.messageAttributes();
// ...
}
}
13 changes: 13 additions & 0 deletions documentation/src/main/java/sqs/inbound/SqsStringConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package sqs.inbound;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;

@ApplicationScoped
public class SqsStringConsumer {
@Incoming("data")
void consume(String messageBody) {
System.out.println("Received: " + messageBody);
}
}
56 changes: 56 additions & 0 deletions documentation/src/main/java/sqs/json/SqsJsonMapping.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package sqs.json;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

import io.smallrye.mutiny.Multi;

public class SqsJsonMapping {

// <code>
@ApplicationScoped
public static class Generator {

@Outgoing("to-rabbitmq")
public Multi<Price> prices() {
AtomicInteger count = new AtomicInteger();
return Multi.createFrom().ticks().every(Duration.ofMillis(1000))
.map(l -> new Price().setPrice(count.incrementAndGet()))
.onOverflow().drop();
}

}

@ApplicationScoped
public static class Consumer {

List<Price> prices = new CopyOnWriteArrayList<>();

@Incoming("from-rabbitmq")
public void consume(Price price) {
prices.add(price);
}

public List<Price> list() {
return prices;
}
}

public static class Price {
public int price;

public Price setPrice(int price) {
this.price = price;
return this;
}
}
// </code>

}
Loading
Loading