-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed Rate/Scheduled polling of messages #372
Comments
Hmmm so let me just confirm the requirements:
If you are fine with requesting 2 messages at the interval, one way we could do it is to make the concurrency rate smaller than retriever batching rate. E.g. 5 concurrency with 6 as the batching period. In this case the batching message retriever would never hit 6 requests for messages so it would just keep requesting the messages it needs at that 10 second period, e.g. 0-5 messages. If you are using the Spring annotation you might be able to achieve this by doing:
the negative is that it would also buffer the resolving of the messages and only ever do it in the batch period. If that is a problem you could implement your own MessageListenerContainer, like the BatchingMessageListenerContainer, with your specific config. See Spring - how to add your own queue listener for more steps on how to do that. Let me know if that isn't what you are thinking! |
Regarding the requirements
Aha, indeed this seems like a reasonable approach. @QueueListener(concurrency = 5, batchSize = 6, batchingPeriodInMs = 10000) I am not entirely sure i can follow the remark around the resolving of messages. The only negative i can think is that this delays the fact that we tell SQS that we processed the message correctly. |
yeah so we are using that batch period to determine when to obtain more messages as well as how long we should batch the resolving of messages. E.g. you have a batching period of 5 seconds, it will wait that period for all the threads to finish before it would send the delete message request. If we have it as a larger value than concurrency I believe it will always wait that time. The best way would be to not share those properties, e.g. update @QueueListener to have new fields and we would set it here: Line 192 in df9e7a6
|
Yes, thanks for the feedback - this is definitely an option. But we have been looking a bit through the code. For this, some changes would be necessary in the library, which I am willing to make myself. We would like to start from the BatchingMessage functionality and implement our own (Scheduled/Polling/Fixed)MessageRetriever. We would like to have the functionality of constant looping and actual retrieving of the BatchingMesageRetriever separately. We would like to create a ScheduledExecutor which we can start at a scheduledFixedRate. That runs the actual retrieval code of the BatchingMessageRetriever each x ms. To be able to also close the ScheduledExecutor when we close the application, we suggest to incorporate a stop hook in the messageRetriever where we can shutdown the executor. So to summarize:
'* For consistency reasons, we can also add the stop hook in the resolver, ... |
The composition of whether we delegate to a separate class or not isn't a big concern for me, as long as the
I believe in an older version I actually had a stop method previously but it ended up just being easier to just use interruptions as the method for stopping the retriever and other background running tasks. This allows the retriever to cleanup and return any messages that were batched. E.g. what you could do without needing to update the API is: @Override
public List<Message> run() {
myBackgroundScheduler.start(); // whatever this other class is
// whatever your blocking logic would be here
while (!Thread.currentThread.isInterrupted()) {
// do your logic here for collecting messages so that they can be retrieved in any `retrieveMessage` call
// do some sort of thread sleep, blocking etc and make sure we listening to any interrupted exceptions etc so that we always call the stop below
}
// your clean up code to stop the background task
myBackgroundScheduler.stop();
// some way for your retriever to return any previously batched messages so they aren't lost.
// E.g. the case that the consumer wants all messages being processed before the whole container is shutdown
return myBackgroundScheduler.batchedMessages();
} You could probably even just do something like this without needing to pull out the logic: @Override
public List<Message> run() {
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
final CountDownLatch blocker = new CountDownLatch(1);
final ScheduledFuture<?> beeperHandle =
scheduler.scheduleAtFixedRate(() -> {
// retrieve your messages here somehow and make sure that it is thread safe
this.batchedMessages.addAll(newMessages);
}, 10, 10, SECONDS);
try {
blocker.await(); // will never countdown so waits for messages to be available
} catch (InterruptedException e) {
Thread.currentThread.interrupt();
}
// shutdown with whatever logic you want to make sure this is graceful
scheduler.shutdown();
List<Messages> leftoverMessages = new ArrayList<>(this.batchedMessages);
this.batchedMessages.clear();
return leftoverMessages ;
} So given the above I don't think we need a new stop function and therefore no changes to the core library api would be needed? In this case you can always build this scheduled version with corresponding container in your own code base but if you want to contribute back to this repository that is also fine too (though probably more effort from your end to do this). |
You are correct, that with the trick of the CountDownLatch we don't need the stop function anymore. But the pulling out the logic out of the BatchingMessageRetriever would still be handy for us. High-level we would like to do something like below based completely on your BatchingMessageListenerContainer. @Override
public List<Message> run() {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
//The 10 would come as a parameter from a new QueueListener annotation
executorService.scheduleAtFixedRate(this::run2, 0, 10, TimeUnit.SECONDS);
final CountDownLatch blocker = new CountDownLatch(1);
try {
blocker.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
futuresWaitingForMessages.forEach(future -> future.cancel(true));
executorService.shutdownNow();
return Collections.emptyList();
} Run2() would than be this, see the duplication of the code from your original BatchingMessageRetriever: public void run2() {
final Queue<CompletableFuture<Message>> messagesToObtain;
try {
messagesToObtain = obtainRequestForMessagesBatch();
} catch (final InterruptedException interruptedException) {
log.debug("Thread interrupted waiting for batch");
return;
}
log.debug("Requesting {} messages", messagesToObtain.size());
if (messagesToObtain.isEmpty()) {
return;
}
final List<Message> messages;
try {
messages =
CompletableFuture
.supplyAsync(messagesToObtain::size)
.thenApply(this::buildReceiveMessageRequest)
.thenComposeAsync(sqsAsyncClient::receiveMessage)
.thenApply(ReceiveMessageResponse::messages)
.get();
} catch (final RuntimeException | ExecutionException exception) {
// Supposedly the SqsAsyncClient can get interrupted and this will remove the interrupted status from the thread and then wrap it
// in it's own version of the interrupted exception...If this happens when the retriever is being shut down it will keep on processing
// because it does not realise it is being shut down, therefore we have to check for this and quit if necessary
if (exception instanceof ExecutionException) {
final Throwable executionExceptionCause = exception.getCause();
if (executionExceptionCause instanceof SdkClientException) {
if (executionExceptionCause.getCause() instanceof SdkInterruptedException) {
log.debug("Thread interrupted while receiving messages");
return;
}
}
}
log.error("Error request messages", exception);
// If there was an exception receiving messages we need to put these back into the queue
futuresWaitingForMessages.addAll(messagesToObtain);
performBackoff();
} catch (final InterruptedException interruptedException) {
log.debug("Thread interrupted while waiting for batch of messages");
return;
}
log.debug("Downloaded {} messages", messages.size());
if (messages.size() > messagesToObtain.size()) {
log.error("More messages were downloaded than requested, this shouldn't happen");
}
for (final Message message : messages) {
final CompletableFuture<Message> completableFuture = messagesToObtain.poll();
if (completableFuture != null) {
completableFuture.complete(message);
}
}
// Any threads that weren't completed send back for processing again
futuresWaitingForMessages.addAll(messagesToObtain);
} As you can see, the logic in the run2() is the same as in the BatchingMessageRetriever. If we could keep the code in the run2() in the library shared by both retrievers, there would be no duplication. What is your opinion on extracting a part of the logic in the BatchingMessageRetriever and use it in a new one? |
yeah if a lot of the logic can be shared that makes sense to me to pull them out! |
Hi, first thanks for the great library.
We are currently using your library but now want to use it to retrieve messages at a fixed rate/schedule.
As in poll for 5 messages each 10 seconds, even if you are earlier done with processing of those 5 messages.
We can't use the BatchingPeriod in the BatchingMessageRetreiverProperties as that is the maximum period it waits.
I didn't found a way to do this, so i took a look at the code.
I found out it could be possible with a new implementation of the MessageRetriever that uses a Scheduled Executor.
So that it can run at a fixed amount of time.
Would you welcome such an addition to your library or did i miss some way it could work with the current code?
Or would you prefer a different approach?
I am willing to make a pull request.
The text was updated successfully, but these errors were encountered: