Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for AWS SQS #1117

Closed
arvidvillen opened this issue Mar 21, 2021 · 24 comments · Fixed by #2500
Closed

Add support for AWS SQS #1117

arvidvillen opened this issue Mar 21, 2021 · 24 comments · Fixed by #2500
Labels
help wanted Extra attention is needed

Comments

@arvidvillen
Copy link

Add support for AWS SQS: https://aws.amazon.com/sqs/

@cescoffier cescoffier added the help wanted Extra attention is needed label Mar 22, 2021
@tcerda95
Copy link

tcerda95 commented Jun 3, 2021

@cescoffier

I would love to collaborate with this implementation but I have a few theoretical doubts when it comes to backpressure and implementing the incoming connector.

First a high level overview of how SQS works. SQS offers a long polling mechanism (max 20 secs), can return up to a maximum of 10 messages per poll/batch and AWS offers an async SDK. So a very simple implementation for a PublisherBuilder I came up with is to poll indefefinitely and create a multi with each of the items per batch. Note that the async SQS client from the SDK returns a completion stage:

    public PublisherBuilder<? extends Message<?>> getSource() {
        Multi<Message<?>> publisher = Uni.createFrom()
            .completionStage(() -> sqsClient.receiveMessage(m -> m.queueUrl(url)))
            .repeat().indefinitely()
            .invoke(() -> System.out.println("Log SQS invocation"))
            .onItem().transformToIterable(ReceiveMessageResponse::messages)
            .onItem().transform(Message::of);

        return ReactiveStreams.fromPublisher(publisher);
    }

However, given that this implementation would poll indefinitely, would not this defeat the purpose of backpressure? Or will this only poll as long as downstream requests for more items?

Finally, thank you very much for your time.

@cescoffier
Copy link
Contributor

I don't now SQS, but Mutiny will only retry when there is request. So, I believe the back pressure will be managed.

@MihaiBogdanEugen
Copy link

@tcerdaITBA I'd like to contribute to this as well. Have you made any progress on your side?

@soulseekeer24
Copy link

hello, anyone still interested on working on this?

@damiankaplon
Copy link

  1. Is there any way for now to use smallrye-reactive-messaging with SQS? As far as i know it does not support amazon SQS for now. Am i right?
  2. Is it possible to wrap SQS Client using JMS so i can integrate smallrye-reactive-messaging via JMS Connector?

@MihaiBogdanEugen
Copy link

  1. You are right.

  2. No.

@soulseekeer24
Copy link

anyone interested on working on adding SQS support ?XD

@adampoplawski
Copy link

Hello
Is there a change for this support? Maybe I can contribute somehow?

@cescoffier
Copy link
Contributor

Sure, a contribution would be more than welcome.

@soulseekeer24
Copy link

i would like to help

@adampoplawski
Copy link

@tcerdaITBA are you still interested? It seems you did some work towards and now it seems there are ppl willing to support.

@holomekc
Copy link

holomekc commented Oct 1, 2023

Hi I started with an implementation:
https://github.com/holomekc/smallrye-reactive-messaging/commits/feature/AWS

I think I have no issues with the actual implementation. But there are some concepts which confuses me a little bit. So maybe you can help me to bring it into the right direction. But first the progress.

Progress:
Moved to the pr: #2402

@holomekc
Copy link

holomekc commented Oct 1, 2023

Questions:

  1. Project structure
    I created an AWS module and a nested SQS module. Should I flatten the structure? I like the nested approach.
    What do you think

  2. OpenTelemetry
    TracingUtils starts and ends the trace immediately. Does not that violate the OpenTelemetry documentation?:

    • Call shouldStart(Context, Object) and do not proceed if it returns false.
    • Call start(Context, Object) at the beginning of a request.
    • Call end(Context, Object, Object, Throwable) at the end of a request.

    In SQS when I send a message I get the messageId only in the response of the SDK lib. I cannot add this information
    to the span. Why isn't AsyncOperationEndStrategy
    used? For batching it is different. There I need to generate the ids before sending. But from an implementation perspective it
    would be easier to do that after the response as well.

  3. Configuration
    For CreateQueue I want to provide the option to define attributes (Map<String, String) and tags (Map<String, String>).
    In case of sending messages I could use the metadata to provide this information. But this does not work for the
    incoming channel. The only way I found is to use the connector configuration. But Maps or Lists are not really supported.
    I need to parse from String manually (key1:value1,key2:value2 or value1,value2). This is something I cannot do inside of
    the message streams over and over again. It is a bit strange, because in microprofile config you could define maps.
    Quarkus also does this all the time.

    In general I think it depends on the use-case. I think for creation it is fine, because you most likely
    do not create that many queues. But what is the proper way to achieve something like this. I saw that sometimes beans
    are used to provide configuration. This is a bit strange to me

  4. Graceful Shutdown
    I started with a terminate method and @BeforeDestroy. I saw that in the other implementations. But this is not really
    graceful. I cannot simply destroy the SDK client and the channels. I need to count some things. E.g. in case 10 messages are
    received I need to wait for the processing and I need to wait for the deletion/confirmation. I did this already in the past in
    a Quarkus lib. There the approach was more clear to me. Am I allowed to block the terminate method? It could take some
    time until the processing is done

Thank you for some help.

@adampoplawski
Copy link

@cescoffier can we get support with code review? It seems more people want to contribute.

@cescoffier
Copy link
Contributor

Sure, @ozangunalp abd I would be happy to review the code

@adampoplawski
Copy link

Hello @cescoffier @ozangunalp . Sorry to ping again but maybe review was forgotten by accident :)

@cescoffier
Copy link
Contributor

@adampoplawski where is the PR?

@adampoplawski
Copy link

Hello @cescoffier
There is branch plus questions from @holomekc how to proceed, sorry for miss information.
https://github.com/holomekc/smallrye-reactive-messaging/commits/feature/AWS

@holomekc
Copy link

I did not create a pr yet, because I did not write tests etc. yet. As @adampoplawski mentioned I wrote some questions. I needed to chill a little bit. I was working to much. I will try to finish what I started asap. I have some freetime soon-ish

@ozangunalp
Copy link
Collaborator

Thanks for this.
There is now a connector contribution guide if it may help you: https://smallrye.io/smallrye-reactive-messaging/4.11.0/concepts/contributing-connectors/

@holomekc
Copy link

holomekc commented Dec 6, 2023

I created the pr. It is not done yet, but maybe review, help etc. is easier to achieve:
#2402

@spc16670
Copy link

Are we planning to merge it anytime soon? Eagerly awaiting this....

@holomekc
Copy link

Nope. Feel free to help. There is still much to do.

@adampoplawski
Copy link

@cescoffier @ozangunalp There is new PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Extra attention is needed
Projects
None yet
Development

Successfully merging a pull request may close this issue.

10 participants