Skip to content

Commit

Permalink
Switch to spring-kafka 3.0.0-SNAPSHOT
Browse files Browse the repository at this point in the history
 - deprecations are temporarily restored there
 - fix deprecation warnings
 - re-enable tests for Windows
  • Loading branch information
garyrussell authored and artembilan committed Mar 8, 2022
1 parent 70587f5 commit 12f2084
Show file tree
Hide file tree
Showing 13 changed files with 71 additions and 63 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ ext {
springAmqpVersion = project.hasProperty('springAmqpVersion') ? project.springAmqpVersion : '3.0.0-SNAPSHOT'
springDataVersion = project.hasProperty('springDataVersion') ? project.springDataVersion : '2022.0.0-SNAPSHOT'
springGraphqlVersion = '1.0.0-SNAPSHOT'
springKafkaVersion = '3.0.0-M1'
springKafkaVersion = '3.0.0-SNAPSHOT'
springRetryVersion = '1.3.1'
springSecurityVersion = project.hasProperty('springSecurityVersion') ? project.springSecurityVersion : '6.0.0-SNAPSHOT'
springVersion = project.hasProperty('springVersion') ? project.springVersion : '6.0.0-SNAPSHOT'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,7 +26,6 @@
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.GenericErrorHandler;
import org.springframework.kafka.support.TopicPartitionOffset;

/**
Expand Down Expand Up @@ -88,7 +87,10 @@ public KafkaMessageListenerContainerSpec<K, V> concurrency(int concurrency) {
* @return the spec.
* @see org.springframework.kafka.listener.ErrorHandler
*/
public KafkaMessageListenerContainerSpec<K, V> errorHandler(GenericErrorHandler<?> errorHandler) {
@SuppressWarnings("deprecation")
public KafkaMessageListenerContainerSpec<K, V> errorHandler(
org.springframework.kafka.listener.GenericErrorHandler<?> errorHandler) {

this.target.setGenericErrorHandler(errorHandler);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -170,6 +170,7 @@ public void setBindSourceRecord(boolean bindSourceRecord) {
this.bindSourceRecord = bindSourceRecord;
}

@SuppressWarnings("deprecation")
@Override
protected void onInit() {
super.onInit();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2021 the original author or authors.
* Copyright 2015-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -261,6 +261,7 @@ public String getComponentType() {
return "kafka:message-driven-channel-adapter";
}

@SuppressWarnings("deprecation")
@Override
protected void onInit() {
super.onInit();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 the original author or authors.
* Copyright 2020-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,8 +25,6 @@

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
Expand Down Expand Up @@ -59,7 +57,6 @@
* @since 5.4
*
*/
@DisabledOnOs(OS.WINDOWS)
@SpringJUnitConfig
@EmbeddedKafka(topics = { "channel.1", "channel.2", "channel.3" }, partitions = 1)
public class ChannelTests {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 the original author or authors.
* Copyright 2019-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,8 +19,6 @@
import static org.assertj.core.api.Assertions.assertThat;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
Expand All @@ -36,7 +34,6 @@
* @since 5.4
*
*/
@DisabledOnOs(OS.WINDOWS)
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(topics = { "one", "two", "three", "four" })
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,7 +36,6 @@
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.test.annotation.DirtiesContext;
Expand Down Expand Up @@ -110,7 +109,7 @@ void testKafkaBatchMessageDrivenChannelAdapterParser() {
}

@Test
@SuppressWarnings("unchecked")
@SuppressWarnings({ "unchecked", "deprecation" })
void testKafkaMessageDrivenChannelAdapterOptions() {
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(Collections.emptyMap());
Expand All @@ -137,7 +136,8 @@ void testKafkaMessageDrivenChannelAdapterOptions() {
adapter.afterPropertiesSet();

messageListener = containerProps.getMessageListener();
assertThat(messageListener).isInstanceOf(RetryingMessageListenerAdapter.class);
assertThat(messageListener).isInstanceOf(
org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.class);

delegate = TestUtils.getPropertyValue(messageListener, "delegate");

Expand All @@ -151,13 +151,15 @@ void testKafkaMessageDrivenChannelAdapterOptions() {

delegate = TestUtils.getPropertyValue(messageListener, "delegate");

assertThat(delegate).isInstanceOf(RetryingMessageListenerAdapter.class);
assertThat(delegate).isInstanceOf(
org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.class);

adapter.setFilterInRetry(true);
adapter.afterPropertiesSet();

messageListener = containerProps.getMessageListener();
assertThat(messageListener).isInstanceOf(RetryingMessageListenerAdapter.class);
assertThat(messageListener).isInstanceOf(
org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.class);

delegate = TestUtils.getPropertyValue(messageListener, "delegate");

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2021 the original author or authors.
* Copyright 2015-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,8 +32,6 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
Expand Down Expand Up @@ -95,7 +93,6 @@
*
* @since 5.4
*/
@DisabledOnOs(OS.WINDOWS)
@SpringJUnitConfig
@DirtiesContext
@EmbeddedKafka(topics = { KafkaDslTests.TEST_TOPIC1, KafkaDslTests.TEST_TOPIC2, KafkaDslTests.TEST_TOPIC3,
Expand Down Expand Up @@ -307,6 +304,7 @@ public IntegrationFlow topic1ListenerFromKafkaFlow() {
.get();
}

@SuppressWarnings("deprecation")
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,8 +32,6 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
Expand Down Expand Up @@ -78,7 +76,6 @@
* @since 5.4
*
*/
@DisabledOnOs(OS.WINDOWS)
class InboundGatewayTests {

private static String topic1 = "testTopic1";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2021 the original author or authors.
* Copyright 2016-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,8 +52,6 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.StaticMessageHeaderAccessor;
Expand All @@ -73,8 +71,8 @@
import org.springframework.kafka.event.ConsumerResumedEvent;
import org.springframework.kafka.event.KafkaEvent;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
import org.springframework.kafka.support.KafkaHeaders;
Expand Down Expand Up @@ -111,7 +109,6 @@
* @since 5.4
*
*/
@DisabledOnOs(OS.WINDOWS)
class MessageDrivenAdapterTests {

private static String topic1 = "testTopic1";
Expand Down Expand Up @@ -343,7 +340,7 @@ void testInboundRecordNoRetryRecover() {
containerProps.setDeliveryAttemptHeader(true);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.setErrorHandler(new SeekToCurrentErrorHandler());
container.setCommonErrorHandler(new DefaultErrorHandler());
KafkaMessageDrivenChannelAdapter<Integer, String> adapter = new KafkaMessageDrivenChannelAdapter<>(container);
MessageChannel out = new DirectChannel() {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,8 +29,6 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
Expand All @@ -48,7 +46,6 @@
* @since 5.4
*
*/
@DisabledOnOs(OS.WINDOWS)
class MessageSourceIntegrationTests {

private static final String TOPIC1 = "MessageSourceIntegrationTests1";
Expand Down
Loading

0 comments on commit 12f2084

Please sign in to comment.