Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable sending String payloads with JSON content type in SqsTemplate #1144

Closed
tomazfernandes opened this issue May 2, 2024 · 9 comments
Closed
Labels
component: sqs SQS integration related issue status: ideal-for-contribution We agree it's nice to have but it is not team priority type: enhancement Smaller enhancement in existing integration

Comments

@tomazfernandes
Copy link
Contributor

Currently due to Spring Messaging internals if we send a String message containing a JSON without specifying a Content-Type it'll default to text/plain in the StringMessageConverter.

If we set .header(MessageHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON), it will be mapped by MappingJackson2MessageConverter which will double serialize the payload, which won't be readable.

We should look into a way of enabling users to send a String json with APPLICATION_JSON content type so it can be properly deserialized by the consumer.

@tomazfernandes tomazfernandes added component: sqs SQS integration related issue status: ideal-for-contribution We agree it's nice to have but it is not team priority type: enhancement Smaller enhancement in existing integration labels May 2, 2024
@imsosleepy
Copy link
Contributor

imsosleepy commented May 11, 2024

@tomazfernandes
I'm looking into this issue, but my sample code doesn't seem to be checking for this issue properly.

Below is the code I used, with some modifications from the SQS sample code in this project.

@SpringBootApplication
public class SpringSqsSample {

	private static final Logger LOGGER = LoggerFactory.getLogger(SpringSqsSample.class);

	private static final String QUEUE_NAME = "test-queue";

	public static void main(String[] args) {
		SpringApplication.run(SpringSqsSample.class, args);
	}

	@SqsListener(queueNames = QUEUE_NAME, acknowledgementMode = "ON_SUCCESS")
	void listen(Object message) {
		LOGGER.info("Received message : " + message);
	}

	@Bean
	public ApplicationRunner sendMessageToQueue2(SqsTemplate sqsTemplate) {
		String jsonString = """
			{
			  "message1": "hello",
			  "message2": "sqs!"
			}
			""";

		return args -> sqsTemplate.send(to -> to.queue(QUEUE_NAME).payload(jsonString).delaySeconds(30).header(MessageHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON));
	}
}

Can you tell me if there's something wrong with my code or give me an example of a situation where it would be double serialize?

@tomazfernandes
Copy link
Contributor Author

@imsosleepy, please try to receive a record MyPojo(String message1, String message2){} in the listener:
void listen(MyPojo message) {...}

@imsosleepy
Copy link
Contributor

imsosleepy commented Jun 10, 2024

@tomazfernandes After modifying the code as per your advice, I proceeded to test it again.

@SpringBootApplication
public class SpringSqsSample {

	private static final Logger LOGGER = LoggerFactory.getLogger(SpringSqsSample.class);

	private static final String QUEUE_NAME = "test-queue";

	public static void main(String[] args) {
		SpringApplication.run(SpringSqsSample.class, args);
	}

	@SqsListener(queueNames = QUEUE_NAME)
	void listen(SampleRecord message) {
		LOGGER.info("Received message {} {}", message.propertyOne(), message.propertyTwo());
	}


	@Bean
	public ApplicationRunner sendMessageToQueue2(SqsTemplate sqsTemplate) {
		String jsonString = """
			{
			  "message1": "hello",
			  "message2": "sqs!"
			}
			""";

		return args -> sqsTemplate.send(to -> to.queue(QUEUE_NAME).payload(jsonString).delaySeconds(30).header(MessageHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON));
	}

	private record SampleRecord(String propertyOne, String propertyTwo) {
	}

}

Then, I am getting the error message below.

java.util.concurrent.CompletionException: io.awspring.cloud.sqs.listener.AsyncAdapterBlockingExecutionFailedException: Error executing action in BlockingMessageListenerAdapter
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:674) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:662) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2200) ~[na:na]
	at io.awspring.cloud.sqs.listener.pipeline.MessageListenerExecutionStage.process(MessageListenerExecutionStage.java:49) ~[classes/:na]
	at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$ComposingMessagePipelineStage.lambda$process$0(MessageProcessingPipelineBuilder.java:80) ~[classes/:na]
	at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2341) ~[na:na]
	at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$ComposingMessagePipelineStage.process(MessageProcessingPipelineBuilder.java:80) ~[classes/:na]
	at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$FutureComposingMessagePipelineStage.process(MessageProcessingPipelineBuilder.java:104) ~[classes/:na]
	at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$FutureComposingMessagePipelineStage.process(MessageProcessingPipelineBuilder.java:104) ~[classes/:na]
	at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$FutureComposingMessagePipelineStage.process(MessageProcessingPipelineBuilder.java:104) ~[classes/:na]
	at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$FutureComposingMessagePipelineStage.process(MessageProcessingPipelineBuilder.java:104) ~[classes/:na]
	at io.awspring.cloud.sqs.listener.sink.AbstractMessageProcessingPipelineSink.lambda$execute$0(AbstractMessageProcessingPipelineSink.java:99) ~[classes/:na]
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1768) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:1589) ~[na:na]
Caused by: io.awspring.cloud.sqs.listener.AsyncAdapterBlockingExecutionFailedException: Error executing action in BlockingMessageListenerAdapter
	at io.awspring.cloud.sqs.listener.AsyncComponentAdapters$AbstractThreadingComponentAdapter.wrapWithBlockingException(AsyncComponentAdapters.java:162) ~[classes/:na]
	at io.awspring.cloud.sqs.listener.AsyncComponentAdapters$AbstractThreadingComponentAdapter.runInSameThread(AsyncComponentAdapters.java:124) ~[classes/:na]
	at io.awspring.cloud.sqs.listener.AsyncComponentAdapters$AbstractThreadingComponentAdapter.execute(AsyncComponentAdapters.java:111) ~[classes/:na]
	at io.awspring.cloud.sqs.listener.AsyncComponentAdapters$BlockingMessageListenerAdapter.onMessage(AsyncComponentAdapters.java:208) ~[classes/:na]
	... 15 common frames omitted
Caused by: io.awspring.cloud.sqs.listener.ListenerExecutionFailedException: Listener failed to process messages 218c1f35-667d-4131-92d5-a16b256edcca
	at io.awspring.cloud.sqs.listener.adapter.AbstractMethodInvokingListenerAdapter.createListenerException(AbstractMethodInvokingListenerAdapter.java:79) ~[classes/:na]
	at io.awspring.cloud.sqs.listener.adapter.AbstractMethodInvokingListenerAdapter.createListenerException(AbstractMethodInvokingListenerAdapter.java:83) ~[classes/:na]
	at io.awspring.cloud.sqs.listener.adapter.AbstractMethodInvokingListenerAdapter.invokeHandler(AbstractMethodInvokingListenerAdapter.java:59) ~[classes/:na]
	at io.awspring.cloud.sqs.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:41) ~[classes/:na]
	at io.awspring.cloud.sqs.listener.AsyncComponentAdapters$BlockingMessageListenerAdapter.lambda$onMessage$0(AsyncComponentAdapters.java:208) ~[classes/:na]
	at io.awspring.cloud.sqs.listener.AsyncComponentAdapters$AbstractThreadingComponentAdapter.runInSameThread(AsyncComponentAdapters.java:120) ~[classes/:na]
	... 17 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [io.awspring.cloud.sqs.sample.SpringSqsSample$SampleRecord] for GenericMessage [payload={
  "message1": "hello",
  "message2": "sqs!"
}

However, this seems to be natural: the error message says that the jsonString and the SampleRecord don't match. Still, double serialization didn't occur. I think I need a more detailed guide.

@tomazfernandes
Copy link
Contributor Author

@imsosleepy I get a different error here:

Caused by: org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot construct instance of com.test.SpringSqsSample$SampleRecord (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('{ "message1": "hello", "message2": "sqs!" }') at [Source: (String)""{\n \"message1\": \"hello\",\n \"message2\": \"sqs!\"\n}\n""; line: 1, column: 1]

I remember seeing the double serialization at some point, but I might have got it wrong, or it might only happen with some specific configuration.

But that's ok - the overall goal is to be able to send raw JSON with the template and have it be properly deserialized to a POJO / record in the listener, requiring as few changes / configurations as possible.

Please let me know if that helps. Thanks.

@imsosleepy
Copy link
Contributor

imsosleepy commented Jun 25, 2024

@tomazfernandes I identified what the issue was and created a PR that fixed it. This issue was not a case of double serialization, but simply an incorrectly created String that could not be deserialized to an Object.

Please see the images below.

image
The first payload was successfully processed. This is the case that I made to succeed.

image
The second case, which is the one we're facing now, is the data stored in SQS when we send raw Json in the form of a String. The second payload was not processed correctly because it contained unnecessary newlines and backslashes.

To fix this issue, I implemented the getPayloadToDeserialize method of MessagingMessageConverter to create the body of the message as an ObjectNode and then convert it back to a String.

However, I need to verify if it is appropriate to have an ObjectMapper in its current location.

Further testing might be necessary, but I would appreciate any advice on the best location for it.

I may need to test it further, but I would appreciate any advice on where to put it.

The PR is here : #1168

@tomazfernandes
Copy link
Contributor Author

tomazfernandes commented Jun 28, 2024

Hey @imsosleepy,

Perhaps a better way would be making the getMappingJackson2MessageConverter from AbstractMessagingMessageConverter protected and using its Object Mapper instead.

Would it be possible for you to create an integration test to assert this solution in fact resolves the problem?

Also, since this is an edge case, it would probably be best if we could only do this extra step if necessary.

Let me know your thoughts, thanks.

@tomazfernandes
Copy link
Contributor Author

Hey @imsosleepy, looking at this again, I wonder if maybe the right way to go would be making it so that the message is not sent with the line breaks.

Otherwise we may fix this for our listeners, but if the message is read by a consumer using other language / library that can't read both formats it might get confusing.

We'd need to check what exactly is adding the line breaks and make sure they're not added to the final payload.

What do you think?

@imsosleepy
Copy link
Contributor

@tomazfernandes

I think it is appropriate to resolve this issue on the receiving side.

The current cause of the failure in object conversion lies not only in the strings that contain line breaks but also in the process of determining the targetType.

Even if the correct type is assigned to JavaType in the part where the type of the payload is determined, if the payload is a JsonString, it is identified as a string, causing an issue in object conversion.

Therefore, the sending side cannot fully resolve the issue.

Please review the final PR : #1168

@nilesh-chordia
Copy link

@imsosleepy I get a different error here:

Caused by: org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot construct instance of com.test.SpringSqsSample$SampleRecord (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('{ "message1": "hello", "message2": "sqs!" }') at [Source: (String)""{\n \"message1\": \"hello\",\n \"message2\": \"sqs!\"\n}\n""; line: 1, column: 1]

I remember seeing the double serialization at some point, but I might have got it wrong, or it might only happen with some specific configuration.

But that's ok - the overall goal is to be able to send raw JSON with the template and have it be properly deserialized to a POJO / record in the listener, requiring as few changes / configurations as possible.

Please let me know if that helps. Thanks.

I am also getting same error
during springcloud update to 3.1.1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
component: sqs SQS integration related issue status: ideal-for-contribution We agree it's nice to have but it is not team priority type: enhancement Smaller enhancement in existing integration
Projects
None yet
Development

No branches or pull requests

3 participants