Skip to content

Commit

Permalink
feat(daemon): clean (#5152)
Browse files Browse the repository at this point in the history
Co-authored-by: Alexander Lyon <arlyon@me.com>
  • Loading branch information
tknickman and arlyon committed May 31, 2023
1 parent 94fb7b7 commit 83517cd
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 11 deletions.
7 changes: 6 additions & 1 deletion cli/internal/daemon/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,15 @@ type ConnectionError struct {
}

func (ce *ConnectionError) Error() string {
return fmt.Sprintf(`connection to turbo daemon process failed. Please ensure the following:
return fmt.Sprintf(`connection to turbo daemon process failed.
To quickly resolve the issue, try running:
- $ turbo daemon clean
To debug further - please ensure the following:
- the process identified by the pid in the file at %v is not running, and remove %v
- check the logs at %v
- the unix domain socket at %v has been removed
You can also run without the daemon process by passing --no-daemon`, ce.PidPath, ce.PidPath, ce.LogPath, ce.SockPath)
}

Expand Down
4 changes: 2 additions & 2 deletions crates/turborepo-globwatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,13 @@ impl GlobWatcher {
{
warn!("failed to watch flush dir: {}", e);
if setup_broadcaster.send(Some(false)).is_err() {
trace!("setup channel shut down before setup completed");
trace!("failed to notify failed flush watch");
}
Err(e)
} else {
trace!("watching flush dir: {:?}", path);
if setup_broadcaster.send(Some(true)).is_err() {
trace!("setup channel shut down before setup completed");
trace!("failed to notify successful flush watch");
}
Ok(())
}
Expand Down
3 changes: 3 additions & 0 deletions crates/turborepo-lib/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ pub enum DaemonCommand {
},
/// Stops the turbo daemon
Stop,
/// Stops the turbo daemon if it is already running, and removes any stale
/// daemon state
Clean,
}

#[derive(Copy, Clone, Debug, PartialEq, Serialize, ValueEnum)]
Expand Down
60 changes: 56 additions & 4 deletions crates/turborepo-lib/src/commands/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,33 @@ pub async fn daemon_client(command: &DaemonCommand, base: &CommandBase) -> Resul
DaemonCommand::Status { .. } => (false, false),
DaemonCommand::Restart | DaemonCommand::Stop => (false, true),
DaemonCommand::Start => (true, true),
DaemonCommand::Clean => (false, true),
};

let pid_file = base.daemon_file_root().join_component("turbod.pid");
let sock_file = base.daemon_file_root().join_component("turbod.sock");

let connector = DaemonConnector {
can_start_server,
can_kill_server,
pid_file: base.daemon_file_root().join_component("turbod.pid"),
sock_file: base.daemon_file_root().join_component("turbod.sock"),
pid_file: pid_file.clone(),
sock_file: sock_file.clone(),
};

let mut client = connector.connect().await?;

match command {
DaemonCommand::Restart => {
let client = connector.connect().await?;
client.restart().await?;
}
// connector.connect will have already started the daemon if needed,
// so this is a no-op
DaemonCommand::Start => {}
DaemonCommand::Stop => {
let client = connector.connect().await?;
client.stop().await?;
}
DaemonCommand::Status { json } => {
let mut client = connector.connect().await?;
let status = client.status().await?;
let log_file = log_filename(&status.log_file)?;
let status = DaemonStatus {
Expand All @@ -60,6 +65,53 @@ pub async fn daemon_client(command: &DaemonCommand, base: &CommandBase) -> Resul
println!("Daemon socket file: {}", status.sock_file.to_string_lossy());
}
}
DaemonCommand::Clean => {
// try to connect and shutdown the daemon
let client = connector.connect().await;
match client {
Ok(client) => match client.stop().await {
Ok(_) => {
tracing::trace!("successfully stopped the daemon");
}
Err(e) => {
tracing::trace!("unable to stop the daemon: {:?}", e);
}
},
Err(e) => {
tracing::trace!("unable to connect to the daemon: {:?}", e);
}
}

// remove pid and sock files
let mut success = true;
trace!("cleaning up daemon files");
// if the pid_file and sock_file still exist, remove them:
if pid_file.exists() {
let result = std::fs::remove_file(pid_file.clone());
// ignore this error
if let Err(e) = result {
println!("Failed to remove pid file: {}", e);
println!("Please remove manually: {}", pid_file);
success = false;
}
}
if sock_file.exists() {
let result = std::fs::remove_file(sock_file.clone());
// ignore this error
if let Err(e) = result {
println!("Failed to remove socket file: {}", e);
println!("Please remove manually: {}", sock_file);
success = false;
}
}

if success {
println!("Done");
} else {
// return error
return Err(DaemonError::CleanFailed);
}
}
};

Ok(())
Expand Down
3 changes: 3 additions & 0 deletions crates/turborepo-lib/src/daemon/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ pub enum DaemonError {

#[error("unable to construct log file name: {0}")]
InvalidLogFile(#[from] time::Error),

#[error("unable to complete daemon clean")]
CleanFailed,
}

impl From<Status> for DaemonError {
Expand Down
9 changes: 7 additions & 2 deletions crates/turborepo-lib/src/daemon/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub struct DaemonConnector {

impl DaemonConnector {
const CONNECT_RETRY_MAX: usize = 3;
const CONNECT_TIMEOUT: Duration = Duration::from_secs(1);
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(1);
const SOCKET_TIMEOUT: Duration = Duration::from_secs(1);
const SOCKET_ERROR_WAIT: Duration = Duration::from_millis(50);
Expand Down Expand Up @@ -196,15 +197,16 @@ impl DaemonConnector {
// note, this endpoint is just a dummy. the actual path is passed in
Endpoint::try_from("http://[::]:50051")
.expect("this is a valid uri")
.timeout(Duration::from_secs(1))
.timeout(Self::CONNECT_TIMEOUT)
.connect_with_connector(tower::service_fn(make_service))
.await
.map(TurbodClient::new)
.map_err(DaemonConnectorError::Socket)
}

/// Kills a currently active server but shutting it down and waiting for it
/// Kills a currently active server by shutting it down and waiting for it
/// to exit.
#[tracing::instrument(skip(self, client))]
async fn kill_live_server(
&self,
client: DaemonClient<()>,
Expand All @@ -226,6 +228,7 @@ impl DaemonConnector {
}

/// Kills a server that is not responding.
#[tracing::instrument(skip(self))]
async fn kill_dead_server(&self, pid: sysinfo::Pid) -> Result<(), DaemonConnectorError> {
let lock = self.pid_lock();

Expand Down Expand Up @@ -254,6 +257,7 @@ impl DaemonConnector {
}
}

#[tracing::instrument(skip(self))]
async fn wait_for_socket(&self) -> Result<(), DaemonConnectorError> {
timeout(
Self::SOCKET_TIMEOUT,
Expand Down Expand Up @@ -292,6 +296,7 @@ pub enum FileWaitError {
///
/// It does this by watching the parent directory of the path, and waiting for
/// events on that path.
#[tracing::instrument(skip(path))]
async fn wait_for_file(
path: &turbopath::AbsoluteSystemPathBuf,
action: WaitAction,
Expand Down
8 changes: 8 additions & 0 deletions crates/turborepo-lib/src/daemon/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub async fn listen_socket(

trace!("acquiring pidlock");
// this will fail if the pid is already owned
// todo: make sure we fall back and handle this
lock.acquire()?;
std::fs::remove_file(&sock_path).ok();

Expand Down Expand Up @@ -204,6 +205,9 @@ mod test {
let result = listen_socket(pid_path, running).await;

// Note: PidLock doesn't implement Debug, so we can't unwrap_err()

// todo: update this test to gracefully connect if the lock file exists but has
// no process
if let Err(err) = result {
assert_matches!(err, SocketOpenError::LockError(PidlockError::LockExists(_)));
} else {
Expand Down Expand Up @@ -231,6 +235,10 @@ mod test {
let result = listen_socket(pid_path, running).await;

// Note: PidLock doesn't implement Debug, so we can't unwrap_err()

// todo: update this test. we should delete the socket file first, remove the
// pid file, and start a new daemon. the old one should just time
// out, and this should not error.
if let Err(err) = result {
assert_matches!(err, SocketOpenError::LockError(PidlockError::LockExists(_)));
} else {
Expand Down
4 changes: 3 additions & 1 deletion crates/turborepo-lib/src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tokio::{
};
use tonic::transport::{NamedService, Server};
use tower::ServiceBuilder;
use tracing::error;
use tracing::{error, trace};
use turbopath::AbsoluteSystemPathBuf;

use super::{
Expand Down Expand Up @@ -177,6 +177,8 @@ impl<T: Watcher + Send + 'static> DaemonServer<T> {
Err(e) => return CloseReason::SocketOpenError(e),
};

trace!("acquired connection stream for socket");

let service = ServiceBuilder::new()
.layer(BumpTimeoutLayer::new(self.timeout.clone()))
.service(crate::daemon::proto::turbod_server::TurbodServer::new(self));
Expand Down
4 changes: 3 additions & 1 deletion crates/turborepo-lib/src/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ impl TurboSubscriber {
let (file_writer, guard) = tracing_appender::non_blocking(appender);
trace!("created non-blocking file writer");

let layer = tracing_subscriber::fmt::layer().with_writer(file_writer);
let layer = tracing_subscriber::fmt::layer()
.with_writer(file_writer)
.with_ansi(false);

self.update.reload(Some(layer))?;
self.guard.lock().expect("not poisoned").replace(guard);
Expand Down

0 comments on commit 83517cd

Please sign in to comment.