Skip to content

Commit

Permalink
added comments and more of kube integration
Browse files Browse the repository at this point in the history
  • Loading branch information
nickjiang2378 committed Apr 25, 2024
1 parent 484e154 commit 683efad
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 4 deletions.
3 changes: 3 additions & 0 deletions hydro_deploy/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ futures = "0.3.26"
futures-core = "0.3.26"
hydroflow_cli_integration = { path = "../hydroflow_cli_integration", version = "^0.5.1" }
indicatif = "0.17.6"
kube = { version = "0.90.0", features = ["derive", "runtime", "ws"] }
k8s-openapi = { version = "0.21.1", features = ["latest"] }
nanoid = "0.4.0"
nix = "0.26.2"
once_cell = "1.17"
Expand All @@ -32,3 +34,4 @@ shell-escape = "0.1.5"
tempfile = "3.3.0"
tokio = { version = "1.16", features = [ "full" ] }
tokio-util = { version = "0.7.7", features=[ "compat" ] }
tar = "0.4.40"
7 changes: 6 additions & 1 deletion hydro_deploy/core/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tokio::sync::RwLock;
use super::gcp::GCPNetwork;
use super::{
progress, CustomService, GCPComputeEngineHost, Host, LocalhostHost, ResourcePool,
ResourceResult, Service,
ResourceResult, Service, PodHost
};
use crate::ServiceBuilder;

Expand All @@ -31,6 +31,11 @@ impl Deployment {
self.add_host(LocalhostHost::new)
}

#[allow(non_snake_case)]
pub fn PodHost(&mut self) -> Arc<RwLock<PodHost>> {
self.add_host(PodHost::new)
}

#[allow(non_snake_case)]
pub fn GCPComputeEngineHost(
&mut self,
Expand Down
2 changes: 1 addition & 1 deletion hydro_deploy/core/src/hydroflow_crate/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub async fn build_crate(
tokio::task::spawn_blocking(move || {
let mut command = Command::new("cargo");
command.args([
"build".to_string(),
"zigbuild".to_string(),
"--profile".to_string(),
profile.unwrap_or("release".to_string()),
]);
Expand Down
3 changes: 3 additions & 0 deletions hydro_deploy/core/src/hydroflow_crate/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ impl Service for HydroflowCrateService {
// request stdout before sending config so we don't miss the "ready" response
let stdout_receiver = binary.write().await.cli_stdout().await;

ProgressTracker::println(format!("Service ready: {formatted_bind_config}\n").as_str());

binary
.write()
.await
Expand All @@ -308,6 +310,7 @@ impl Service for HydroflowCrateService {
*self.server_defns.try_write().unwrap() =
serde_json::from_str(ready_line.trim_start_matches("ready: ")).unwrap();
} else {
ProgressTracker::println(format!("Did not find ready. Instead found: {:?}", ready_line).as_str());
bail!("expected ready");
}

Expand Down
3 changes: 3 additions & 0 deletions hydro_deploy/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ pub mod progress;
pub mod localhost;
pub use localhost::LocalhostHost;

pub mod kubernetes;
pub use kubernetes::PodHost;

pub mod ssh;

pub mod gcp;
Expand Down
24 changes: 22 additions & 2 deletions hydro_deploy/core/src/ssh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,22 @@ use async_ssh2_lite::ssh2::ErrorCode;
use async_ssh2_lite::{AsyncChannel, AsyncSession, Error, SessionConfiguration};
use async_trait::async_trait;
use futures::io::BufReader;
use futures::{AsyncBufReadExt, AsyncWriteExt, StreamExt};
use futures::{AsyncBufReadExt, AsyncWriteExt, StreamExt, TryStreamExt, join};
use hydroflow_cli_integration::ServerBindConfig;
use nanoid::nanoid;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::RwLock;

// use k8s_openapi::api::core::v1::Pod;
// use kube::{
// api::{Api, AttachParams, AttachedProcess, DeleteParams, PostParams, ResourceExt, WatchEvent, WatchParams},
// Client,
// };

// use tokio::io::AsyncWriteExt;

// use kube::core::subresource::AttachParams;

use super::progress::ProgressTracker;
use super::util::async_retry;
use super::{LaunchedBinary, LaunchedHost, ResourceResult, ServerStrategy};
Expand Down Expand Up @@ -281,6 +291,7 @@ impl<T: LaunchedSSHHost> LaunchedHost for T {
let user = self.ssh_user();
let binary_path = PathBuf::from(format!("/home/{user}/hydro-{unique_name}"));

// gets the ssh session for launching the binary
let channel = ProgressTracker::leaf(
format!("launching binary /home/{user}/hydro-{unique_name}"),
async {
Expand Down Expand Up @@ -311,19 +322,28 @@ impl<T: LaunchedSSHHost> LaunchedHost for T {
)
.await?;

// stdin_sender is used by other functions to send data queued up for stdin_receiver
// stdin_sender = top of the "funnel", stdin_receiver = bottom (concurrency supported due to internal synchronization)
let (stdin_sender, mut stdin_receiver) = async_channel::unbounded::<String>();
let mut stdin = channel.stream(0); // stream 0 is stdout/stdin, we use it for stdin

// stream 0 is stdout/stdin, we use it for stdin. Note that we can't just return stdin directly because it doesn't support synchronization
let mut stdin = channel.stream(0);
tokio::spawn(async move {
// Note that this while loop only wakes up when we get a result due to await
while let Some(line) = stdin_receiver.next().await {
if stdin.write_all(line.as_bytes()).await.is_err() {
break;
}

// flush the entire buffer
stdin.flush().await.unwrap();
}
});

let id_clone = id.clone();

// Pull away the first stdout stream into a different "prioritized" channel,
// and send everything else to stdout
let (stdout_cli_receivers, stdout_receivers) =
prioritized_broadcast(BufReader::new(channel.stream(0)).lines(), move |s| {
println!("[{id_clone}] {s}")
Expand Down
2 changes: 2 additions & 0 deletions hydro_deploy/core/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type PriorityBroadcacst = (
Arc<RwLock<Vec<Sender<String>>>>,
);

// Divides up a single stream into two channels (one prioritized and the other not)
// The first data packet that comes in will get sent to the first channel, and all others will get sent to the second channel
pub fn prioritized_broadcast<T: Stream<Item = io::Result<String>> + Send + Unpin + 'static>(
mut lines: T,
default: impl Fn(String) + Send + 'static,
Expand Down

0 comments on commit 683efad

Please sign in to comment.