diff --git a/README.md b/README.md index 927a200..76d804e 100644 --- a/README.md +++ b/README.md @@ -54,23 +54,27 @@ use the type `Flow[C, R, Any]` where `C` is the command type and `R` is the resu In the demo subproject "streamee-demo" one simple process is defined in the `DemoProcess` object: ``` scala -def apply(scheduler: Scheduler)(implicit ec: ExecutionContext): Flow[String, String, NotUsed] = - Flow[String] - .mapAsync(1)(step("step1", 2.seconds, scheduler)) - .mapAsync(1)(step("step2", 2.seconds, scheduler)) +def apply(scheduler: Scheduler)(implicit ec: ExecutionContext): Flow[Request, Response, NotUsed] = + Flow[Request] + .mapAsync(1) { + case Request(id, n) => after(2.seconds, scheduler)(Future.successful((id, n * 42))) + } + .mapAsync(1) { + case (id, n) => after(2.seconds, scheduler)(Future.successful(Response(id, n))) + } ``` Next we have to create the actual processor, i.e. the running stream into which the process is embedded, by calling `Processor.apply` thereby giving the process, processor settings and the reference to `CoordinatedShutdown`. -In the demo subproject "streamee-demo" this happens in `Main`: +In the demo subproject "streamee-demo" this happens in `Api`: ``` scala val demoProcessor = - Processor(DemoLogic(scheduler)(untypedSystem.dispatcher), - ProcessorSettings(context.system), - CoordinatedShutdown(context.system.toUntyped)) + Processor(DemoProcess(scheduler)(untypedSystem.dispatcher), + ProcessorSettings(untypedSystem), + CoordinatedShutdown(untypedSystem)) ``` Commands offered via the returned queue are emitted into the given process. Once results are @@ -84,18 +88,14 @@ using an `ExpiringPromise` with the given timeout. In the demo subproject "streamee-demo" this happens in `Api`: ``` scala -pathPrefix("accounts") { - post { - entity(as[Entity]) { - case Entity(s) => - onProcessorSuccess(s, demoProcessor, demoProcessorTimeout, scheduler) { - case s if s.isEmpty => - complete(StatusCodes.BadRequest -> "Empty entity!") - case s if s.startsWith("taxi") => - complete(StatusCodes.Conflict -> "We don't like taxis ;-)") - case s => - complete(StatusCodes.Created -> s) - } +post { + entity(as[DemoProcess.Request]) { request => + onProcessorSuccess(request, demoProcessor, demoProcessorTimeout, scheduler) { + case DemoProcess.Response(_, n) if n == 42 => + complete(StatusCodes.BadRequest -> "Request must not have n == 1!") + + case DemoProcess.Response(_, n) => + complete(StatusCodes.Created -> n) } } } @@ -112,16 +112,15 @@ This code is open source software licensed under the [Apache 2.0 License](http:/ ## Publishing -To publish a release to Maven central follow these steps: +To publish a release to Maven Central follow these steps: -1. Create a release via GitHub -2. Publish artifact to OSS Sonatype stage repository: - ``` - sbt publishSigned - ``` - Note that your Sonatype credentials needs to be configured on your machine and you need to have access writes to publish artifacts to the group id `io.moia`. +1. Create a tag/release on GitHub +2. Publish the artifact to the OSS Sonatype stage repository: + ``` + sbt publishSigned + ``` + Note that your Sonatype credentials needs to be configured on your machine and you need to have access writes to publish artifacts to the group id `io.moia`. 3. Release artifact to Maven Central with: - ``` - sbt sonatypeRelease - ``` - + ``` + sbt sonatypeRelease + ``` diff --git a/streamee-demo/src/main/scala/io/moia/streamee/demo/Api.scala b/streamee-demo/src/main/scala/io/moia/streamee/demo/Api.scala index f26a793..4410a13 100644 --- a/streamee-demo/src/main/scala/io/moia/streamee/demo/Api.scala +++ b/streamee-demo/src/main/scala/io/moia/streamee/demo/Api.scala @@ -17,18 +17,17 @@ package io.moia.streamee package demo -import akka.actor.CoordinatedShutdown.{ PhaseServiceUnbind, Reason } import akka.actor.{ ActorSystem, CoordinatedShutdown, Scheduler } +import akka.actor.CoordinatedShutdown.{ PhaseServiceUnbind, Reason } import akka.http.scaladsl.Http import akka.http.scaladsl.model.StatusCodes import akka.http.scaladsl.model.StatusCodes.OK import akka.http.scaladsl.server.{ Directives, Route } import akka.stream.Materializer -import akka.stream.scaladsl.SourceQueue import de.heikoseeberger.akkahttpcirce.ErrorAccumulatingCirceSupport import org.apache.logging.log4j.scala.Logging import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ ExecutionContext, Promise } +import scala.concurrent.ExecutionContext import scala.util.{ Failure, Success } /** @@ -42,15 +41,17 @@ object Api extends Logging { private final object BindFailure extends Reason - def apply( - config: Config, - demoProcessor: Processor[String, String] - )(implicit untypedSystem: ActorSystem, mat: Materializer): Unit = { + def apply(config: Config)(implicit untypedSystem: ActorSystem, mat: Materializer): Unit = { import config._ import untypedSystem.dispatcher implicit val scheduler: Scheduler = untypedSystem.scheduler +val demoProcessor = + Processor(DemoProcess(scheduler)(untypedSystem.dispatcher), + ProcessorSettings(untypedSystem), + CoordinatedShutdown(untypedSystem)) + Http() .bindAndHandle( route(demoProcessor, demoProcessorTimeout), @@ -71,7 +72,7 @@ object Api extends Logging { } def route( - demoProcessor: SourceQueue[(String, Promise[String])], + demoProcessor: Processor[DemoProcess.Request, DemoProcess.Response], demoProcessorTimeout: FiniteDuration )(implicit ec: ExecutionContext, scheduler: Scheduler): Route = { import Directives._ @@ -86,16 +87,14 @@ object Api extends Logging { } } ~ post { - entity(as[Entity]) { - case Entity(s) => - onProcessorSuccess(s, demoProcessor, demoProcessorTimeout, scheduler) { - case s if s.isEmpty => - complete(StatusCodes.BadRequest -> "Empty entity!") - case s if s.startsWith("taxi") => - complete(StatusCodes.Conflict -> "We don't like taxis ;-)") - case s => - complete(StatusCodes.Created -> s) - } + entity(as[DemoProcess.Request]) { request => + onProcessorSuccess(request, demoProcessor, demoProcessorTimeout, scheduler) { + case DemoProcess.Response(_, n) if n == 42 => + complete(StatusCodes.BadRequest -> "Request must not have n == 1!") + + case DemoProcess.Response(_, n) => + complete(StatusCodes.Created -> n) + } } } } diff --git a/streamee-demo/src/main/scala/io/moia/streamee/demo/DemoProcess.scala b/streamee-demo/src/main/scala/io/moia/streamee/demo/DemoProcess.scala index 4759f07..31607ae 100644 --- a/streamee-demo/src/main/scala/io/moia/streamee/demo/DemoProcess.scala +++ b/streamee-demo/src/main/scala/io/moia/streamee/demo/DemoProcess.scala @@ -21,22 +21,13 @@ import akka.actor.Scheduler import akka.pattern.after import akka.stream.scaladsl.Flow import org.apache.logging.log4j.scala.Logging -import scala.concurrent.duration.{ DurationInt, FiniteDuration } -import scala.concurrent.{ ExecutionContext, Future, Promise } +import scala.concurrent.{ ExecutionContext, Future } +import scala.concurrent.duration.DurationInt object DemoProcess extends Logging { - private def step(name: String, duration: FiniteDuration, scheduler: Scheduler)( - s: String - )(implicit ec: ExecutionContext) = { - logger.debug(s"Before $name") - val p = Promise[String]() - p.tryCompleteWith(after(duration, scheduler) { - logger.debug(s"After $name") - Future.successful(s) - }) - p.future - } + final case class Request(id: String, n: Int) + final case class Response(id: String, n: Int) /** * Simple domain logic process for demo purposes. @@ -50,8 +41,12 @@ object DemoProcess extends Logging { * allows for easily showing the effect of backpressure. For real-world applications usually a * higher value would be suitable. */ - def apply(scheduler: Scheduler)(implicit ec: ExecutionContext): Flow[String, String, NotUsed] = - Flow[String] - .mapAsync(1)(step("step1", 2.seconds, scheduler)) - .mapAsync(1)(step("step2", 2.seconds, scheduler)) +def apply(scheduler: Scheduler)(implicit ec: ExecutionContext): Flow[Request, Response, NotUsed] = + Flow[Request] + .mapAsync(1) { + case Request(id, n) => after(2.seconds, scheduler)(Future.successful((id, n * 42))) + } + .mapAsync(1) { + case (id, n) => after(2.seconds, scheduler)(Future.successful(Response(id, n))) + } } diff --git a/streamee-demo/src/main/scala/io/moia/streamee/demo/Main.scala b/streamee-demo/src/main/scala/io/moia/streamee/demo/Main.scala index de655b3..336b2ed 100644 --- a/streamee-demo/src/main/scala/io/moia/streamee/demo/Main.scala +++ b/streamee-demo/src/main/scala/io/moia/streamee/demo/Main.scala @@ -17,11 +17,11 @@ package io.moia.streamee package demo +import akka.actor.{ ActorSystem => UntypedSystem } import akka.actor.CoordinatedShutdown.Reason -import akka.actor.typed.scaladsl.{ ActorContext, Behaviors } -import akka.actor.typed.{ ActorSystem, Behavior, Terminated } -import akka.actor.{ ActorSystem => UntypedSystem, CoordinatedShutdown, Scheduler } -import akka.cluster.typed.{ Cluster, SelfUp, Subscribe } +import akka.actor.typed.{ ActorSystem, Behavior } +import akka.actor.typed.scaladsl.Behaviors +import akka.cluster.typed.{ Cluster, SelfUp, Subscribe, Unsubscribe } import akka.management.AkkaManagement import akka.management.cluster.bootstrap.ClusterBootstrap import akka.stream.Materializer @@ -48,37 +48,25 @@ object Main extends Logging { AkkaManagement(system.toUntyped).start() ClusterBootstrap(system.toUntyped).start() - - logger.info(s"${system.name} started and ready to join cluster") } def apply(config: Config): Behavior[SelfUp] = Behaviors.setup { context => + context.log.info("{} started and ready to join cluster", context.system.name) + Cluster(context.system).subscriptions ! Subscribe(context.self, classOf[SelfUp]) - Behaviors - .receiveMessage[SelfUp] { _ => - logger.info(s"${context.system.name} joined cluster and is up") - onSelfUp(config, context) - Behaviors.empty - } - .receiveSignal { - case (_, Terminated(actor)) => - logger.error(s"Shutting down, because $actor terminated!") - CoordinatedShutdown(context.system.toUntyped).run(TopLevelActorTerminated) - Behaviors.same - } - } + Behaviors.receive { (context, _) => + context.log.info("{} joined cluster and is up", context.system.name) - private def onSelfUp(config: Config, context: ActorContext[SelfUp]) = { - implicit val untypedSystem: UntypedSystem = context.system.toUntyped - implicit val mat: Materializer = ActorMaterializer()(context.system) - implicit val scheduler: Scheduler = context.system.scheduler + Cluster(context.system).subscriptions ! Unsubscribe(context.self) - val demoProcessor = - Processor(DemoProcess(scheduler)(untypedSystem.dispatcher), - ProcessorSettings(context.system), - CoordinatedShutdown(context.system.toUntyped)) - Api(config.api, demoProcessor) - } + implicit val untypedSystem: UntypedSystem = context.system.toUntyped + implicit val mat: Materializer = ActorMaterializer()(context.system) + + Api(config.api) + + Behaviors.empty + } + } } diff --git a/streamee/src/main/resources/reference.conf b/streamee/src/main/resources/reference.conf index 60f3bb9..4d44d61 100644 --- a/streamee/src/main/resources/reference.conf +++ b/streamee/src/main/resources/reference.conf @@ -9,11 +9,11 @@ streamee { # ATTENTNION: Currently must be 1, see https://github.com/akka/akka/issues/25349! buffer-size = 1 - # The maximum number of commands which can be in-flight in the wrapped domain logic process. - # Large values should not be an issue, because for each command in-flight there is just a + # The maximum number of requests which can be in-flight in the wrapped domain logic process. + # Large values should not be an issue, because for each request in-flight there is just a # buffered promise (which is rather lightweight). # # Must be positive! - max-nr-of-in-flight-commands = 8192 + max-nr-of-in-flight-requests = 8192 } } diff --git a/streamee/src/main/scala/io/moia/streamee/ExpiringPromise.scala b/streamee/src/main/scala/io/moia/streamee/ExpiringPromise.scala index 90b9bc8..1852f9d 100644 --- a/streamee/src/main/scala/io/moia/streamee/ExpiringPromise.scala +++ b/streamee/src/main/scala/io/moia/streamee/ExpiringPromise.scala @@ -36,13 +36,13 @@ object ExpiringPromise { * @param timeout maximum duration for the promise to be completed successfully * @param scheduler Akka scheduler needed for timeout handling * @param ec Scala execution context for timeout handling - * @tparam R result type + * @tparam R response type */ def apply[R](timeout: FiniteDuration, scheduler: Scheduler)(implicit ec: ExecutionContext): Promise[R] = { - val promisedR = Promise[R]() - val resultTimeout = after(timeout, scheduler)(Future.failed(PromiseExpired(timeout))) - promisedR.tryCompleteWith(resultTimeout) + val promisedR = Promise[R]() + val responseTimeout = after(timeout, scheduler)(Future.failed(PromiseExpired(timeout))) + promisedR.tryCompleteWith(responseTimeout) promisedR } } diff --git a/streamee/src/main/scala/io/moia/streamee/Processor.scala b/streamee/src/main/scala/io/moia/streamee/Processor.scala index 9704cdc..b9a72db 100644 --- a/streamee/src/main/scala/io/moia/streamee/Processor.scala +++ b/streamee/src/main/scala/io/moia/streamee/Processor.scala @@ -24,66 +24,74 @@ import org.apache.logging.log4j.scala.Logging import scala.concurrent.Promise /** - * Runs a domain logic process (Akka Streams flow) for processing commands into results. See + * Runs a domain logic process (Akka Streams flow) accepting requests and producing responses. See * [[Processor.apply]] for details. */ object Processor extends Logging { - final case class NotCorrelated[C, R](c: C, r: R) - extends Exception(s"Command $c and result $r are not correlated!") + final case class NotCorrelated[A, B](a: A, b: B) + extends Exception(s"Request $a and response $b are not correlated!") /** - * Runs a domain logic process (Akka Streams flow) for processing commands into results. - * Commands offered via the returned queue are pushed into the given `process`. Once results are - * available the promise given together with the command is completed with success. If the - * process back-pressures, offered commands are dropped. + * Runs a domain logic process (Akka Streams flow) accepting requests and producing responses. + * Requests offered via the returned queue are pushed into the given `process`. Once responses + * are available, the promise given together with the request is completed with success. If the + * process back-pressures, offered requests are dropped (fail fast). * * A task is registered with Akka Coordinated Shutdown in the "service-requests-done" phase to - * ensure that no more commands are accepted and all in-flight commands have been processed + * ensure that no more requests are accepted and all in-flight requests have been processed * before continuing with the shutdown. * - * '''Attention''': the given domain logic process must emit exactly one result for every - * command and the sequence of the elements in the process must be maintained! + * '''Attention''': the given domain logic process must emit exactly one response for every + * request and the sequence of the elements in the process must be maintained! Give a specific + * `correlated` function to verify this. * - * @param process domain logic process from command to result + * @param process domain logic process from request to response * @param settings settings for processors (from application.conf or reference.conf) * @param shutdown Akka Coordinated Shutdown - * @param correlated correlation between command and result, by default always true + * @param correlated correlation between request and response, by default always true * @param mat materializer to run the stream - * @tparam C command type - * @tparam R result type - * @return queue for command-promise pairs + * @tparam A request type + * @tparam B response type + * @return queue for request-promise pairs */ - def apply[C, R]( - process: Flow[C, R, Any], + def apply[A, B]( + process: Flow[A, B, Any], settings: ProcessorSettings, shutdown: CoordinatedShutdown, - correlated: (C, R) => Boolean = (_: C, _: R) => true - )(implicit mat: Materializer): Processor[C, R] = { + correlated: (A, B) => Boolean = (_: A, _: B) => true + )(implicit mat: Materializer): Processor[A, B] = { val (sourceQueueWithComplete, done) = Source - .queue[(C, Promise[R])](settings.bufferSize, OverflowStrategy.dropNew) // Must be 1: for 0 offers could "hang", for larger values completing would not work, see: https://github.com/akka/akka/issues/25349 - .via(embed(process, settings.maxNrOfInFlightCommands)) + .queue[(A, Promise[B])](settings.bufferSize, OverflowStrategy.dropNew) // Must be 1: for 0 offers could "hang", for larger values completing would not work, see: https://github.com/akka/akka/issues/25349 + .via(embed(process, settings.maxNrOfInFlightRequests)) .toMat(Sink.foreach { - case (r, (c, p)) if !correlated(c, r) => p.tryFailure(NotCorrelated(c, r)) - case (r, (_, p)) => p.trySuccess(r) + case (response, (request, promisedResponse)) if !correlated(request, response) => + promisedResponse.tryFailure(NotCorrelated(request, response)) + case (response, (_, promisedResponse)) => + promisedResponse.trySuccess(response) })(Keep.both) .withAttributes(ActorAttributes.supervisionStrategy(_ => Supervision.Resume)) // The stream must not die! .run() + shutdown.addTask(PhaseServiceRequestsDone, "processor") { () => sourceQueueWithComplete.complete() done // `sourceQueueWithComplete.watchCompletion` seems broken, hence use `done` } + sourceQueueWithComplete } - private def embed[C, R](process: Flow[C, R, Any], bufferSize: Int) = + private def embed[A, B](process: Flow[A, B, Any], bufferSize: Int) = Flow.fromGraph(GraphDSL.create() { implicit builder => import GraphDSL.Implicits._ - val nest = builder.add(Flow[(C, Promise[R])].map { case (c, p) => (c, (c, p)) }) - val unzip = builder.add(Unzip[C, (C, Promise[R])]()) - val zip = builder.add(Zip[R, (C, Promise[R])]()) - val buf = builder.add(Flow[(C, Promise[R])].buffer(bufferSize, OverflowStrategy.backpressure)) + val nest = + builder.add(Flow[(A, Promise[B])].map { + case (request, promisedResponse) => (request, (request, promisedResponse)) + }) + val unzip = builder.add(Unzip[A, (A, Promise[B])]()) + val zip = builder.add(Zip[B, (A, Promise[B])]()) + val buf = builder.add(Flow[(A, Promise[B])].buffer(bufferSize, OverflowStrategy.backpressure)) // format: OFF nest ~> unzip.in diff --git a/streamee/src/main/scala/io/moia/streamee/ProcessorDirectives.scala b/streamee/src/main/scala/io/moia/streamee/ProcessorDirectives.scala index af4d4db..f5a98e0 100644 --- a/streamee/src/main/scala/io/moia/streamee/ProcessorDirectives.scala +++ b/streamee/src/main/scala/io/moia/streamee/ProcessorDirectives.scala @@ -32,34 +32,34 @@ import scala.concurrent.duration.FiniteDuration object ProcessorDirectives extends Logging { /** - * Offers the given command to the given processor thereby using an [[ExpiringPromise]] with the - * given `timeout` and handles the returned `OfferQueueResult` (from Akka Streams - * `SourceQueue.offer`): if `Enqueued` (happiest path) dispatches the associated result to the + * Offers the given request to the given processor thereby using an [[ExpiringPromise]] with the + * given `timeout` and handles the returned `OfferQueueResponse` (from Akka Streams + * `SourceQueue.offer`): if `Enqueued` (happiest path) dispatches the associated response to the * inner route via `onSuccess`, if `Dropped` (not so happy path) completes the HTTP request with * `ServiceUnavailable` and else (failure case, should not happen) completes the HTTP request * with `InternalServerError`. * - * @param command command to be processed + * @param request request to be processed * @param processor the processor to work with - * @param timeout maximum duration for the command to be processed, i.e. the related promise to be completed + * @param timeout maximum duration for the request to be processed, i.e. the related promise to be completed * @param scheduler Akka scheduler needed for timeout handling - * @tparam C command type - * @tparam R result type + * @tparam C request type + * @tparam R response type */ - def onProcessorSuccess[C, R](command: C, + def onProcessorSuccess[C, R](request: C, processor: SourceQueue[(C, Promise[R])], timeout: FiniteDuration, scheduler: Scheduler): Directive1[R] = extractExecutionContext.flatMap { implicit ec => - val result = ExpiringPromise[R](timeout, scheduler) + val response = ExpiringPromise[R](timeout, scheduler) - onSuccess(processor.offer((command, result))).flatMap { + onSuccess(processor.offer((request, response))).flatMap { case QueueOfferResult.Enqueued => - logger.debug(s"Successfully enqueued command $command!") - onSuccess(result.future) + logger.debug(s"Successfully enqueued request $request!") + onSuccess(response.future) case QueueOfferResult.Dropped => - logger.warn(s"Processor dropped command $command!") + logger.warn(s"Processor dropped request $request!") complete(StatusCodes.ServiceUnavailable) case QueueOfferResult.QueueClosed => diff --git a/streamee/src/main/scala/io/moia/streamee/ProcessorSettings.scala b/streamee/src/main/scala/io/moia/streamee/ProcessorSettings.scala index 88dd4c2..a84057d 100644 --- a/streamee/src/main/scala/io/moia/streamee/ProcessorSettings.scala +++ b/streamee/src/main/scala/io/moia/streamee/ProcessorSettings.scala @@ -62,21 +62,21 @@ sealed trait ProcessorSettings extends Extension { } /** - * The maximum number of commands which can be in-flight in the wrapped domain logic process. + * The maximum number of requests which can be in-flight in the wrapped domain logic process. * - * Large values should not be an issue, because for each command in-flight there is just a + * Large values should not be an issue, because for each request in-flight there is just a * buffered promise (which is rather lightweight). * * Must be positive! */ - final val maxNrOfInFlightCommands: Int = { - val maxNrOfInFlightCommands = - system.settings.config.getInt("streamee.processor.max-nr-of-in-flight-commands") - if (maxNrOfInFlightCommands <= 0) + final val maxNrOfInFlightRequests: Int = { + val maxNrOfInFlightRequests = + system.settings.config.getInt("streamee.processor.max-nr-of-in-flight-requests") + if (maxNrOfInFlightRequests <= 0) throw new IllegalArgumentException( - "streamee.processor.max-nr-of-in-flight-commands must be positive!" + "streamee.processor.max-nr-of-in-flight-requests must be positive!" ) - maxNrOfInFlightCommands + maxNrOfInFlightRequests } } diff --git a/streamee/src/test/scala/io/moia/streamee/ProcessorDirectivesTests.scala b/streamee/src/test/scala/io/moia/streamee/ProcessorDirectivesTests.scala index 053b0f6..65d3eec 100644 --- a/streamee/src/test/scala/io/moia/streamee/ProcessorDirectivesTests.scala +++ b/streamee/src/test/scala/io/moia/streamee/ProcessorDirectivesTests.scala @@ -78,10 +78,10 @@ object ProcessorDirectivesTests extends TestSuite with RouteTest with TestFramew private def route(processor: SourceQueue[(String, Promise[Boolean])]) = post { - entity(as[String]) { command => - onProcessorSuccess(command, processor, 100.milliseconds.dilated, system.scheduler) { - result => - if (result) + entity(as[String]) { request => + onProcessorSuccess(request, processor, 100.milliseconds.dilated, system.scheduler) { + response => + if (response) complete(StatusCodes.Created) else complete(StatusCodes.BadRequest) diff --git a/streamee/src/test/scala/io/moia/streamee/ProcessorSettingsTests.scala b/streamee/src/test/scala/io/moia/streamee/ProcessorSettingsTests.scala index 93af742..17c1064 100644 --- a/streamee/src/test/scala/io/moia/streamee/ProcessorSettingsTests.scala +++ b/streamee/src/test/scala/io/moia/streamee/ProcessorSettingsTests.scala @@ -57,25 +57,25 @@ object ProcessorSettingsTests extends TestSuite { } finally system.terminate() } - 'throwNonPositiveMaxNrOfInFlightCommands - { + 'throwNonPositiveMaxNrOfInFlightRequests - { val config = ConfigFactory - .parseString("streamee.processor.max-nr-of-in-flight-commands=0") + .parseString("streamee.processor.max-nr-of-in-flight-requests=0") .withFallback(ConfigFactory.load()) val system = ActorSystem[Nothing](Behaviors.empty, getClass.getSimpleName.init, config) try intercept[IllegalArgumentException](ProcessorSettings(system)) finally system.terminate() } - 'maxNrOfInFlightCommands - { + 'maxNrOfInFlightRequests - { val config = ConfigFactory - .parseString("streamee.processor.max-nr-of-in-flight-commands=1024") + .parseString("streamee.processor.max-nr-of-in-flight-requests=1024") .withFallback(ConfigFactory.load()) val system = ActorSystem[Nothing](Behaviors.empty, getClass.getSimpleName.init, config) try { - val maxNrOfInFlightCommands = ProcessorSettings(system).maxNrOfInFlightCommands - assert(maxNrOfInFlightCommands == 1024) + val maxNrOfInFlightRequests = ProcessorSettings(system).maxNrOfInFlightRequests + assert(maxNrOfInFlightRequests == 1024) } finally system.terminate() } }