-
Notifications
You must be signed in to change notification settings - Fork 387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New api #23
New api #23
Changes from 9 commits
b04b5a3
42aa47d
3a9a361
f5a1bbe
d834cd6
6bafe4c
ba48493
3f91b9f
01c9a0a
b241930
c7edcee
93fa6dc
dea92ec
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,9 @@ | ||
package kafka.consumer | ||
package com.softwaremill.react.kafka | ||
|
||
import java.util.Properties | ||
import java.util.UUID | ||
|
||
import kafka.consumer.ConsumerConfig | ||
import kafka.serializer.Decoder | ||
|
||
object ConsumerProps { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. I'll update that. Thanks :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I'd definitely second that |
||
|
||
|
@@ -33,7 +35,13 @@ object ConsumerProps { | |
* group id multiple processes indicate that they are all part of the same consumer group. | ||
* | ||
*/ | ||
def apply(brokerList: String, zooKeeperHost: String, topic: String, groupId: String = UUID.randomUUID().toString): ConsumerProps = { | ||
def apply[T]( | ||
brokerList: String, | ||
zooKeeperHost: String, | ||
topic: String, | ||
groupId: String, | ||
decoder: Decoder[T] | ||
): ConsumerProps[T] = { | ||
val props = Map[String, String]( | ||
"metadata.broker.list" -> brokerList, | ||
"group.id" -> groupId, | ||
|
@@ -45,19 +53,23 @@ object ConsumerProps { | |
"offsets.storage" -> "zookeeper" | ||
) | ||
|
||
new ConsumerProps(props, topic, groupId) | ||
new ConsumerProps(props, topic, groupId, decoder) | ||
} | ||
} | ||
|
||
case class ConsumerProps(private val params: Map[String, String], topic: String, groupId: String) { | ||
case class ConsumerProps[T]( | ||
params: Map[String, String], | ||
topic: String, | ||
groupId: String, | ||
decoder: Decoder[T] | ||
) { | ||
|
||
/** | ||
* Consumer Timeout | ||
* Throw a timeout exception to the consumer if no message is available for consumption after the specified interval | ||
*/ | ||
def consumerTimeoutMs(timeInMs: Long): ConsumerProps = { | ||
ConsumerProps(params + ("consumer.timeout.ms" -> timeInMs.toString), topic, groupId) | ||
} | ||
def consumerTimeoutMs(timeInMs: Long): ConsumerProps[T] = | ||
copy(params = params + ("consumer.timeout.ms" -> timeInMs.toString)) | ||
|
||
/** | ||
* What to do when there is no initial offset in Zookeeper or if an offset is out of range: | ||
|
@@ -80,27 +92,25 @@ case class ConsumerProps(private val params: Map[String, String], topic: String, | |
* *************************************************************************************** | ||
* | ||
*/ | ||
def readFromEndOfStream(): ConsumerProps = { | ||
ConsumerProps(params + ("auto.offset.reset" -> "largest"), topic, groupId) | ||
} | ||
def readFromEndOfStream(): ConsumerProps[T] = copy(params = params + ("auto.offset.reset" -> "largest")) | ||
|
||
/** | ||
* Store offsets in Kafka and/or ZooKeeper. NOTE: Server instance must be 8.2 or higher | ||
* | ||
* dualCommit = true means store in both ZooKeeper(legacy) and Kafka(new) places. | ||
*/ | ||
def kafkaOffsetsStorage(dualCommit: Boolean): ConsumerProps = { | ||
def kafkaOffsetsStorage(dualCommit: Boolean): ConsumerProps[T] = { | ||
val p = params + ( | ||
"offsets.storage" -> "kafka", | ||
"dual.commit.enabled" -> dualCommit.toString | ||
) | ||
ConsumerProps(p, topic, groupId) | ||
copy(params = p) | ||
} | ||
/** | ||
* Set any additional properties as needed | ||
*/ | ||
def setProperty(key: String, value: String): ConsumerProps = ConsumerProps(params + (key -> value), topic, groupId) | ||
def setProperties(values: (String, String)*): ConsumerProps = ConsumerProps(params ++ values, topic, groupId) | ||
def setProperty(key: String, value: String): ConsumerProps[T] = copy(params = params + (key -> value)) | ||
def setProperties(values: (String, String)*): ConsumerProps[T] = copy(params = params ++ values) | ||
|
||
/** | ||
* Generate the Kafka ConsumerConfig object | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,17 +1,16 @@ | ||
package com.softwaremill.react.kafka | ||
|
||
import akka.stream.actor.{ActorSubscriber, ActorSubscriberMessage, WatermarkRequestStrategy} | ||
import akka.stream.actor.{ActorSubscriber, ActorSubscriberMessage, RequestStrategy} | ||
import kafka.producer.KafkaProducer | ||
import kafka.serializer.Encoder | ||
|
||
private[kafka] class KafkaActorSubscriber[T]( | ||
val producer: KafkaProducer, | ||
val encoder: Encoder[T], | ||
partitionizer: T => Option[Array[Byte]] = (_: T) => None | ||
val producer: KafkaProducer[T], | ||
props: ProducerProps[T], | ||
requestStrategyProvider: () => RequestStrategy | ||
) | ||
extends ActorSubscriber { | ||
|
||
protected def requestStrategy = WatermarkRequestStrategy(10) | ||
protected def requestStrategy = requestStrategyProvider() | ||
|
||
override def postStop(): Unit = { | ||
cleanupResources() | ||
|
@@ -23,17 +22,12 @@ private[kafka] class KafkaActorSubscriber[T]( | |
processElement(element.asInstanceOf[T]) | ||
case ActorSubscriberMessage.OnError(ex) => | ||
handleError(ex) | ||
case ActorSubscriberMessage.OnComplete => | ||
case ActorSubscriberMessage.OnComplete | "Stop" => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be honest: for tests only. I know that's pretty smelly but I couldn't find a better way to implement an important test ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't need to block this PR, but maybe it's worth discussing it with the Akka guys? Seems like something that shouldn't be necessary. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's not an issue of Akka - it's "by design" according to reactive streams specs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, yeah with Akka I meant akka-streams/the reactive stream spec. And I meant if implementations really have to add this kind of (indeed smelly) things to their production code then it is something worth thinking about/solving in akka-streams (or even the spec). |
||
cleanupResources() | ||
} | ||
|
||
private def processElement(element: T) = { | ||
try { | ||
producer.send(encoder.toBytes(element), partitionizer(element)) | ||
} | ||
catch { | ||
case e: Exception => handleError(e) | ||
} | ||
producer.send(props.encoder.toBytes(element), props.partitionizer(element)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No exception handling anymore? this means exceptions get silently lost now, I think? Were there any negative effect of
Maybe a solution is to make it configurable what to do depending on the type of exception? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, using supervisor sounds good. I'm only wondering if it is possible/easy to set it with the current set up. May be worth an example (doesn't need to be in this PR though) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's included in this PR. Check README.md for the "Working with actors" section, it also describes error handling. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the probleam is if the producer is closed,and this change will cause the actor restart again and again and rethrow the ProducerAlreadClosed exception and restart again.until it until the poisionPill. |
||
} | ||
|
||
private def handleError(ex: Throwable) = { | ||
|
@@ -44,4 +38,4 @@ private[kafka] class KafkaActorSubscriber[T]( | |
def cleanupResources() = { | ||
producer.close() | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just curious, what's the reason for this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Original intent is to guard certain level of quality. This comes from a commercial project I've been working on where such set of flags does a good job. Do you see any concerns regarding this flag?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed that's a good intent. It's just that you are deleting the flag here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. I wanted to keep
@deprecated
method calls in tests which wouldn't compile with this flag. Ultimately I resigned from this idea so I can bring it back, thanks for pointing this out.