Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Since version 2.7, paused consumer auto-resume without asking just after pausing. #1866

Closed
theogoria opened this issue Jul 13, 2021 · 12 comments · Fixed by #1868
Closed

Since version 2.7, paused consumer auto-resume without asking just after pausing. #1866

theogoria opened this issue Jul 13, 2021 · 12 comments · Fixed by #1868

Comments

@theogoria
Copy link

theogoria commented Jul 13, 2021

Affects Version(s): 2.7.1

Hello,

I think I have found a bug with the pause method of Listener Containers.
When I call registry.getListenerContainers().forEach(MessageListenerContainer::pause); I have something like this in the log :

2021-07-06T18:20:16,650- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Paused consumption from: [partition-0]
2021-07-06T18:20:16,678- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Received: 0 records
2021-07-06T18:20:16,678- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Paused consumption from: [partition-1]
2021-07-06T18:20:16,718- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Received: 0 records
2021-07-06T18:20:16,718- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Paused consumption from: [partition-2]
2021-07-06T18:20:16,767- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Received: 0 records
2021-07-06T18:20:16,767- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Paused consumption from: [partition-3]
2021-07-06T18:20:21,651- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Resumed consumption from [partition-0]
2021-07-06T18:20:21,652- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Received: 0 records
2021-07-06T18:20:21,679- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Resumed consumption from [partition-1]
2021-07-06T18:20:21,679- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Received: 0 records
2021-07-06T18:20:21,719- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Resumed consumption from [partition-2]
2021-07-06T18:20:21,719- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Received: 0 records
2021-07-06T18:20:21,767- [DEBUG] [T-1] k.l.KafkaMessageListenerContainer$ListenerConsumer - [{}] - Resumed consumption from [partition-3]

We can see that the consumtion has resumed without the call of the resume method.

I think it's because the addition of the possibility to pause partitions in version 2.7.

When the pause of the consumer is requested all the partitions are paused but they are not added to the collection pauseRequestedPartitions of the AbstractMessageListenerContainer so they are not returned by isPartitionPauseRequested and they are restarted by the method KafkaMessageListenerContainer::resumePartitionsIfNecessary at the end of the poll.

theogoria pushed a commit to theogoria/spring-kafka that referenced this issue Jul 13, 2021
@tomazfernandes
Copy link
Contributor

tomazfernandes commented Jul 13, 2021

Hi @theogoria, thanks for your input.

There are some things I don't quite understand here.

First thing is that pausing the consumer doesn't prevent it from polling - it will continue polling as usual and retrieve 0 records each time. So your logs shows the expected behaviour. If polling a paused partition did retrieve any records then we'd have a problem.

Second thing is that for resuming partitions we have this filter:
.filter(tp -> !isPartitionPauseRequested(tp) && pausedConsumerPartitions.contains(tp))
So if the partition was not paused by the pausePartition method, pausedConsumerPartitions will not contain the partition, and that partition will be filtered out and not resumed.

So I don't think there's any auto-resuming going on there. Am I missing something?

That being said, I see there's room for the pauseConsumer and pausePartition to have conflicts. For example if the user pauses a partition and then resumes the consumer, the container pause state will be left inconsistent and prevent the user from pausing the partition again.

The easiest solution I think would be adding / removing the paused / resumed partitions from the pausedConsumerPartitions collection, or simply delegate all pausing to the pause partition method:

Set<TopicPartition> paused = this.consumer.paused(); paused .forEach(KafkaMessageListenerContainer.this::resumePartition);

Since partition resuming and pausing is done after the consumer one we'd get it right next. One side effect would be firing PartitionPausedEvents when we pause the consumer, not sure if that's a behavior we want and whether it's a breaking change.

Which goes along the lines proposed in the PR, it just has an unnecessary manipulation of the pausedPartitions list after.

Either way it's not an as urgent matter as it sounded first I guess.

@garyrussell, what do you think of this?

Thanks!

@garyrussell
Copy link
Contributor

I don't mind combining them; but there is a possibility that a partition is paused for retry; then the user pauses the entire container and resumes it (before the paused partition would normally be resumed). But, I suppose if that was the case, the delivery would be rejected again because it is too early, and the partition re-paused for the remainder of the pause time.

If there truly is a bug (that's why I would like to see a test case that exposes it without any changes to the container), then I would prefer to get it into next Monday's release.

@garyrussell
Copy link
Contributor

It seems there is a bug...

@SpringBootApplication
public class Kgh1866Application {

	public static void main(String[] args) {
		SpringApplication.run(Kgh1866Application.class, args);
	}


	@KafkaListener(id = "kgh1866", topics = "kgh1866")
	public void listen(String in) {
		System.out.println(in);
	}

	@Bean
	public NewTopic topic() {
		return TopicBuilder.name("kgh1866").partitions(4).replicas(1).build();
	}

	@EventListener
	public void listener(ListenerContainerIdleEvent event) {
		event.getContainer(ConcurrentMessageListenerContainer.class).pause();
	}

}
logging.level.org.springframework.kafka=debug

spring.kafka.listener.idle-event-interval=5000
2021-07-13 17:44:25.747 DEBUG 79452 --- [  kgh1866-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2021-07-13 17:44:25.748 DEBUG 79452 --- [  kgh1866-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {}
2021-07-13 17:44:25.749 DEBUG 79452 --- [  kgh1866-0-C-1] essageListenerContainer$ListenerConsumer : Paused consumption from: [kgh1866-0, kgh1866-2, kgh1866-1, kgh1866-3]
2021-07-13 17:44:30.754 DEBUG 79452 --- [  kgh1866-0-C-1] essageListenerContainer$ListenerConsumer : Resumed consumption from [kgh1866-0, kgh1866-2, kgh1866-1, kgh1866-3]
2021-07-13 17:44:30.755 DEBUG 79452 --- [  kgh1866-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records

@tomazfernandes
Copy link
Contributor

Hmm, you're right, I see it now.

The only thing I don't understand from the open PR is the need for adding the partitions to the list manually after pausing / resuming them, since they are already added by the methods.

For the BackOff behavior when a consumer is resumed I think what you described should be good enough, with the user resuming the consumer and the backed off partitions pausing again if necessary.

The last point would be about the PartitionPaused / Resumed Event, that would be fired for each partition when the consumer is paused / resumed. It seems it's a small change of contract, but I don't know if that's acceptable.

@garyrussell
Copy link
Contributor

garyrussell commented Jul 13, 2021

This filter

.filter(tp -> !isPartitionPauseRequested(tp)
	&& pausedConsumerPartitions.contains(tp))

will always return true for partitions that have not been individually paused.

I think the simplest solution is only call it if the container is not paused

			if (!this.consumerPaused) {
				resumePartitionsIfNecessary();
			}

@tomazfernandes
Copy link
Contributor

What about the opposite? If a partition is paused and we resume the consumer, the partition will be resumed by resumeConsumerIfNecessary and then paused again in the next loop by pausePartitionIfNecessary, right?

@theogoria
Copy link
Author

Hello @tomazfernandes and @garyrussell and thanks for your replies.

In my PR I considered that pausing a consumer is basically pausing all the partitions so effectively that introduce a kind of conflict between the functionalities of pausing a consumer and pausing a partition but that allow things such as pausing the consumer and then only resume one partition.

And I have added the partitions manually to pausedPartitions list to mark them has paused, this is not the same list, this is to keep the result of isPartitionPaused updated.

@garyrussell
Copy link
Contributor

Yes, we could track actually paused partitions and not resume them when the container is resumed. Although, as I said, that will be self-correcting anyway.

@theogoria
Copy link
Author

Yes, call resumePartitionsIfNecessary only if the container is not paused is a good solution to keep the two functionalities separated.

Do you wan't me to update my PR or to close it ?

@garyrussell
Copy link
Contributor

Feel free to update the PR, or I will take it tomorrow.

garyrussell added a commit to garyrussell/spring-kafka that referenced this issue Jul 14, 2021
Resolves spring-projects#1866

The new retryable topic feature pauses/resumes individual partitions.

This broke normal container pause/resume by incorrectly resuming partitions
that were paused by the container pause operation.

Similarly, if individual partitions were paused and then the container was
paused and resumed, the container resumed all partitions.

Decouple the functionality to prevent this cross-talk.

Do not resume any individually paused partitions when the container is in a paused state.
Do not resume any individually paused partitions when the container is resumed.

Also

Use a `ConcurrentHashMap.newKeySet()` instead of synchronization on partition pause requests.
Use `getAssignedPartitions()` to allow the retry topic feature to work with manual assignments.

Add tests to verify no cross-talk between pausing individual partitions and the container.
artembilan pushed a commit that referenced this issue Jul 14, 2021
Resolves #1866

The new retryable topic feature pauses/resumes individual partitions.

This broke normal container pause/resume by incorrectly resuming partitions
that were paused by the container pause operation.

Similarly, if individual partitions were paused and then the container was
paused and resumed, the container resumed all partitions.

Decouple the functionality to prevent this cross-talk.

Do not resume any individually paused partitions when the container is in a paused state.
Do not resume any individually paused partitions when the container is resumed.

Also

Use a `ConcurrentHashMap.newKeySet()` instead of synchronization on partition pause requests.
Use `getAssignedPartitions()` to allow the retry topic feature to work with manual assignments.

Add tests to verify no cross-talk between pausing individual partitions and the container.

* Fix race in test.
@artembilan artembilan added this to the 2.8.0-M1 milestone Jul 14, 2021
garyrussell added a commit that referenced this issue Jul 14, 2021
artembilan pushed a commit that referenced this issue Jul 14, 2021
Resolves #1866

The new retryable topic feature pauses/resumes individual partitions.

This broke normal container pause/resume by incorrectly resuming partitions
that were paused by the container pause operation.

Similarly, if individual partitions were paused and then the container was
paused and resumed, the container resumed all partitions.

Decouple the functionality to prevent this cross-talk.

Do not resume any individually paused partitions when the container is in a paused state.
Do not resume any individually paused partitions when the container is resumed.

Also

Use a `ConcurrentHashMap.newKeySet()` instead of synchronization on partition pause requests.
Use `getAssignedPartitions()` to allow the retry topic feature to work with manual assignments.

Add tests to verify no cross-talk between pausing individual partitions and the container.

* Fix race in test.
# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
@sivadivi14
Copy link

sivadivi14 commented Jan 20, 2023

This filter

.filter(tp -> !isPartitionPauseRequested(tp)
	&& pausedConsumerPartitions.contains(tp))

will always return true for partitions that have not been individually paused.

I think the simplest solution is only call it if the container is not paused

			if (!this.consumerPaused) {
				resumePartitionsIfNecessary();
			}

Siva Divi : It is a Bug . The above code skips delay and resuming partitions immediately.
The below code works perfectly on version 2.6.9 but not on versions > 2.6.9
consumer.pause(Collections.singleton(new TopicPartition(topic, partitionId)))

@artembilan
Copy link
Member

@sivadivi14 ,

would you mind to raise a new fresh issue with much more info to let us to reproduce and against the supported version, not this 2.7.x which is EOL: https://spring.io/projects/spring-kafka#support

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants