diff --git a/hydro_deploy/core/Cargo.toml b/hydro_deploy/core/Cargo.toml index 99fc1e741d3c..4fbd0221c298 100644 --- a/hydro_deploy/core/Cargo.toml +++ b/hydro_deploy/core/Cargo.toml @@ -23,6 +23,8 @@ futures = "0.3.26" futures-core = "0.3.26" hydroflow_cli_integration = { path = "../hydroflow_cli_integration", version = "^0.5.1" } indicatif = "0.17.6" +kube = { version = "0.90.0", features = ["derive", "runtime", "ws"] } +k8s-openapi = { version = "0.21.1", features = ["latest"] } nanoid = "0.4.0" nix = "0.26.2" once_cell = "1.17" @@ -32,3 +34,4 @@ shell-escape = "0.1.5" tempfile = "3.3.0" tokio = { version = "1.16", features = [ "full" ] } tokio-util = { version = "0.7.7", features=[ "compat" ] } +tar = "0.4.40" diff --git a/hydro_deploy/core/src/deployment.rs b/hydro_deploy/core/src/deployment.rs index 4570f28d8489..96f338404580 100644 --- a/hydro_deploy/core/src/deployment.rs +++ b/hydro_deploy/core/src/deployment.rs @@ -7,7 +7,7 @@ use tokio::sync::RwLock; use super::gcp::GCPNetwork; use super::{ progress, CustomService, GCPComputeEngineHost, Host, LocalhostHost, ResourcePool, - ResourceResult, Service, + ResourceResult, Service, PodHost }; use crate::ServiceBuilder; @@ -31,6 +31,11 @@ impl Deployment { self.add_host(LocalhostHost::new) } + #[allow(non_snake_case)] + pub fn PodHost(&mut self) -> Arc> { + self.add_host(PodHost::new) + } + #[allow(non_snake_case)] pub fn GCPComputeEngineHost( &mut self, diff --git a/hydro_deploy/core/src/hydroflow_crate/build.rs b/hydro_deploy/core/src/hydroflow_crate/build.rs index a692ed07b9f8..3de17f1571d6 100644 --- a/hydro_deploy/core/src/hydroflow_crate/build.rs +++ b/hydro_deploy/core/src/hydroflow_crate/build.rs @@ -66,7 +66,7 @@ pub async fn build_crate( tokio::task::spawn_blocking(move || { let mut command = Command::new("cargo"); command.args([ - "build".to_string(), + "zigbuild".to_string(), "--profile".to_string(), profile.unwrap_or("release".to_string()), ]); diff --git a/hydro_deploy/core/src/hydroflow_crate/service.rs b/hydro_deploy/core/src/hydroflow_crate/service.rs index 9e3bdfa5936b..b101d4385046 100644 --- a/hydro_deploy/core/src/hydroflow_crate/service.rs +++ b/hydro_deploy/core/src/hydroflow_crate/service.rs @@ -291,6 +291,8 @@ impl Service for HydroflowCrateService { // request stdout before sending config so we don't miss the "ready" response let stdout_receiver = binary.write().await.cli_stdout().await; + ProgressTracker::println(format!("Service ready: {formatted_bind_config}\n").as_str()); + binary .write() .await @@ -308,6 +310,7 @@ impl Service for HydroflowCrateService { *self.server_defns.try_write().unwrap() = serde_json::from_str(ready_line.trim_start_matches("ready: ")).unwrap(); } else { + ProgressTracker::println(format!("Did not find ready. Instead found: {:?}", ready_line).as_str()); bail!("expected ready"); } diff --git a/hydro_deploy/core/src/lib.rs b/hydro_deploy/core/src/lib.rs index 31f8f3f00ce4..5b41b31e1e85 100644 --- a/hydro_deploy/core/src/lib.rs +++ b/hydro_deploy/core/src/lib.rs @@ -17,6 +17,9 @@ pub mod progress; pub mod localhost; pub use localhost::LocalhostHost; +pub mod kubernetes; +pub use kubernetes::PodHost; + pub mod ssh; pub mod gcp; diff --git a/hydro_deploy/core/src/ssh.rs b/hydro_deploy/core/src/ssh.rs index cd75e7100415..758a46119b5f 100644 --- a/hydro_deploy/core/src/ssh.rs +++ b/hydro_deploy/core/src/ssh.rs @@ -11,12 +11,22 @@ use async_ssh2_lite::ssh2::ErrorCode; use async_ssh2_lite::{AsyncChannel, AsyncSession, Error, SessionConfiguration}; use async_trait::async_trait; use futures::io::BufReader; -use futures::{AsyncBufReadExt, AsyncWriteExt, StreamExt}; +use futures::{AsyncBufReadExt, AsyncWriteExt, StreamExt, TryStreamExt, join}; use hydroflow_cli_integration::ServerBindConfig; use nanoid::nanoid; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::RwLock; +// use k8s_openapi::api::core::v1::Pod; +// use kube::{ +// api::{Api, AttachParams, AttachedProcess, DeleteParams, PostParams, ResourceExt, WatchEvent, WatchParams}, +// Client, +// }; + +// use tokio::io::AsyncWriteExt; + +// use kube::core::subresource::AttachParams; + use super::progress::ProgressTracker; use super::util::async_retry; use super::{LaunchedBinary, LaunchedHost, ResourceResult, ServerStrategy}; @@ -281,6 +291,7 @@ impl LaunchedHost for T { let user = self.ssh_user(); let binary_path = PathBuf::from(format!("/home/{user}/hydro-{unique_name}")); + // gets the ssh session for launching the binary let channel = ProgressTracker::leaf( format!("launching binary /home/{user}/hydro-{unique_name}"), async { @@ -311,19 +322,28 @@ impl LaunchedHost for T { ) .await?; + // stdin_sender is used by other functions to send data queued up for stdin_receiver + // stdin_sender = top of the "funnel", stdin_receiver = bottom (concurrency supported due to internal synchronization) let (stdin_sender, mut stdin_receiver) = async_channel::unbounded::(); - let mut stdin = channel.stream(0); // stream 0 is stdout/stdin, we use it for stdin + + // stream 0 is stdout/stdin, we use it for stdin. Note that we can't just return stdin directly because it doesn't support synchronization + let mut stdin = channel.stream(0); tokio::spawn(async move { + // Note that this while loop only wakes up when we get a result due to await while let Some(line) = stdin_receiver.next().await { if stdin.write_all(line.as_bytes()).await.is_err() { break; } + // flush the entire buffer stdin.flush().await.unwrap(); } }); let id_clone = id.clone(); + + // Pull away the first stdout stream into a different "prioritized" channel, + // and send everything else to stdout let (stdout_cli_receivers, stdout_receivers) = prioritized_broadcast(BufReader::new(channel.stream(0)).lines(), move |s| { println!("[{id_clone}] {s}") diff --git a/hydro_deploy/core/src/util.rs b/hydro_deploy/core/src/util.rs index 3adf46dee848..5e1460867253 100644 --- a/hydro_deploy/core/src/util.rs +++ b/hydro_deploy/core/src/util.rs @@ -30,6 +30,8 @@ type PriorityBroadcacst = ( Arc>>>, ); +// Divides up a single stream into two channels (one prioritized and the other not) +// The first data packet that comes in will get sent to the first channel, and all others will get sent to the second channel pub fn prioritized_broadcast> + Send + Unpin + 'static>( mut lines: T, default: impl Fn(String) + Send + 'static,