-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Since version 2.7, paused consumer auto-resume without asking just after pausing. #1866
Comments
Hi @theogoria, thanks for your input. There are some things I don't quite understand here. First thing is that pausing the consumer doesn't prevent it from polling - it will continue polling as usual and retrieve 0 records each time. So your logs shows the expected behaviour. If polling a paused partition did retrieve any records then we'd have a problem. Second thing is that for resuming partitions we have this filter: So I don't think there's any auto-resuming going on there. Am I missing something? That being said, I see there's room for the pauseConsumer and pausePartition to have conflicts. For example if the user pauses a partition and then resumes the consumer, the container pause state will be left inconsistent and prevent the user from pausing the partition again. The easiest solution I think would be adding / removing the paused / resumed partitions from the pausedConsumerPartitions collection, or simply delegate all pausing to the pause partition method:
Since partition resuming and pausing is done after the consumer one we'd get it right next. One side effect would be firing PartitionPausedEvents when we pause the consumer, not sure if that's a behavior we want and whether it's a breaking change. Which goes along the lines proposed in the PR, it just has an unnecessary manipulation of the pausedPartitions list after. Either way it's not an as urgent matter as it sounded first I guess. @garyrussell, what do you think of this? Thanks! |
I don't mind combining them; but there is a possibility that a partition is paused for retry; then the user pauses the entire container and resumes it (before the paused partition would normally be resumed). But, I suppose if that was the case, the delivery would be rejected again because it is too early, and the partition re-paused for the remainder of the pause time. If there truly is a bug (that's why I would like to see a test case that exposes it without any changes to the container), then I would prefer to get it into next Monday's release. |
It seems there is a bug... @SpringBootApplication
public class Kgh1866Application {
public static void main(String[] args) {
SpringApplication.run(Kgh1866Application.class, args);
}
@KafkaListener(id = "kgh1866", topics = "kgh1866")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("kgh1866").partitions(4).replicas(1).build();
}
@EventListener
public void listener(ListenerContainerIdleEvent event) {
event.getContainer(ConcurrentMessageListenerContainer.class).pause();
}
} logging.level.org.springframework.kafka=debug
spring.kafka.listener.idle-event-interval=5000
|
Hmm, you're right, I see it now. The only thing I don't understand from the open PR is the need for adding the partitions to the list manually after pausing / resuming them, since they are already added by the methods. For the BackOff behavior when a consumer is resumed I think what you described should be good enough, with the user resuming the consumer and the backed off partitions pausing again if necessary. The last point would be about the PartitionPaused / Resumed Event, that would be fired for each partition when the consumer is paused / resumed. It seems it's a small change of contract, but I don't know if that's acceptable. |
This filter .filter(tp -> !isPartitionPauseRequested(tp)
&& pausedConsumerPartitions.contains(tp)) will always return true for partitions that have not been individually paused. I think the simplest solution is only call it if the container is not paused if (!this.consumerPaused) {
resumePartitionsIfNecessary();
} |
What about the opposite? If a partition is paused and we resume the consumer, the partition will be resumed by resumeConsumerIfNecessary and then paused again in the next loop by pausePartitionIfNecessary, right? |
Hello @tomazfernandes and @garyrussell and thanks for your replies. In my PR I considered that pausing a consumer is basically pausing all the partitions so effectively that introduce a kind of conflict between the functionalities of pausing a consumer and pausing a partition but that allow things such as pausing the consumer and then only resume one partition. And I have added the partitions manually to |
Yes, we could track actually paused partitions and not resume them when the container is resumed. Although, as I said, that will be self-correcting anyway. |
Yes, call Do you wan't me to update my PR or to close it ? |
Feel free to update the PR, or I will take it tomorrow. |
Resolves spring-projects#1866 The new retryable topic feature pauses/resumes individual partitions. This broke normal container pause/resume by incorrectly resuming partitions that were paused by the container pause operation. Similarly, if individual partitions were paused and then the container was paused and resumed, the container resumed all partitions. Decouple the functionality to prevent this cross-talk. Do not resume any individually paused partitions when the container is in a paused state. Do not resume any individually paused partitions when the container is resumed. Also Use a `ConcurrentHashMap.newKeySet()` instead of synchronization on partition pause requests. Use `getAssignedPartitions()` to allow the retry topic feature to work with manual assignments. Add tests to verify no cross-talk between pausing individual partitions and the container.
Resolves #1866 The new retryable topic feature pauses/resumes individual partitions. This broke normal container pause/resume by incorrectly resuming partitions that were paused by the container pause operation. Similarly, if individual partitions were paused and then the container was paused and resumed, the container resumed all partitions. Decouple the functionality to prevent this cross-talk. Do not resume any individually paused partitions when the container is in a paused state. Do not resume any individually paused partitions when the container is resumed. Also Use a `ConcurrentHashMap.newKeySet()` instead of synchronization on partition pause requests. Use `getAssignedPartitions()` to allow the retry topic feature to work with manual assignments. Add tests to verify no cross-talk between pausing individual partitions and the container. * Fix race in test.
Resolves #1866 The new retryable topic feature pauses/resumes individual partitions. This broke normal container pause/resume by incorrectly resuming partitions that were paused by the container pause operation. Similarly, if individual partitions were paused and then the container was paused and resumed, the container resumed all partitions. Decouple the functionality to prevent this cross-talk. Do not resume any individually paused partitions when the container is in a paused state. Do not resume any individually paused partitions when the container is resumed. Also Use a `ConcurrentHashMap.newKeySet()` instead of synchronization on partition pause requests. Use `getAssignedPartitions()` to allow the retry topic feature to work with manual assignments. Add tests to verify no cross-talk between pausing individual partitions and the container. * Fix race in test. # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
Siva Divi : It is a Bug . The above code skips delay and resuming partitions immediately. |
would you mind to raise a new fresh issue with much more info to let us to reproduce and against the supported version, not this |
Affects Version(s): 2.7.1
Hello,
I think I have found a bug with the pause method of Listener Containers.
When I call
registry.getListenerContainers().forEach(MessageListenerContainer::pause);
I have something like this in the log :We can see that the consumtion has resumed without the call of the resume method.
I think it's because the addition of the possibility to pause partitions in version 2.7.
When the pause of the consumer is requested all the partitions are paused but they are not added to the collection
pauseRequestedPartitions
of theAbstractMessageListenerContainer
so they are not returned byisPartitionPauseRequested
and they are restarted by the methodKafkaMessageListenerContainer::resumePartitionsIfNecessary
at the end of the poll.The text was updated successfully, but these errors were encountered: