Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support passing a List of batched SQS Messages as an argument #337

Open
stephencassidy-r7 opened this issue Oct 1, 2020 · 3 comments
Open
Labels
determining desire Figuring out whether there is a large desire for this feature to warrant development enhancement New feature or request

Comments

@stephencassidy-r7
Copy link

Another edge case that I think could be useful for others 馃槃

From what I understand - at the minute the listener container to request up to 10 messages from a queue, and will pass each message to available consumer threads for processing. We can accept the message, or message payload as an argument to a consumer method.

Would it be possible to support passing all the messages from the receive message request to a consumer method so they can be processed in a batch? So if we have something like

 @PrefetchingQueueListener(
        identifier = "my-queue,
        value = "my-queue-url",
        concurrencyLevel = 10
    )
    public void onMessage(final List<Message> messages) {
        // process all the messages from the receive message request on a single thread
    }

If the container gets 10 messages from the request, a single thread will get those 10, then the next available thread gets the next etc to allow the consumer method to process things in a batch.

I believe this could break alot of things - for example, I'm not sure how you could pass MessageAttributes as a method parameter without having to do additional mapping, but (for myself at least 馃槢 ) I think this would be an acceptable trade off for increased throughput.

Let me know if I can explain things a bit better, I had attempted to write my own argument resolver but was unable to do so but I can share some of the code if you want

@JaidenAshmore
Copy link
Owner

Hey thanks for the suggestion!

So the whole framework has been written with the concept of moving a single message around each component so updating it to cover both use cases, while still being performant with thread usage, would be a considerable effort/redesign. e.g. the the flow from MessageRetriever -> MessageBroker -> MessageProcessor are all dealing with a single message and would involve complete redesigns of those interfaces to support this use case.

Also, as you said, it would
break a lot of existing functionality when it comes to argument resolvers and also break the decorators like Xray/Brave tracing. So unfortunately due to the amount of work it would be, I don't think this would be a use-case that I would try to support in this framework unless there was a lot of demand for it.

You can of course provide your own batching logic in your listener, e.g. the very high level pseudo code

@PrefetchingQueueListener(
        identifier = "my-queue,
        value = "my-queue-url",
        concurrencyLevel = 10
    )
    public void onMessage(final Message messages) {
       // add the message to some batch that is read by a different thread

       // wait until the batch is complete and return when done
    }

    // this function is run on some other thread somehow
    void batchingThread() {
            while(notShutdown) {
                 // wait for batch of messages
                 final List<Message> message = ...;

                 // process the batch on this thread
            }
     }
}

But at this point, considering how much effort it would be on your side to implement this and the lack of performance due to the frameworks threads sitting idle waiting for the batch to complete I am not sure if there is much value in that.

I guess I am still not seeing the use case for this, so happy for you to enlighten me. What were you wanting to gain from doing the batch of messages? e.g. are you trying to reduce the number of threads running or are certain messages related to each other and therefore doing it in a batch you can reduce the number of messages actually processed etc.

@scassidy1986
Copy link

Yeah - this library has been great so far for simplifying alot of common things, like injecting messages attributes and converting the payloads and passing in the batches would break this (unless everything was modified to return a Map<Message, MessageAttributeValue>).

Your pseudo code is pretty much what we have implemented 馃槤 - for our use case, we have lots of consumer services that listen for events on SQS, do something with the payload and persist it somewhere. In some cases, we've noticed having lots of consuming threads persisting small batches does not perform as well as having 1 consumer thread persisting larger batches (mostly database transactions, where multiple threads writing increases the small chance of deadlocks etc), so we were going buffer the messages, and have another thread running every X seconds task that would consume and process the batched messages - roughly something like

@QueueListener(...)
public void consumer(@Payload final EventType event) {
    buffer.add(BatchedEvent.builder(event)
        .onSuccess((event) -> // delete the message)
        .onFailure((event, exception) -> // perform some back-off logic and log the error)
        .build()
    );
}

@Scheduled
public void batchProcessor() {
    final List<BatchedEvent> events = new ArrayList<>();
    buffer.drainTo(events);
    try {
        // flatten and process events
        events.forEach(event -> event.onSuccess(event));
    } catch (Exception e) {
        events.forEach((event, e) -> event.onFailure(e));
    }
}

Since we can request up to 10 messages at a time from from SQS (and for the busier services there will always be enough messages) I was wondering if there was a way we could set the argument on the consumer method to be a List<...>, I had tried (badly 馃槃) at implementing a MessageContainer that could accept and pass a list of messages but I see how this could involve re-working alot of components for something that in reality could be done outside the consumer!

After thinking about it more I can close this issue if you want?

@JaidenAshmore
Copy link
Owner

Okay cool I see the desire for it.

Thinking about this more, I might be possible to semi support this with only implementation changes instead of needing to modify the API of this library. I have included a basic high level design below.

Unfortunately, due to the time it would take to do this, I don't think I would get around to doing this (or at least not in the next few months so that wouldn't help you anyway).

For now, I would say to just keep doing the batching in your listener. There could be tweaks that you could to do reduce the number of thread switches to improve performance e.g. make concurrency size 1 and use the Acknowledge parameter to manually mark the message as processed (You would need to measure to see if that is more performant 馃 ).

If that is not performant enough you could even just use the raw AWS SDK directly instead of using this library. Lots of references in this codebase for how to do that but happy to point you in a direction if you need help with the SDK.

I still think we should keep this issue open though so if others find it and desire the feature they can bump it which could kick me into gear to do it. Also happy to take PRs for this if someone takes the initiative to do it, you can see the sort of changes that I did for the FifoListener in this PR#335 which is very similar to what you would need to do here.

High Level Design

Core Changes

MessageBroker

We create a new broker, pretty much the same as the ConcurrentMessageBroker, that will take a message from the MessageRetriever and on the same broker thread send it to the MessageProcessor. This is in comparison to the ConcurrentMessageBroker which will call into the MessageProcessor with each message on its own message processing thread.

MessageProcessor

We implement a new processor that will collect these messages into a batch and after a limit is hit, say 10 messages, you call the method that accepts a list of messages via reflection or a lambda, etc.

Argument resolution

We would need to modify how the argument resolution works for this use case. e.g. one approach we could take is by making sure all arguments have an annotation on a list parameter.

void listener(@MessageId List<String> messageIds, @Messages List<Message> messages)

Unfortunately, due to type erasure, nothing would stop us in the above supplying a List<Integer> for the message Ids and having a whole bunch of runtime exceptions. It would also mean we would need to have a new annotation for the messages field to make sure it is being set correctly OR we just dump the list of messages in any field that is a List but doesn't have an annotation.

MessageListenerContainer

We implement our own container that will use these new components, and maybe the PrefetchingMessageRetriever.

Spring Changes

We create a new annotation and factory that builds the container above. I don't think this would be too significant effort, similar to what we did for the @FifoListener.

Would look like:

@MessageBatchingListener(value="queueUrl", maxBatchSize = 10)
public void listener(@MessageId List<String> messageIds, @Messages List<Message> messages) {
        // do your batching logic
}

Improvements

  • This should have less thread switching than the current implementations and so should be more performant. However, there is still overhead with moving these messages around in the container.

Problems

  • Argument resolution would need to be modified to support this, not sure if this is a breaking change or just an addition.
  • I think this would completely break the MessageProcessorDecorators, e.g. Brave or XRay tracing would just not work or at least not supplying the correct traces.

@JaidenAshmore JaidenAshmore added enhancement New feature or request determining desire Figuring out whether there is a large desire for this feature to warrant development labels Oct 7, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
determining desire Figuring out whether there is a large desire for this feature to warrant development enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants