From de8956836898a1fa035b47ad0c7794c1413ad349 Mon Sep 17 00:00:00 2001 From: jvican Date: Tue, 13 Mar 2018 11:31:46 +0100 Subject: [PATCH 01/12] Model `ClientPool` to register disconnection listeners --- frontend/src/main/scala/bloop/Cli.scala | 15 ++++--- .../scala/bloop/bsp/BloopBspServices.scala | 3 +- .../main/scala/bloop/engine/ClientPool.scala | 40 +++++++++++++++++ .../main/scala/bloop/engine/Interpreter.scala | 13 +++++- .../src/main/scala/bloop/engine/State.scala | 44 +++++++++++++------ 5 files changed, 93 insertions(+), 22 deletions(-) create mode 100644 frontend/src/main/scala/bloop/engine/ClientPool.scala diff --git a/frontend/src/main/scala/bloop/Cli.scala b/frontend/src/main/scala/bloop/Cli.scala index 912e15467f..46c8806771 100644 --- a/frontend/src/main/scala/bloop/Cli.scala +++ b/frontend/src/main/scala/bloop/Cli.scala @@ -3,22 +3,22 @@ package bloop import bloop.bsp.BspServer import bloop.cli.validation.Validate import bloop.cli.{CliOptions, CliParsers, Commands, CommonOptions, ExitStatus} -import bloop.engine.{Action, Exit, Interpreter, Print, Run, State} +import bloop.engine.{Action, ClientPool, Exit, Interpreter, NailgunPool, NoPool, Print, Run, State} import bloop.io.Paths import bloop.logging.{BloopLogger, Logger} import caseapp.core.{DefaultBaseCommand, Messages} -import com.martiansoftware.nailgun +import com.martiansoftware.nailgun.NGContext class Cli object Cli { def main(args: Array[String]): Unit = { State.setUpShutdownHoook() val action = parse(args, CommonOptions.default) - val exitStatus = run(action) + val exitStatus = run(action, NoPool) sys.exit(exitStatus.code) } - def nailMain(ngContext: nailgun.NGContext): Unit = { + def nailMain(ngContext: NGContext): Unit = { val nailgunOptions = CommonOptions( in = ngContext.in, out = ngContext.out, @@ -36,7 +36,8 @@ object Cli { printErrorAndExit(helpAsked, nailgunOptions) else parse(args, nailgunOptions) } - val exitStatus = run(cmd) + + val exitStatus = run(cmd, NailgunPool(ngContext)) ngContext.exit(exitStatus.code) } @@ -188,7 +189,7 @@ object Cli { } } - def run(action: Action): ExitStatus = { + def run(action: Action, pool: ClientPool): ExitStatus = { import bloop.io.AbsolutePath def getConfigDir(cliOptions: CliOptions): AbsolutePath = { cliOptions.configDir @@ -205,7 +206,7 @@ object Cli { val commonOpts = cliOptions.common val configDirectory = getConfigDir(cliOptions) val logger = BloopLogger.at(configDirectory.syntax, commonOpts.out, commonOpts.err) - val state = State.loadActiveStateFor(configDirectory, cliOptions.common, logger) + val state = State.loadActiveStateFor(configDirectory, pool, cliOptions.common, logger) val newState = Interpreter.execute(action, state) State.stateCache.updateBuild(newState.copy(status = ExitStatus.Ok)) newState.status diff --git a/frontend/src/main/scala/bloop/bsp/BloopBspServices.scala b/frontend/src/main/scala/bloop/bsp/BloopBspServices.scala index aa0632f0ea..5fef3b42bc 100644 --- a/frontend/src/main/scala/bloop/bsp/BloopBspServices.scala +++ b/frontend/src/main/scala/bloop/bsp/BloopBspServices.scala @@ -71,7 +71,8 @@ class BloopBspServices(callSiteState: State, client: JsonRpcClient, bspLogger: L private final val defaultOpts = callSiteState.commonOptions def reloadState(uri: String): State = { - val state0 = State.loadActiveStateFor(AbsolutePath(uri), defaultOpts, bspForwarderLogger) + val pool = callSiteState.pool + val state0 = State.loadActiveStateFor(AbsolutePath(uri), pool, defaultOpts, bspForwarderLogger) state0.copy(logger = bspForwarderLogger, commonOptions = latestState.commonOptions) } diff --git a/frontend/src/main/scala/bloop/engine/ClientPool.scala b/frontend/src/main/scala/bloop/engine/ClientPool.scala new file mode 100644 index 0000000000..79915598b4 --- /dev/null +++ b/frontend/src/main/scala/bloop/engine/ClientPool.scala @@ -0,0 +1,40 @@ +package bloop.engine + +import com.martiansoftware.nailgun.{NGClientDisconnectReason, NGClientListener, NGContext} + +sealed trait CloseEvent +case object Heartbeat extends CloseEvent +case object SocketError extends CloseEvent +case object SocketTimeout extends CloseEvent +case object SessionShutdown extends CloseEvent +case object InternalError extends CloseEvent + +sealed trait ClientPool { + def addListener(f: CloseEvent => Unit): Unit + def removeAllListeners(): Unit +} + +case object NoPool extends ClientPool { + override def addListener(f: CloseEvent => Unit): Unit = () + override def removeAllListeners(): Unit = () +} + +case class NailgunPool(context: NGContext) extends ClientPool { + override def addListener(f: CloseEvent => Unit): Unit = { + val nailgunListener = new NGClientListener { + override def clientDisconnected(reason: NGClientDisconnectReason): Unit = { + f(reason match { + case NGClientDisconnectReason.HEARTBEAT => Heartbeat + case NGClientDisconnectReason.SOCKET_ERROR => SocketError + case NGClientDisconnectReason.SOCKET_TIMEOUT => SocketTimeout + case NGClientDisconnectReason.SESSION_SHUTDOWN => SessionShutdown + case NGClientDisconnectReason.INTERNAL_ERROR => InternalError + }) + } + } + context.addClientListener(nailgunListener) + nailgunListener + } + + override def removeAllListeners(): Unit = context.removeAllClientListeners() +} diff --git a/frontend/src/main/scala/bloop/engine/Interpreter.scala b/frontend/src/main/scala/bloop/engine/Interpreter.scala index a271ca19e7..2618fb6289 100644 --- a/frontend/src/main/scala/bloop/engine/Interpreter.scala +++ b/frontend/src/main/scala/bloop/engine/Interpreter.scala @@ -11,6 +11,7 @@ import bloop.testing.TestInternals import bloop.engine.tasks.Tasks import bloop.Project import monix.eval.Task +import monix.execution.Cancelable import monix.execution.misc.NonFatal import scala.concurrent.Await @@ -92,7 +93,9 @@ object Interpreter { val allSourceDirs = reachable.iterator.flatMap(_.sourceDirectories.toList).map(_.underlying) val watcher = new SourceWatcher(allSourceDirs.toList, state.logger) // Make file watching cancel tasks if lots of different changes happen in less than 100ms - val watchFn: State => State = { state => waitAndLog(state, f(state)) } + val watchFn: State => State = { state => + waitAndLog(state, f(state)) + } val firstOp = watchFn(state) watcher.watch(firstOp, watchFn) } @@ -253,7 +256,15 @@ object Interpreter { } private def waitAndLog(previousState: State, newState: Task[State]): State = { + val pool = previousState.pool try { + val handle = newState.runAsync(previousState.scheduler) + pool.addListener { + case e: CloseEvent => + previousState.logger.info(s"Client has disconnected with a $e event. Cancelling tasks...") + handle.cancel() + pool.removeAllListeners() + } // Duration has to be infinity, we cannot predict how much time compilation takes Await.result(newState.runAsync(previousState.scheduler), Duration.Inf) } catch { diff --git a/frontend/src/main/scala/bloop/engine/State.scala b/frontend/src/main/scala/bloop/engine/State.scala index b5a854d53d..5af00100ff 100644 --- a/frontend/src/main/scala/bloop/engine/State.scala +++ b/frontend/src/main/scala/bloop/engine/State.scala @@ -26,6 +26,7 @@ final case class State private ( build: Build, results: ResultsCache, compilerCache: CompilerCache, + pool: ClientPool, commonOptions: CommonOptions, status: ExitStatus, logger: Logger @@ -51,22 +52,24 @@ object State { } private[bloop] def forTests(build: Build, compilerCache: CompilerCache, logger: Logger): State = { - val initializedResults = build.projects.foldLeft(ResultsCache.getEmpty(logger)) { + val results0 = build.projects.foldLeft(ResultsCache.getEmpty(logger)) { case (results, project) => results.initializeResult(project) } - State(build, initializedResults, compilerCache, CommonOptions.default, ExitStatus.Ok, logger) + State(build, results0, compilerCache, NoPool, CommonOptions.default, ExitStatus.Ok, logger) } - // Improve the caching by using file metadata - def apply(build: Build, options: CommonOptions, logger: Logger): State = { + def apply(build: Build, pool: ClientPool, opts: CommonOptions, logger: Logger): State = { val results = build.projects.foldLeft(ResultsCache.getEmpty(logger)) { case (results, project) => results.initializeResult(project) } val compilerCache = getCompilerCache(logger) - State(build, results, compilerCache, options, ExitStatus.Ok, logger) + State(build, results, compilerCache, pool, opts, ExitStatus.Ok, logger) } + def apply(build: Build, options: CommonOptions, logger: Logger): State = + apply(build, NoPool, options, logger) + def setUpShutdownHoook(): Unit = { Runtime .getRuntime() @@ -100,13 +103,28 @@ object State { import bloop.Project import bloop.io.AbsolutePath - def loadActiveStateFor(configDir: AbsolutePath, opts: CommonOptions, logger: Logger): State = { - State.stateCache - .addIfMissing(configDir, path => { - val projects = Project.fromDir(configDir, logger) - val build: Build = Build(configDir, projects) - State(build, opts, logger) - }) - .copy(logger = logger) + + /** + * Loads an state active for the given configuration directory. + * + * @param configDir The configuration directory to load a state for. + * @param pool The pool of listeners that are connected to this client. + * @param opts The common options associated with the state. + * @param logger The logger to be used to instantiate the state. + * @return An state (cached or not) associated with the configuration directory. + */ + def loadActiveStateFor( + configDir: AbsolutePath, + pool: ClientPool, + opts: CommonOptions, + logger: Logger + ): State = { + val cached = State.stateCache.addIfMissing(configDir, path => { + val projects = Project.fromDir(configDir, logger) + val build: Build = Build(configDir, projects) + State(build, pool, opts, logger) + }) + + cached.copy(logger = logger) } } From 9d6f20e457ce48af2b596fc7357c26c02712e63e Mon Sep 17 00:00:00 2001 From: jvican Date: Tue, 13 Mar 2018 11:34:25 +0100 Subject: [PATCH 02/12] Copy pool and common options on cached state It is important that we propagate these variables from the use site even if the state is cached because they may be different for different clients. The common options are also crucial (because we want to enable future modifications of `System.in`, `System.out`, etc). --- frontend/src/main/scala/bloop/engine/State.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frontend/src/main/scala/bloop/engine/State.scala b/frontend/src/main/scala/bloop/engine/State.scala index 5af00100ff..06094e1274 100644 --- a/frontend/src/main/scala/bloop/engine/State.scala +++ b/frontend/src/main/scala/bloop/engine/State.scala @@ -125,6 +125,6 @@ object State { State(build, pool, opts, logger) }) - cached.copy(logger = logger) + cached.copy(pool = pool, commonOptions = opts, logger = logger) } } From 9a43068ad1c7914cbc995dbb06fc316bc712cf14 Mon Sep 17 00:00:00 2001 From: jvican Date: Tue, 13 Mar 2018 11:38:17 +0100 Subject: [PATCH 03/12] Add a little bit of debugging to `System.out` --- frontend/src/main/scala/bloop/engine/Interpreter.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/frontend/src/main/scala/bloop/engine/Interpreter.scala b/frontend/src/main/scala/bloop/engine/Interpreter.scala index 2618fb6289..60bf389d9b 100644 --- a/frontend/src/main/scala/bloop/engine/Interpreter.scala +++ b/frontend/src/main/scala/bloop/engine/Interpreter.scala @@ -262,6 +262,7 @@ object Interpreter { pool.addListener { case e: CloseEvent => previousState.logger.info(s"Client has disconnected with a $e event. Cancelling tasks...") + System.out.println(s"Client has disconnected with a $e event. Cancelling tasks...") handle.cancel() pool.removeAllListeners() } From f6849af8ee79c1476ef4f872d7ca0922e0bdd8fa Mon Sep 17 00:00:00 2001 From: jvican Date: Tue, 13 Mar 2018 12:25:37 +0100 Subject: [PATCH 04/12] Avoid repetitive `runAsync` and remove API entrypoint --- frontend/src/main/scala/bloop/Bloop.scala | 7 +++---- frontend/src/main/scala/bloop/engine/Interpreter.scala | 6 ++---- frontend/src/main/scala/bloop/engine/State.scala | 3 --- 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/frontend/src/main/scala/bloop/Bloop.scala b/frontend/src/main/scala/bloop/Bloop.scala index c213c47c6e..7229709e85 100644 --- a/frontend/src/main/scala/bloop/Bloop.scala +++ b/frontend/src/main/scala/bloop/Bloop.scala @@ -1,15 +1,14 @@ package bloop import bloop.cli.{CliOptions, Commands, ExitStatus} -import bloop.cli.CliParsers.{inputStreamRead, printStreamRead, OptionsParser, pathParser} -import bloop.engine.{Build, Exit, Interpreter, Run, State} +import bloop.cli.CliParsers.{OptionsParser, inputStreamRead, pathParser, printStreamRead} +import bloop.engine.{Build, Exit, Interpreter, NoPool, Run, State} import bloop.engine.tasks.Tasks import bloop.io.AbsolutePath import bloop.io.Timer.timed import bloop.logging.BloopLogger import jline.console.ConsoleReader import caseapp.{CaseApp, RemainingArgs} - import jline.console.ConsoleReader import scala.annotation.tailrec @@ -27,7 +26,7 @@ object Bloop extends CaseApp[CliOptions] { logger.verboseIf(options.verbose) { val projects = Project.fromDir(configDirectory, logger) val build = Build(configDirectory, projects) - val state = State(build, options.common, logger) + val state = State(build, NoPool, options.common, logger) run(state) } } diff --git a/frontend/src/main/scala/bloop/engine/Interpreter.scala b/frontend/src/main/scala/bloop/engine/Interpreter.scala index 60bf389d9b..5260351bf2 100644 --- a/frontend/src/main/scala/bloop/engine/Interpreter.scala +++ b/frontend/src/main/scala/bloop/engine/Interpreter.scala @@ -261,13 +261,11 @@ object Interpreter { val handle = newState.runAsync(previousState.scheduler) pool.addListener { case e: CloseEvent => - previousState.logger.info(s"Client has disconnected with a $e event. Cancelling tasks...") - System.out.println(s"Client has disconnected with a $e event. Cancelling tasks...") + System.out.println(s"Client has disconnected with a '$e' event. Cancelling tasks...") handle.cancel() - pool.removeAllListeners() } // Duration has to be infinity, we cannot predict how much time compilation takes - Await.result(newState.runAsync(previousState.scheduler), Duration.Inf) + Await.result(handle, Duration.Inf) } catch { case NonFatal(t) => previousState.logger.error(t.getMessage) diff --git a/frontend/src/main/scala/bloop/engine/State.scala b/frontend/src/main/scala/bloop/engine/State.scala index 06094e1274..08f2182dfe 100644 --- a/frontend/src/main/scala/bloop/engine/State.scala +++ b/frontend/src/main/scala/bloop/engine/State.scala @@ -67,9 +67,6 @@ object State { State(build, results, compilerCache, pool, opts, ExitStatus.Ok, logger) } - def apply(build: Build, options: CommonOptions, logger: Logger): State = - apply(build, NoPool, options, logger) - def setUpShutdownHoook(): Unit = { Runtime .getRuntime() From ceee8bbb71bf309ca6a577eef73166d99af38e7e Mon Sep 17 00:00:00 2001 From: jvican Date: Tue, 13 Mar 2018 12:29:29 +0100 Subject: [PATCH 05/12] Add `.devbloop` to `.gitignore` This folder is meant to be used to test bloop binaries in development. --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index ec4be308fd..6e2c5fcf63 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,9 @@ bin/.scalafmt* .zinc/ .nailgun/ +# Directory in which to install locally bloop binaries to test them +.devbloop/ + # zinc uses this local cache for publishing stuff .ivy2/ From b3e7bf7f7a565850cae85498a56eb71a859bf5c1 Mon Sep 17 00:00:00 2001 From: jvican Date: Tue, 13 Mar 2018 16:13:09 +0100 Subject: [PATCH 06/12] Implement a decent file watching task with monix The previous file watching logic was a monstruosity because: 1. It relied on `Await.result` to get the state for every of the actions run on file events. 2. It was difficult to reason about. 3. It was not possible to cancel it. This new implementation uses the nifty `Observer` and `Observable` interfaces to implement file watching in a reliable way, closing the watcher when the implementation is cancelled or whenever an exception happens in the watcher. This issue fixes https://github.com/scalacenter/bloop/issues/257, which was caused because file watchers of previous clients (connections) were still running in the background and acting on events. --- .../main/scala/bloop/engine/ClientPool.scala | 6 +- .../scala/bloop/engine/ExecutionContext.scala | 1 + .../main/scala/bloop/engine/Interpreter.scala | 24 +++--- .../main/scala/bloop/io/SourceWatcher.scala | 73 ++++++++++++++++++- 4 files changed, 88 insertions(+), 16 deletions(-) diff --git a/frontend/src/main/scala/bloop/engine/ClientPool.scala b/frontend/src/main/scala/bloop/engine/ClientPool.scala index 79915598b4..dbd17cdb3d 100644 --- a/frontend/src/main/scala/bloop/engine/ClientPool.scala +++ b/frontend/src/main/scala/bloop/engine/ClientPool.scala @@ -21,7 +21,7 @@ case object NoPool extends ClientPool { case class NailgunPool(context: NGContext) extends ClientPool { override def addListener(f: CloseEvent => Unit): Unit = { - val nailgunListener = new NGClientListener { + context.addClientListener(new NGClientListener { override def clientDisconnected(reason: NGClientDisconnectReason): Unit = { f(reason match { case NGClientDisconnectReason.HEARTBEAT => Heartbeat @@ -31,9 +31,7 @@ case class NailgunPool(context: NGContext) extends ClientPool { case NGClientDisconnectReason.INTERNAL_ERROR => InternalError }) } - } - context.addClientListener(nailgunListener) - nailgunListener + }) } override def removeAllListeners(): Unit = context.removeAllClientListeners() diff --git a/frontend/src/main/scala/bloop/engine/ExecutionContext.scala b/frontend/src/main/scala/bloop/engine/ExecutionContext.scala index 411a18b23d..059b7581ba 100644 --- a/frontend/src/main/scala/bloop/engine/ExecutionContext.scala +++ b/frontend/src/main/scala/bloop/engine/ExecutionContext.scala @@ -17,4 +17,5 @@ object ExecutionContext { } implicit val scheduler: Scheduler = Scheduler(executor) + implicit val ioScheduler: Scheduler = Scheduler.io("bloop-io") } diff --git a/frontend/src/main/scala/bloop/engine/Interpreter.scala b/frontend/src/main/scala/bloop/engine/Interpreter.scala index 5260351bf2..99bc90f281 100644 --- a/frontend/src/main/scala/bloop/engine/Interpreter.scala +++ b/frontend/src/main/scala/bloop/engine/Interpreter.scala @@ -87,17 +87,11 @@ object Interpreter { BspServer.run(cmd, state, scheduler) } - private def watch(project: Project, state: State, f: State => Task[State]): Task[State] = Task { - // TODO(jvican): The implementation here could be improved, do so. + private def watch(project: Project, state: State, f: State => Task[State]): Task[State] = { val reachable = Dag.dfs(state.build.getDagFor(project)) val allSourceDirs = reachable.iterator.flatMap(_.sourceDirectories.toList).map(_.underlying) val watcher = new SourceWatcher(allSourceDirs.toList, state.logger) - // Make file watching cancel tasks if lots of different changes happen in less than 100ms - val watchFn: State => State = { state => - waitAndLog(state, f(state)) - } - val firstOp = watchFn(state) - watcher.watch(firstOp, watchFn) + watcher.watch(state, f) } private def compile(cmd: Commands.Compile, state: State): Task[State] = { @@ -258,12 +252,20 @@ object Interpreter { private def waitAndLog(previousState: State, newState: Task[State]): State = { val pool = previousState.pool try { - val handle = newState.runAsync(previousState.scheduler) + val handle = newState + .executeWithOptions(_.enableAutoCancelableRunLoops) + .runAsync(previousState.scheduler) + + // Let's cancel tasks (if supported by the underlying implementation) when clients disconnect pool.addListener { case e: CloseEvent => - System.out.println(s"Client has disconnected with a '$e' event. Cancelling tasks...") - handle.cancel() + if (!handle.isCompleted) { + System.out.println( + s"Client in ${previousState.build.origin.syntax} has disconnected with a '$e' event. Cancelling tasks...") + handle.cancel() + } } + // Duration has to be infinity, we cannot predict how much time compilation takes Await.result(handle, Duration.Inf) } catch { diff --git a/frontend/src/main/scala/bloop/io/SourceWatcher.scala b/frontend/src/main/scala/bloop/io/SourceWatcher.scala index 1260019b73..42dd8cd3f2 100644 --- a/frontend/src/main/scala/bloop/io/SourceWatcher.scala +++ b/frontend/src/main/scala/bloop/io/SourceWatcher.scala @@ -3,12 +3,14 @@ package bloop.io import java.nio.file.Path import bloop.cli.ExitStatus -import bloop.engine.State +import bloop.engine.{ExecutionContext, State} import bloop.logging.Logger import scala.collection.JavaConverters._ import io.methvin.watcher.DirectoryChangeEvent.EventType import io.methvin.watcher.{DirectoryChangeEvent, DirectoryChangeListener, DirectoryWatcher} +import monix.eval.Task +import monix.reactive.{MulticastStrategy, Observable} final class SourceWatcher(dirs0: Seq[Path], logger: Logger) { private val dirs = dirs0.distinct @@ -51,4 +53,73 @@ final class SourceWatcher(dirs0: Seq[Path], logger: Logger) { result.mergeStatus(ExitStatus.UnexpectedError) } } + + private final val fakePath = java.nio.file.Paths.get("$$$bloop-monix-trigger-first-event$$$") + private final val triggerEventAtFirst = + new DirectoryChangeEvent(DirectoryChangeEvent.EventType.OVERFLOW, fakePath, -1) + + def watch(state0: State, action: State => Task[State]): Task[State] = { + logger.debug(s"Watching the following directories: ${dirs.mkString(", ")}") + + def runAction(state: State, event: DirectoryChangeEvent): Task[State] = { + Task(logger.debug(s"A ${event.eventType()} in ${event.path()} has triggered an event.")) + .flatMap(_ => action(state)) + .executeOn(state0.scheduler) + } + + val (observer, observable) = + Observable.multicast[DirectoryChangeEvent](MulticastStrategy.publish)( + ExecutionContext.ioScheduler) + + val compilationTask = observable + .foldLeftL(action(state0)) { + case (stateTask, e) => + e.eventType match { + case EventType.CREATE => stateTask.flatMap(s => runAction(s, e)) + case EventType.MODIFY => stateTask.flatMap(s => runAction(s, e)) + case EventType.OVERFLOW => stateTask.flatMap(s => runAction(s, e)) + case EventType.DELETE => stateTask + } + } + .flatten + + val watcher = DirectoryWatcher.create( + dirsAsJava, + new DirectoryChangeListener { + @volatile var stop: Boolean = false + override def onEvent(event: DirectoryChangeEvent): Unit = { + val targetFile = event.path() + val targetPath = targetFile.toFile.getAbsolutePath() + if (Files.isRegularFile(targetFile) && + (targetPath.endsWith(".scala") || targetPath.endsWith(".java"))) { + val ack = observer.onNext(event) + stop = ack.isCompleted + } + } + } + ) + + import scala.util.{Try, Success, Failure} + val watchingTask = Task { + Try { + try watcher.watch() + finally watcher.close() + } + }.doOnCancel(Task(watcher.close())) + + val firstEventTriggerTask = Task(observer.onNext(triggerEventAtFirst)) + val aggregated = Task.zip3( + firstEventTriggerTask.executeOn(ExecutionContext.ioScheduler), + watchingTask.executeOn(ExecutionContext.ioScheduler), + compilationTask.executeOn(state0.scheduler) + ) + + aggregated.map { + case (_, Success(_), state) => state + case (_, Failure(t), state) => + state.logger.error("Unexpected file watching error") + state.logger.trace(t) + state.mergeStatus(ExitStatus.UnexpectedError) + } + } } From 4ac97bbef077e0685109043270905f88b23b5a55 Mon Sep 17 00:00:00 2001 From: jvican Date: Tue, 13 Mar 2018 16:42:20 +0100 Subject: [PATCH 07/12] Refactor to trigger compilation in the first place --- .../main/scala/bloop/io/SourceWatcher.scala | 78 ++++++------------- 1 file changed, 23 insertions(+), 55 deletions(-) diff --git a/frontend/src/main/scala/bloop/io/SourceWatcher.scala b/frontend/src/main/scala/bloop/io/SourceWatcher.scala index 42dd8cd3f2..fcb0270af6 100644 --- a/frontend/src/main/scala/bloop/io/SourceWatcher.scala +++ b/frontend/src/main/scala/bloop/io/SourceWatcher.scala @@ -20,51 +20,18 @@ final class SourceWatcher(dirs0: Seq[Path], logger: Logger) { import java.nio.file.Files dirs.foreach(p => if (!Files.exists(p)) Files.createDirectories(p) else ()) - def watch(initialState: State, action: State => State): State = { - logger.debug(s"Watching the following directories: ${dirs.mkString(", ")}") - var result: State = initialState - def runAction(event: DirectoryChangeEvent): Unit = { - logger.debug(s"A ${event.eventType()} in ${event.path()} has triggered an event.") - result = action(result) - } - - val watcher = DirectoryWatcher.create( - dirsAsJava, - new DirectoryChangeListener { - override def onEvent(event: DirectoryChangeEvent): Unit = { - val targetFile = event.path() - val targetPath = targetFile.toFile.getAbsolutePath() - if (Files.isRegularFile(targetFile) && - (targetPath.endsWith(".scala") || targetPath.endsWith(".java"))) { - event.eventType() match { - case EventType.CREATE => runAction(event) - case EventType.MODIFY => runAction(event) - case EventType.OVERFLOW => runAction(event) - case EventType.DELETE => () // We don't do anything when a file is deleted - } - } - } - } - ) - try { watcher.watch(); result } catch { - case t: Throwable => - logger.error("Unexpected error happened when file watching.") - logger.trace(t) - result.mergeStatus(ExitStatus.UnexpectedError) - } - } - private final val fakePath = java.nio.file.Paths.get("$$$bloop-monix-trigger-first-event$$$") - private final val triggerEventAtFirst = + private final val triggerCompilationEvent = new DirectoryChangeEvent(DirectoryChangeEvent.EventType.OVERFLOW, fakePath, -1) def watch(state0: State, action: State => Task[State]): Task[State] = { - logger.debug(s"Watching the following directories: ${dirs.mkString(", ")}") + logger.info(s"Watching the following directories: ${dirs.mkString(", ")}") + var lastState: State = state0 def runAction(state: State, event: DirectoryChangeEvent): Task[State] = { - Task(logger.debug(s"A ${event.eventType()} in ${event.path()} has triggered an event.")) - .flatMap(_ => action(state)) - .executeOn(state0.scheduler) + Task(logger.info(s"A ${event.eventType()} in ${event.path()} has triggered an event.")) + .flatMap((_: Unit) => lastState = action(state)) + .map(state => { lastState = state; state }) } val (observer, observable) = @@ -72,12 +39,12 @@ final class SourceWatcher(dirs0: Seq[Path], logger: Logger) { ExecutionContext.ioScheduler) val compilationTask = observable - .foldLeftL(action(state0)) { - case (stateTask, e) => + .foldLeftL(runAction(lastState, triggerCompilationEvent)) { + case (state, e) => e.eventType match { - case EventType.CREATE => stateTask.flatMap(s => runAction(s, e)) - case EventType.MODIFY => stateTask.flatMap(s => runAction(s, e)) - case EventType.OVERFLOW => stateTask.flatMap(s => runAction(s, e)) + case EventType.CREATE => runAction(state, e) + case EventType.MODIFY => runAction(state, e) + case EventType.OVERFLOW => runAction(state, e) case EventType.DELETE => stateTask } } @@ -105,21 +72,22 @@ final class SourceWatcher(dirs0: Seq[Path], logger: Logger) { try watcher.watch() finally watcher.close() } - }.doOnCancel(Task(watcher.close())) + } - val firstEventTriggerTask = Task(observer.onNext(triggerEventAtFirst)) - val aggregated = Task.zip3( - firstEventTriggerTask.executeOn(ExecutionContext.ioScheduler), + val aggregated = Task.zip2( watchingTask.executeOn(ExecutionContext.ioScheduler), compilationTask.executeOn(state0.scheduler) ) - aggregated.map { - case (_, Success(_), state) => state - case (_, Failure(t), state) => - state.logger.error("Unexpected file watching error") - state.logger.trace(t) - state.mergeStatus(ExitStatus.UnexpectedError) - } + aggregated + .map { + case (Success(_), state) => state + case (Failure(t), state) => + state.logger.error("Unexpected file watching error") + state.logger.trace(t) + state.mergeStatus(ExitStatus.UnexpectedError) + } + .doOnCancel(Task(watcher.close())) + .doOnFinish(Task(watcher.close())) } } From d7d68db8eca9a70fd09c800a397c56553d23aabf Mon Sep 17 00:00:00 2001 From: jvican Date: Wed, 14 Mar 2018 00:43:04 +0100 Subject: [PATCH 08/12] Use a monix implementation that actually works The previous one didn't work because `Observable` returns a list of tasks, and that task cannot be processed and flattened until the stream of file events has finished (never). This is bad, it means we cannot fold left values produced by observables and then pass them in to functions that return `Task`s. For that, I've discovered you need `Consumer`, and there is one awesome consumer that does exactly what we want: the one created with `Consumer.foldLeftAsync`. --- .../main/scala/bloop/io/SourceWatcher.scala | 66 +++++++------------ 1 file changed, 23 insertions(+), 43 deletions(-) diff --git a/frontend/src/main/scala/bloop/io/SourceWatcher.scala b/frontend/src/main/scala/bloop/io/SourceWatcher.scala index fcb0270af6..7ca3a2602c 100644 --- a/frontend/src/main/scala/bloop/io/SourceWatcher.scala +++ b/frontend/src/main/scala/bloop/io/SourceWatcher.scala @@ -10,7 +10,7 @@ import scala.collection.JavaConverters._ import io.methvin.watcher.DirectoryChangeEvent.EventType import io.methvin.watcher.{DirectoryChangeEvent, DirectoryChangeListener, DirectoryWatcher} import monix.eval.Task -import monix.reactive.{MulticastStrategy, Observable} +import monix.reactive.{Consumer, MulticastStrategy, Observable} final class SourceWatcher(dirs0: Seq[Path], logger: Logger) { private val dirs = dirs0.distinct @@ -20,36 +20,26 @@ final class SourceWatcher(dirs0: Seq[Path], logger: Logger) { import java.nio.file.Files dirs.foreach(p => if (!Files.exists(p)) Files.createDirectories(p) else ()) - private final val fakePath = java.nio.file.Paths.get("$$$bloop-monix-trigger-first-event$$$") - private final val triggerCompilationEvent = - new DirectoryChangeEvent(DirectoryChangeEvent.EventType.OVERFLOW, fakePath, -1) - def watch(state0: State, action: State => Task[State]): Task[State] = { - logger.info(s"Watching the following directories: ${dirs.mkString(", ")}") - - var lastState: State = state0 def runAction(state: State, event: DirectoryChangeEvent): Task[State] = { Task(logger.info(s"A ${event.eventType()} in ${event.path()} has triggered an event.")) - .flatMap((_: Unit) => lastState = action(state)) - .map(state => { lastState = state; state }) + .flatMap(_ => action(state)) + } + + val fileEventConsumer = Consumer.foldLeftAsync[State, DirectoryChangeEvent](state0) { + case (state, event) => + event.eventType match { + case EventType.CREATE => runAction(state, event) + case EventType.MODIFY => runAction(state, event) + case EventType.OVERFLOW => runAction(state, event) + case EventType.DELETE => Task.now(state) + } } val (observer, observable) = Observable.multicast[DirectoryChangeEvent](MulticastStrategy.publish)( ExecutionContext.ioScheduler) - val compilationTask = observable - .foldLeftL(runAction(lastState, triggerCompilationEvent)) { - case (state, e) => - e.eventType match { - case EventType.CREATE => runAction(state, e) - case EventType.MODIFY => runAction(state, e) - case EventType.OVERFLOW => runAction(state, e) - case EventType.DELETE => stateTask - } - } - .flatten - val watcher = DirectoryWatcher.create( dirsAsJava, new DirectoryChangeListener { @@ -57,7 +47,7 @@ final class SourceWatcher(dirs0: Seq[Path], logger: Logger) { override def onEvent(event: DirectoryChangeEvent): Unit = { val targetFile = event.path() val targetPath = targetFile.toFile.getAbsolutePath() - if (Files.isRegularFile(targetFile) && + if (!stop && Files.isRegularFile(targetFile) && (targetPath.endsWith(".scala") || targetPath.endsWith(".java"))) { val ack = observer.onNext(event) stop = ack.isCompleted @@ -66,28 +56,18 @@ final class SourceWatcher(dirs0: Seq[Path], logger: Logger) { } ) - import scala.util.{Try, Success, Failure} val watchingTask = Task { - Try { - try watcher.watch() - finally watcher.close() - } - } + logger.info(s"Watching the following directories: ${dirs.mkString(", ")}") + try watcher.watch() + finally watcher.close() + }.doOnCancel(Task(watcher.close())) - val aggregated = Task.zip2( - watchingTask.executeOn(ExecutionContext.ioScheduler), - compilationTask.executeOn(state0.scheduler) - ) + val watchHandle = watchingTask.materialize.runAsync(ExecutionContext.ioScheduler) - aggregated - .map { - case (Success(_), state) => state - case (Failure(t), state) => - state.logger.error("Unexpected file watching error") - state.logger.trace(t) - state.mergeStatus(ExitStatus.UnexpectedError) - } - .doOnCancel(Task(watcher.close())) - .doOnFinish(Task(watcher.close())) + observable + .consumeWith(fileEventConsumer) + .executeOn(state0.scheduler) + .doOnCancel(Task(watchHandle.cancel())) + .doOnFinish(_ => Task(watchHandle.cancel())) } } From 8e72cfe80d59fe3d1d4b30eb025845a6efd68c26 Mon Sep 17 00:00:00 2001 From: jvican Date: Wed, 14 Mar 2018 01:17:00 +0100 Subject: [PATCH 09/12] Finish the redesign of file watching Fixes https://github.com/scalacenter/bloop/issues/257 which was caused because the cancellation of tasks for file watching did not work correctly when `CTRL-C` was used. --- .../main/scala/bloop/engine/Interpreter.scala | 9 ++++-- .../main/scala/bloop/io/SourceWatcher.scala | 29 ++++++++++++------- 2 files changed, 24 insertions(+), 14 deletions(-) diff --git a/frontend/src/main/scala/bloop/engine/Interpreter.scala b/frontend/src/main/scala/bloop/engine/Interpreter.scala index 99bc90f281..360fb9b7ff 100644 --- a/frontend/src/main/scala/bloop/engine/Interpreter.scala +++ b/frontend/src/main/scala/bloop/engine/Interpreter.scala @@ -90,8 +90,9 @@ object Interpreter { private def watch(project: Project, state: State, f: State => Task[State]): Task[State] = { val reachable = Dag.dfs(state.build.getDagFor(project)) val allSourceDirs = reachable.iterator.flatMap(_.sourceDirectories.toList).map(_.underlying) - val watcher = new SourceWatcher(allSourceDirs.toList, state.logger) - watcher.watch(state, f) + val watcher = new SourceWatcher(project, allSourceDirs.toList, state.logger) + // Force the first execution before relying on the file watching task + f(state).flatMap(newState => watcher.watch(newState, f)) } private def compile(cmd: Commands.Compile, state: State): Task[State] = { @@ -267,7 +268,9 @@ object Interpreter { } // Duration has to be infinity, we cannot predict how much time compilation takes - Await.result(handle, Duration.Inf) + val result = Await.result(handle, Duration.Inf) + System.out.println(s"Result is $result") + result } catch { case NonFatal(t) => previousState.logger.error(t.getMessage) diff --git a/frontend/src/main/scala/bloop/io/SourceWatcher.scala b/frontend/src/main/scala/bloop/io/SourceWatcher.scala index 7ca3a2602c..773bd5f0f8 100644 --- a/frontend/src/main/scala/bloop/io/SourceWatcher.scala +++ b/frontend/src/main/scala/bloop/io/SourceWatcher.scala @@ -2,7 +2,7 @@ package bloop.io import java.nio.file.Path -import bloop.cli.ExitStatus +import bloop.Project import bloop.engine.{ExecutionContext, State} import bloop.logging.Logger @@ -10,9 +10,10 @@ import scala.collection.JavaConverters._ import io.methvin.watcher.DirectoryChangeEvent.EventType import io.methvin.watcher.{DirectoryChangeEvent, DirectoryChangeListener, DirectoryWatcher} import monix.eval.Task +import monix.execution.Cancelable import monix.reactive.{Consumer, MulticastStrategy, Observable} -final class SourceWatcher(dirs0: Seq[Path], logger: Logger) { +final class SourceWatcher(project: Project, dirs0: Seq[Path], logger: Logger) { private val dirs = dirs0.distinct private val dirsAsJava: java.util.List[Path] = dirs.asJava @@ -22,8 +23,8 @@ final class SourceWatcher(dirs0: Seq[Path], logger: Logger) { def watch(state0: State, action: State => Task[State]): Task[State] = { def runAction(state: State, event: DirectoryChangeEvent): Task[State] = { - Task(logger.info(s"A ${event.eventType()} in ${event.path()} has triggered an event.")) - .flatMap(_ => action(state)) + logger.info(s"A ${event.eventType()} in ${event.path()} has triggered an event.") + action(state) } val fileEventConsumer = Consumer.foldLeftAsync[State, DirectoryChangeEvent](state0) { @@ -43,14 +44,13 @@ final class SourceWatcher(dirs0: Seq[Path], logger: Logger) { val watcher = DirectoryWatcher.create( dirsAsJava, new DirectoryChangeListener { - @volatile var stop: Boolean = false override def onEvent(event: DirectoryChangeEvent): Unit = { val targetFile = event.path() val targetPath = targetFile.toFile.getAbsolutePath() - if (!stop && Files.isRegularFile(targetFile) && + if (Files.isRegularFile(targetFile) && (targetPath.endsWith(".scala") || targetPath.endsWith(".java"))) { - val ack = observer.onNext(event) - stop = ack.isCompleted + observer.onNext(event) + () } } } @@ -60,14 +60,21 @@ final class SourceWatcher(dirs0: Seq[Path], logger: Logger) { logger.info(s"Watching the following directories: ${dirs.mkString(", ")}") try watcher.watch() finally watcher.close() - }.doOnCancel(Task(watcher.close())) + }.doOnCancel(Task{ + System.out.println("Running cancellation for watch task") + observer.onComplete() + watcher.close() + System.out.println(s"File watching of '${project.name}' has been successfully cancelled.") + }) val watchHandle = watchingTask.materialize.runAsync(ExecutionContext.ioScheduler) observable .consumeWith(fileEventConsumer) - .executeOn(state0.scheduler) - .doOnCancel(Task(watchHandle.cancel())) + .doOnCancel(Task{ + System.out.println("RUnning cancellation in observable") + Cancelable.cancelAll(List(watchHandle)) + }) .doOnFinish(_ => Task(watchHandle.cancel())) } } From 0d8b83f9587e21e9d602bde71d43660c4e69ea37 Mon Sep 17 00:00:00 2001 From: jvican Date: Wed, 14 Mar 2018 11:51:53 +0100 Subject: [PATCH 10/12] Redirect cancellation logs to the out in nailgun --- backend/src/main/scala/bloop/Compiler.scala | 1 + .../main/scala/bloop/logging/BloopLogger.scala | 5 ++--- frontend/src/main/scala/bloop/Cli.scala | 6 ++++-- .../main/scala/bloop/cli/CommonOptions.scala | 2 ++ .../main/scala/bloop/engine/Interpreter.scala | 18 ++++++++++++------ .../main/scala/bloop/io/SourceWatcher.scala | 18 ++++++++---------- 6 files changed, 29 insertions(+), 21 deletions(-) diff --git a/backend/src/main/scala/bloop/Compiler.scala b/backend/src/main/scala/bloop/Compiler.scala index 1347fe53d4..940a15a5d3 100644 --- a/backend/src/main/scala/bloop/Compiler.scala +++ b/backend/src/main/scala/bloop/Compiler.scala @@ -10,6 +10,7 @@ import bloop.logging.Logger import sbt.internal.inc.{FreshCompilerCache, Locate, LoggedReporter, ZincUtil} case class CompileInputs( + scalaInstance: ScalaInstance, compilerCache: CompilerCache, sourceDirectories: Array[AbsolutePath], diff --git a/backend/src/main/scala/bloop/logging/BloopLogger.scala b/backend/src/main/scala/bloop/logging/BloopLogger.scala index a61196c8db..e0859741a7 100644 --- a/backend/src/main/scala/bloop/logging/BloopLogger.scala +++ b/backend/src/main/scala/bloop/logging/BloopLogger.scala @@ -1,11 +1,10 @@ package bloop.logging -import java.io.{FileOutputStream, OutputStream, PrintStream} +import java.io.PrintStream import java.util.concurrent.atomic.AtomicInteger import scala.Console.{CYAN, GREEN, RED, RESET, YELLOW} - -import bloop.io.AbsolutePath +import com.martiansoftware.nailgun.NGCommunicator /** * Creates a logger that writes to the given streams. diff --git a/frontend/src/main/scala/bloop/Cli.scala b/frontend/src/main/scala/bloop/Cli.scala index 46c8806771..fd8140b238 100644 --- a/frontend/src/main/scala/bloop/Cli.scala +++ b/frontend/src/main/scala/bloop/Cli.scala @@ -4,8 +4,7 @@ import bloop.bsp.BspServer import bloop.cli.validation.Validate import bloop.cli.{CliOptions, CliParsers, Commands, CommonOptions, ExitStatus} import bloop.engine.{Action, ClientPool, Exit, Interpreter, NailgunPool, NoPool, Print, Run, State} -import bloop.io.Paths -import bloop.logging.{BloopLogger, Logger} +import bloop.logging.BloopLogger import caseapp.core.{DefaultBaseCommand, Messages} import com.martiansoftware.nailgun.NGContext @@ -19,10 +18,13 @@ object Cli { } def nailMain(ngContext: NGContext): Unit = { + val server = ngContext.getNGServer val nailgunOptions = CommonOptions( in = ngContext.in, out = ngContext.out, err = ngContext.err, + ngout = server.out, + ngerr = server.err, workingDirectory = ngContext.getWorkingDirectory, ) val command = ngContext.getCommand diff --git a/frontend/src/main/scala/bloop/cli/CommonOptions.scala b/frontend/src/main/scala/bloop/cli/CommonOptions.scala index d4a830ed99..02dc31b72b 100644 --- a/frontend/src/main/scala/bloop/cli/CommonOptions.scala +++ b/frontend/src/main/scala/bloop/cli/CommonOptions.scala @@ -20,6 +20,8 @@ case class CommonOptions( @Hidden out: PrintStream = System.out, @Hidden in: InputStream = System.in, @Hidden err: PrintStream = System.err, + @Hidden ngout: PrintStream = System.out, + @Hidden ngerr: PrintStream = System.err, threads: Int = ExecutionContext.nCPUs ) { def workingPath: AbsolutePath = AbsolutePath(workingDirectory) diff --git a/frontend/src/main/scala/bloop/engine/Interpreter.scala b/frontend/src/main/scala/bloop/engine/Interpreter.scala index 360fb9b7ff..6c86d37d9b 100644 --- a/frontend/src/main/scala/bloop/engine/Interpreter.scala +++ b/frontend/src/main/scala/bloop/engine/Interpreter.scala @@ -11,7 +11,6 @@ import bloop.testing.TestInternals import bloop.engine.tasks.Tasks import bloop.Project import monix.eval.Task -import monix.execution.Cancelable import monix.execution.misc.NonFatal import scala.concurrent.Await @@ -20,8 +19,10 @@ import scala.concurrent.duration.Duration object Interpreter { @tailrec def execute(action: Action, state: State): State = { - def logAndTime(cliOptions: CliOptions, action: Task[State]): State = { - state.logger.verboseIf(cliOptions.verbose)(timed(state.logger)(waitAndLog(state, action))) + def logAndTime(cliOptions: CliOptions, task: Task[State]): State = { + state.logger.verboseIf(cliOptions.verbose)( + timed(state.logger)(waitAndLog(action, cliOptions, state, task)) + ) } action match { @@ -250,8 +251,14 @@ object Interpreter { state.mergeStatus(ExitStatus.InvalidCommandLineOption) } - private def waitAndLog(previousState: State, newState: Task[State]): State = { + private def waitAndLog( + action: Action, + cliOptions: CliOptions, + previousState: State, + newState: Task[State] + ): State = { val pool = previousState.pool + val ngout = cliOptions.common.ngout try { val handle = newState .executeWithOptions(_.enableAutoCancelableRunLoops) @@ -267,9 +274,8 @@ object Interpreter { } } - // Duration has to be infinity, we cannot predict how much time compilation takes val result = Await.result(handle, Duration.Inf) - System.out.println(s"Result is $result") + ngout.println(s"The task for $action finished.") result } catch { case NonFatal(t) => diff --git a/frontend/src/main/scala/bloop/io/SourceWatcher.scala b/frontend/src/main/scala/bloop/io/SourceWatcher.scala index 773bd5f0f8..f39bb2072a 100644 --- a/frontend/src/main/scala/bloop/io/SourceWatcher.scala +++ b/frontend/src/main/scala/bloop/io/SourceWatcher.scala @@ -10,11 +10,11 @@ import scala.collection.JavaConverters._ import io.methvin.watcher.DirectoryChangeEvent.EventType import io.methvin.watcher.{DirectoryChangeEvent, DirectoryChangeListener, DirectoryWatcher} import monix.eval.Task -import monix.execution.Cancelable import monix.reactive.{Consumer, MulticastStrategy, Observable} final class SourceWatcher(project: Project, dirs0: Seq[Path], logger: Logger) { private val dirs = dirs0.distinct + private val dirsCount = dirs.size private val dirsAsJava: java.util.List[Path] = dirs.asJava // Create source directories if they don't exist, otherwise the watcher fails. @@ -22,8 +22,9 @@ final class SourceWatcher(project: Project, dirs0: Seq[Path], logger: Logger) { dirs.foreach(p => if (!Files.exists(p)) Files.createDirectories(p) else ()) def watch(state0: State, action: State => Task[State]): Task[State] = { + val ngout = state0.commonOptions.ngout def runAction(state: State, event: DirectoryChangeEvent): Task[State] = { - logger.info(s"A ${event.eventType()} in ${event.path()} has triggered an event.") + logger.debug(s"A ${event.eventType()} in ${event.path()} has triggered an event.") action(state) } @@ -57,24 +58,21 @@ final class SourceWatcher(project: Project, dirs0: Seq[Path], logger: Logger) { ) val watchingTask = Task { - logger.info(s"Watching the following directories: ${dirs.mkString(", ")}") + logger.info(s"File watching $dirsCount directories...") try watcher.watch() finally watcher.close() - }.doOnCancel(Task{ - System.out.println("Running cancellation for watch task") + }.doOnCancel(Task { observer.onComplete() watcher.close() - System.out.println(s"File watching of '${project.name}' has been successfully cancelled.") + ngout.println( + s"File watching on '${project.name}' and dependent projects has been successfully cancelled.") }) val watchHandle = watchingTask.materialize.runAsync(ExecutionContext.ioScheduler) observable .consumeWith(fileEventConsumer) - .doOnCancel(Task{ - System.out.println("RUnning cancellation in observable") - Cancelable.cancelAll(List(watchHandle)) - }) .doOnFinish(_ => Task(watchHandle.cancel())) + .doOnCancel(Task(watchHandle.cancel())) } } From b85f2249e56ca95c81cee99de9e7f38ac3771973 Mon Sep 17 00:00:00 2001 From: jvican Date: Wed, 14 Mar 2018 16:07:54 +0100 Subject: [PATCH 11/12] Add test for file watching in `BasicNailgunSpec` --- frontend/src/main/scala/bloop/Server.scala | 6 ++- .../main/scala/bloop/engine/Interpreter.scala | 2 +- .../scala/bloop/logging/RecordingLogger.scala | 2 + .../bloop/nailgun/BasicNailgunSpec.scala | 41 ++++++++++++++-- .../scala/bloop/nailgun/NailgunTest.scala | 49 +++++++++++++++---- 5 files changed, 85 insertions(+), 15 deletions(-) diff --git a/frontend/src/main/scala/bloop/Server.scala b/frontend/src/main/scala/bloop/Server.scala index f4ff65619b..f2c7fdc039 100644 --- a/frontend/src/main/scala/bloop/Server.scala +++ b/frontend/src/main/scala/bloop/Server.scala @@ -10,11 +10,15 @@ class Server object Server { private val defaultPort: Int = 8212 // 8100 + 'p' def main(args: Array[String]): Unit = { + run(mainTest(args)) + } + + private[bloop] def mainTest(args: Array[String]): NGServer = { val port = Try(args(0).toInt).getOrElse(Server.defaultPort) val addr = InetAddress.getLoopbackAddress val server = new NGServer(addr, port) registerAliases(server) - run(server) + server } def nailMain(ngContext: NGContext): Unit = { diff --git a/frontend/src/main/scala/bloop/engine/Interpreter.scala b/frontend/src/main/scala/bloop/engine/Interpreter.scala index 6c86d37d9b..8ee7898801 100644 --- a/frontend/src/main/scala/bloop/engine/Interpreter.scala +++ b/frontend/src/main/scala/bloop/engine/Interpreter.scala @@ -268,7 +268,7 @@ object Interpreter { pool.addListener { case e: CloseEvent => if (!handle.isCompleted) { - System.out.println( + ngout.println( s"Client in ${previousState.build.origin.syntax} has disconnected with a '$e' event. Cancelling tasks...") handle.cancel() } diff --git a/frontend/src/test/scala/bloop/logging/RecordingLogger.scala b/frontend/src/test/scala/bloop/logging/RecordingLogger.scala index f473825db3..9bcda19425 100644 --- a/frontend/src/test/scala/bloop/logging/RecordingLogger.scala +++ b/frontend/src/test/scala/bloop/logging/RecordingLogger.scala @@ -15,7 +15,9 @@ class RecordingLogger extends AbstractLogger { override def verbose[T](op: => T): T = op override def debug(msg: String): Unit = { messages.add(("debug", msg)); () } override def info(msg: String): Unit = { messages.add(("info", msg)); () } + def serverInfo(msg: String): Unit = { messages.add(("server-info", msg)); () } override def error(msg: String): Unit = { messages.add(("error", msg)); () } + def serverError(msg: String): Unit = { messages.add(("server-error", msg)); () } override def warn(msg: String): Unit = { messages.add(("warn", msg)); () } private def trace(msg: String): Unit = { messages.add(("trace", msg)); () } override def trace(ex: Throwable): Unit = { diff --git a/frontend/src/test/scala/bloop/nailgun/BasicNailgunSpec.scala b/frontend/src/test/scala/bloop/nailgun/BasicNailgunSpec.scala index ccf7f39088..29762ebb8a 100644 --- a/frontend/src/test/scala/bloop/nailgun/BasicNailgunSpec.scala +++ b/frontend/src/test/scala/bloop/nailgun/BasicNailgunSpec.scala @@ -1,14 +1,17 @@ package bloop.nailgun -import scala.Console.{GREEN, RESET} +import java.util.concurrent.TimeUnit -import org.junit.{Ignore, Test} +import scala.Console.{GREEN, RESET} +import org.junit.Test import org.junit.Assert.{assertEquals, assertTrue} - import bloop.logging.RecordingLogger class BasicNailgunSpec extends NailgunTest { + val out = System.out + val err = System.err + @Test def unknownCommandTest(): Unit = { withServerInProject("with-resources") { (logger, client) => @@ -80,6 +83,38 @@ class BasicNailgunSpec extends NailgunTest { } } + @Test + def testWatchCompileCommand(): Unit = { + var rlogger: RecordingLogger = null + withServerInProject("with-resources") { (logger, client) => + client.success("clean", "with-resources") + val fileWatchingProcess = client.issueAsProcess("compile", "-w", "with-resources") + fileWatchingProcess.waitFor(4, TimeUnit.SECONDS) + fileWatchingProcess.destroyForcibly() + + // Repeat the whole process again. + client.success("clean", "with-resources") + val fileWatchingProcess2 = client.issueAsProcess("compile", "-w", "with-resources") + fileWatchingProcess2.waitFor(4, TimeUnit.SECONDS) + fileWatchingProcess2.destroyForcibly() + + // Ugly, but necessary for the sake of testing. + rlogger = logger + } + + // We check here the logs because until 'exit' the server doesn't flush everything + val serverInfos = rlogger.getMessages().filter(_._1 == "server-info").map(_._2) + val cancellingTasks = serverInfos.filter(_.contains("Cancelling tasks...")) + val fileWatchingCancellations = serverInfos.filter( + _ == "File watching on 'with-resources' and dependent projects has been successfully cancelled.") + + val timesHappened = fileWatchingCancellations.size + assertTrue(s"The file watching cancellation happened $timesHappened only!", timesHappened == 2) + + val timesCancelling = cancellingTasks.size + assertTrue(s"Bloop registered task cancellation only $timesCancelling", timesCancelling == 2) + } + @Test def testRunCommand(): Unit = { withServerInProject("with-resources") { (logger, client) => diff --git a/frontend/src/test/scala/bloop/nailgun/NailgunTest.scala b/frontend/src/test/scala/bloop/nailgun/NailgunTest.scala index f775a448a3..56aaf54f66 100644 --- a/frontend/src/test/scala/bloop/nailgun/NailgunTest.scala +++ b/frontend/src/test/scala/bloop/nailgun/NailgunTest.scala @@ -1,12 +1,14 @@ package bloop.nailgun -import org.junit.Assert.{assertEquals, assertNotEquals} +import java.io.{File, PrintStream} +import org.junit.Assert.{assertEquals, assertNotEquals} import java.nio.file.{Files, Path, Paths} import bloop.Server import bloop.logging.{ProcessLogger, RecordingLogger} import bloop.tasks.ProjectHelpers +import com.martiansoftware.nailgun.NGServer /** * Base class for writing test for the nailgun integration. @@ -25,16 +27,28 @@ abstract class NailgunTest { * @return The result of executing `op` on the client. */ def withServer[T](log: RecordingLogger, base: Path)(op: Client => T): T = { + val oldOut = System.out + val oldErr = System.err + val outStream = new PrintStream(ProcessLogger.toOutputStream(log.serverInfo)) + val errStream = new PrintStream(ProcessLogger.toOutputStream(log.serverError)) + val serverThread = new Thread { override def run(): Unit = { - val outStream = ProcessLogger.toOutputStream(log.info) - val errStream = ProcessLogger.toOutputStream(log.error) - Console.withOut(outStream) { - Console.withErr(errStream) { - Server.main(Array(TEST_PORT.toString)) - } + var optServer: Option[NGServer] = None + + // Trick nailgun into thinking these are the real streams + System.setOut(outStream) + System.setErr(errStream) + try { + optServer = Some(Server.mainTest(Array(TEST_PORT.toString))) + } finally { + System.setOut(oldOut) + System.setErr(oldErr) } + + val server = optServer.getOrElse(sys.error("The nailgun server failed to initialize!")) + server.run() } } @@ -43,7 +57,11 @@ abstract class NailgunTest { Thread.sleep(500) val client = new Client(TEST_PORT, log, base) try op(client) - finally client.success("exit") + finally { + client.success("exit") + outStream.flush() + errStream.flush() + } } /** @@ -90,17 +108,28 @@ abstract class NailgunTest { } /** - * Executes a command `cmd` on the server, and return the exit code. + * Execute a command `cmd` on the server and return the exit code. * * @param cmd The command to execute * @return The exit code of the operation. */ def issue(cmd: String*): Int = { + issueAsProcess(cmd: _*).waitFor() + } + + /** + * Execute a command `cmd` on the server and return the process + * executing the specified command. + * + * @param cmd The command to execute + * @return The exit code of the operation. + */ + def issueAsProcess(cmd: String*): Process = { val builder = processBuilder(cmd) val process = builder.start() val processLogger = new ProcessLogger(log, process) processLogger.start() - process.waitFor() + process } /** From 492179282358f9053856eba307060bc3168f7caa Mon Sep 17 00:00:00 2001 From: jvican Date: Wed, 14 Mar 2018 16:33:38 +0100 Subject: [PATCH 12/12] Address Martin's feedback on naming and nailgun test --- frontend/src/main/scala/bloop/Server.scala | 4 ++-- .../src/test/scala/bloop/nailgun/BasicNailgunSpec.scala | 9 +++------ frontend/src/test/scala/bloop/nailgun/NailgunTest.scala | 2 +- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/frontend/src/main/scala/bloop/Server.scala b/frontend/src/main/scala/bloop/Server.scala index f2c7fdc039..58d9fa2a00 100644 --- a/frontend/src/main/scala/bloop/Server.scala +++ b/frontend/src/main/scala/bloop/Server.scala @@ -10,10 +10,10 @@ class Server object Server { private val defaultPort: Int = 8212 // 8100 + 'p' def main(args: Array[String]): Unit = { - run(mainTest(args)) + run(instantiateServer(args)) } - private[bloop] def mainTest(args: Array[String]): NGServer = { + private[bloop] def instantiateServer(args: Array[String]): NGServer = { val port = Try(args(0).toInt).getOrElse(Server.defaultPort) val addr = InetAddress.getLoopbackAddress val server = new NGServer(addr, port) diff --git a/frontend/src/test/scala/bloop/nailgun/BasicNailgunSpec.scala b/frontend/src/test/scala/bloop/nailgun/BasicNailgunSpec.scala index 29762ebb8a..404ba92dab 100644 --- a/frontend/src/test/scala/bloop/nailgun/BasicNailgunSpec.scala +++ b/frontend/src/test/scala/bloop/nailgun/BasicNailgunSpec.scala @@ -104,14 +104,11 @@ class BasicNailgunSpec extends NailgunTest { // We check here the logs because until 'exit' the server doesn't flush everything val serverInfos = rlogger.getMessages().filter(_._1 == "server-info").map(_._2) - val cancellingTasks = serverInfos.filter(_.contains("Cancelling tasks...")) - val fileWatchingCancellations = serverInfos.filter( + val timesCancelling = serverInfos.count(_.contains("Cancelling tasks...")) + val timesCancelled = serverInfos.count( _ == "File watching on 'with-resources' and dependent projects has been successfully cancelled.") - val timesHappened = fileWatchingCancellations.size - assertTrue(s"The file watching cancellation happened $timesHappened only!", timesHappened == 2) - - val timesCancelling = cancellingTasks.size + assertTrue(s"File watching cancellation happened $timesCancelled only!", timesCancelled == 2) assertTrue(s"Bloop registered task cancellation only $timesCancelling", timesCancelling == 2) } diff --git a/frontend/src/test/scala/bloop/nailgun/NailgunTest.scala b/frontend/src/test/scala/bloop/nailgun/NailgunTest.scala index 56aaf54f66..2961c1674a 100644 --- a/frontend/src/test/scala/bloop/nailgun/NailgunTest.scala +++ b/frontend/src/test/scala/bloop/nailgun/NailgunTest.scala @@ -41,7 +41,7 @@ abstract class NailgunTest { System.setOut(outStream) System.setErr(errStream) try { - optServer = Some(Server.mainTest(Array(TEST_PORT.toString))) + optServer = Some(Server.instantiateServer(Array(TEST_PORT.toString))) } finally { System.setOut(oldOut) System.setErr(oldErr)