Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented reconnect and downstream propagation when reading messages #2572

Merged
merged 2 commits into from
Apr 11, 2024

Conversation

dankristensen
Copy link
Contributor

Here is a PR which makes reconnect for jms possible.

I have verified the functionality using IBM MQ Series, and can confirm that it is working.

Unfortunately i cannot see how we can create a test for this, so it would be nice with some input on how to do this.

All existing tests are all passing

@ozangunalp
Copy link
Collaborator

@dankristensen Thank you for opening this PR! I've looked into this yesterday.

I still think we need to listen for exceptions on the JMSContext and recreate the context. I have a working solution built on top of your change. I tried not to rewrite the whole connector. For testing I simply restart the broker and continue to send/receive messages.

If you are ok with it, I can push my changes and we can discuss on it?

@dankristensen
Copy link
Contributor Author

@ozangunalp Please do so. I am only interested in a working solution.

@ozangunalp
Copy link
Collaborator

@dankristensen please check on your end with IBM MQ.

Also, please note that we are not planning to increase our support of the JMS connector. That being said I can review/discuss if you are willing to contribute.

@dankristensen
Copy link
Contributor Author

@ozangunalp I have checked and verified this in our end, on a real IBM MQ server. It still works flawlessly. And thanks for the changes you have made. I now understand what you mean by the Holder class.

I think this functionality looks good now, but i still have a concern about guaranteed delivery, but guess this is something for another PR. I have looked a little bit into this, but cannot see exactly how this can be implemented when using a session-mode that is NOT AUTO_ACKNOWLEDGE. Maybe you have some clues how this can be implemented when an @inbound message returns a CompletableFuture.failedFuture

@ozangunalp
Copy link
Collaborator

The session modes CLIENT_ACKNOWLEDGE and DUPS_OK_ACKNOWLEDGE should work as expected as the message ack calls the message.acknowledge. For transacted sessions, we don't think that it easily fits the reactive messaging execution model.

@ozangunalp
Copy link
Collaborator

@cescoffier can you take a quick look ?

@dankristensen
Copy link
Contributor Author

So then maybe i am doing it wrong, when i am using CLIENT_ACKNOWLEDGE.

I have done it like this:
@Incoming("SOME_CHANNEL") public CompletionStage<Void> handleStartJob(Message<String> message) { try { String startJobEvent = message.getPayload(); if ("FAIL".equals(startJobEvent)) { throw new IllegalArgumentException("Fail"); } return message.ack(); } catch (Exception e) { return CompletableFuture.failedFuture(e); } }

@ozangunalp
Copy link
Collaborator

If you return a failed future it'll stop the stream. Note that there isn't any negative acknowledgement for JMS. When you call message.nack it'll simply log the failure.

For a simpler implementation, you can omit the ack and nack, the framework will ack/nack for you:

@Incoming("SOME_CHANNEL")
public void handleStartJob(String message) {
    if ("FAIL".equals(message)) {
        throw new IllegalArgumentException("Fail");
    }
}

@dankristensen
Copy link
Contributor Author

I know that i Can use it without the Message, but i need the headers. I just didn’t add it in this example. So should i throw an exception instead of returning a failed future

@ozangunalp
Copy link
Collaborator

I know that i Can use it without the Message, but i need the headers. I just didn’t add it in this example. So should i throw an exception instead of returning a failed future

You can inject the metadata as the 2nd parameter of your method

@Incoming("SOME_CHANNEL")
public void handleStartJob(String message, IncomingJmsMessageMetadata metadata) {
    // metadata.
    if ("FAIL".equals(message)) {
        throw new IllegalArgumentException("Fail");
    }
}

@dankristensen
Copy link
Contributor Author

Thanks a lot i Will try this tommorow

@dankristensen
Copy link
Contributor Author

Regarding applying an exceptionlistener to the jmscontext, i actually tried that, but got an error back from ibm mq, that i could not assign a clientid, when it was already in use. This was before your changes, so i Will also try this again tommorow.

@cescoffier cescoffier added the jms label Apr 11, 2024
@ozangunalp
Copy link
Collaborator

@dankristensen waiting for your feedback with MQ to go forward with this

@dankristensen
Copy link
Contributor Author

I have verified the reconnect funtionality against an IBM MQ Server. It is reconnecting as i expect. I find issues regarding acknowledgement, i will open another issue regarding that. Thank you so much for your help. It has been a real pleasure with your valuable feedback.

@ozangunalp ozangunalp merged commit cfce2e3 into smallrye:main Apr 11, 2024
4 checks passed
@ozangunalp ozangunalp added this to the 4.21.0 milestone Apr 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants