Skip to content

Commit

Permalink
Rely on Akka's error handling instead of custom logging
Browse files Browse the repository at this point in the history
- Also allow gentle closing of resources by just killing the actors
- This fixes #21
  • Loading branch information
kciesielski committed Jul 20, 2015
1 parent 3e71c95 commit c714f9f
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,19 @@ private[kafka] class KafkaActorPublisher[T](consumer: KafkaConsumer, decoder: De

override def receive = {
case ActorPublisherMessage.Request(_) | Poll => readDemandedItems()
case ActorPublisherMessage.Cancel | ActorPublisherMessage.SubscriptionTimeoutExceeded => cleanupResources()
case ActorPublisherMessage.Cancel | ActorPublisherMessage.SubscriptionTimeoutExceeded =>
cleanupResources()
context.stop(self)
}

private def demand_? : Boolean = totalDemand > 0

private def tryReadingSingleElement(): Try[Option[T]] = {
override def postStop(): Unit = {
cleanupResources()
super.postStop()
}

private def tryReadingSingleElement(): Try[Option[T]] = {
Try {
val bytes = if (iterator.hasNext() && demand_?) Option(iterator.next().message()) else None
bytes.map(decoder.fromBytes)
Expand All @@ -44,7 +50,6 @@ private[kafka] class KafkaActorPublisher[T](consumer: KafkaConsumer, decoder: De

private def cleanupResources(): Unit = {
consumer.close()
context.stop(self)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.softwaremill.react.kafka

import akka.actor.ActorLogging
import akka.stream.actor.{ActorSubscriber, ActorSubscriberMessage, WatermarkRequestStrategy}
import kafka.producer.KafkaProducer
import kafka.serializer.Encoder
Expand All @@ -10,17 +9,22 @@ private[kafka] class KafkaActorSubscriber[T](
val encoder: Encoder[T],
partitionizer: T => Option[Array[Byte]] = (_: T) => None
)
extends ActorSubscriber with ActorLogging {
extends ActorSubscriber {

protected def requestStrategy = WatermarkRequestStrategy(10)

override def postStop(): Unit = {
cleanupResources()
super.postStop()
}

def receive = {
case ActorSubscriberMessage.OnNext(element) =>
processElement(element.asInstanceOf[T])
case ActorSubscriberMessage.OnError(ex) =>
handleError(ex)
case ActorSubscriberMessage.OnComplete =>
streamFinished()
cleanupResources()
}

private def processElement(element: T) = {
Expand All @@ -33,12 +37,11 @@ private[kafka] class KafkaActorSubscriber[T](
}

private def handleError(ex: Throwable) = {
log.error("Stopping subscriber due to an error", ex)
producer.close()
context.stop(self)
cleanupResources()
throw ex
}

private def streamFinished() = {
def cleanupResources() = {
producer.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,27 @@ package com.softwaremill.react.kafka

import java.util.UUID

import akka.actor.{ActorSystem, Props}
import akka.actor.SupervisorStrategy.Stop
import akka.actor._
import akka.pattern.ask
import akka.stream.actor.{ActorSubscriber, ActorSubscriberMessage, WatermarkRequestStrategy}
import akka.testkit.{ImplicitSender, TestKit}
import akka.testkit.{EventFilter, ImplicitSender, TestActorRef, TestKit}
import akka.util.Timeout
import kafka.serializer.{StringEncoder, StringDecoder}
import com.typesafe.config.ConfigFactory
import kafka.producer.{KafkaProducer, ProducerClosedException, ProducerProps}
import kafka.serializer.{StringDecoder, StringEncoder}
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps

class ReactiveKafkaIntegrationSpec(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with WordSpecLike
with Matchers with BeforeAndAfterAll {

def this() = this(ActorSystem("ReactiveKafkaIntegrationSpec"))
class ReactiveKafkaIntegrationSpec
extends TestKit(ActorSystem(
"ReactiveKafkaIntegrationSpec",
ConfigFactory.parseString("""akka.loggers = ["akka.testkit.TestEventListener"]""")
))
with ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll {

def uuid() = UUID.randomUUID().toString
implicit val timeout = Timeout(1 second)
Expand Down Expand Up @@ -61,6 +66,33 @@ class ReactiveKafkaIntegrationSpec(_system: ActorSystem) extends TestKit(_system
shouldStartConsuming(fromEnd = true)
}

"close producer, kill actor and log error when in trouble" in {
// given
val topic = uuid()
val group = uuid()
val kafka = newKafka()
val encoder = new StringEncoder()
val publisher = kafka.consume(topic, group, new StringDecoder())
val props = ProducerProps(kafka.host, topic, group)
val producer = new KafkaProducer(props)
val supervisor = system.actorOf(Props(new TestHelperSupervisor(self)))
val subscriberProps = Props(new KafkaActorSubscriber(producer, encoder))
val kafkaSubscriberActor = TestActorRef(subscriberProps, supervisor, "subscriber")
.asInstanceOf[TestActorRef[KafkaActorSubscriber[String]]]
val kafkaSubscriber = ActorSubscriber[String](kafkaSubscriberActor)
val subscriberActor = system.actorOf(Props(new ReactiveTestSubscriber))
val testSubscriber = ActorSubscriber[String](subscriberActor)
watch(kafkaSubscriberActor)
publisher.subscribe(testSubscriber)

// then
EventFilter[ProducerClosedException](message = "producer already closed") intercept {
kafkaSubscriberActor.underlyingActor.cleanupResources()
kafkaSubscriber.onNext("foo")
}
expectMsgClass(classOf[Throwable]).getClass should equal(classOf[ProducerClosedException])
}

def shouldStartConsuming(fromEnd: Boolean): Unit = {
// given
val kafka = newKafka()
Expand All @@ -85,9 +117,10 @@ class ReactiveKafkaIntegrationSpec(_system: ActorSystem) extends TestKit(_system
// then
awaitCond {
val collectedStrings = Await.result(subscriberActor ? "get elements", atMost = 1 second).asInstanceOf[Seq[_]]
collectedStrings.length > 0 && collectedStrings.contains("one") == !fromEnd
collectedStrings.nonEmpty && collectedStrings.contains("one") == !fromEnd
}
}

}

def newKafka(): ReactiveKafka = {
Expand All @@ -106,4 +139,18 @@ class ReactiveTestSubscriber extends ActorSubscriber {
elements = elements :+ element.asInstanceOf[String]
case "get elements" => sender ! elements
}
}

class TestHelperSupervisor(parent: ActorRef) extends Actor {

override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(
maxNrOfRetries = 10,
withinTimeRange = 1 minute
) {
case e: Throwable => parent ! e; Stop
}

override def receive = {
case _ =>
}
}

0 comments on commit c714f9f

Please sign in to comment.