Skip to content

Commit

Permalink
Sonar Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell committed Jul 14, 2021
1 parent 1875a6c commit 2cab170
Show file tree
Hide file tree
Showing 14 changed files with 43 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.springframework.lang.Nullable;

/**
* Handles errors thrown during the execution of a {@link BatchMessageListener}.
* The listener should communicate which position(s) in the list failed in the
Expand All @@ -37,7 +39,7 @@ public interface BatchErrorHandler extends GenericErrorHandler<ConsumerRecords<?
* @param consumer the consumer.
* @param container the container.
*/
default void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
default void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
MessageListenerContainer container) {
handle(thrownException, data);
}
Expand All @@ -51,7 +53,7 @@ default void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consu
* @param invokeListener a callback to re-invoke the listener.
* @since 2.3.7
*/
default void handle(Exception thrownException, ConsumerRecords<?, ?> data,
default void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> data,
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {

handle(thrownException, data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* @since 2.7.4
*
*/
public class ConditionalDelegatingBatchErrorHandler implements ContainerAwareBatchErrorHandler {
public class ConditionalDelegatingBatchErrorHandler implements ListenerInvokingBatchErrorHandler {

private final ContainerAwareBatchErrorHandler defaultErrorHandler;

Expand Down Expand Up @@ -70,21 +70,21 @@ public void addDelegate(Class<? extends Throwable> throwable, ContainerAwareBatc
}

@Override
public void handle(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
MessageListenerContainer container) {

// Never called but, just in case
doHandle(thrownException, records, consumer, container, null);
}

@Override
public void handle(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
MessageListenerContainer container, Runnable invokeListener) {

doHandle(thrownException, records, consumer, container, invokeListener);
}

protected void doHandle(Exception thrownException, ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
protected void doHandle(Exception thrownException, @Nullable ConsumerRecords<?, ?> records, Consumer<?, ?> consumer,
MessageListenerContainer container, @Nullable Runnable invokeListener) {

Throwable cause = thrownException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public void addDelegate(Class<? extends Throwable> throwable, ContainerAwareErro
public void handle(Exception thrownException, @Nullable List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
MessageListenerContainer container) {

boolean handled = false;
Throwable cause = thrownException;
if (cause instanceof ListenerExecutionFailedException) {
cause = thrownException.getCause();
Expand All @@ -83,7 +82,6 @@ public void handle(Exception thrownException, @Nullable List<ConsumerRecord<?, ?
Class<? extends Throwable> causeClass = cause.getClass();
for (Entry<Class<? extends Throwable>, ContainerAwareErrorHandler> entry : this.delegates.entrySet()) {
if (entry.getKey().isAssignableFrom(causeClass)) {
handled = true;
entry.getValue().handle(thrownException, records, consumer, container);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ default void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> d
}

@Override
void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer);
void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> data, Consumer<?, ?> consumer);

@Override
default void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.springframework.lang.Nullable;

/**
* An error handler that has access to the batch of records from the last poll the
* consumer, and the container.
Expand All @@ -31,12 +33,12 @@
public interface ContainerAwareBatchErrorHandler extends ConsumerAwareBatchErrorHandler {

@Override
default void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer) {
default void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> data, Consumer<?, ?> consumer) {
throw new UnsupportedOperationException("Container should never call this");
}

@Override
void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
MessageListenerContainer container);

/**
Expand All @@ -50,8 +52,8 @@ void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?
*/
@Override
@SuppressWarnings("unused")
default void handle(Exception thrownException, ConsumerRecords<?, ?> data,
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {
default void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> data,
Consumer<?, ?> consumer, MessageListenerContainer container, @Nullable Runnable invokeListener) {

handle(thrownException, data, consumer, container);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.KafkaException;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

/**
Expand Down Expand Up @@ -55,7 +56,7 @@ public ContainerStoppingBatchErrorHandler(Executor executor) {
}

@Override
public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
MessageListenerContainer container) {
this.executor.execute(() -> container.stop());
// isRunning is false before the container.stop() waits for listener thread
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public interface GenericErrorHandler<T> {
* @param data the data.
* @param consumer the consumer.
*/
default void handle(Exception thrownException, T data, Consumer<?, ?> consumer) {
default void handle(Exception thrownException, @Nullable T data, Consumer<?, ?> consumer) {
handle(thrownException, data);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1534,16 +1534,19 @@ private void doResumeConsumerIfNeccessary() {

private void pausePartitionsIfNecessary() {
Set<TopicPartition> pausedConsumerPartitions = this.consumer.paused();
List<TopicPartition> partitionsToPause = getAssignedPartitions()
.stream()
.filter(tp -> isPartitionPauseRequested(tp)
&& !pausedConsumerPartitions.contains(tp))
.collect(Collectors.toList());
if (partitionsToPause.size() > 0) {
this.consumer.pause(partitionsToPause);
this.pausedPartitions.addAll(partitionsToPause);
this.logger.debug(() -> "Paused consumption from " + partitionsToPause);
partitionsToPause.forEach(KafkaMessageListenerContainer.this::publishConsumerPartitionPausedEvent);
Collection<TopicPartition> partitions = getAssignedPartitions();
if (partitions != null) {
List<TopicPartition> partitionsToPause = partitions
.stream()
.filter(tp -> isPartitionPauseRequested(tp)
&& !pausedConsumerPartitions.contains(tp))
.collect(Collectors.toList());
if (partitionsToPause.size() > 0) {
this.consumer.pause(partitionsToPause);
this.pausedPartitions.addAll(partitionsToPause);
this.logger.debug(() -> "Paused consumption from " + partitionsToPause);
partitionsToPause.forEach(KafkaMessageListenerContainer.this::publishConsumerPartitionPausedEvent);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.springframework.lang.Nullable;

/**
* A batch error handler that is capable of invoking the listener during error handling.
*
Expand All @@ -30,14 +32,14 @@
public interface ListenerInvokingBatchErrorHandler extends ContainerAwareBatchErrorHandler {

@Override
default void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
default void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
MessageListenerContainer container) {

throw new UnsupportedOperationException("Container should never call this");
}

@Override
void handle(Exception thrownException, ConsumerRecords<?, ?> records,
void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> records,
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener);

}
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,12 @@ private int findIndex(ConsumerRecords<?, ?> data, ConsumerRecord<?, ?> record) {
return i;
}

private void seekOrRecover(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
private void seekOrRecover(Exception thrownException, @Nullable ConsumerRecords<?, ?> data, Consumer<?, ?> consumer,
MessageListenerContainer container, int indexArg) {

if (data == null) {
return;
}
Iterator<?> iterator = data.iterator();
List<ConsumerRecord<?, ?>> toCommit = new ArrayList<>();
List<ConsumerRecord<?, ?>> remaining = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ default void handle(Exception thrownException, @Nullable ConsumerRecord<?, ?> da
* @param records the remaining records including the one that failed.
* @param consumer the consumer.
*/
void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer);
void handle(Exception thrownException, @Nullable List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer);

@Override
default void handle(Exception thrownException, @Nullable List<ConsumerRecord<?, ?>> records,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void setAckAfterHandle(boolean ackAfterHandle) {
}

@Override
public void handle(Exception thrownException, ConsumerRecords<?, ?> records,
public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> records,
Consumer<?, ?> consumer, MessageListenerContainer container, Runnable invokeListener) {

if (records == null || records.count() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void setAckAfterHandle(boolean ackAfterHandle) {
public void handle(Exception thrownException, @Nullable List<ConsumerRecord<?, ?>> records,
Consumer<?, ?> consumer, MessageListenerContainer container) {

SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(),
SeekUtils.seekOrRecover(thrownException, records, consumer, container, isCommitRecovered(), // NOSONAR
getRecoveryStrategy(records, thrownException), this.logger, getLogLevel());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public static void seekOrRecover(Exception thrownException, @Nullable List<Consu
}
}

if (records == null || !doSeeks(records, consumer, thrownException, true, recovery, container, logger)) {
if (records == null || !doSeeks(records, consumer, thrownException, true, recovery, container, logger)) { // NOSONAR
throw new KafkaException("Seek to current after exception", level, thrownException);
}
if (commitRecovered) {
Expand Down

0 comments on commit 2cab170

Please sign in to comment.