From 87c31a4cbe64f63c93baed007a225d9642abaa0f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 11 Oct 2022 08:24:53 +0200 Subject: [PATCH 1/2] Update to Akka 2.7.0-M3 --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 9dce8f336..b1dbc6760 100644 --- a/build.sbt +++ b/build.sbt @@ -12,7 +12,7 @@ val Scala213 = "2.13.8" val Scala212 = "2.12.16" val AkkaBinaryVersionForDocs = "2.7" -val akkaVersion = "2.7.0-M1" +val akkaVersion = "2.7.0-M3" // Keep .scala-steward.conf pin in sync val kafkaVersion = "3.0.1" From 63c3eda1c701afcb7e1ac578843045c82b72fde0 Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Thu, 13 Oct 2022 12:11:22 +0200 Subject: [PATCH 2/2] adapt TestSource/Sink creation --- .../kafka/testkit/scaladsl/KafkaSpec.scala | 2 +- .../scala/akka/kafka/TransactionsOps.scala | 2 +- .../internal/CommitCollectorStageSpec.scala | 5 +- .../internal/CommittingWithMockSpec.scala | 22 +++---- .../akka/kafka/internal/ConsumerSpec.scala | 22 +++---- .../internal/PartitionedSourceSpec.scala | 54 +++++++-------- .../akka/kafka/internal/ProducerSpec.scala | 66 ++++++++----------- .../akka/kafka/scaladsl/CommittingSpec.scala | 20 +++--- .../scaladsl/ConnectionCheckerSpec.scala | 2 +- .../akka/kafka/scaladsl/IntegrationSpec.scala | 10 +-- .../scaladsl/PartitionedSourcesSpec.scala | 8 +-- .../akka/kafka/scaladsl/RebalanceSpec.scala | 10 +-- .../kafka/scaladsl/RetentionPeriodSpec.scala | 12 ++-- .../akka/kafka/scaladsl/TimestampSpec.scala | 6 +- .../SchemaRegistrySerializationSpec.scala | 10 +-- 15 files changed, 119 insertions(+), 132 deletions(-) diff --git a/testkit/src/main/scala/akka/kafka/testkit/scaladsl/KafkaSpec.scala b/testkit/src/main/scala/akka/kafka/testkit/scaladsl/KafkaSpec.scala index 1ea8d9138..223c8bb52 100644 --- a/testkit/src/main/scala/akka/kafka/testkit/scaladsl/KafkaSpec.scala +++ b/testkit/src/main/scala/akka/kafka/testkit/scaladsl/KafkaSpec.scala @@ -196,7 +196,7 @@ abstract class KafkaSpec(_kafkaPort: Int, val zooKeeperPort: Int, actorSystem: A Consumer .plainSource(consumerSettings, Subscriptions.topics(topic.toSet)) .map(_.value) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() } diff --git a/tests/src/test/scala/akka/kafka/TransactionsOps.scala b/tests/src/test/scala/akka/kafka/TransactionsOps.scala index 93df9a05a..3da7e052d 100644 --- a/tests/src/test/scala/akka/kafka/TransactionsOps.scala +++ b/tests/src/test/scala/akka/kafka/TransactionsOps.scala @@ -160,7 +160,7 @@ trait TransactionsOps extends TestSuite with Matchers { )(implicit actorSystem: ActorSystem, mat: Materializer): TestSubscriber.Probe[String] = offsetValueSource(settings, topic) .map(_._2) - .runWith(TestSink.probe) + .runWith(TestSink()) def offsetValueSource(settings: ConsumerSettings[String, String], topic: String): Source[(Long, String), Consumer.Control] = diff --git a/tests/src/test/scala/akka/kafka/internal/CommitCollectorStageSpec.scala b/tests/src/test/scala/akka/kafka/internal/CommitCollectorStageSpec.scala index f48636584..937536989 100644 --- a/tests/src/test/scala/akka/kafka/internal/CommitCollectorStageSpec.scala +++ b/tests/src/test/scala/akka/kafka/internal/CommitCollectorStageSpec.scala @@ -394,11 +394,10 @@ class CommitCollectorStageSpec(_system: ActorSystem) val flow = Committer.batchFlow(committerSettings) - val ((source, control), sink) = TestSource - .probe[Committable] + val ((source, control), sink) = TestSource[Committable]() .viaMat(ConsumerControlFactory.controlFlow())(Keep.both) .via(flow) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() (source, control, sink) diff --git a/tests/src/test/scala/akka/kafka/internal/CommittingWithMockSpec.scala b/tests/src/test/scala/akka/kafka/internal/CommittingWithMockSpec.scala index 1c1c39c90..5d3932c6c 100644 --- a/tests/src/test/scala/akka/kafka/internal/CommittingWithMockSpec.scala +++ b/tests/src/test/scala/akka/kafka/internal/CommittingWithMockSpec.scala @@ -112,7 +112,7 @@ class CommittingWithMockSpec(_system: ActorSystem) val mock = new ConsumerMock[K, V](commitLog) val (control, probe) = createSourceWithMetadata(mock.mock, (rec: ConsumerRecord[K, V]) => rec.offset.toString) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() val msg = createMessage(1) @@ -143,7 +143,7 @@ class CommittingWithMockSpec(_system: ActorSystem) val commitLog = new ConsumerMock.LogHandler() val mock = new ConsumerMock[K, V](commitLog) val (control, probe) = createCommittableSource(mock.mock) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() val msg = createMessage(1) @@ -173,7 +173,7 @@ class CommittingWithMockSpec(_system: ActorSystem) val commitLog = new ConsumerMock.LogHandler(onCompleteFailure) val mock = new ConsumerMock[K, V](commitLog) val (control, probe) = createCommittableSource(mock.mock) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() val msg = createMessage(1) @@ -208,7 +208,7 @@ class CommittingWithMockSpec(_system: ActorSystem) val commitLog = new ConsumerMock.LogHandler(onCompleteFailure) val mock = new ConsumerMock[K, V](commitLog) val (control, probe) = createCommittableSource(mock.mock) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() val msg = createMessage(1) @@ -238,7 +238,7 @@ class CommittingWithMockSpec(_system: ActorSystem) val commitLog = new ConsumerMock.LogHandler() val mock = new ConsumerMock[K, V](commitLog) val (control, probe) = createCommittableSource(mock.mock) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() val count = 100 @@ -268,7 +268,7 @@ class CommittingWithMockSpec(_system: ActorSystem) val commitLog = new ConsumerMock.LogHandler() val mock = new ConsumerMock[K, V](commitLog) val (control, probe) = createCommittableSource(mock.mock, topics = Set("topic1", "topic2")) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() val msgsTopic1 = (1 to 3).map(createMessage(_, "topic1")) @@ -305,7 +305,7 @@ class CommittingWithMockSpec(_system: ActorSystem) val (control, probe) = createSourceWithMetadata(mock.mock, (rec: ConsumerRecord[K, V]) => rec.offset.toString, topics = Set("topic1", "topic2")) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() val msgsTopic1 = (1 to 3).map(createMessage(_, "topic1")) @@ -344,7 +344,7 @@ class CommittingWithMockSpec(_system: ActorSystem) val (control, probe) = createSourceWithMetadata(mock.mock, (rec: ConsumerRecord[K, V]) => rec.offset.toString, topics = Set("topic1", "topic2")) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() val msgsTopic1 = (1 to 3).map(createMessage(_, "topic1")) @@ -386,10 +386,10 @@ class CommittingWithMockSpec(_system: ActorSystem) val mock1 = new ConsumerMock[K, V](commitLog1) val mock2 = new ConsumerMock[K, V](commitLog2) val (control1, probe1) = createCommittableSource(mock1.mock, "group1", Set("topic1", "topic2")) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() val (control2, probe2) = createCommittableSource(mock2.mock, "group2", Set("topic1", "topic3")) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() val msgs1a = (1 to 3).map(createMessage(_, "topic1", "group1")) @@ -446,7 +446,7 @@ class CommittingWithMockSpec(_system: ActorSystem) val (control, probe) = createSourceWithMetadata(mock.mock, (rec: ConsumerRecord[K, V]) => rec.offset.toString, topics = Set("topic1", "topic2")) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() val msgsTopic1 = (1 to 3).map(createMessage(_, "topic1")) diff --git a/tests/src/test/scala/akka/kafka/internal/ConsumerSpec.scala b/tests/src/test/scala/akka/kafka/internal/ConsumerSpec.scala index 8ceda7b55..e93fe57dc 100644 --- a/tests/src/test/scala/akka/kafka/internal/ConsumerSpec.scala +++ b/tests/src/test/scala/akka/kafka/internal/ConsumerSpec.scala @@ -77,7 +77,7 @@ class ConsumerSpec(_system: ActorSystem) def checkMessagesReceiving(msgss: Seq[Seq[CommittableMessage[K, V]]]): Unit = { val mock = new ConsumerMock[K, V]() val (control, probe) = createCommittableSource(mock.mock) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() probe.request(msgss.map(_.size).sum.toLong) @@ -118,7 +118,7 @@ class ConsumerSpec(_system: ActorSystem) val mock = new FailingConsumerMock[K, V](new Exception("Fatal Kafka error"), failOnCallNumber = 1) val probe = createCommittableSource(mock.mock) - .toMat(TestSink.probe)(Keep.right) + .toMat(TestSink())(Keep.right) .run() probe @@ -129,7 +129,7 @@ class ConsumerSpec(_system: ActorSystem) it should "complete stage when stream control.stop called" in assertAllStagesStopped { val mock = new ConsumerMock[K, V]() val (control, probe) = createCommittableSource(mock.mock) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() probe.request(100) @@ -142,7 +142,7 @@ class ConsumerSpec(_system: ActorSystem) it should "complete stage when processing flow canceled" in assertAllStagesStopped { val mock = new ConsumerMock[K, V]() val (control, probe) = createCommittableSource(mock.mock) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() probe.request(100) @@ -178,7 +178,7 @@ class ConsumerSpec(_system: ActorSystem) val commitLog = new ConsumerMock.LogHandler() val mock = new ConsumerMock[K, V](commitLog) val (control, probe) = createCommittableSource(mock.mock) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() mock.enqueue((1 to 10).map(createMessage).map(toRecord)) @@ -198,7 +198,7 @@ class ConsumerSpec(_system: ActorSystem) val commitLog = new ConsumerMock.LogHandler() val mock = new ConsumerMock[K, V](commitLog) val (control, probe) = createCommittableSource(mock.mock) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() probe.request(1) @@ -213,7 +213,7 @@ class ConsumerSpec(_system: ActorSystem) val commitLog = new ConsumerMock.LogHandler() val mock = new ConsumerMock[K, V](commitLog) val (control, probe) = createCommittableSource(mock.mock) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() probe.cancel() @@ -225,7 +225,7 @@ class ConsumerSpec(_system: ActorSystem) val commitLog = new ConsumerMock.LogHandler() val mock = new ConsumerMock[K, V](commitLog) val (control, probe) = createCommittableSource(mock.mock) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() mock.enqueue((1 to 10).map(createMessage).map(toRecord)) @@ -243,7 +243,7 @@ class ConsumerSpec(_system: ActorSystem) val commitLog = new ConsumerMock.LogHandler() val mock = new ConsumerMock[K, V](commitLog) val (control, probe) = createCommittableSource(mock.mock) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() val msgs = (1 to 10).map(createMessage) @@ -279,7 +279,7 @@ class ConsumerSpec(_system: ActorSystem) val commitLog = new ConsumerMock.LogHandler() val mock = new ConsumerMock[K, V](commitLog) val (control, probe) = createCommittableSource(mock.mock) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() val msg = createMessage(1) @@ -302,7 +302,7 @@ class ConsumerSpec(_system: ActorSystem) val commitLog = new ConsumerMock.LogHandler() val mock = new ConsumerMock[K, V](commitLog) val (control, probe) = createCommittableSource(mock.mock) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() val msgs = (1 to 10).map(createMessage) diff --git a/tests/src/test/scala/akka/kafka/internal/PartitionedSourceSpec.scala b/tests/src/test/scala/akka/kafka/internal/PartitionedSourceSpec.scala index 92285e8d0..062c6deb3 100644 --- a/tests/src/test/scala/akka/kafka/internal/PartitionedSourceSpec.scala +++ b/tests/src/test/scala/akka/kafka/internal/PartitionedSourceSpec.scala @@ -72,7 +72,7 @@ class PartitionedSourceSpec(_system: ActorSystem) val sink = Consumer .committablePartitionedSource(consumerSettings(dummy), Subscriptions.topics(topic)) .flatMapMerge(breadth = 10, _._2) - .runWith(TestSink.probe) + .runWith(TestSink()) dummy.started.futureValue should be(Done) @@ -105,7 +105,7 @@ class PartitionedSourceSpec(_system: ActorSystem) val sink = Consumer .committablePartitionedSource(consumerSettings(dummy), Subscriptions.topics(topic)) - .runWith(TestSink.probe) + .runWith(TestSink()) dummy.started.futureValue should be(Done) @@ -131,7 +131,7 @@ class PartitionedSourceSpec(_system: ActorSystem) val sink = Consumer .committablePartitionedSource(consumerSettings(dummy), Subscriptions.topics(topic)) - .runWith(TestSink.probe) + .runWith(TestSink()) dummy.started.futureValue should be(Done) @@ -149,7 +149,7 @@ class PartitionedSourceSpec(_system: ActorSystem) // No demand on sub-sources => paused dummy.tpsPaused should contain only tp0 - val probeTp0 = subSources(tp0).runWith(TestSink.probe[CommittableMessage[K, V]]) + val probeTp0 = subSources(tp0).runWith(TestSink[CommittableMessage[K, V]]()) dummy.setNextPollData(tp0 -> singleRecord) // demand a value @@ -167,7 +167,7 @@ class PartitionedSourceSpec(_system: ActorSystem) val sink = Consumer .committablePartitionedSource(consumerSettings(dummy), Subscriptions.topics(topic)) - .runWith(TestSink.probe) + .runWith(TestSink()) dummy.started.futureValue should be(Done) @@ -179,7 +179,7 @@ class PartitionedSourceSpec(_system: ActorSystem) // No demand on sub-sources => paused dummy.tpsPaused should contain.allOf(tp0, tp1) - val probeTp0 = subSources(tp0).runWith(TestSink.probe[CommittableMessage[K, V]]) + val probeTp0 = subSources(tp0).runWith(TestSink[CommittableMessage[K, V]]()) dummy.setNextPollData(tp0 -> singleRecord) // demand a value @@ -197,7 +197,7 @@ class PartitionedSourceSpec(_system: ActorSystem) val sink = Consumer .committablePartitionedSource(consumerSettings(dummy), Subscriptions.topics(topic)) - .runWith(TestSink.probe) + .runWith(TestSink()) dummy.started.futureValue should be(Done) @@ -221,7 +221,7 @@ class PartitionedSourceSpec(_system: ActorSystem) val sink = Consumer .committablePartitionedSource(consumerSettings(dummy), Subscriptions.topics(topic)) - .runWith(TestSink.probe) + .runWith(TestSink()) dummy.started.futureValue should be(Done) @@ -262,7 +262,7 @@ class PartitionedSourceSpec(_system: ActorSystem) val sink = Consumer .plainPartitionedManualOffsetSource(consumerSettings(dummy), Subscriptions.topics(topic), getOffsetsOnAssign) - .runWith(TestSink.probe) + .runWith(TestSink()) dummy.started.futureValue should be(Done) @@ -294,7 +294,7 @@ class PartitionedSourceSpec(_system: ActorSystem) val sink = Consumer .plainPartitionedManualOffsetSource(consumerSettings(dummy), Subscriptions.topics(topic), getOffsetsOnAssign) - .runWith(TestSink.probe) + .runWith(TestSink()) dummy.started.futureValue should be(Done) @@ -333,7 +333,7 @@ class PartitionedSourceSpec(_system: ActorSystem) val sink = Consumer .plainPartitionedManualOffsetSource(consumerSettings(dummy), Subscriptions.topics(topic), getOffsetsOnAssign) - .runWith(TestSink.probe) + .runWith(TestSink()) dummy.started.futureValue should be(Done) @@ -371,7 +371,7 @@ class PartitionedSourceSpec(_system: ActorSystem) onRevoke = { tp => revoked = revoked ++ tp }) - .runWith(TestSink.probe) + .runWith(TestSink()) dummy.started.futureValue should be(Done) @@ -405,7 +405,7 @@ class PartitionedSourceSpec(_system: ActorSystem) val sink = Consumer .plainPartitionedManualOffsetSource(consumerSettings(dummy), Subscriptions.topics(topic), getOffsetsOnAssign) - .runWith(TestSink.probe) + .runWith(TestSink()) dummy.started.futureValue should be(Done) @@ -445,7 +445,7 @@ class PartitionedSourceSpec(_system: ActorSystem) val sink = Consumer .plainPartitionedManualOffsetSource(consumerSettings(dummy), Subscriptions.topics(topic), getOffsetsOnAssign) - .runWith(TestSink.probe) + .runWith(TestSink()) dummy.started.futureValue should be(Done) @@ -475,7 +475,7 @@ class PartitionedSourceSpec(_system: ActorSystem) .committablePartitionedManualOffsetSource(consumerSettings(dummy), Subscriptions.topics(topic), getOffsetsOnAssign) - .runWith(TestSink.probe) + .runWith(TestSink()) dummy.started.futureValue should be(Done) @@ -509,7 +509,7 @@ class PartitionedSourceSpec(_system: ActorSystem) .committablePartitionedManualOffsetSource(consumerSettings(dummy), Subscriptions.topics(topic), getOffsetsOnAssign) - .runWith(TestSink.probe) + .runWith(TestSink()) dummy.started.futureValue should be(Done) @@ -550,7 +550,7 @@ class PartitionedSourceSpec(_system: ActorSystem) .committablePartitionedManualOffsetSource(consumerSettings(dummy), Subscriptions.topics(topic), getOffsetsOnAssign) - .runWith(TestSink.probe) + .runWith(TestSink()) dummy.started.futureValue should be(Done) @@ -588,7 +588,7 @@ class PartitionedSourceSpec(_system: ActorSystem) onRevoke = { tp => revoked = revoked ++ tp }) - .runWith(TestSink.probe) + .runWith(TestSink()) dummy.started.futureValue should be(Done) @@ -612,7 +612,7 @@ class PartitionedSourceSpec(_system: ActorSystem) val sink = Consumer .plainPartitionedSource(consumerSettings(dummy), Subscriptions.topics(topic)) - .runWith(TestSink.probe) + .runWith(TestSink()) dummy.started.futureValue should be(Done) @@ -636,7 +636,7 @@ class PartitionedSourceSpec(_system: ActorSystem) val sink1 = Consumer .plainPartitionedSource(consumerSettings(dummy), Subscriptions.topics(topic)) - .runWith(TestSink.probe) + .runWith(TestSink()) dummy.started.futureValue should be(Done) @@ -648,7 +648,7 @@ class PartitionedSourceSpec(_system: ActorSystem) // simulate partition re-balance val sink2 = Consumer .plainPartitionedSource(consumerSettings(dummy2), Subscriptions.topics(topic)) - .runWith(TestSink.probe) + .runWith(TestSink()) dummy.assignWithCallback(tp0) subSources1(tp1).runWith(Sink.ignore).futureValue should be(Done) @@ -667,7 +667,7 @@ class PartitionedSourceSpec(_system: ActorSystem) val sink1 = Consumer .plainPartitionedSource(consumerSettings(dummy), Subscriptions.topics(topic)) - .runWith(TestSink.probe) + .runWith(TestSink()) dummy.started.futureValue should be(Done) @@ -676,8 +676,8 @@ class PartitionedSourceSpec(_system: ActorSystem) val subSources1 = Map(sink1.requestNext(), sink1.requestNext()) subSources1.keys should contain.allOf(tp0, tp1) - val probeTp0 = subSources1(tp0).runWith(TestSink.probe[ConsumerRecord[K, V]]) - val probeTp1 = subSources1(tp1).runWith(TestSink.probe[ConsumerRecord[K, V]]) + val probeTp0 = subSources1(tp0).runWith(TestSink[ConsumerRecord[K, V]]()) + val probeTp1 = subSources1(tp1).runWith(TestSink[ConsumerRecord[K, V]]()) // trigger demand probeTp0.request(1L) @@ -702,7 +702,7 @@ class PartitionedSourceSpec(_system: ActorSystem) val sink1 = Consumer .plainPartitionedSource(consumerSettings(dummy), Subscriptions.topics(topic)) - .runWith(TestSink.probe) + .runWith(TestSink()) dummy.started.futureValue should be(Done) @@ -711,8 +711,8 @@ class PartitionedSourceSpec(_system: ActorSystem) val subSources1 = Map(sink1.requestNext(), sink1.requestNext()) subSources1.keys should contain.allOf(tp0, tp1) - val probeTp0 = subSources1(tp0).runWith(TestSink.probe[ConsumerRecord[K, V]]) - val probeTp1 = subSources1(tp1).runWith(TestSink.probe[ConsumerRecord[K, V]]) + val probeTp0 = subSources1(tp0).runWith(TestSink[ConsumerRecord[K, V]]()) + val probeTp1 = subSources1(tp1).runWith(TestSink[ConsumerRecord[K, V]]()) // trigger demand probeTp0.request(1L) diff --git a/tests/src/test/scala/akka/kafka/internal/ProducerSpec.scala b/tests/src/test/scala/akka/kafka/internal/ProducerSpec.scala index 29cf3ba7a..75b9b78b8 100644 --- a/tests/src/test/scala/akka/kafka/internal/ProducerSpec.scala +++ b/tests/src/test/scala/akka/kafka/internal/ProducerSpec.scala @@ -128,10 +128,9 @@ class ProducerSpec(_system: ActorSystem) } val committer = new CommittedMarkerMock - val (source, sink) = TestSource - .probe[TxMsg] + val (source, sink) = TestSource[TxMsg]() .via(testProducerFlow(client)) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() val txMsg = toTxMessage(input, committer.mock) @@ -173,7 +172,7 @@ class ProducerSpec(_system: ActorSystem) } val probe = Source(input.map(toMessage)) .via(testProducerFlow(client)) - .runWith(TestSink.probe) + .runWith(TestSink()) probe .request(10) @@ -193,10 +192,9 @@ class ProducerSpec(_system: ActorSystem) val inputMap = input.toMap new ProducerMock[K, V](ProducerMock.handlers.delayedMap(100.millis)(x => Try { inputMap(x) })) } - val (source, sink) = TestSource - .probe[Msg] + val (source, sink) = TestSource[Msg]() .via(testProducerFlow(client)) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() input.map(toMessage).foreach(source.sendNext) @@ -225,10 +223,9 @@ class ProducerSpec(_system: ActorSystem) else Success(inputMap(msg)) }) } - val (source, sink) = TestSource - .probe[Msg] + val (source, sink) = TestSource[Msg]() .via(testProducerFlow(client)) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() sink.request(100) @@ -254,10 +251,9 @@ class ProducerSpec(_system: ActorSystem) else Success(inputMap(msg)) }) } - val (source, sink) = TestSource - .probe[Msg] + val (source, sink) = TestSource[Msg]() .via(testProducerFlow(client)) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() input.map(toMessage).foreach(source.sendNext) @@ -287,12 +283,11 @@ class ProducerSpec(_system: ActorSystem) else Success(inputMap(msg)) }) } - val (source, sink) = TestSource - .probe[Msg] + val (source, sink) = TestSource[Msg]() .via( testProducerFlow(client).withAttributes(ActorAttributes.withSupervisionStrategy(Supervision.resumingDecider)) ) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() input.map(toMessage).foreach(source.sendNext) @@ -316,7 +311,7 @@ class ProducerSpec(_system: ActorSystem) val client = new ProducerMock[K, V](ProducerMock.handlers.fail) val probe = Source(input.map(toMessage)) .via(testProducerFlow(client)) - .runWith(TestSink.probe) + .runWith(TestSink()) probe .request(10) @@ -336,10 +331,9 @@ class ProducerSpec(_system: ActorSystem) val inputMap = input.toMap new ProducerMock[K, V](ProducerMock.handlers.delayedMap(5.seconds)(x => Try { inputMap(x) })) } - val (source, sink) = TestSource - .probe[Msg] + val (source, sink) = TestSource[Msg]() .via(testProducerFlow(client)) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() sink.request(10) @@ -362,7 +356,7 @@ class ProducerSpec(_system: ActorSystem) } val probe = Source(input.map(toMessage)) .via(testProducerFlow(client, closeOnStop = false)) - .runWith(TestSink.probe) + .runWith(TestSink()) probe .request(10) @@ -383,10 +377,9 @@ class ProducerSpec(_system: ActorSystem) Failure(error) }) - val (source, sink) = TestSource - .probe[Msg] + val (source, sink) = TestSource[Msg]() .via(testProducerFlow(client, closeOnStop = false)) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() sink.request(100) @@ -406,7 +399,7 @@ class ProducerSpec(_system: ActorSystem) val probe = Source .empty[Msg] .via(testTransactionProducerFlow(client)) - .runWith(TestSink.probe) + .runWith(TestSink()) probe .request(1) @@ -426,10 +419,9 @@ class ProducerSpec(_system: ActorSystem) } val committer = new CommittedMarkerMock - val (source, sink) = TestSource - .probe[TxMsg] + val (source, sink) = TestSource[TxMsg]() .via(testTransactionProducerFlow(client)) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() val txMsg = toTxMessage(input, committer.mock) @@ -453,10 +445,9 @@ class ProducerSpec(_system: ActorSystem) } val committer = new CommittedMarkerMock - val (source, sink) = TestSource - .probe[TxMsg] + val (source, sink) = TestSource[TxMsg]() .via(testTransactionProducerFlow(client)) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() val txMsg = toTxMessage(input, committer.mock) @@ -481,11 +472,10 @@ class ProducerSpec(_system: ActorSystem) } val committer = new CommittedMarkerMock - val (source, sink) = TestSource - .probe[TxMsg] + val (source, sink) = TestSource[TxMsg]() .map(msg => ProducerMessage.passThrough[K, V, PartitionOffset](msg.passThrough)) .via(testTransactionProducerFlow(client)) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() val txMsg = toTxMessage(input, committer.mock) @@ -509,10 +499,9 @@ class ProducerSpec(_system: ActorSystem) } val committedMarker = new CommittedMarkerMock - val (source, sink) = TestSource - .probe[TxMsg] + val (source, sink) = TestSource[TxMsg]() .via(testTransactionProducerFlow(client)) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() val txMsg: TxMsg = toTxMessage(input, committedMarker.mock) @@ -538,8 +527,7 @@ class ProducerSpec(_system: ActorSystem) } val committedMarker = new CommittedMarkerMock - val (source, sink) = TestSource - .probe[TxMsg] + val (source, sink) = TestSource[TxMsg]() .via(testTransactionProducerFlow(client)) .toMat(Sink.lastOption)(Keep.both) .run() diff --git a/tests/src/test/scala/akka/kafka/scaladsl/CommittingSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/CommittingSpec.scala index 17085c481..5f44273b0 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/CommittingSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/CommittingSpec.scala @@ -59,7 +59,7 @@ class CommittingSpec extends SpecBase with TestcontainersKafkaLike with Inside { elem.record.value } } - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() probe1 @@ -72,7 +72,7 @@ class CommittingSpec extends SpecBase with TestcontainersKafkaLike with Inside { val probe2 = Consumer .committableSource(consumerSettings, Subscriptions.topics(topic1)) .map(_.record.value) - .runWith(TestSink.probe) + .runWith(TestSink()) // Note that due to buffers and mapAsync(10) the committed offset is more // than 26, and that is not wrong @@ -90,7 +90,7 @@ class CommittingSpec extends SpecBase with TestcontainersKafkaLike with Inside { val probe3 = Consumer .committableSource(consumerSettings.withGroupId(group2), Subscriptions.topics(topic1)) .map(_.record.value) - .runWith(TestSink.probe) + .runWith(TestSink()) probe3 .request(100) @@ -122,7 +122,7 @@ class CommittingSpec extends SpecBase with TestcontainersKafkaLike with Inside { val subscription1 = Subscriptions.topics(topic1).withRebalanceListener(rebalanceActor1.ref) val (control1, probe1) = Consumer .committableSource(consumerSettings, subscription1) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() // Await initial partition assignment @@ -141,7 +141,7 @@ class CommittingSpec extends SpecBase with TestcontainersKafkaLike with Inside { val subscription2 = Subscriptions.topics(topic1).withRebalanceListener(rebalanceActor2.ref) val (control2, probe2) = Consumer .committableSource(consumerSettings, subscription2) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() // Await an assignment to the new rebalance listener @@ -207,7 +207,7 @@ class CommittingSpec extends SpecBase with TestcontainersKafkaLike with Inside { val subscription1 = Subscriptions.topics(topic1).withRebalanceListener(rebalanceActor1.ref) val (control1, probe1) = Consumer .committableSource(consumerSettings, subscription1) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() // Await initial partition assignment @@ -226,7 +226,7 @@ class CommittingSpec extends SpecBase with TestcontainersKafkaLike with Inside { val subscription2 = Subscriptions.topics(topic1).withRebalanceListener(rebalanceActor2.ref) val (control2, probe2) = Consumer .committableSource(consumerSettings, subscription2) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() // Rebalance happens @@ -274,7 +274,7 @@ class CommittingSpec extends SpecBase with TestcontainersKafkaLike with Inside { val (control, probe1) = Consumer .committableSource(consumerDefaults.withGroupId(group), Subscriptions.topics(topic)) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() // request one, only @@ -315,7 +315,7 @@ class CommittingSpec extends SpecBase with TestcontainersKafkaLike with Inside { .map(_.committableOffset) .batch(max = 10, CommittableOffsetBatch.apply)(_.updated(_)) .mapAsync(1)(_.commitInternal()) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() val (control, probe) = consumeAndBatchCommit(topic) @@ -572,7 +572,7 @@ class CommittingSpec extends SpecBase with TestcontainersKafkaLike with Inside { Committer .batchFlow(committerDefaults.withDelivery(CommitDelivery.SendAndForget).withMaxBatch(commitBatchSize)) ) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() val (control, probe) = consumeAndBatchCommit(topic) diff --git a/tests/src/test/scala/akka/kafka/scaladsl/ConnectionCheckerSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/ConnectionCheckerSpec.scala index 20f28d30a..0a7859828 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/ConnectionCheckerSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/ConnectionCheckerSpec.scala @@ -73,7 +73,7 @@ class ConnectionCheckerSpec extends SpecBase with TestcontainersKafkaPerClassLik val consumerSettings = noBrokerConsumerSettings.withBootstrapServers(bootstrapServers) val (control, probe) = - Consumer.plainSource(consumerSettings, Subscriptions.topics(topic)).toMat(TestSink.probe)(Keep.both).run() + Consumer.plainSource(consumerSettings, Subscriptions.topics(topic)).toMat(TestSink())(Keep.both).run() probe.ensureSubscription().requestNext().value() shouldBe msg diff --git a/tests/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala index fb83352c9..813eef1b6 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/IntegrationSpec.scala @@ -173,7 +173,7 @@ class IntegrationSpec extends SpecBase with TestcontainersKafkaLike with Inside .batch(max = 10, CommittableOffsetBatch.apply)(_.updated(_)) .mapAsync(producerDefaults.parallelism)(_.commitInternal()) - val probe = source.runWith(TestSink.probe) + val probe = source.runWith(TestSink()) probe.request(1).expectNext() @@ -295,7 +295,7 @@ class IntegrationSpec extends SpecBase with TestcontainersKafkaLike with Inside val probe = Consumer .plainExternalSource[Array[Byte], String](consumer, Subscriptions.assignment(partition0)) .map(_.value()) - .runWith(TestSink.probe) + .runWith(TestSink()) probe .request(100) @@ -318,7 +318,7 @@ class IntegrationSpec extends SpecBase with TestcontainersKafkaLike with Inside val (control, probe) = Consumer .plainSource(consumerDefaults.withGroupId(group), Subscriptions.topics(topic)) .map(_.value()) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() probe @@ -346,7 +346,7 @@ class IntegrationSpec extends SpecBase with TestcontainersKafkaLike with Inside Subscriptions.topics(topic)) .flatMapMerge(1, _._2) .map(_.value()) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() probe @@ -369,7 +369,7 @@ class IntegrationSpec extends SpecBase with TestcontainersKafkaLike with Inside val control = Consumer .plainSource(consumerDefaults.withGroupId(group), Subscriptions.topics(topic)) .map(_.value()) - .to(TestSink.probe) + .to(TestSink()) .run() // Wait a tiny bit to avoid a race on "not yet initialized: only setHandler is allowed in GraphStageLogic constructor" diff --git a/tests/src/test/scala/akka/kafka/scaladsl/PartitionedSourcesSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/PartitionedSourcesSpec.scala index 306ce46d6..17ffcfe3b 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/PartitionedSourcesSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/PartitionedSourcesSpec.scala @@ -45,7 +45,7 @@ class PartitionedSourcesSpec extends SpecBase with TestcontainersKafkaLike with _ => Future.successful(Map.empty)) .flatMapMerge(1, _._2) .map(_.value()) - .runWith(TestSink.probe) + .runWith(TestSink()) probe .request(100) @@ -66,7 +66,7 @@ class PartitionedSourcesSpec extends SpecBase with TestcontainersKafkaLike with tp => Future.successful(tp.map(_ -> 51L).toMap)) .flatMapMerge(1, _._2) .map(_.value()) - .runWith(TestSink.probe) + .runWith(TestSink()) probe .request(50) @@ -328,13 +328,13 @@ class PartitionedSourcesSpec extends SpecBase with TestcontainersKafkaLike with .flatMapMerge(1, _._2) .map(_.value()) - val (control1, firstConsumer) = source.toMat(TestSink.probe)(Keep.both).run() + val (control1, firstConsumer) = source.toMat(TestSink())(Keep.both).run() eventually { assert(partitionsAssigned, "first consumer should get asked for offsets") } - val secondConsumer = source.runWith(TestSink.probe) + val secondConsumer = source.runWith(TestSink()) eventually { revoked.value should have size partitions / 2L diff --git a/tests/src/test/scala/akka/kafka/scaladsl/RebalanceSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/RebalanceSpec.scala index a5562db59..2dbfd229b 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/RebalanceSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/RebalanceSpec.scala @@ -66,7 +66,7 @@ class RebalanceSpec extends SpecBase with TestcontainersKafkaLike with Inside { val probe1subscription = Subscriptions.topics(topic1).withRebalanceListener(probe1rebalanceActor.ref) val (control1, probe1) = Consumer .plainSource(consumerSettings.withClientId(consumerClientId1), probe1subscription) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() log.debug("Await initial partition assignment") @@ -90,7 +90,7 @@ class RebalanceSpec extends SpecBase with TestcontainersKafkaLike with Inside { val probe2subscription = Subscriptions.topics(topic1).withRebalanceListener(probe2rebalanceActor.ref) val (control2, probe2) = Consumer .plainSource(consumerSettings.withClientId(consumerClientId2), probe2subscription) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() log.debug("Await a revoke to consumer 1") @@ -134,7 +134,7 @@ class RebalanceSpec extends SpecBase with TestcontainersKafkaLike with Inside { .expectNextN(partitions.toLong) .map { case (tp, subSource) => - (tp, subSource.toMat(TestSink.probe)(Keep.right).run()) + (tp, subSource.toMat(TestSink())(Keep.right).run()) } def runForSubSource( @@ -170,7 +170,7 @@ class RebalanceSpec extends SpecBase with TestcontainersKafkaLike with Inside { val probe1subscription = Subscriptions.topics(topic1).withRebalanceListener(probe1rebalanceActor.ref) val (control1, probe1) = Consumer .plainPartitionedSource(consumerSettings.withClientId(consumerClientId1), probe1subscription) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() log.debug("Await initial partition assignment") @@ -200,7 +200,7 @@ class RebalanceSpec extends SpecBase with TestcontainersKafkaLike with Inside { val probe2subscription = Subscriptions.topics(topic1).withRebalanceListener(probe2rebalanceActor.ref) val (control2, probe2) = Consumer .plainPartitionedSource(consumerSettings.withClientId(consumerClientId2), probe2subscription) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() probe2.request(1) diff --git a/tests/src/test/scala/akka/kafka/scaladsl/RetentionPeriodSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/RetentionPeriodSpec.scala index 04f41b7ca..5b4df0ce8 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/RetentionPeriodSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/RetentionPeriodSpec.scala @@ -68,7 +68,7 @@ class RetentionPeriodSpec extends SpecBase with TestcontainersKafkaPerClassLike // val probe1subscription = Subscriptions.topics(topic1).withRebalanceListener(probe1rebalanceActor.ref) // val (control1, probe1) = Consumer // .committableSource(consumerSettings.withClientId(consumerClientId1), probe1subscription) -// .toMat(TestSink.probe)(Keep.both) +// .toMat(TestSink())(Keep.both) // .run() // // log.debug("Await initial partition assignment") @@ -87,7 +87,7 @@ class RetentionPeriodSpec extends SpecBase with TestcontainersKafkaPerClassLike // val probe2subscription = Subscriptions.topics(topic1).withRebalanceListener(probe2rebalanceActor.ref) // val (control2, probe2) = Consumer // .committableSource(consumerSettings.withClientId(consumerClientId2), probe2subscription) -// .toMat(TestSink.probe)(Keep.both) +// .toMat(TestSink())(Keep.both) // .run() // // log.debug("Await a revoke to consumer 1") @@ -120,7 +120,7 @@ class RetentionPeriodSpec extends SpecBase with TestcontainersKafkaPerClassLike // val probe3subscription = Subscriptions.topics("__consumer_offsets") // val (control3, probe3) = Consumer // .plainSource(group2consumerSettings.withClientId(consumerClientId3), probe3subscription) -// .toMat(TestSink.probe)(Keep.both) +// .toMat(TestSink())(Keep.both) // .run() // val commits: Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = probe3.request(100).expectNextN(10) // @@ -190,7 +190,7 @@ class RetentionPeriodSpec extends SpecBase with TestcontainersKafkaPerClassLike Done } } - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() probe1 @@ -207,7 +207,7 @@ class RetentionPeriodSpec extends SpecBase with TestcontainersKafkaPerClassLike val probe2 = Consumer .committableSource(consumerSettings, Subscriptions.topics(topic1)) .map(_.record.value) - .runWith(TestSink.probe) + .runWith(TestSink()) // Note that due to buffers and mapAsync(10) the committed offset is more // than 26, and that is not wrong @@ -227,7 +227,7 @@ class RetentionPeriodSpec extends SpecBase with TestcontainersKafkaPerClassLike val probe3 = Consumer .committableSource(consumerSettings, Subscriptions.topics(topic1)) .map(_.record.value) - .runWith(TestSink.probe) + .runWith(TestSink()) probe3 .request(100) diff --git a/tests/src/test/scala/akka/kafka/scaladsl/TimestampSpec.scala b/tests/src/test/scala/akka/kafka/scaladsl/TimestampSpec.scala index 9c17c8702..bac1d3056 100644 --- a/tests/src/test/scala/akka/kafka/scaladsl/TimestampSpec.scala +++ b/tests/src/test/scala/akka/kafka/scaladsl/TimestampSpec.scala @@ -38,7 +38,7 @@ class TimestampSpec extends SpecBase with TestcontainersKafkaLike with Inside wi val probe = Consumer .plainSource(consumerSettings, topicsAndTs) .map(_.value()) - .runWith(TestSink.probe) + .runWith(TestSink()) probe .request(50) @@ -64,7 +64,7 @@ class TimestampSpec extends SpecBase with TestcontainersKafkaLike with Inside wi val probe = Consumer .plainSource(consumerSettings, topicsAndTs) - .runWith(TestSink.probe) + .runWith(TestSink()) probe.ensureSubscription() probe.expectNoMessage(200.millis) @@ -83,7 +83,7 @@ class TimestampSpec extends SpecBase with TestcontainersKafkaLike with Inside wi val probe = Consumer .plainSource(consumerSettings, topicsAndTs) - .runWith(TestSink.probe) + .runWith(TestSink()) probe.ensureSubscription() probe.expectNoMessage(200.millis) diff --git a/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala b/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala index 1a7fb12b9..c822c185e 100644 --- a/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala +++ b/tests/src/test/scala/docs/scaladsl/SchemaRegistrySerializationSpec.scala @@ -138,21 +138,21 @@ class SchemaRegistrySerializationSpec extends DocsSpecBase with TestcontainersKa Consumer .plainExternalSource[String, SpecificRecord](consumerActor, Subscriptions.assignment(new TopicPartition(topic, 0))) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() val (control2, probe2) = Consumer .plainExternalSource[String, SpecificRecord](consumerActor, Subscriptions.assignment(new TopicPartition(topic, 1))) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() val (thisStreamStaysAlive, probe3) = Consumer .plainExternalSource[String, SpecificRecord](consumerActor, Subscriptions.assignment(new TopicPartition(topic, 2))) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() // request from 2 streams @@ -185,12 +185,12 @@ class SchemaRegistrySerializationSpec extends DocsSpecBase with TestcontainersKa val (control1, partitionedProbe) = Consumer .plainPartitionedSource(specificRecordConsumerSettings(group), Subscriptions.topics(topic)) - .toMat(TestSink.probe)(Keep.both) + .toMat(TestSink())(Keep.both) .run() partitionedProbe.request(1L) val (_, subSource) = partitionedProbe.expectNext() - val subStream = subSource.runWith(TestSink.probe) + val subStream = subSource.runWith(TestSink()) subStream.request(1L)