Skip to content
This repository has been archived by the owner on Apr 29, 2022. It is now read-only.

Improve naming and demo #12

Merged
merged 1 commit into from
Sep 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 30 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,27 @@ use the type `Flow[C, R, Any]` where `C` is the command type and `R` is the resu
In the demo subproject "streamee-demo" one simple process is defined in the `DemoProcess` object:

``` scala
def apply(scheduler: Scheduler)(implicit ec: ExecutionContext): Flow[String, String, NotUsed] =
Flow[String]
.mapAsync(1)(step("step1", 2.seconds, scheduler))
.mapAsync(1)(step("step2", 2.seconds, scheduler))
def apply(scheduler: Scheduler)(implicit ec: ExecutionContext): Flow[Request, Response, NotUsed] =
Flow[Request]
.mapAsync(1) {
case Request(id, n) => after(2.seconds, scheduler)(Future.successful((id, n * 42)))
}
.mapAsync(1) {
case (id, n) => after(2.seconds, scheduler)(Future.successful(Response(id, n)))
}
```

Next we have to create the actual processor, i.e. the running stream into which the process is
embedded, by calling `Processor.apply` thereby giving the process, processor settings and the
reference to `CoordinatedShutdown`.

In the demo subproject "streamee-demo" this happens in `Main`:
In the demo subproject "streamee-demo" this happens in `Api`:

``` scala
val demoProcessor =
Processor(DemoLogic(scheduler)(untypedSystem.dispatcher),
ProcessorSettings(context.system),
CoordinatedShutdown(context.system.toUntyped))
Processor(DemoProcess(scheduler)(untypedSystem.dispatcher),
ProcessorSettings(untypedSystem),
CoordinatedShutdown(untypedSystem))
```

Commands offered via the returned queue are emitted into the given process. Once results are
Expand All @@ -84,18 +88,14 @@ using an `ExpiringPromise` with the given timeout.
In the demo subproject "streamee-demo" this happens in `Api`:

``` scala
pathPrefix("accounts") {
post {
entity(as[Entity]) {
case Entity(s) =>
onProcessorSuccess(s, demoProcessor, demoProcessorTimeout, scheduler) {
case s if s.isEmpty =>
complete(StatusCodes.BadRequest -> "Empty entity!")
case s if s.startsWith("taxi") =>
complete(StatusCodes.Conflict -> "We don't like taxis ;-)")
case s =>
complete(StatusCodes.Created -> s)
}
post {
entity(as[DemoProcess.Request]) { request =>
onProcessorSuccess(request, demoProcessor, demoProcessorTimeout, scheduler) {
case DemoProcess.Response(_, n) if n == 42 =>
complete(StatusCodes.BadRequest -> "Request must not have n == 1!")

case DemoProcess.Response(_, n) =>
complete(StatusCodes.Created -> n)
}
}
}
Expand All @@ -112,16 +112,15 @@ This code is open source software licensed under the [Apache 2.0 License](http:/

## Publishing

To publish a release to Maven central follow these steps:
To publish a release to Maven Central follow these steps:

1. Create a release via GitHub
2. Publish artifact to OSS Sonatype stage repository:
```
sbt publishSigned
```
Note that your Sonatype credentials needs to be configured on your machine and you need to have access writes to publish artifacts to the group id `io.moia`.
1. Create a tag/release on GitHub
2. Publish the artifact to the OSS Sonatype stage repository:
```
sbt publishSigned
```
Note that your Sonatype credentials needs to be configured on your machine and you need to have access writes to publish artifacts to the group id `io.moia`.
3. Release artifact to Maven Central with:
```
sbt sonatypeRelease
```

```
sbt sonatypeRelease
```
35 changes: 17 additions & 18 deletions streamee-demo/src/main/scala/io/moia/streamee/demo/Api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@
package io.moia.streamee
package demo

import akka.actor.CoordinatedShutdown.{ PhaseServiceUnbind, Reason }
import akka.actor.{ ActorSystem, CoordinatedShutdown, Scheduler }
import akka.actor.CoordinatedShutdown.{ PhaseServiceUnbind, Reason }
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.StatusCodes.OK
import akka.http.scaladsl.server.{ Directives, Route }
import akka.stream.Materializer
import akka.stream.scaladsl.SourceQueue
import de.heikoseeberger.akkahttpcirce.ErrorAccumulatingCirceSupport
import org.apache.logging.log4j.scala.Logging
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Promise }
import scala.concurrent.ExecutionContext
import scala.util.{ Failure, Success }

/**
Expand All @@ -42,15 +41,17 @@ object Api extends Logging {

private final object BindFailure extends Reason

def apply(
config: Config,
demoProcessor: Processor[String, String]
)(implicit untypedSystem: ActorSystem, mat: Materializer): Unit = {
def apply(config: Config)(implicit untypedSystem: ActorSystem, mat: Materializer): Unit = {
import config._
import untypedSystem.dispatcher

implicit val scheduler: Scheduler = untypedSystem.scheduler

val demoProcessor =
Processor(DemoProcess(scheduler)(untypedSystem.dispatcher),
ProcessorSettings(untypedSystem),
CoordinatedShutdown(untypedSystem))

Http()
.bindAndHandle(
route(demoProcessor, demoProcessorTimeout),
Expand All @@ -71,7 +72,7 @@ object Api extends Logging {
}

def route(
demoProcessor: SourceQueue[(String, Promise[String])],
demoProcessor: Processor[DemoProcess.Request, DemoProcess.Response],
demoProcessorTimeout: FiniteDuration
)(implicit ec: ExecutionContext, scheduler: Scheduler): Route = {
import Directives._
Expand All @@ -86,16 +87,14 @@ object Api extends Logging {
}
} ~
post {
entity(as[Entity]) {
case Entity(s) =>
onProcessorSuccess(s, demoProcessor, demoProcessorTimeout, scheduler) {
case s if s.isEmpty =>
complete(StatusCodes.BadRequest -> "Empty entity!")
case s if s.startsWith("taxi") =>
complete(StatusCodes.Conflict -> "We don't like taxis ;-)")
case s =>
complete(StatusCodes.Created -> s)
}
entity(as[DemoProcess.Request]) { request =>
onProcessorSuccess(request, demoProcessor, demoProcessorTimeout, scheduler) {
case DemoProcess.Response(_, n) if n == 42 =>
complete(StatusCodes.BadRequest -> "Request must not have n == 1!")

case DemoProcess.Response(_, n) =>
complete(StatusCodes.Created -> n)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,13 @@ import akka.actor.Scheduler
import akka.pattern.after
import akka.stream.scaladsl.Flow
import org.apache.logging.log4j.scala.Logging
import scala.concurrent.duration.{ DurationInt, FiniteDuration }
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.DurationInt

object DemoProcess extends Logging {

private def step(name: String, duration: FiniteDuration, scheduler: Scheduler)(
s: String
)(implicit ec: ExecutionContext) = {
logger.debug(s"Before $name")
val p = Promise[String]()
p.tryCompleteWith(after(duration, scheduler) {
logger.debug(s"After $name")
Future.successful(s)
})
p.future
}
final case class Request(id: String, n: Int)
final case class Response(id: String, n: Int)

/**
* Simple domain logic process for demo purposes.
Expand All @@ -50,8 +41,12 @@ object DemoProcess extends Logging {
* allows for easily showing the effect of backpressure. For real-world applications usually a
* higher value would be suitable.
*/
def apply(scheduler: Scheduler)(implicit ec: ExecutionContext): Flow[String, String, NotUsed] =
Flow[String]
.mapAsync(1)(step("step1", 2.seconds, scheduler))
.mapAsync(1)(step("step2", 2.seconds, scheduler))
def apply(scheduler: Scheduler)(implicit ec: ExecutionContext): Flow[Request, Response, NotUsed] =
Flow[Request]
.mapAsync(1) {
case Request(id, n) => after(2.seconds, scheduler)(Future.successful((id, n * 42)))
}
.mapAsync(1) {
case (id, n) => after(2.seconds, scheduler)(Future.successful(Response(id, n)))
}
}
46 changes: 17 additions & 29 deletions streamee-demo/src/main/scala/io/moia/streamee/demo/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
package io.moia.streamee
package demo

import akka.actor.{ ActorSystem => UntypedSystem }
import akka.actor.CoordinatedShutdown.Reason
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
import akka.actor.typed.{ ActorSystem, Behavior, Terminated }
import akka.actor.{ ActorSystem => UntypedSystem, CoordinatedShutdown, Scheduler }
import akka.cluster.typed.{ Cluster, SelfUp, Subscribe }
import akka.actor.typed.{ ActorSystem, Behavior }
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.typed.{ Cluster, SelfUp, Subscribe, Unsubscribe }
import akka.management.AkkaManagement
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.stream.Materializer
Expand All @@ -48,37 +48,25 @@ object Main extends Logging {

AkkaManagement(system.toUntyped).start()
ClusterBootstrap(system.toUntyped).start()

logger.info(s"${system.name} started and ready to join cluster")
}

def apply(config: Config): Behavior[SelfUp] =
Behaviors.setup { context =>
context.log.info("{} started and ready to join cluster", context.system.name)

Cluster(context.system).subscriptions ! Subscribe(context.self, classOf[SelfUp])

Behaviors
.receiveMessage[SelfUp] { _ =>
logger.info(s"${context.system.name} joined cluster and is up")
onSelfUp(config, context)
Behaviors.empty
}
.receiveSignal {
case (_, Terminated(actor)) =>
logger.error(s"Shutting down, because $actor terminated!")
CoordinatedShutdown(context.system.toUntyped).run(TopLevelActorTerminated)
Behaviors.same
}
}
Behaviors.receive { (context, _) =>
context.log.info("{} joined cluster and is up", context.system.name)

private def onSelfUp(config: Config, context: ActorContext[SelfUp]) = {
implicit val untypedSystem: UntypedSystem = context.system.toUntyped
implicit val mat: Materializer = ActorMaterializer()(context.system)
implicit val scheduler: Scheduler = context.system.scheduler
Cluster(context.system).subscriptions ! Unsubscribe(context.self)

val demoProcessor =
Processor(DemoProcess(scheduler)(untypedSystem.dispatcher),
ProcessorSettings(context.system),
CoordinatedShutdown(context.system.toUntyped))
Api(config.api, demoProcessor)
}
implicit val untypedSystem: UntypedSystem = context.system.toUntyped
implicit val mat: Materializer = ActorMaterializer()(context.system)

Api(config.api)

Behaviors.empty
}
}
}
6 changes: 3 additions & 3 deletions streamee/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ streamee {
# ATTENTNION: Currently must be 1, see https://github.com/akka/akka/issues/25349!
buffer-size = 1

# The maximum number of commands which can be in-flight in the wrapped domain logic process.
# Large values should not be an issue, because for each command in-flight there is just a
# The maximum number of requests which can be in-flight in the wrapped domain logic process.
# Large values should not be an issue, because for each request in-flight there is just a
# buffered promise (which is rather lightweight).
#
# Must be positive!
max-nr-of-in-flight-commands = 8192
max-nr-of-in-flight-requests = 8192
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ object ExpiringPromise {
* @param timeout maximum duration for the promise to be completed successfully
* @param scheduler Akka scheduler needed for timeout handling
* @param ec Scala execution context for timeout handling
* @tparam R result type
* @tparam R response type
*/
def apply[R](timeout: FiniteDuration,
scheduler: Scheduler)(implicit ec: ExecutionContext): Promise[R] = {
val promisedR = Promise[R]()
val resultTimeout = after(timeout, scheduler)(Future.failed(PromiseExpired(timeout)))
promisedR.tryCompleteWith(resultTimeout)
val promisedR = Promise[R]()
val responseTimeout = after(timeout, scheduler)(Future.failed(PromiseExpired(timeout)))
promisedR.tryCompleteWith(responseTimeout)
promisedR
}
}
Loading