Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error handling #41

Closed
z4f1r0v opened this issue Oct 2, 2015 · 10 comments
Closed

Error handling #41

z4f1r0v opened this issue Oct 2, 2015 · 10 comments

Comments

@z4f1r0v
Copy link

z4f1r0v commented Oct 2, 2015

We streaming Twitter data from a service called GNIP suddenly I get a

ERROR [lt-dispatcher-2] c.s.r.k.KafkaActorSubscriber Stopping Kafka subscriber due to fatal error. WARNING arguments left: 1

As I read in order to understand what is happening I need to implement error handling with actors.
I would like some help on the example. Can someone show me how I can create a sink from the actor that supervises the KafkaActorSubscriber child?

@z4f1r0v z4f1r0v closed this as completed Oct 5, 2015
@z4f1r0v z4f1r0v reopened this Oct 5, 2015
@kciesielski
Copy link
Contributor

There's an example in our Activator Template: https://github.com/softwaremill/activator-reactive-kafka-scala/blob/master/src/main/scala/sample/reactivekafka/KafkaWriterCoordinator.scala
Let me know if you need any further explanation on how to plug in your error handling.

@z4f1r0v
Copy link
Author

z4f1r0v commented Oct 5, 2015

Made some progress on the matter and ended up with this. Unfortunately I'm still getting the above mentioned error. I would like to know what is this "arguments left: 1"

import akka.actor.SupervisorStrategy.Resume
import akka.actor.{ Props, Actor, ActorLogging, OneForOneStrategy, SupervisorStrategy }
import com.softwaremill.react.kafka.{ ProducerProperties, ReactiveKafka }
import com.typesafe.config.Config
import kafka.serializer.StringEncoder

import scala.collection.JavaConversions._

class ActorSupervisor(config: Config)(implicit val materializer: akka.stream.Materializer) extends Actor with ActorLogging {
  val subscriber: String = "Subscriber"

  override val supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
    case e =>
      log.error("Exception: {}", e)
      Resume
  }

  private def createSupervisedSubscriberActor() = {
    val kafka = new ReactiveKafka()

    val subscriberProperties = ProducerProperties(
      brokerList = config.getStringList("kafka.brokers").toList.mkString(","),
      topic = config.getString("kafka.topic"),
      clientId = config.getString("kafka.clientId"),
      encoder = new StringEncoder())

    val subscriberActorProps: Props = kafka.producerActorProps(subscriberProperties)
    context.actorOf(subscriberActorProps, subscriber)
  }

  override def aroundPreStart() = {
    createSupervisedSubscriberActor()
  }

  override def receive = {
    case msg => context.child(subscriber) match {
      case Some(ca) => ca ! msg
      case None =>
    }
  }
}

And the sink is created like this

val kafkaSink = Sink.actorSubscriber(Props(new ActorSupervisor(config)))

@z4f1r0v
Copy link
Author

z4f1r0v commented Oct 7, 2015

According to this you are missing a {} in KafkaActorSubscriber.
Should I make a PR?

@kciesielski
Copy link
Contributor

Sure, a PR will be most welcome :)

@z4f1r0v
Copy link
Author

z4f1r0v commented Oct 7, 2015

Ok, I see that someone already changed the logging. I guess this will be part of 0.8.2.? @kciesielski any estimates on when you will put that up?

@kciesielski kciesielski added the bug label Oct 8, 2015
@kciesielski
Copy link
Contributor

@alexanderzafirov I need to verify this one as well: #43, looks like a serious issue. As soon as it's resolved I'll release 0.8.2.

@z4f1r0v
Copy link
Author

z4f1r0v commented Oct 8, 2015

@kciesielski thanks!

@kciesielski kciesielski added this to the 0.9.0 milestone Oct 12, 2015
@z4f1r0v
Copy link
Author

z4f1r0v commented Oct 13, 2015

@kciesielski Just to clarify - by labeling the issue as a bug are you referring to the fact that we cannot catch certain exceptions with the ActorSupervisor?

@kciesielski
Copy link
Contributor

@alexanderzafirov Yes, because we cannot see the cause.

@kciesielski kciesielski removed this from the 0.9.0 milestone Oct 20, 2015
@kciesielski
Copy link
Contributor

Fixed by 35f7a49

@ennru ennru added this to the invalid milestone Jun 7, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants