Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support error handling for KafkaActorSubscriber #29

Closed
He-Pin opened this issue Aug 21, 2015 · 6 comments
Closed

support error handling for KafkaActorSubscriber #29

He-Pin opened this issue Aug 21, 2015 · 6 comments
Milestone

Comments

@He-Pin
Copy link
Member

He-Pin commented Aug 21, 2015

  private def processElement(element: T) = {
    producer.send(props.encoder.toBytes(element), props.partitionizer(element))
  }

I here the encoder may cause an error,and I don't want the actor subscriber to fail.I can't find anyway to handle it.

the akka doc said :

ActorPublisher source and ActorSubscriber sink components do not honour the supervision strategy attribute yet.
@He-Pin
Copy link
Member Author

He-Pin commented Aug 21, 2015

like add a invoker to props
and then :

  private def processElement(element: T) = {
props.invoker.invoke{
    producer.send(props.encoder.toBytes(element), props.partitionizer(element))
}
  }

@kciesielski
Copy link
Contributor

You can supervise a KafkaActorSubscriber and prevent it from restarting using code like this:
https://github.com/softwaremill/reactive-kafka/blob/master/src/test/scala/examples/examples.scala#L40
It's not perfect and I'm hoping get this refined but I guess that it can should work for now.

@He-Pin
Copy link
Member Author

He-Pin commented Aug 24, 2015

yes I just found the refactory here
#23
will close it for now.

@He-Pin He-Pin closed this as completed Aug 24, 2015
@kciesielski
Copy link
Contributor

There's another way to handle subscriber errors:

   val sinkDecider: Supervision.Decider = {
      case _ => Supervision.Resume // Your error handling
    }

    Source(publisher)
      .map(_.message().toUpperCase)
      .to(Sink(subscriber).withAttributes(ActorAttributes.supervisionStrategy(sinkDecider)))
      .run()

@kciesielski kciesielski added this to the 0.8.0 milestone Aug 26, 2015
@He-Pin
Copy link
Member Author

He-Pin commented Aug 26, 2015

@kciesielski once I realized that I could return the props ,I just attach it as a child actor and the override the supervison strategy.
but from the doc,said that:

ZipWith, FlexiMerge, FlexiRoute junction, ActorPublisher source and ActorSubscriber sink components do not honour the supervision strategy attribute yet.

will the solution you described here works?

@kciesielski
Copy link
Contributor

@hepin1989 Thanks for finding this :) Indeed, I wrote some tests and looks like this decider will never be called. So it looks like the only way to catch exceptions now is to use something similar to the method described in the ReactiveKafkaIntegrationSpec:"close producer, kill actor and log error when in trouble" test.
I'll update the examples and documentation accordingly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants