Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New api #23

Merged
merged 13 commits into from
Jul 23, 2015
Merged
51 changes: 47 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@ import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import com.softwaremill.react.kafka.ReactiveKafka
import com.softwaremill.react.kafka.ProducerProps
import com.softwaremill.react.kafka.ConsumerProps
import kafka.serializer.{StringDecoder, StringEncoder}

implicit val actorSystem = ActorSystem("ReactiveKafka")
implicit val materializer = ActorMaterializer()

val kafka = new ReactiveKafka(host = "localhost:9092", zooKeeperHost = "localhost:2181")
val publisher = kafka.consume("lowercaseStrings", "groupName", new StringDecoder())
val subscriber = kafka.publish("uppercaseStrings", "groupName", new StringEncoder())

val kafka = new ReactiveKafka()
val publisher = kafka.consume(ConsumerProps("localhost:9092", "localhost:2181", "lowercaseStrings", "groupName", new StringDecoder()))
val subscriber = kafka.publish(ProducerProps("localhost:9092", "uppercaseSettings", "groupName", new StringEncoder()))

Source(publisher).map(_.toUpperCase).to(Sink(subscriber)).run()
```
Expand All @@ -50,6 +51,48 @@ you can use alternative way to create a consumer:
val publisher = kafka.consumeFromEnd(topic, groupId, new StringDecoder())
````

Working with actors
----
Since we are based upon akka-stream, the best way to handle errors is to leverage Akka's error handling and lifecycle
management capabilities. Producers and consumers are in fact actors.

#### Obtaining actor references
`ReactiveKafka` comes with a few methods allowing working on the actor level. You can let it create Props to let your
own supervisor create these actor as children, or you can directly create actors at the top level of supervision.
Here are a some examples:

```Scala
// inside an Actor:
implicit val materializer = ActorMaterializer()

val kafka = new ReactiveKafka()
// publisher
val publisherProps = ConsumerProps("localhost:9092", "localhost:2181", "lowercaseStrings", "groupName", new StringDecoder())
val publisherActorProps: Props = kafka.consumerActorProps(publisherProps)
val publisherActor: ActorRef = context.actorOf(publisherActorProps)
// or:
val topLevelPublisherActor: ActorRef = kafka.consumerActor(publisherProps)

// subscriber
val subscriberProps = ProducerProps("localhost:9092", "uppercaseSettings", "groupName", new StringEncoder())
val subscriberActorProps: Props = kafka.producerActorProps(subscriberProps)
val subscriberActor: ActorRef = context.actorOf(subscriberActorProps)
// or:
val topLevelSubscriberActor: ActorRef = kafka.producerActor(subscriberProps)
```

#### Handling errors
When a publisher (consumer) fails to load more elements from Kafka, it calls `onError()` on all of its subscribers.
The error will be handled depending on subscriber implementation.
When a subscriber (producer) fails to get more elements from upstream due to an error, it is no longer usable.
It will throw an exception and close all underlying resource (effectively: the Kafka connection).
If there's a problem with putting elements into Kafka, only an exception will be thrown.
This mechanism allows custom handling and keeping the subscriber working.

#### Cleaning up
If you want to manually stop a publisher or a subscriber, you can just kill the actor using `context.stop()` or a
`PoisonPill`. Underlying Kafka resources will be cleaned up.

Tuning
----

Expand Down
1 change: 0 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ scalacOptions ++= Seq(
"-encoding", "UTF-8", // yes, this is 2 args
"-feature",
"-unchecked",
"-Xfatal-warnings",
"-Xlint",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious, what's the reason for this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original intent is to guard certain level of quality. This comes from a commercial project I've been working on where such set of flags does a good job. Do you see any concerns regarding this flag?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed that's a good intent. It's just that you are deleting the flag here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I wanted to keep @deprecated method calls in tests which wouldn't compile with this flag. Ultimately I resigned from this idea so I can bring it back, thanks for pointing this out.

"-Yno-adapted-args",
"-Ywarn-dead-code",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package kafka.consumer
package com.softwaremill.react.kafka

import java.util.Properties
import java.util.UUID

import kafka.consumer.ConsumerConfig
import kafka.serializer.Decoder

object ConsumerProps {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using Props may be a bit confusing here (because of akka Props). Maybe rename to ConsumerProperties? (similar for ProducerProps)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I'll update that. Thanks :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'd definitely second that


Expand Down Expand Up @@ -33,7 +35,13 @@ object ConsumerProps {
* group id multiple processes indicate that they are all part of the same consumer group.
*
*/
def apply(brokerList: String, zooKeeperHost: String, topic: String, groupId: String = UUID.randomUUID().toString): ConsumerProps = {
def apply[T](
brokerList: String,
zooKeeperHost: String,
topic: String,
groupId: String,
decoder: Decoder[T]
): ConsumerProps[T] = {
val props = Map[String, String](
"metadata.broker.list" -> brokerList,
"group.id" -> groupId,
Expand All @@ -45,19 +53,23 @@ object ConsumerProps {
"offsets.storage" -> "zookeeper"
)

new ConsumerProps(props, topic, groupId)
new ConsumerProps(props, topic, groupId, decoder)
}
}

case class ConsumerProps(private val params: Map[String, String], topic: String, groupId: String) {
case class ConsumerProps[T](
params: Map[String, String],
topic: String,
groupId: String,
decoder: Decoder[T]
) {

/**
* Consumer Timeout
* Throw a timeout exception to the consumer if no message is available for consumption after the specified interval
*/
def consumerTimeoutMs(timeInMs: Long): ConsumerProps = {
ConsumerProps(params + ("consumer.timeout.ms" -> timeInMs.toString), topic, groupId)
}
def consumerTimeoutMs(timeInMs: Long): ConsumerProps[T] =
copy(params = params + ("consumer.timeout.ms" -> timeInMs.toString))

/**
* What to do when there is no initial offset in Zookeeper or if an offset is out of range:
Expand All @@ -80,27 +92,25 @@ case class ConsumerProps(private val params: Map[String, String], topic: String,
* ***************************************************************************************
*
*/
def readFromEndOfStream(): ConsumerProps = {
ConsumerProps(params + ("auto.offset.reset" -> "largest"), topic, groupId)
}
def readFromEndOfStream(): ConsumerProps[T] = copy(params = params + ("auto.offset.reset" -> "largest"))

/**
* Store offsets in Kafka and/or ZooKeeper. NOTE: Server instance must be 8.2 or higher
*
* dualCommit = true means store in both ZooKeeper(legacy) and Kafka(new) places.
*/
def kafkaOffsetsStorage(dualCommit: Boolean): ConsumerProps = {
def kafkaOffsetsStorage(dualCommit: Boolean): ConsumerProps[T] = {
val p = params + (
"offsets.storage" -> "kafka",
"dual.commit.enabled" -> dualCommit.toString
)
ConsumerProps(p, topic, groupId)
copy(params = p)
}
/**
* Set any additional properties as needed
*/
def setProperty(key: String, value: String): ConsumerProps = ConsumerProps(params + (key -> value), topic, groupId)
def setProperties(values: (String, String)*): ConsumerProps = ConsumerProps(params ++ values, topic, groupId)
def setProperty(key: String, value: String): ConsumerProps[T] = copy(params = params + (key -> value))
def setProperties(values: (String, String)*): ConsumerProps[T] = copy(params = params ++ values)

/**
* Generate the Kafka ConsumerConfig object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package com.softwaremill.react.kafka
import akka.stream.actor.{ActorPublisher, ActorPublisherMessage}
import com.softwaremill.react.kafka.KafkaActorPublisher.Poll
import kafka.consumer.{ConsumerTimeoutException, KafkaConsumer}
import kafka.serializer.Decoder

import scala.annotation.tailrec
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
private[kafka] class KafkaActorPublisher[T](consumer: KafkaConsumer, decoder: Decoder[T]) extends ActorPublisher[T] {

private[kafka] class KafkaActorPublisher[T](consumer: KafkaConsumer[T]) extends ActorPublisher[T] {

val iterator = consumer.iterator()

Expand All @@ -29,7 +29,7 @@ private[kafka] class KafkaActorPublisher[T](consumer: KafkaConsumer, decoder: De
private def tryReadingSingleElement(): Try[Option[T]] = {
Try {
val bytes = if (iterator.hasNext() && demand_?) Option(iterator.next().message()) else None
bytes.map(decoder.fromBytes)
bytes.map(consumer.props.decoder.fromBytes)
} recover {
// We handle timeout exceptions as normal 'end of the queue' cases
case _: ConsumerTimeoutException => None
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package com.softwaremill.react.kafka

import akka.stream.actor.{ActorSubscriber, ActorSubscriberMessage, WatermarkRequestStrategy}
import akka.stream.actor.{ActorSubscriber, ActorSubscriberMessage, RequestStrategy}
import kafka.producer.KafkaProducer
import kafka.serializer.Encoder

private[kafka] class KafkaActorSubscriber[T](
val producer: KafkaProducer,
val encoder: Encoder[T],
partitionizer: T => Option[Array[Byte]] = (_: T) => None
val producer: KafkaProducer[T],
props: ProducerProps[T],
requestStrategyProvider: () => RequestStrategy
)
extends ActorSubscriber {

protected def requestStrategy = WatermarkRequestStrategy(10)
protected def requestStrategy = requestStrategyProvider()

override def postStop(): Unit = {
cleanupResources()
Expand All @@ -23,17 +22,12 @@ private[kafka] class KafkaActorSubscriber[T](
processElement(element.asInstanceOf[T])
case ActorSubscriberMessage.OnError(ex) =>
handleError(ex)
case ActorSubscriberMessage.OnComplete =>
case ActorSubscriberMessage.OnComplete | "Stop" =>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is Stop added?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest: for tests only. I know that's pretty smelly but I couldn't find a better way to implement an important test (ReactiveKafkaIntegrationSpec. "close producer, kill actor and log error when in trouble"). Using TestActorRef.underlyingActor couldn't work there because I already had an instance of ActorRef which needs to be supervised. Sending ActorSubscriberMessage.OnComplete triggers some internal akka stuff that I didn't want to get executed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't need to block this PR, but maybe it's worth discussing it with the Akka guys? Seems like something that shouldn't be necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not an issue of Akka - it's "by design" according to reactive streams specs. OnComplete is meant to be an internal message of Akka Streams. When it's sent to a subscriber, one can't send any more elements (and onError will not be called). This is part of the specs, so I needed something custom to be able to close internal resource in order to simulate sending an element when Kafka is not available.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, yeah with Akka I meant akka-streams/the reactive stream spec. And I meant if implementations really have to add this kind of (indeed smelly) things to their production code then it is something worth thinking about/solving in akka-streams (or even the spec).

cleanupResources()
}

private def processElement(element: T) = {
try {
producer.send(encoder.toBytes(element), partitionizer(element))
}
catch {
case e: Exception => handleError(e)
}
producer.send(props.encoder.toBytes(element), props.partitionizer(element))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No exception handling anymore? this means exceptions get silently lost now, I think?

Were there any negative effect of

-    catch {        
-      case e: Exception => handleError(e)      
-    }      

Maybe a solution is to make it configurable what to do depending on the type of exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleError() in the semantics of reactive streams API is meant to be called only on fatal issues, where you cannot recover and you need to close your resources. It cannot be called twice. This case means that something bad happened when we tried to put an element in Kafka which may be recoverable.
I decided to leave error handling to standard mechanisms of Akka - supervision. Throwing an exception should be handled by actor's supervisor. If there's no supervisor, the default Akka's behavior will be writing to logs and restarting the actor (which has no effect on internal Kafka reference) and letting it work further.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, using supervisor sounds good. I'm only wondering if it is possible/easy to set it with the current set up. May be worth an example (doesn't need to be in this PR though)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's included in this PR. Check README.md for the "Working with actors" section, it also describes error handling.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the probleam is if the producer is closed,and this change will cause the actor restart again and again and rethrow the ProducerAlreadClosed exception and restart again.until it until the poisionPill.

}

private def handleError(ex: Throwable) = {
Expand All @@ -44,4 +38,4 @@ private[kafka] class KafkaActorSubscriber[T](
def cleanupResources() = {
producer.close()
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package kafka.producer
package com.softwaremill.react.kafka

import java.util.Properties
import java.util.UUID
import java.util.{Properties, UUID}

import kafka.message.DefaultCompressionCodec
import kafka.message.NoCompressionCodec
import kafka.message.SnappyCompressionCodec
import kafka.message.{DefaultCompressionCodec, NoCompressionCodec, SnappyCompressionCodec}
import kafka.producer.ProducerConfig
import kafka.serializer.Encoder

object ProducerProps {

Expand All @@ -29,7 +28,13 @@ object ProducerProps {
* the application making the request.
*
*/
def apply(brokerList: String, topic: String, clientId: String = UUID.randomUUID().toString): ProducerProps = {
def apply[T](
brokerList: String,
topic: String,
clientId: String,
encoder: Encoder[T],
partitionizer: T => Option[Array[Byte]]
): ProducerProps[T] = {
val props = Map[String, String](
"metadata.broker.list" -> brokerList,

Expand All @@ -41,40 +46,62 @@ object ProducerProps {
"producer.type" -> "sync"
)

new ProducerProps(props, topic, clientId)
new ProducerProps(props, topic, clientId, encoder, partitionizer)
}

def apply[T](brokerList: String, topic: String, clientId: String, encoder: Encoder[T]): ProducerProps[T] = {
val props = Map[String, String](
"metadata.broker.list" -> brokerList,

// defaults
"compression.codec" -> DefaultCompressionCodec.codec.toString,
"client.id" -> clientId,
"message.send.max.retries" -> 3.toString,
"request.required.acks" -> -1.toString,
"producer.type" -> "sync"
)

new ProducerProps(props, topic, clientId, encoder, (_: T) => None)
}

}

case class ProducerProps(private val params: Map[String, String], topic: String, clientId: String) {
case class ProducerProps[T](
private val params: Map[String, String],
topic: String,
clientId: String = UUID.randomUUID().toString,
encoder: Encoder[T],
partitionizer: T => Option[Array[Byte]] = (_: T) => None
) {

/**
* Asynchronous Mode
* The number of messages to send in one batch when using async mode.
* The producer will wait until either this number of messages are ready
* to send or bufferMaxMs timeout is reached.
*/
def asynchronous(batchSize: Int = 200, bufferMaxMs: Int = 500): ProducerProps = {
def asynchronous(batchSize: Int = 200, bufferMaxMs: Int = 500): ProducerProps[T] = {
val p = params + (
"producer.type" -> "async",
"batch.num.messages" -> batchSize.toString,
"queue.buffering.max.ms" -> bufferMaxMs.toString
)
ProducerProps(p, topic, clientId)
copy(params = p)
}

/**
* No Compression
* Allows you to turn off the compression codec for all data generated by this producer.
*/
def noCompression(): ProducerProps = {
ProducerProps(params + ("compression.codec" -> NoCompressionCodec.codec.toString), topic, clientId)
def noCompression(): ProducerProps[T] = {
copy(params = params + ("compression.codec" -> NoCompressionCodec.codec.toString))
}

/**
* Use Snappy Compression instead of the default compression
*/
def useSnappyCompression(): ProducerProps = {
ProducerProps(params + ("compression.codec" -> SnappyCompressionCodec.codec.toString), topic, clientId)
def useSnappyCompression(): ProducerProps[T] = {
copy(params = params + ("compression.codec" -> SnappyCompressionCodec.codec.toString))
}

/**
Expand All @@ -84,8 +111,8 @@ case class ProducerProps(private val params: Map[String, String], topic: String,
* setting a non-zero value here can lead to duplicates in the case of network errors
* that cause a message to be sent but the acknowledgment to be lost.
*/
def messageSendMaxRetries(num: Int): ProducerProps = {
ProducerProps(params + ("message.send.max.retries" -> num.toString), topic, clientId)
def messageSendMaxRetries(num: Int): ProducerProps[T] = {
copy(params = params + ("message.send.max.retries" -> num.toString))
}

/**
Expand All @@ -98,15 +125,15 @@ case class ProducerProps(private val params: Map[String, String], topic: String,
* -1) which means that the producer gets an acknowledgment after all in-sync replicas have received the data. This option
* provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains.
*/
def requestRequiredAcks(value: Int): ProducerProps = {
ProducerProps(params + ("request.required.acks" -> value.toString), topic, clientId)
def requestRequiredAcks(value: Int): ProducerProps[T] = {
copy(params = params + ("request.required.acks" -> value.toString))
}

/**
* Set any additional properties as needed
*/
def setProperty(key: String, value: String): ProducerProps = ProducerProps(params + (key -> value), topic, clientId)
def setProperties(values: (String, String)*): ProducerProps = ProducerProps(params ++ values, topic, clientId)
def setProperty(key: String, value: String): ProducerProps[T] = copy(params = params + (key -> value))
def setProperties(values: (String, String)*): ProducerProps[T] = copy(params = params ++ values)

/**
* Generate the Kafka ProducerConfig object
Expand All @@ -120,5 +147,4 @@ case class ProducerProps(private val params: Map[String, String], topic: String,
* Dump current props for debugging
*/
def dump: String = params.map { e => f"${e._1}%-20s : ${e._2.toString}" }.mkString("\n")

}
Loading