Skip to content
This repository has been archived by the owner on Apr 29, 2022. It is now read-only.

Commit

Permalink
Improve naming and demo (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
hseeberger committed Sep 13, 2018
1 parent 587c68c commit febb272
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 161 deletions.
61 changes: 30 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,27 @@ use the type `Flow[C, R, Any]` where `C` is the command type and `R` is the resu
In the demo subproject "streamee-demo" one simple process is defined in the `DemoProcess` object:

``` scala
def apply(scheduler: Scheduler)(implicit ec: ExecutionContext): Flow[String, String, NotUsed] =
Flow[String]
.mapAsync(1)(step("step1", 2.seconds, scheduler))
.mapAsync(1)(step("step2", 2.seconds, scheduler))
def apply(scheduler: Scheduler)(implicit ec: ExecutionContext): Flow[Request, Response, NotUsed] =
Flow[Request]
.mapAsync(1) {
case Request(id, n) => after(2.seconds, scheduler)(Future.successful((id, n * 42)))
}
.mapAsync(1) {
case (id, n) => after(2.seconds, scheduler)(Future.successful(Response(id, n)))
}
```

Next we have to create the actual processor, i.e. the running stream into which the process is
embedded, by calling `Processor.apply` thereby giving the process, processor settings and the
reference to `CoordinatedShutdown`.

In the demo subproject "streamee-demo" this happens in `Main`:
In the demo subproject "streamee-demo" this happens in `Api`:

``` scala
val demoProcessor =
Processor(DemoLogic(scheduler)(untypedSystem.dispatcher),
ProcessorSettings(context.system),
CoordinatedShutdown(context.system.toUntyped))
Processor(DemoProcess(scheduler)(untypedSystem.dispatcher),
ProcessorSettings(untypedSystem),
CoordinatedShutdown(untypedSystem))
```

Commands offered via the returned queue are emitted into the given process. Once results are
Expand All @@ -84,18 +88,14 @@ using an `ExpiringPromise` with the given timeout.
In the demo subproject "streamee-demo" this happens in `Api`:

``` scala
pathPrefix("accounts") {
post {
entity(as[Entity]) {
case Entity(s) =>
onProcessorSuccess(s, demoProcessor, demoProcessorTimeout, scheduler) {
case s if s.isEmpty =>
complete(StatusCodes.BadRequest -> "Empty entity!")
case s if s.startsWith("taxi") =>
complete(StatusCodes.Conflict -> "We don't like taxis ;-)")
case s =>
complete(StatusCodes.Created -> s)
}
post {
entity(as[DemoProcess.Request]) { request =>
onProcessorSuccess(request, demoProcessor, demoProcessorTimeout, scheduler) {
case DemoProcess.Response(_, n) if n == 42 =>
complete(StatusCodes.BadRequest -> "Request must not have n == 1!")

case DemoProcess.Response(_, n) =>
complete(StatusCodes.Created -> n)
}
}
}
Expand All @@ -112,16 +112,15 @@ This code is open source software licensed under the [Apache 2.0 License](http:/

## Publishing

To publish a release to Maven central follow these steps:
To publish a release to Maven Central follow these steps:

1. Create a release via GitHub
2. Publish artifact to OSS Sonatype stage repository:
```
sbt publishSigned
```
Note that your Sonatype credentials needs to be configured on your machine and you need to have access writes to publish artifacts to the group id `io.moia`.
1. Create a tag/release on GitHub
2. Publish the artifact to the OSS Sonatype stage repository:
```
sbt publishSigned
```
Note that your Sonatype credentials needs to be configured on your machine and you need to have access writes to publish artifacts to the group id `io.moia`.
3. Release artifact to Maven Central with:
```
sbt sonatypeRelease
```

```
sbt sonatypeRelease
```
35 changes: 17 additions & 18 deletions streamee-demo/src/main/scala/io/moia/streamee/demo/Api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@
package io.moia.streamee
package demo

import akka.actor.CoordinatedShutdown.{ PhaseServiceUnbind, Reason }
import akka.actor.{ ActorSystem, CoordinatedShutdown, Scheduler }
import akka.actor.CoordinatedShutdown.{ PhaseServiceUnbind, Reason }
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.StatusCodes.OK
import akka.http.scaladsl.server.{ Directives, Route }
import akka.stream.Materializer
import akka.stream.scaladsl.SourceQueue
import de.heikoseeberger.akkahttpcirce.ErrorAccumulatingCirceSupport
import org.apache.logging.log4j.scala.Logging
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Promise }
import scala.concurrent.ExecutionContext
import scala.util.{ Failure, Success }

/**
Expand All @@ -42,15 +41,17 @@ object Api extends Logging {

private final object BindFailure extends Reason

def apply(
config: Config,
demoProcessor: Processor[String, String]
)(implicit untypedSystem: ActorSystem, mat: Materializer): Unit = {
def apply(config: Config)(implicit untypedSystem: ActorSystem, mat: Materializer): Unit = {
import config._
import untypedSystem.dispatcher

implicit val scheduler: Scheduler = untypedSystem.scheduler

val demoProcessor =
Processor(DemoProcess(scheduler)(untypedSystem.dispatcher),
ProcessorSettings(untypedSystem),
CoordinatedShutdown(untypedSystem))

Http()
.bindAndHandle(
route(demoProcessor, demoProcessorTimeout),
Expand All @@ -71,7 +72,7 @@ object Api extends Logging {
}

def route(
demoProcessor: SourceQueue[(String, Promise[String])],
demoProcessor: Processor[DemoProcess.Request, DemoProcess.Response],
demoProcessorTimeout: FiniteDuration
)(implicit ec: ExecutionContext, scheduler: Scheduler): Route = {
import Directives._
Expand All @@ -86,16 +87,14 @@ object Api extends Logging {
}
} ~
post {
entity(as[Entity]) {
case Entity(s) =>
onProcessorSuccess(s, demoProcessor, demoProcessorTimeout, scheduler) {
case s if s.isEmpty =>
complete(StatusCodes.BadRequest -> "Empty entity!")
case s if s.startsWith("taxi") =>
complete(StatusCodes.Conflict -> "We don't like taxis ;-)")
case s =>
complete(StatusCodes.Created -> s)
}
entity(as[DemoProcess.Request]) { request =>
onProcessorSuccess(request, demoProcessor, demoProcessorTimeout, scheduler) {
case DemoProcess.Response(_, n) if n == 42 =>
complete(StatusCodes.BadRequest -> "Request must not have n == 1!")

case DemoProcess.Response(_, n) =>
complete(StatusCodes.Created -> n)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,13 @@ import akka.actor.Scheduler
import akka.pattern.after
import akka.stream.scaladsl.Flow
import org.apache.logging.log4j.scala.Logging
import scala.concurrent.duration.{ DurationInt, FiniteDuration }
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.DurationInt

object DemoProcess extends Logging {

private def step(name: String, duration: FiniteDuration, scheduler: Scheduler)(
s: String
)(implicit ec: ExecutionContext) = {
logger.debug(s"Before $name")
val p = Promise[String]()
p.tryCompleteWith(after(duration, scheduler) {
logger.debug(s"After $name")
Future.successful(s)
})
p.future
}
final case class Request(id: String, n: Int)
final case class Response(id: String, n: Int)

/**
* Simple domain logic process for demo purposes.
Expand All @@ -50,8 +41,12 @@ object DemoProcess extends Logging {
* allows for easily showing the effect of backpressure. For real-world applications usually a
* higher value would be suitable.
*/
def apply(scheduler: Scheduler)(implicit ec: ExecutionContext): Flow[String, String, NotUsed] =
Flow[String]
.mapAsync(1)(step("step1", 2.seconds, scheduler))
.mapAsync(1)(step("step2", 2.seconds, scheduler))
def apply(scheduler: Scheduler)(implicit ec: ExecutionContext): Flow[Request, Response, NotUsed] =
Flow[Request]
.mapAsync(1) {
case Request(id, n) => after(2.seconds, scheduler)(Future.successful((id, n * 42)))
}
.mapAsync(1) {
case (id, n) => after(2.seconds, scheduler)(Future.successful(Response(id, n)))
}
}
46 changes: 17 additions & 29 deletions streamee-demo/src/main/scala/io/moia/streamee/demo/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
package io.moia.streamee
package demo

import akka.actor.{ ActorSystem => UntypedSystem }
import akka.actor.CoordinatedShutdown.Reason
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
import akka.actor.typed.{ ActorSystem, Behavior, Terminated }
import akka.actor.{ ActorSystem => UntypedSystem, CoordinatedShutdown, Scheduler }
import akka.cluster.typed.{ Cluster, SelfUp, Subscribe }
import akka.actor.typed.{ ActorSystem, Behavior }
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.typed.{ Cluster, SelfUp, Subscribe, Unsubscribe }
import akka.management.AkkaManagement
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.stream.Materializer
Expand All @@ -48,37 +48,25 @@ object Main extends Logging {

AkkaManagement(system.toUntyped).start()
ClusterBootstrap(system.toUntyped).start()

logger.info(s"${system.name} started and ready to join cluster")
}

def apply(config: Config): Behavior[SelfUp] =
Behaviors.setup { context =>
context.log.info("{} started and ready to join cluster", context.system.name)

Cluster(context.system).subscriptions ! Subscribe(context.self, classOf[SelfUp])

Behaviors
.receiveMessage[SelfUp] { _ =>
logger.info(s"${context.system.name} joined cluster and is up")
onSelfUp(config, context)
Behaviors.empty
}
.receiveSignal {
case (_, Terminated(actor)) =>
logger.error(s"Shutting down, because $actor terminated!")
CoordinatedShutdown(context.system.toUntyped).run(TopLevelActorTerminated)
Behaviors.same
}
}
Behaviors.receive { (context, _) =>
context.log.info("{} joined cluster and is up", context.system.name)

private def onSelfUp(config: Config, context: ActorContext[SelfUp]) = {
implicit val untypedSystem: UntypedSystem = context.system.toUntyped
implicit val mat: Materializer = ActorMaterializer()(context.system)
implicit val scheduler: Scheduler = context.system.scheduler
Cluster(context.system).subscriptions ! Unsubscribe(context.self)

val demoProcessor =
Processor(DemoProcess(scheduler)(untypedSystem.dispatcher),
ProcessorSettings(context.system),
CoordinatedShutdown(context.system.toUntyped))
Api(config.api, demoProcessor)
}
implicit val untypedSystem: UntypedSystem = context.system.toUntyped
implicit val mat: Materializer = ActorMaterializer()(context.system)

Api(config.api)

Behaviors.empty
}
}
}
6 changes: 3 additions & 3 deletions streamee/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ streamee {
# ATTENTNION: Currently must be 1, see https://github.com/akka/akka/issues/25349!
buffer-size = 1

# The maximum number of commands which can be in-flight in the wrapped domain logic process.
# Large values should not be an issue, because for each command in-flight there is just a
# The maximum number of requests which can be in-flight in the wrapped domain logic process.
# Large values should not be an issue, because for each request in-flight there is just a
# buffered promise (which is rather lightweight).
#
# Must be positive!
max-nr-of-in-flight-commands = 8192
max-nr-of-in-flight-requests = 8192
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ object ExpiringPromise {
* @param timeout maximum duration for the promise to be completed successfully
* @param scheduler Akka scheduler needed for timeout handling
* @param ec Scala execution context for timeout handling
* @tparam R result type
* @tparam R response type
*/
def apply[R](timeout: FiniteDuration,
scheduler: Scheduler)(implicit ec: ExecutionContext): Promise[R] = {
val promisedR = Promise[R]()
val resultTimeout = after(timeout, scheduler)(Future.failed(PromiseExpired(timeout)))
promisedR.tryCompleteWith(resultTimeout)
val promisedR = Promise[R]()
val responseTimeout = after(timeout, scheduler)(Future.failed(PromiseExpired(timeout)))
promisedR.tryCompleteWith(responseTimeout)
promisedR
}
}
Loading

0 comments on commit febb272

Please sign in to comment.