From 93d8afc16728ddb173657acfe634e1b5cd617962 Mon Sep 17 00:00:00 2001 From: jvican Date: Fri, 20 Apr 2018 13:34:49 +0200 Subject: [PATCH] Revamp the test and run fork process infrastructure 1. Migrate to `NuProcess` and avoid the annoying Java and Scala process APIs. Nuprocess gives us a better foundation to deal with stdout and stderr, as well as processing standard input from the host process (Bloop). `NuProcess` will also be more efficient than our previous approach. 2. Migrate test server and the whole `runMain` machinery to `Task` so that we can have a finer control over it. Note that now these actions can be cancelled! --- .../scala/bloop/logging/ProcessLogger.scala | 22 +- build.sbt | 1 + .../main/scala/bloop/engine/tasks/Tasks.scala | 87 ++++---- .../main/scala/bloop/exec/ForkProcess.scala | 100 --------- .../src/main/scala/bloop/exec/Forker.scala | 203 ++++++++++++++++++ .../scala/bloop/testing/TestInternals.scala | 42 ++-- .../main/scala/bloop/testing/TestServer.scala | 186 ++++++++-------- ...ForkProcessSpec.scala => ForkerSpec.scala} | 52 ++++- .../test/scala/bloop/tasks/RunTasksSpec.scala | 54 ++++- .../test/scala/bloop/tasks/TestTaskTest.scala | 46 ++-- .../src/test/scala/bloop/tasks/TestUtil.scala | 15 +- project/Dependencies.scala | 2 + 12 files changed, 511 insertions(+), 299 deletions(-) delete mode 100644 frontend/src/main/scala/bloop/exec/ForkProcess.scala create mode 100644 frontend/src/main/scala/bloop/exec/Forker.scala rename frontend/src/test/scala/bloop/exec/{ForkProcessSpec.scala => ForkerSpec.scala} (63%) diff --git a/backend/src/main/scala/bloop/logging/ProcessLogger.scala b/backend/src/main/scala/bloop/logging/ProcessLogger.scala index cf0de103ec..ca0bc6f083 100644 --- a/backend/src/main/scala/bloop/logging/ProcessLogger.scala +++ b/backend/src/main/scala/bloop/logging/ProcessLogger.scala @@ -83,14 +83,32 @@ object ProcessLogger { * @param logFn The handler that receives data from the `stream` * @param stream The stream that produces the data. */ -private class StreamLogger(logFn: String => Unit, stream: InputStream) extends Thread { +private final class StreamLogger(logFn: String => Unit, stream: InputStream) extends Thread { private[this] val reader = new BufferedReader(new InputStreamReader(stream)) @tailrec - override final def run(): Unit = { + override def run(): Unit = { Option(reader.readLine()) match { case Some(line) => logFn(line); run() case None => () } } } + +private final class InputThread(in: InputStream, processIn: OutputStream) extends Thread { + override def run(): Unit = { + val buffer = new Array[Byte](256) + var read: Int = -1 + while (true) { + try { + read = in.read(buffer) + if (read == -1) return + else { + processIn.write(buffer, 0, read) + } + } catch { + case e: java.io.IOException => return + } + } + } +} diff --git a/build.sbt b/build.sbt index 8a807d04a8..114063393c 100644 --- a/build.sbt +++ b/build.sbt @@ -58,6 +58,7 @@ val jsonConfig = project Dependencies.metaconfigDocs, Dependencies.metaconfigConfig, Dependencies.circeDerivation, + Dependencies.nuprocess, Dependencies.scalacheck % Test, ) } else { diff --git a/frontend/src/main/scala/bloop/engine/tasks/Tasks.scala b/frontend/src/main/scala/bloop/engine/tasks/Tasks.scala index 3d5811a8ea..f850301a3a 100644 --- a/frontend/src/main/scala/bloop/engine/tasks/Tasks.scala +++ b/frontend/src/main/scala/bloop/engine/tasks/Tasks.scala @@ -4,7 +4,7 @@ import bloop.cli.ExitStatus import bloop.config.Config import bloop.engine.caches.ResultsCache import bloop.engine.{Dag, Leaf, Parent, State} -import bloop.exec.ForkProcess +import bloop.exec.Forker import bloop.io.AbsolutePath import bloop.logging.BspLogger import bloop.reporter.{BspReporter, LogReporter, Problem, Reporter, ReporterConfig} @@ -271,52 +271,54 @@ object Tasks { isolated: Boolean, frameworkSpecificRawArgs: List[String], testFilter: String => Boolean - ): Task[State] = Task { - // TODO(jvican): This method should cache the test loader always. + ): Task[State] = { import state.logger import bloop.util.JavaCompat.EnrichOptional + def foundFrameworks (frameworks: Array[Framework])= frameworks.map(_.name).mkString(", ") + + // Test arguments coming after `--` can only be used if only one mapping is found + def considerFrameworkArgs(frameworks: Array[Framework]) = { + if (frameworkSpecificRawArgs.isEmpty) Nil + else { + frameworks match { + case Array(oneFramework) => + val rawArgs = frameworkSpecificRawArgs.toArray + val cls = oneFramework.getClass.getName() + logger.debug(s"Test options '$rawArgs' assigned to the only found framework $cls'.") + List(Config.TestArgument(rawArgs, Some(Config.TestFramework(List(cls))))) + case _ => + val ignoredArgs = frameworkSpecificRawArgs.mkString(" ") + logger.warn( + s"Framework-specific test options '${ignoredArgs}' are ignored because several frameworks were found: ${foundFrameworks(frameworks)}") + Nil + } + } + } val projectsToTest = if (isolated) List(project) else Dag.dfs(state.build.getDagFor(project)) - projectsToTest.foreach { project => + val testTasks = projectsToTest.map { project => val projectName = project.name val projectTestArgs = project.testOptions.arguments - val forkProcess = ForkProcess(project.javaEnv, project.classpath) - val testLoader = forkProcess.toExecutionClassLoader(Some(TestInternals.filteredLoader)) + val forker = Forker(project.javaEnv, project.classpath) + val testLoader = forker.toExecutionClassLoader(Some(TestInternals.filteredLoader)) val frameworks = project.testFrameworks .flatMap(f => TestInternals.loadFramework(testLoader, f.names, logger)) - def foundFrameworks = frameworks.map(_.name).mkString(", ") - logger.debug(s"Found frameworks: $foundFrameworks") + logger.debug(s"Found frameworks: ${foundFrameworks(frameworks)}") - // Test arguments coming after `--` can only be used if only one mapping is found - val frameworkArgs = { - if (frameworkSpecificRawArgs.isEmpty) Nil - else { - frameworks match { - case Array(oneFramework) => - val rawArgs = frameworkSpecificRawArgs.toArray - val cls = oneFramework.getClass.getName() - logger.debug(s"Test options '$rawArgs' assigned to the only found framework $cls'.") - List(Config.TestArgument(rawArgs, Some(Config.TestFramework(List(cls))))) - case _ => - val ignoredArgs = frameworkSpecificRawArgs.mkString(" ") - logger.warn( - s"Framework-specific test options '${ignoredArgs}' are ignored because several frameworks were found: $foundFrameworks") - Nil - } - } - } - - val analysis = state.results.lastSuccessfulResult(project).analysis().toOption.getOrElse { + val frameworkArgs = considerFrameworkArgs(frameworks) + val lastCompileResult = state.results.lastSuccessfulResult(project) + val analysis = lastCompileResult.analysis().toOption.getOrElse { logger.warn(s"Test execution is triggered but no compilation detected for ${projectName}.") Analysis.empty } - val discoveredTests = { + val discovered = { val tests = discoverTests(analysis, frameworks) val excluded = project.testOptions.excludes.toSet val ungroupedTests = tests.toList.flatMap { case (framework, tasks) => tasks.map(t => (framework, t)) } + val (includedTests, excludedTests) = ungroupedTests.partition { case (_, task) => val fqn = task.fullyQualifiedName() @@ -335,13 +337,17 @@ object Tasks { DiscoveredTests(testLoader, includedTests.groupBy(_._1).mapValues(_.map(_._2))) } + val opts = state.commonOptions val args = project.testOptions.arguments ++ frameworkArgs - val env = state.commonOptions.env - TestInternals.executeTasks(cwd, forkProcess, discoveredTests, args, handler, logger, env) + TestInternals.execute(cwd, forker, discovered, args, handler, logger, opts) } - // Return the previous state, test execution doesn't modify it. - state.mergeStatus(ExitStatus.Ok) + // For now, test execution is only sequential. + Task.sequence(testTasks).map { exitCodes => + val isOk = exitCodes.forall(_ == 0) + if (isOk) state.mergeStatus(ExitStatus.Ok) + else state.copy(status = ExitStatus.TestExecutionError) + } } /** @@ -357,16 +363,15 @@ object Tasks { project: Project, cwd: AbsolutePath, fqn: String, - args: Array[String]): Task[State] = Task { + args: Array[String]): Task[State] = { val classpath = project.classpath - val processConfig = ForkProcess(project.javaEnv, classpath) - val exitCode = processConfig.runMain(cwd, fqn, args, state.logger, state.commonOptions.env) - val exitStatus = { - if (exitCode == ForkProcess.EXIT_OK) ExitStatus.Ok - else ExitStatus.UnexpectedError + val processConfig = Forker(project.javaEnv, classpath) + processConfig.runMain(cwd, fqn, args, state.logger, state.commonOptions).map { exitCode => + state.mergeStatus { + if (exitCode == Forker.EXIT_OK) ExitStatus.Ok + else ExitStatus.UnexpectedError + } } - - state.mergeStatus(exitStatus) } /** diff --git a/frontend/src/main/scala/bloop/exec/ForkProcess.scala b/frontend/src/main/scala/bloop/exec/ForkProcess.scala deleted file mode 100644 index 4849009dfc..0000000000 --- a/frontend/src/main/scala/bloop/exec/ForkProcess.scala +++ /dev/null @@ -1,100 +0,0 @@ -package bloop.exec - -import java.io.File.{pathSeparator, separator} -import java.lang.ClassLoader -import java.lang.ProcessBuilder.Redirect -import java.nio.file.Files -import java.net.URLClassLoader -import java.util.Properties -import java.util.concurrent.ConcurrentHashMap - -import scala.util.control.NonFatal -import bloop.io.AbsolutePath -import bloop.logging.{Logger, ProcessLogger} - -/** - * Configuration to start a new JVM to execute Java code. - * - * @param javaEnv The configuration describing how to start the new JVM. - * @param classpath The full classpath with which the code should be executed. - */ -final case class ForkProcess(javaEnv: JavaEnv, classpath: Array[AbsolutePath]) { - - /** - * Creates a `ClassLoader` from the classpath of this `ForkProcess`. - * - * @param parent A parent classloader - * @return A classloader constructed from the classpath of this `ForkProcess`. - */ - def toExecutionClassLoader(parent: Option[ClassLoader]): ClassLoader = { - def makeNew(parent: Option[ClassLoader]): ClassLoader = { - val classpathEntries = classpath.map(_.underlying.toUri.toURL) - new URLClassLoader(classpathEntries, parent.orNull) - } - ForkProcess.classLoaderCache.computeIfAbsent(parent, makeNew) - } - - /** - * Run the main function in class `className`, passing it `args`. - * - * @param cwd The directory in which to start the forked JVM. - * @param className The fully qualified name of the class to run. - * @param args The arguments to pass to the main method. - * @param logger Where to log the messages from execution. - * @param properties The environment properties to run the program with. - * @param extraClasspath Paths to append to the classpath before running. - * @return 0 if the execution exited successfully, a non-zero number otherwise. - */ - def runMain(cwd: AbsolutePath, - className: String, - args: Array[String], - logger: Logger, - env: Properties, - extraClasspath: Array[AbsolutePath] = Array.empty): Int = { - import scala.collection.JavaConverters.{propertiesAsScalaMap, mapAsJavaMapConverter} - val fullClasspath = classpath ++ extraClasspath - - val java = javaEnv.javaHome.resolve("bin").resolve("java") - val classpathOption = "-cp" :: fullClasspath.map(_.syntax).mkString(pathSeparator) :: Nil - val appOptions = className :: args.toList - val cmd = java.syntax :: javaEnv.javaOptions.toList ::: classpathOption ::: appOptions - - logger.debug(s"Running '$className' in a new JVM.") - logger.debug(s" java_home = '${javaEnv.javaHome}'") - logger.debug(s" javaOptions = '${javaEnv.javaOptions.mkString(" ")}'") - logger.debug(s" classpath = '${fullClasspath.map(_.syntax).mkString(pathSeparator)}'") - logger.debug(s" command = '${cmd.mkString(" ")}'") - logger.debug(s" cwd = '$cwd'") - - if (!Files.exists(cwd.underlying)) { - logger.error(s"Couldn't start the forked JVM because '$cwd' doesn't exist.") - ForkProcess.EXIT_ERROR - } else { - val processBuilder = new ProcessBuilder(cmd: _*) - processBuilder.redirectInput(Redirect.INHERIT) - processBuilder.directory(cwd.toFile) - val processEnv = processBuilder.environment() - processEnv.clear() - processEnv.putAll(propertiesAsScalaMap(env).asJava) - val process = processBuilder.start() - val processLogger = new ProcessLogger(logger, process) - processLogger.start() - val exitCode = process.waitFor() - logger.debug(s"Forked JVM exited with code: $exitCode") - - exitCode - } - } -} - -object ForkProcess { - - private val classLoaderCache: ConcurrentHashMap[Option[ClassLoader], ClassLoader] = - new ConcurrentHashMap - - /** The code returned after a successful execution. */ - final val EXIT_OK = 0 - - /** The code returned after the execution errored. */ - final val EXIT_ERROR = 1 -} diff --git a/frontend/src/main/scala/bloop/exec/Forker.scala b/frontend/src/main/scala/bloop/exec/Forker.scala new file mode 100644 index 0000000000..8dbcca6884 --- /dev/null +++ b/frontend/src/main/scala/bloop/exec/Forker.scala @@ -0,0 +1,203 @@ +package bloop.exec + +import java.io.File.pathSeparator +import java.nio.file.Files +import java.net.URLClassLoader +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} + +import bloop.cli.CommonOptions +import bloop.engine.ExecutionContext +import bloop.io.AbsolutePath +import bloop.logging.Logger +import com.zaxxer.nuprocess.{NuAbstractProcessHandler, NuProcess} +import monix.eval.Task + +import scala.concurrent.duration.FiniteDuration + +/** + * Collects configuration to start a new program in a new process. + * + * The name comes from a similar utility https://github.com/sshtools/forker. + * + * @param javaEnv The configuration describing how to start the new JVM. + * @param classpath The full classpath with which the code should be executed. + */ +final case class Forker(javaEnv: JavaEnv, classpath: Array[AbsolutePath]) { + + /** + * Creates a `ClassLoader` from the classpath of this `ForkProcess`. + * + * @param parent A parent classloader + * @return A classloader constructed from the classpath of this `ForkProcess`. + */ + def toExecutionClassLoader(parent: Option[ClassLoader]): ClassLoader = { + def makeNew(parent: Option[ClassLoader]): ClassLoader = { + val classpathEntries = classpath.map(_.underlying.toUri.toURL) + new URLClassLoader(classpathEntries, parent.orNull) + } + Forker.classLoaderCache.computeIfAbsent(parent, makeNew) + } + + /** + * Run the main function in class `className`, passing it `args`. + * + * @param cwd The directory in which to start the forked JVM. + * @param mainClass The fully qualified name of the class to run. + * @param args The arguments to pass to the main method. + * @param logger Where to log the messages from execution. + * @param properties The environment properties to run the program with. + * @param extraClasspath Paths to append to the classpath before running. + * @return 0 if the execution exited successfully, a non-zero number otherwise. + */ + def runMain( + cwd: AbsolutePath, + mainClass: String, + args: Array[String], + logger: Logger, + opts: CommonOptions, + extraClasspath: Array[AbsolutePath] = Array.empty + ): Task[Int] = { + import scala.collection.JavaConverters.{propertiesAsScalaMap, mapAsJavaMapConverter} + val fullClasspath = (classpath ++ extraClasspath).map(_.syntax).mkString(pathSeparator) + val java = javaEnv.javaHome.resolve("bin").resolve("java") + val classpathOption = "-cp" :: fullClasspath :: Nil + val appOptions = mainClass :: args.toList + val cmd = java.syntax :: javaEnv.javaOptions.toList ::: classpathOption ::: appOptions + + if (!Files.exists(cwd.underlying)) { + Task { + logger.error(s"Couldn't start the forked JVM because '$cwd' doesn't exist.") + Forker.EXIT_ERROR + } + } else { + final class ProcessHandler extends NuAbstractProcessHandler { + override def onStart(nuProcess: NuProcess): Unit = { + if (logger.isVerbose) { + val debugOptions = + s""" + |Fork options: + | command = '${cmd.mkString(" ")}' + | cwd = '$cwd' + | classpath = '$fullClasspath' + | java_home = '${javaEnv.javaHome}' + | java_options = '${javaEnv.javaOptions.mkString(" ")}""".stripMargin + logger.debug(debugOptions) + } + } + + override def onExit(statusCode: Int): Unit = + logger.debug(s"Forked JVM exited with code $statusCode") + + val outBuilder = StringBuilder.newBuilder + override def onStdout(buffer: ByteBuffer, closed: Boolean): Unit = { + if (closed) { + val remaining = outBuilder.mkString + if (!remaining.isEmpty) + logger.info(remaining) + } else { + Forker.linesFrom(buffer, outBuilder).foreach(logger.info(_)) + } + } + + val errBuilder = StringBuilder.newBuilder + override def onStderr(buffer: ByteBuffer, closed: Boolean): Unit = { + if (closed) { + val remaining = errBuilder.mkString + if (!remaining.isEmpty) + logger.error(remaining) + } else { + Forker.linesFrom(buffer, errBuilder).foreach(logger.error(_)) + } + } + } + + Task(logger.debug(s"Running '$mainClass' in a new JVM.")).flatMap { _ => + import com.zaxxer.nuprocess.NuProcessBuilder + val handler = new ProcessHandler() + val builder = new NuProcessBuilder(handler, cmd: _*) + builder.setCwd(cwd.underlying) + val npEnv = builder.environment() + npEnv.clear() + npEnv.putAll(propertiesAsScalaMap(opts.env).asJava) + val process = builder.start() + + /* We need to gobble the input manually with a fixed delay because otherwise + * the remote process will not see it. Instead of using the `wantWrite` API + * we write directly to the process to avoid the extra level of indirection. + * + * The input gobble runs on a 50ms basis and it can process a maximum of 4096 + * bytes at a time. The rest that is not read will be read in the next 50ms. */ + val duration = FiniteDuration(50, TimeUnit.MILLISECONDS) + val gobbleInput = ExecutionContext.ioScheduler.scheduleWithFixedDelay(duration, duration) { + val buffer = new Array[Byte](4096) + val read = opts.in.read(buffer, 0, buffer.length) + if (read == -1) () + else process.writeStdin(ByteBuffer.wrap(buffer)) + } + + Task(process.waitFor(0, _root_.java.util.concurrent.TimeUnit.SECONDS)) + .doOnFinish(_ => Task(gobbleInput.cancel())) + .doOnCancel(Task { + gobbleInput.cancel() + try process.closeStdin(true) + finally process.destroy(true) + }) + } + } + } +} + +object Forker { + private val classLoaderCache: ConcurrentHashMap[Option[ClassLoader], ClassLoader] = + new ConcurrentHashMap + + /** The code returned after a successful execution. */ + final val EXIT_OK = 0 + + /** The code returned after the execution errored. */ + final val EXIT_ERROR = 1 + + private final val EmptyArray = Array.empty[String] + + /** + * Return an array of lines from a process buffer and a no lines buffer. + * + * The no lines buffer keeps track of previous messages that didn't contain + * a new line, it is therefore mutated. The buffer is the logs that we just + * received from our process. + * + * This method returns an array of new lines when the messages contain new + * lines at the end. If there are several new lines in a message but the last + * one doesn't, then we add the remaining to the string builder. + * + * @param buffer The buffer that we receive from NuProcess. + * @param remaining The string builder bookkeeping remaining msgs without new lines. + * @return An array of new lines. It can be empty. + */ + private[bloop] def linesFrom(buffer: ByteBuffer, remaining: StringBuilder): Array[String] = { + val bytes = new Array[Byte](buffer.remaining()) + buffer.get(bytes) + val msg = new String(bytes, StandardCharsets.UTF_8) + val newLines = msg.split(System.lineSeparator, Integer.MAX_VALUE) + newLines match { + case Array() => remaining.++=(msg); EmptyArray + case msgs => + val msgAtTheEnd = newLines.apply(newLines.length - 1) + val shouldBuffer = !msgAtTheEnd.isEmpty + if (shouldBuffer) + remaining.++=(msgAtTheEnd) + + if (msgs.length > 1) { + if (shouldBuffer) newLines.init + else { + val firstLine = newLines.apply(0) + newLines(0) = remaining.mkString ++ firstLine + remaining.clear() + newLines.init + } + } else EmptyArray + } + } +} diff --git a/frontend/src/main/scala/bloop/testing/TestInternals.scala b/frontend/src/main/scala/bloop/testing/TestInternals.scala index 9e9b03ac66..5c91153a23 100644 --- a/frontend/src/main/scala/bloop/testing/TestInternals.scala +++ b/frontend/src/main/scala/bloop/testing/TestInternals.scala @@ -4,10 +4,13 @@ import java.util.Properties import java.util.regex.Pattern import bloop.DependencyResolution +import bloop.cli.CommonOptions import bloop.config.Config -import bloop.exec.ForkProcess +import bloop.engine.ExecutionContext +import bloop.exec.Forker import bloop.io.AbsolutePath import bloop.logging.Logger +import monix.eval.Task import sbt.testing.{AnnotatedFingerprint, EventHandler, Fingerprint, SubclassFingerprint} import org.scalatools.testing.{Framework => OldFramework} import sbt.internal.inc.Analysis @@ -78,37 +81,40 @@ object TestInternals { * Execute the test tasks in a forked JVM. * * @param cwd The directory in which to start the forked JVM. - * @param fork Configuration for the forked JVM. - * @param discoveredTests The tests that were discovered. + * @param forker Configuration for the forked JVM. + * @param discovered The tests that were discovered. * @param args The test arguments to pass to the framework. - * @param eventHandler Handler that reacts on messages from the testing frameworks. + * @param handler Handler that reacts on messages from the testing frameworks. * @param logger Logger receiving test output. * @param env The environment properties to run the program with. */ - def executeTasks(cwd: AbsolutePath, - fork: ForkProcess, - discoveredTests: DiscoveredTests, - args: List[Config.TestArgument], - eventHandler: EventHandler, - logger: Logger, - env: Properties): Unit = { + def execute( + cwd: AbsolutePath, + forker: Forker, + discovered: DiscoveredTests, + args: List[Config.TestArgument], + handler: EventHandler, + logger: Logger, + opts: CommonOptions + ): Task[Int] = { logger.debug("Starting forked test execution.") // Make sure that we cache the resolution of the test agent jar and we don't repeat it every time val agentFiles = lazyTestAgents(logger) - val testLoader = fork.toExecutionClassLoader(Some(filteredLoader)) - val server = new TestServer(logger, eventHandler, discoveredTests, args) + val testLoader = forker.toExecutionClassLoader(Some(filteredLoader)) + val server = new TestServer(logger, handler, discovered, args, opts) val forkMain = classOf[sbt.ForkMain].getCanonicalName val arguments = Array(server.port.toString) val testAgentJars = agentFiles.filter(_.underlying.toString.endsWith(".jar")) logger.debug("Test agent jars: " + agentFiles.mkString(", ")) - val exitCode = server.whileRunning { - fork.runMain(cwd, forkMain, arguments, logger, env, testAgentJars) - } - - if (exitCode != 0) logger.error(s"Forked execution terminated with non-zero code: $exitCode") + val listener = server.listenToTests + val runner = forker.runMain(cwd, forkMain, arguments, logger, opts, testAgentJars) + val listenerHandle = listener.reporter.runAsync(ExecutionContext.ioScheduler) + runner.delayExecutionWith(listener.startServer) + .executeOn(ExecutionContext.ioScheduler) + .doOnCancel(Task(listenerHandle.cancel())) } def loadFramework(l: ClassLoader, fqns: List[String], logger: Logger): Option[Framework] = { diff --git a/frontend/src/main/scala/bloop/testing/TestServer.scala b/frontend/src/main/scala/bloop/testing/TestServer.scala index ba2664aef7..b4bb37afc8 100644 --- a/frontend/src/main/scala/bloop/testing/TestServer.scala +++ b/frontend/src/main/scala/bloop/testing/TestServer.scala @@ -1,82 +1,77 @@ package bloop.testing -import java.io.{ObjectInputStream, ObjectOutputStream, Serializable} -import java.net.{ServerSocket, SocketException} +import java.io.{ObjectInputStream, ObjectOutputStream} +import java.net.ServerSocket +import bloop.cli.CommonOptions import bloop.config.Config import scala.util.control.NonFatal import bloop.logging.Logger +import monix.eval.Task import sbt.{ForkConfiguration, ForkTags} -import sbt.testing.{AnnotatedFingerprint, Event, EventHandler, SubclassFingerprint, TaskDef} +import sbt.testing.{Event, EventHandler, TaskDef} + +import scala.concurrent.Promise /** - * A server that communicates with the test agent in a forked JVM to run the tests. - * Heavily inspired from sbt's `ForkTests.scala`. + * Implements the protocol that the forked remote JVM talks with the host process. + * + * This protocol is not formal and has been implemented after sbt's `ForkTests`. */ final class TestServer( logger: Logger, eventHandler: EventHandler, discoveredTests: DiscoveredTests, - args: List[Config.TestArgument] + args: List[Config.TestArgument], + opts: CommonOptions ) { private val server = new ServerSocket(0) - private val listener = new Thread(() => run()) private val frameworks = discoveredTests.tests.keys private val tasks = discoveredTests.tests.values.flatten - private val testLoader = discoveredTests.classLoader - /** The port on which this server is listening. */ + case class TestOrchestrator(startServer: Task[Unit], reporter: Task[Unit]) val port = server.getLocalPort + def listenToTests: TestOrchestrator = { + def forkFingerprint(td: TaskDef): TaskDef = { + val newFingerprint = sbt.SerializableFingerprints.forkFingerprint(td.fingerprint) + new TaskDef(td.fullyQualifiedName, newFingerprint, td.explicitlySpecified, td.selectors) + } - def whileRunning[T](op: => T): T = { - start() - try { - val result = op - listener.join() - result - } finally stop() - } - - private[this] def start(): Unit = { - logger.debug(s"Starting test server on port $port.") - listener.start() - } - - private[this] def stop(): Unit = { - logger.debug("Terminating test server.") - server.close() - } - - private def run(): Unit = { - logger.debug("Waiting for connection from remote JVM.") - val socket = { - try server.accept() - catch { - case ex: SocketException => - logger.error("Connection with remote JVM failed.") - logger.trace(ex) - server.close() - return + @annotation.tailrec + def receiveLogs(is: ObjectInputStream, os: ObjectOutputStream): Unit = { + is.readObject() match { + case ForkTags.`Done` => + os.writeObject(ForkTags.Done) + os.flush() + case Array(ForkTags.`Error`, s: String) => + logger.error(s) + receiveLogs(is, os) + case Array(ForkTags.`Warn`, s: String) => + logger.warn(s) + receiveLogs(is, os) + case Array(ForkTags.`Info`, s: String) => + logger.info(s) + receiveLogs(is, os) + case Array(ForkTags.`Debug`, s: String) => + logger.debug(s) + receiveLogs(is, os) + case t: Throwable => + logger.trace(t) + receiveLogs(is, os) + case Array(_: String, tEvents: Array[Event]) => + tEvents.foreach(eventHandler.handle) + receiveLogs(is, os) } } - logger.debug("Remote JVM connected.") - - val os = new ObjectOutputStream(socket.getOutputStream) - // Must flush the header that the constructor writes, otherwise the ObjectInputStream on the - // other end may block indefinitely - os.flush() - val is = new ObjectInputStream(socket.getInputStream) - try { - val config = new ForkConfiguration(logger.ansiCodesSupported, /* parallel = */ false) + def talk(is: ObjectInputStream, os: ObjectOutputStream, config: ForkConfiguration): Unit = { os.writeObject(config) - val taskDefs = tasks.map(forkFingerprint) os.writeObject(taskDefs.toArray) - os.writeInt(frameworks.size) + frameworks.foreach { framework => val frameworkClass = framework.getClass.getName() val fargs = args.filter { arg => @@ -86,70 +81,57 @@ final class TestServer( } } - val runner = TestInternals.getRunner(framework, fargs, testLoader) + val runner = TestInternals.getRunner(framework, fargs, discoveredTests.classLoader) os.writeObject(Array(framework.getClass.getCanonicalName)) os.writeObject(runner.args) os.writeObject(runner.remoteArgs) } + os.flush() + receiveLogs(is, os) + } - new React(is, os, logger, eventHandler).react() - } catch { - case NonFatal(e) => - logger.error("An error occurred during remote test execution.") - logger.trace(e) - } finally { - is.close() - os.close() - socket.close() + val serverStarted = Promise[Unit]() + val clientConnection = Task { + logger.debug(s"Firing up test server at $port. Waiting for client...") + serverStarted.trySuccess(()) + server.accept() } - } + val testListeningTask = clientConnection.flatMap { socket => + logger.debug("Test server established connection with remote JVM.") + val os = new ObjectOutputStream(socket.getOutputStream) + os.flush() + val is = new ObjectInputStream(socket.getInputStream) + val config = new ForkConfiguration(logger.ansiCodesSupported, /* parallel = */ false) - private[this] def forkFingerprint(td: TaskDef): TaskDef = { - val newFingerprint = sbt.SerializableFingerprints.forkFingerprint(td.fingerprint) - new TaskDef(td.fullyQualifiedName, newFingerprint, td.explicitlySpecified, td.selectors) - } + val cleanSocketResources = Task { + is.close() + os.close() + socket.close() + } -} + Task(talk(is, os, config)) + .doOnFinish(_ => cleanSocketResources) + .doOnCancel(cleanSocketResources) + } -/** - * Reacts on messages from the forked JVM. - * Copied straight from ForkTests in sbt/sbt. - * - * Copyright 2011 - 2017, Lightbend, Inc. - * Copyright 2008 - 2010, Mark Harrah - * Licensed under BSD-3-Clause license - */ -private final class React(is: ObjectInputStream, - os: ObjectOutputStream, - logger: Logger, - eventHandler: EventHandler) { - import ForkTags._ - @annotation.tailrec - def react(): Unit = { - is.readObject() match { - case `Done` => - os.writeObject(Done) - os.flush() - case Array(`Error`, s: String) => - logger.error(s) - react() - case Array(`Warn`, s: String) => - logger.warn(s) - react() - case Array(`Info`, s: String) => - logger.info(s) - react() - case Array(`Debug`, s: String) => - logger.debug(s) - react() - case t: Throwable => - logger.trace(t) - react() - case Array(_: String, tEvents: Array[Event]) => - tEvents.foreach(eventHandler.handle) - react() + def closeServer(t: Option[Throwable]) = Task { + t.foreach { + case NonFatal(e) => + logger.error(s"Unexpected error during remote test execution: '${e.getMessage}'.") + logger.trace(e) + case _ => + } + + server.close() + opts.ngout.println("Test server has been successfully closed.") } + + val listener = testListeningTask + .doOnCancel(closeServer(None)) + .doOnFinish(closeServer(_)) + + TestOrchestrator(Task.fromFuture(serverStarted.future), listener) } } diff --git a/frontend/src/test/scala/bloop/exec/ForkProcessSpec.scala b/frontend/src/test/scala/bloop/exec/ForkerSpec.scala similarity index 63% rename from frontend/src/test/scala/bloop/exec/ForkProcessSpec.scala rename to frontend/src/test/scala/bloop/exec/ForkerSpec.scala index a13ba75b76..3472d1d790 100644 --- a/frontend/src/test/scala/bloop/exec/ForkProcessSpec.scala +++ b/frontend/src/test/scala/bloop/exec/ForkerSpec.scala @@ -1,18 +1,22 @@ package bloop.exec +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import java.nio.file.{Files, Path} +import java.util.concurrent.TimeUnit import org.junit.Test import org.junit.Assert.{assertEquals, assertNotEquals} import org.junit.experimental.categories.Category - import bloop.logging.RecordingLogger import bloop.io.AbsolutePath import bloop.tasks.TestUtil import bloop.tasks.TestUtil.withTemporaryDirectory +import scala.concurrent.duration.Duration + @Category(Array(classOf[bloop.FastTests])) -class ForkProcessSpec { +class ForkerSpec { val packageName = "foo.bar" val mainClassName = "Main" @@ -31,8 +35,7 @@ class ForkProcessSpec { } val dependencies = Map.empty[String, Set[String]] - val runnableProject = Map( - TestUtil.RootProject -> Map("A.scala" -> ArtificialSources.`A.scala`)) + val runnableProject = Map(TestUtil.RootProject -> Map("A.scala" -> ArtificialSources.`A.scala`)) private def run(cwd: Path, args: Array[String])(op: (Int, List[(String, String)]) => Unit): Unit = TestUtil.checkAfterCleanCompilation(runnableProject, dependencies) { state => @@ -40,14 +43,47 @@ class ForkProcessSpec { val project = TestUtil.getProject(TestUtil.RootProject, state) val env = JavaEnv.default val classpath = project.classpath - val config = ForkProcess(env, classpath) + val config = Forker(env, classpath) val logger = new RecordingLogger - val userEnv = TestUtil.runAndTestProperties - val exitCode = config.runMain(cwdPath, s"$packageName.$mainClassName", args, logger, userEnv) + val opts = state.commonOptions.copy(env = TestUtil.runAndTestProperties) + val mainClass = s"$packageName.$mainClassName" + val wait = Duration.apply(15, TimeUnit.SECONDS) + val exitCode = + TestUtil.await(wait)(config.runMain(cwdPath, mainClass, args, logger.asVerbose, opts)) val messages = logger.getMessages op(exitCode, messages) } + @Test + def detectNewLines(): Unit = { + val nl = System.lineSeparator() + val remaining = new StringBuilder() + val msg = ByteBuffer.wrap(" ".getBytes(StandardCharsets.UTF_8)) + val lines = Forker.linesFrom(msg, remaining) + assert(lines.length == 0) + assert(remaining.mkString == " ") + + val msg2 = ByteBuffer.wrap(s"Hello${nl}World!$nl".getBytes(StandardCharsets.UTF_8)) + val lines2 = Forker.linesFrom(msg2, remaining) + assert(lines2.length == 2) + assert(lines2(0) == " Hello") + assert(lines2(1) == "World!") + assert(remaining.mkString.isEmpty) + + val msg3 = ByteBuffer.wrap(s"${nl}${nl}asdf".getBytes(StandardCharsets.UTF_8)) + val lines3 = Forker.linesFrom(msg3, remaining) + assert(lines3.length == 2) + assert(lines3(0) == "") + assert(lines3(1) == "") + assert(remaining.mkString == "asdf") + + val msg4 = ByteBuffer.wrap(s"${nl}this is SPARTA${nl}".getBytes(StandardCharsets.UTF_8)) + val lines4 = Forker.linesFrom(msg4, remaining) + assert(lines4.length == 2) + assert(lines4(0) == "asdf") + assert(lines4(1) == "this is SPARTA") + } + @Test def canRun(): Unit = withTemporaryDirectory { tmp => run(tmp, Array("foo", "bar", "baz")) { @@ -77,7 +113,7 @@ class ForkProcessSpec { case ("error", msg) => msg.contains(nonExisting) case _ => false } - assertEquals(ForkProcess.EXIT_ERROR, exitCode.toLong) + assertEquals(Forker.EXIT_ERROR, exitCode.toLong) assert(messages.exists(expected), s"Couldn't find expected error messages in $messages") } } diff --git a/frontend/src/test/scala/bloop/tasks/RunTasksSpec.scala b/frontend/src/test/scala/bloop/tasks/RunTasksSpec.scala index c2870704b7..da8f17e783 100644 --- a/frontend/src/test/scala/bloop/tasks/RunTasksSpec.scala +++ b/frontend/src/test/scala/bloop/tasks/RunTasksSpec.scala @@ -1,20 +1,21 @@ package bloop.tasks +import java.io.ByteArrayInputStream +import java.nio.charset.StandardCharsets +import java.util.concurrent.TimeUnit + +import bloop.ScalaInstance import org.junit.Test import org.junit.Assert.assertEquals import org.junit.experimental.categories.Category - import bloop.cli.Commands -import bloop.engine.{Interpreter, Run, State} +import bloop.engine.{Run, State} import bloop.engine.tasks.Tasks import bloop.exec.JavaEnv -import bloop.logging.{ProcessLogger, RecordingLogger} -import bloop.tasks.TestUtil.{ - checkAfterCleanCompilation, - getProject, - loadTestProject, - runAndCheck -} +import bloop.logging.RecordingLogger +import bloop.tasks.TestUtil.{checkAfterCleanCompilation, getProject, loadTestProject, runAndCheck} + +import scala.concurrent.duration.Duration @Category(Array(classOf[bloop.FastTests])) class RunTasksSpec { @@ -149,4 +150,39 @@ class RunTasksSpec { } } + @Test + def canRunApplicationThatRequiresInput = { + object Sources { + val `A.scala` = + """object Foo { + | def main(args: Array[String]): Unit = { + | println("Hello, World! I'm waiting for input") + | println(new java.util.Scanner(System.in).nextLine()) + | println("I'm done!") + | } + |} + """.stripMargin + } + + val logger = new RecordingLogger + val structure = Map("A" -> Map("A.scala" -> Sources.`A.scala`)) + val scalaInstance: ScalaInstance = CompilationHelpers.scalaInstance + val javaEnv: JavaEnv = JavaEnv.default + TestUtil.withState(structure, Map.empty, scalaInstance = scalaInstance, javaEnv = javaEnv) { + (state0: State) => + // It has to contain a new line for the process to finish! ;) + val ourInputStream = new ByteArrayInputStream("Hello!\n".getBytes(StandardCharsets.UTF_8)) + val hijackedCommonOptions = state0.commonOptions.copy(in = ourInputStream) + val state = state0.copy(logger = logger).copy(commonOptions = hijackedCommonOptions) + val projects = state.build.projects + val projectA = getProject("A", state) + val action = Run(Commands.Run("A")) + val duration = Duration.apply(15, TimeUnit.SECONDS) + def msgs = logger.getMessages + val compiledState = + try TestUtil.blockingExecute(action, state, duration) + catch { case t: Throwable => println(msgs.mkString("\n")); throw t} + assert(compiledState.status.isOk) + } + } } diff --git a/frontend/src/test/scala/bloop/tasks/TestTaskTest.scala b/frontend/src/test/scala/bloop/tasks/TestTaskTest.scala index de283bdbb6..7094bb63b4 100644 --- a/frontend/src/test/scala/bloop/tasks/TestTaskTest.scala +++ b/frontend/src/test/scala/bloop/tasks/TestTaskTest.scala @@ -1,5 +1,6 @@ package bloop.tasks +import java.util.concurrent.TimeUnit import java.util.{Arrays, Collection} import org.junit.Assert.{assertEquals, assertTrue} @@ -9,8 +10,9 @@ import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.runners.Parameterized.Parameters import bloop.Project +import bloop.cli.CommonOptions import bloop.engine.{ExecutionContext, State} -import bloop.exec.{ForkProcess, JavaEnv} +import bloop.exec.{Forker, JavaEnv} import bloop.io.AbsolutePath import bloop.reporter.ReporterConfig import sbt.testing.Framework @@ -25,15 +27,6 @@ object TestTaskTest { // Test that frameworks are class-loaded, detected and that test classes exist and can be run. val frameworkNames = Array("ScalaTest", "ScalaCheck", "Specs2", "UTest", "JUnit") - @Parameters - def data(): Collection[Array[String]] = { - Arrays.asList(frameworkNames.map(Array.apply(_)): _*) - } -} - -@Category(Array(classOf[bloop.SlowTests])) -@RunWith(classOf[Parameterized]) -class TestTaskTest(framework: String) { private val TestProjectName = "with-tests" private val (testState: State, testProject: Project, testAnalysis: CompileAnalysis) = { import bloop.util.JavaCompat.EnrichOptional @@ -47,10 +40,26 @@ class TestTaskTest(framework: String) { (state, project, analysis) } + @Parameters + def data(): Collection[Array[Object]] = { + Arrays.asList( + frameworkNames.map(n => Array[AnyRef](n, testState, testProject, testAnalysis)): _*) + } +} + +@Category(Array(classOf[bloop.SlowTests])) +@RunWith(classOf[Parameterized]) +class TestTaskTest( + framework: String, + testState: State, + testProject: Project, + testAnalysis: CompileAnalysis +) { + @Test def testSuffixCanBeOmitted = { - val expectedName = TestProjectName + "-test" - val withoutSuffix = Tasks.pickTestProject(TestProjectName, testState) + val expectedName = testProject.name + val withoutSuffix = Tasks.pickTestProject(expectedName.stripSuffix("-test"), testState) val withSuffix = Tasks.pickTestProject(expectedName, testState) assertTrue("Couldn't find the project without suffix", withoutSuffix.isDefined) assertEquals(expectedName, withoutSuffix.get.name) @@ -58,13 +67,13 @@ class TestTaskTest(framework: String) { assertEquals(expectedName, withSuffix.get.name) } - private val processRunnerConfig: ForkProcess = { + private val processRunnerConfig: Forker = { val javaEnv = JavaEnv.default val classpath = testProject.classpath - ForkProcess(javaEnv, classpath) + Forker(javaEnv, classpath) } - private def testLoader(fork: ForkProcess): ClassLoader = { + private def testLoader(fork: Forker): ClassLoader = { fork.toExecutionClassLoader(Some(TestInternals.filteredLoader)) } @@ -88,8 +97,11 @@ class TestTaskTest(framework: String) { Seq(framework -> filteredDefs) }.toMap val discoveredTests = DiscoveredTests(classLoader, tests) - val env = TestUtil.runAndTestProperties - TestInternals.executeTasks(cwd, config, discoveredTests, Nil, Tasks.handler, logger, env) + val opts = CommonOptions.default.copy(env = TestUtil.runAndTestProperties) + val exitCode = TestUtil.await(Duration.apply(15, TimeUnit.SECONDS)) { + TestInternals.execute(cwd, config, discoveredTests, Nil, Tasks.handler, logger, opts) + } + assert(exitCode == 0) } } } diff --git a/frontend/src/test/scala/bloop/tasks/TestUtil.scala b/frontend/src/test/scala/bloop/tasks/TestUtil.scala index 34ca2e853e..8888c0f045 100644 --- a/frontend/src/test/scala/bloop/tasks/TestUtil.scala +++ b/frontend/src/test/scala/bloop/tasks/TestUtil.scala @@ -61,12 +61,23 @@ object TestUtil { } } - def blockingExecute(a: Action, state: State): State = { + def await[T](duration: Duration)(t: Task[T]): T = { + val handle = t + .executeWithOptions(_.enableAutoCancelableRunLoops) + .runAsync(ExecutionContext.scheduler) + try Await.result(handle, duration) + catch { + case NonFatal(t) => handle.cancel(); throw t + case i: InterruptedException => handle.cancel(); throw i + } + } + + def blockingExecute(a: Action, state: State, duration: Duration = Duration.Inf): State = { val handle = Interpreter .execute(a, Task.now(state)) .executeWithOptions(_.enableAutoCancelableRunLoops) .runAsync(ExecutionContext.scheduler) - try Await.result(handle, Duration.Inf) + try Await.result(handle, duration) catch { case NonFatal(t) => handle.cancel(); throw t case i: InterruptedException => handle.cancel(); state diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 6fcaa6f409..9cba82de44 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -21,6 +21,7 @@ object Dependencies { val monixVersion = "2.3.3" val metaconfigVersion = "0.6.0" val circeVersion = "0.9.3" + val nuprocessVersion = "1.1.3" import sbt.librarymanagement.syntax.stringToOrganization val zinc = "ch.epfl.scala" %% "zinc" % zincVersion @@ -57,4 +58,5 @@ object Dependencies { val metaconfigDocs = "com.geirsson" %% "metaconfig-docs" % metaconfigVersion val circeCore = "io.circe" %% "circe-core" % circeVersion val circeGeneric = "io.circe" %% "circe-generic" % circeVersion + val nuprocess = "com.zaxxer" % "nuprocess" % nuprocessVersion }