Skip to content

Commit

Permalink
Merge pull request #345 from scalacenter/topic/cancel-gracefully
Browse files Browse the repository at this point in the history
Redesign file watching and add client listeners
  • Loading branch information
Duhemm authored Mar 14, 2018
2 parents babe062 + 4921792 commit 072b2fe
Show file tree
Hide file tree
Showing 16 changed files with 250 additions and 76 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ bin/.scalafmt*
.zinc/
.nailgun/

# Directory in which to install locally bloop binaries to test them
.devbloop/

# zinc uses this local cache for publishing stuff
.ivy2/

Expand Down
1 change: 1 addition & 0 deletions backend/src/main/scala/bloop/Compiler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import bloop.logging.Logger
import sbt.internal.inc.{FreshCompilerCache, Locate, LoggedReporter, ZincUtil}

case class CompileInputs(

scalaInstance: ScalaInstance,
compilerCache: CompilerCache,
sourceDirectories: Array[AbsolutePath],
Expand Down
5 changes: 2 additions & 3 deletions backend/src/main/scala/bloop/logging/BloopLogger.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package bloop.logging

import java.io.{FileOutputStream, OutputStream, PrintStream}
import java.io.PrintStream
import java.util.concurrent.atomic.AtomicInteger

import scala.Console.{CYAN, GREEN, RED, RESET, YELLOW}

import bloop.io.AbsolutePath
import com.martiansoftware.nailgun.NGCommunicator

/**
* Creates a logger that writes to the given streams.
Expand Down
7 changes: 3 additions & 4 deletions frontend/src/main/scala/bloop/Bloop.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package bloop

import bloop.cli.{CliOptions, Commands, ExitStatus}
import bloop.cli.CliParsers.{inputStreamRead, printStreamRead, OptionsParser, pathParser}
import bloop.engine.{Build, Exit, Interpreter, Run, State}
import bloop.cli.CliParsers.{OptionsParser, inputStreamRead, pathParser, printStreamRead}
import bloop.engine.{Build, Exit, Interpreter, NoPool, Run, State}
import bloop.engine.tasks.Tasks
import bloop.io.AbsolutePath
import bloop.io.Timer.timed
import bloop.logging.BloopLogger
import jline.console.ConsoleReader
import caseapp.{CaseApp, RemainingArgs}

import jline.console.ConsoleReader

import scala.annotation.tailrec
Expand All @@ -27,7 +26,7 @@ object Bloop extends CaseApp[CliOptions] {
logger.verboseIf(options.verbose) {
val projects = Project.fromDir(configDirectory, logger)
val build = Build(configDirectory, projects)
val state = State(build, options.common, logger)
val state = State(build, NoPool, options.common, logger)
run(state)
}
}
Expand Down
21 changes: 12 additions & 9 deletions frontend/src/main/scala/bloop/Cli.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,28 @@ package bloop
import bloop.bsp.BspServer
import bloop.cli.validation.Validate
import bloop.cli.{CliOptions, CliParsers, Commands, CommonOptions, ExitStatus}
import bloop.engine.{Action, Exit, Interpreter, Print, Run, State}
import bloop.io.Paths
import bloop.logging.{BloopLogger, Logger}
import bloop.engine.{Action, ClientPool, Exit, Interpreter, NailgunPool, NoPool, Print, Run, State}
import bloop.logging.BloopLogger
import caseapp.core.{DefaultBaseCommand, Messages}
import com.martiansoftware.nailgun
import com.martiansoftware.nailgun.NGContext

class Cli
object Cli {
def main(args: Array[String]): Unit = {
State.setUpShutdownHoook()
val action = parse(args, CommonOptions.default)
val exitStatus = run(action)
val exitStatus = run(action, NoPool)
sys.exit(exitStatus.code)
}

def nailMain(ngContext: nailgun.NGContext): Unit = {
def nailMain(ngContext: NGContext): Unit = {
val server = ngContext.getNGServer
val nailgunOptions = CommonOptions(
in = ngContext.in,
out = ngContext.out,
err = ngContext.err,
ngout = server.out,
ngerr = server.err,
workingDirectory = ngContext.getWorkingDirectory,
)
val command = ngContext.getCommand
Expand All @@ -36,7 +38,8 @@ object Cli {
printErrorAndExit(helpAsked, nailgunOptions)
else parse(args, nailgunOptions)
}
val exitStatus = run(cmd)

val exitStatus = run(cmd, NailgunPool(ngContext))
ngContext.exit(exitStatus.code)
}

Expand Down Expand Up @@ -188,7 +191,7 @@ object Cli {
}
}

def run(action: Action): ExitStatus = {
def run(action: Action, pool: ClientPool): ExitStatus = {
import bloop.io.AbsolutePath
def getConfigDir(cliOptions: CliOptions): AbsolutePath = {
cliOptions.configDir
Expand All @@ -205,7 +208,7 @@ object Cli {
val commonOpts = cliOptions.common
val configDirectory = getConfigDir(cliOptions)
val logger = BloopLogger.at(configDirectory.syntax, commonOpts.out, commonOpts.err)
val state = State.loadActiveStateFor(configDirectory, cliOptions.common, logger)
val state = State.loadActiveStateFor(configDirectory, pool, cliOptions.common, logger)
val newState = Interpreter.execute(action, state)
State.stateCache.updateBuild(newState.copy(status = ExitStatus.Ok))
newState.status
Expand Down
6 changes: 5 additions & 1 deletion frontend/src/main/scala/bloop/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ class Server
object Server {
private val defaultPort: Int = 8212 // 8100 + 'p'
def main(args: Array[String]): Unit = {
run(instantiateServer(args))
}

private[bloop] def instantiateServer(args: Array[String]): NGServer = {
val port = Try(args(0).toInt).getOrElse(Server.defaultPort)
val addr = InetAddress.getLoopbackAddress
val server = new NGServer(addr, port)
registerAliases(server)
run(server)
server
}

def nailMain(ngContext: NGContext): Unit = {
Expand Down
3 changes: 2 additions & 1 deletion frontend/src/main/scala/bloop/bsp/BloopBspServices.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ class BloopBspServices(callSiteState: State, client: JsonRpcClient, bspLogger: L

private final val defaultOpts = callSiteState.commonOptions
def reloadState(uri: String): State = {
val state0 = State.loadActiveStateFor(AbsolutePath(uri), defaultOpts, bspForwarderLogger)
val pool = callSiteState.pool
val state0 = State.loadActiveStateFor(AbsolutePath(uri), pool, defaultOpts, bspForwarderLogger)
state0.copy(logger = bspForwarderLogger, commonOptions = latestState.commonOptions)
}

Expand Down
2 changes: 2 additions & 0 deletions frontend/src/main/scala/bloop/cli/CommonOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ case class CommonOptions(
@Hidden out: PrintStream = System.out,
@Hidden in: InputStream = System.in,
@Hidden err: PrintStream = System.err,
@Hidden ngout: PrintStream = System.out,
@Hidden ngerr: PrintStream = System.err,
threads: Int = ExecutionContext.nCPUs
) {
def workingPath: AbsolutePath = AbsolutePath(workingDirectory)
Expand Down
38 changes: 38 additions & 0 deletions frontend/src/main/scala/bloop/engine/ClientPool.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package bloop.engine

import com.martiansoftware.nailgun.{NGClientDisconnectReason, NGClientListener, NGContext}

sealed trait CloseEvent
case object Heartbeat extends CloseEvent
case object SocketError extends CloseEvent
case object SocketTimeout extends CloseEvent
case object SessionShutdown extends CloseEvent
case object InternalError extends CloseEvent

sealed trait ClientPool {
def addListener(f: CloseEvent => Unit): Unit
def removeAllListeners(): Unit
}

case object NoPool extends ClientPool {
override def addListener(f: CloseEvent => Unit): Unit = ()
override def removeAllListeners(): Unit = ()
}

case class NailgunPool(context: NGContext) extends ClientPool {
override def addListener(f: CloseEvent => Unit): Unit = {
context.addClientListener(new NGClientListener {
override def clientDisconnected(reason: NGClientDisconnectReason): Unit = {
f(reason match {
case NGClientDisconnectReason.HEARTBEAT => Heartbeat
case NGClientDisconnectReason.SOCKET_ERROR => SocketError
case NGClientDisconnectReason.SOCKET_TIMEOUT => SocketTimeout
case NGClientDisconnectReason.SESSION_SHUTDOWN => SessionShutdown
case NGClientDisconnectReason.INTERNAL_ERROR => InternalError
})
}
})
}

override def removeAllListeners(): Unit = context.removeAllClientListeners()
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ object ExecutionContext {
}

implicit val scheduler: Scheduler = Scheduler(executor)
implicit val ioScheduler: Scheduler = Scheduler.io("bloop-io")
}
45 changes: 33 additions & 12 deletions frontend/src/main/scala/bloop/engine/Interpreter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import scala.concurrent.duration.Duration
object Interpreter {
@tailrec
def execute(action: Action, state: State): State = {
def logAndTime(cliOptions: CliOptions, action: Task[State]): State = {
state.logger.verboseIf(cliOptions.verbose)(timed(state.logger)(waitAndLog(state, action)))
def logAndTime(cliOptions: CliOptions, task: Task[State]): State = {
state.logger.verboseIf(cliOptions.verbose)(
timed(state.logger)(waitAndLog(action, cliOptions, state, task))
)
}

action match {
Expand Down Expand Up @@ -86,15 +88,12 @@ object Interpreter {
BspServer.run(cmd, state, scheduler)
}

private def watch(project: Project, state: State, f: State => Task[State]): Task[State] = Task {
// TODO(jvican): The implementation here could be improved, do so.
private def watch(project: Project, state: State, f: State => Task[State]): Task[State] = {
val reachable = Dag.dfs(state.build.getDagFor(project))
val allSourceDirs = reachable.iterator.flatMap(_.sourceDirectories.toList).map(_.underlying)
val watcher = new SourceWatcher(allSourceDirs.toList, state.logger)
// Make file watching cancel tasks if lots of different changes happen in less than 100ms
val watchFn: State => State = { state => waitAndLog(state, f(state)) }
val firstOp = watchFn(state)
watcher.watch(firstOp, watchFn)
val watcher = new SourceWatcher(project, allSourceDirs.toList, state.logger)
// Force the first execution before relying on the file watching task
f(state).flatMap(newState => watcher.watch(newState, f))
}

private def compile(cmd: Commands.Compile, state: State): Task[State] = {
Expand Down Expand Up @@ -252,10 +251,32 @@ object Interpreter {
state.mergeStatus(ExitStatus.InvalidCommandLineOption)
}

private def waitAndLog(previousState: State, newState: Task[State]): State = {
private def waitAndLog(
action: Action,
cliOptions: CliOptions,
previousState: State,
newState: Task[State]
): State = {
val pool = previousState.pool
val ngout = cliOptions.common.ngout
try {
// Duration has to be infinity, we cannot predict how much time compilation takes
Await.result(newState.runAsync(previousState.scheduler), Duration.Inf)
val handle = newState
.executeWithOptions(_.enableAutoCancelableRunLoops)
.runAsync(previousState.scheduler)

// Let's cancel tasks (if supported by the underlying implementation) when clients disconnect
pool.addListener {
case e: CloseEvent =>
if (!handle.isCompleted) {
ngout.println(
s"Client in ${previousState.build.origin.syntax} has disconnected with a '$e' event. Cancelling tasks...")
handle.cancel()
}
}

val result = Await.result(handle, Duration.Inf)
ngout.println(s"The task for $action finished.")
result
} catch {
case NonFatal(t) =>
previousState.logger.error(t.getMessage)
Expand Down
41 changes: 28 additions & 13 deletions frontend/src/main/scala/bloop/engine/State.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ final case class State private (
build: Build,
results: ResultsCache,
compilerCache: CompilerCache,
pool: ClientPool,
commonOptions: CommonOptions,
status: ExitStatus,
logger: Logger
Expand All @@ -51,20 +52,19 @@ object State {
}

private[bloop] def forTests(build: Build, compilerCache: CompilerCache, logger: Logger): State = {
val initializedResults = build.projects.foldLeft(ResultsCache.getEmpty(logger)) {
val results0 = build.projects.foldLeft(ResultsCache.getEmpty(logger)) {
case (results, project) => results.initializeResult(project)
}
State(build, initializedResults, compilerCache, CommonOptions.default, ExitStatus.Ok, logger)
State(build, results0, compilerCache, NoPool, CommonOptions.default, ExitStatus.Ok, logger)
}

// Improve the caching by using file metadata
def apply(build: Build, options: CommonOptions, logger: Logger): State = {
def apply(build: Build, pool: ClientPool, opts: CommonOptions, logger: Logger): State = {
val results = build.projects.foldLeft(ResultsCache.getEmpty(logger)) {
case (results, project) => results.initializeResult(project)
}

val compilerCache = getCompilerCache(logger)
State(build, results, compilerCache, options, ExitStatus.Ok, logger)
State(build, results, compilerCache, pool, opts, ExitStatus.Ok, logger)
}

def setUpShutdownHoook(): Unit = {
Expand Down Expand Up @@ -100,13 +100,28 @@ object State {

import bloop.Project
import bloop.io.AbsolutePath
def loadActiveStateFor(configDir: AbsolutePath, opts: CommonOptions, logger: Logger): State = {
State.stateCache
.addIfMissing(configDir, path => {
val projects = Project.fromDir(configDir, logger)
val build: Build = Build(configDir, projects)
State(build, opts, logger)
})
.copy(logger = logger)

/**
* Loads an state active for the given configuration directory.
*
* @param configDir The configuration directory to load a state for.
* @param pool The pool of listeners that are connected to this client.
* @param opts The common options associated with the state.
* @param logger The logger to be used to instantiate the state.
* @return An state (cached or not) associated with the configuration directory.
*/
def loadActiveStateFor(
configDir: AbsolutePath,
pool: ClientPool,
opts: CommonOptions,
logger: Logger
): State = {
val cached = State.stateCache.addIfMissing(configDir, path => {
val projects = Project.fromDir(configDir, logger)
val build: Build = Build(configDir, projects)
State(build, pool, opts, logger)
})

cached.copy(pool = pool, commonOptions = opts, logger = logger)
}
}
Loading

0 comments on commit 072b2fe

Please sign in to comment.