Skip to content

Commit

Permalink
Set SO_REUSEADDR again for wasmtime serve on Unix (#8738)
Browse files Browse the repository at this point in the history
This reverts part of #7863 which was a misunderstanding on my part
about `SO_REUSEADDR`. I think I mixed it up with `SO_REUSEPORT`. Without
`SO_REUSEADDR` it's possible to have this happen on Unix:

* Start a `wasmtime serve` session
* Connect to the session with `nc`
* Kill the server
* Restarting the server no longer works

Due to the active TCP connection at the time the server was killed the
socket/address is in the `TIME_WAIT` state which apparently prevents
rebinding the port until the OS has a chance to clean up everything.
Setting `SO_REUSEADDR` enables rebinding the address quickly.

Now in #7863 that was trying to fix #7852 which said that it was
possible to have multiple `wasmtime serve` instances binding the same
port. That can lead to confusion and is generally something we don't
want. That being said #7863 only fixed the issue for Windows but ended
up making Unix worse. This PR restores more reasonable behavior for both
Unix and Windows by conditionally setting the `SO_REUSEADDR` based on
the platform.

This PR additionally adds two new tests, both for rebinding an in-use
port quickly and additionally ensuring the same port can't be listened
to twice.
  • Loading branch information
alexcrichton authored Jun 3, 2024
1 parent 05fe628 commit bda1a64
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 16 deletions.
14 changes: 9 additions & 5 deletions src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,15 +281,19 @@ impl ServeCommand {

let instance = linker.instantiate_pre(&component)?;

// Tokio by default sets `SO_REUSEADDR` for listeners but that makes it
// a bit confusing if you run Wasmtime but forget to close a previous
// `serve` session. To avoid that we explicitly disable `SO_REUSEADDR`
// here.
let socket = match &self.addr {
SocketAddr::V4(_) => tokio::net::TcpSocket::new_v4()?,
SocketAddr::V6(_) => tokio::net::TcpSocket::new_v6()?,
};
socket.set_reuseaddr(false)?;
// Conditionally enable `SO_REUSEADDR` depending on the current
// platform. On Unix we want this to be able to rebind an address in
// the `TIME_WAIT` state which can happen then a server is killed with
// active TCP connections and then restarted. On Windows though if
// `SO_REUSEADDR` is specified then it enables multiple applications to
// bind the port at the same time which is not something we want. Hence
// this is conditionally set based on the platform (and deviates from
// Tokio's default from always-on).
socket.set_reuseaddr(!cfg!(windows))?;
socket.bind(self.addr)?;
let listener = socket.listen(100)?;

Expand Down
109 changes: 98 additions & 11 deletions tests/all/cli_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1461,11 +1461,15 @@ mod test_programs {
// Spawn `wasmtime serve` on port 0 which will randomly assign it a
// port.
let mut cmd = super::get_wasmtime_command()?;
cmd.arg("serve").arg("--addr=127.0.0.1:0").arg(wasm);
configure(&mut cmd);
Self::spawn(&mut cmd)
}

fn spawn(cmd: &mut Command) -> Result<WasmtimeServe> {
cmd.stdin(Stdio::null());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
cmd.arg("serve").arg("--addr=127.0.0.1:0").arg(wasm);
configure(&mut cmd);
let mut child = cmd.spawn()?;

// Read the first line of stderr which will say which address it's
Expand Down Expand Up @@ -1526,15 +1530,7 @@ mod test_programs {

/// Send a request to this server and wait for the response.
async fn send_request(&self, req: http::Request<String>) -> Result<http::Response<String>> {
let tcp = TcpStream::connect(&self.addr)
.await
.context("failed to connect")?;
let tcp = wasmtime_wasi_http::io::TokioIo::new(tcp);
let (mut send, conn) = hyper::client::conn::http1::handshake(tcp)
.await
.context("failed http handshake")?;

let conn_task = tokio::task::spawn(conn);
let (mut send, conn_task) = self.start_requests().await?;

let response = send
.send_request(req)
Expand All @@ -1551,6 +1547,22 @@ mod test_programs {

Ok(http::Response::from_parts(parts, body))
}

async fn start_requests(
&self,
) -> Result<(
hyper::client::conn::http1::SendRequest<String>,
tokio::task::JoinHandle<hyper::Result<()>>,
)> {
let tcp = TcpStream::connect(&self.addr)
.await
.context("failed to connect")?;
let tcp = wasmtime_wasi_http::io::TokioIo::new(tcp);
let (send, conn) = hyper::client::conn::http1::handshake(tcp)
.await
.context("failed http handshake")?;
Ok((send, tokio::task::spawn(conn)))
}
}

// Don't leave child processes running by accident so kill the child process
Expand Down Expand Up @@ -1679,6 +1691,81 @@ mod test_programs {
}
Ok(())
}

#[tokio::test]
async fn cli_serve_only_one_process_allowed() -> Result<()> {
let wasm = CLI_SERVE_ECHO_ENV_COMPONENT;
let server = WasmtimeServe::new(wasm, |cmd| {
cmd.arg("-Scli");
})?;

let err = WasmtimeServe::spawn(
super::get_wasmtime_command()?
.arg("serve")
.arg("-Scli")
.arg(format!("--addr={}", server.addr))
.arg(wasm),
)
.err()
.expect("server spawn should have failed but it succeeded");
drop(server);

let err = format!("{err:?}");
println!("{err}");
assert!(err.contains("os error"));
Ok(())
}

// Technically this test is a little racy. This binds port 0 to acquire a
// random port, issues a single request to this port, but then kills this
// server while the request is still processing. The port is then rebound
// in the next process while it technically could be stolen by another
// process.
#[tokio::test]
async fn cli_serve_quick_rebind_allowed() -> Result<()> {
let wasm = CLI_SERVE_ECHO_ENV_COMPONENT;
let server = WasmtimeServe::new(wasm, |cmd| {
cmd.arg("-Scli");
})?;
let addr = server.addr;

// Start up a `send` and `conn_task` which represents a connection to
// this server.
let (mut send, conn_task) = server.start_requests().await?;
let _ = send
.send_request(
hyper::Request::builder()
.uri("http://localhost/")
.header("env", "FOO")
.body(String::new())
.context("failed to make request")?,
)
.await;

// ... once a response has been received (or at least the status
// code/headers) then kill the server. THis is done while `conn_task`
// and `send` are still alive so we're guaranteed that the other side
// got a request (we got a response) and our connection is still open.
//
// This forces the address/port into the `TIME_WAIT` state. The rebind
// below in the next process will fail if `SO_REUSEADDR` isn't set.
drop(server);
drop(send);
let _ = conn_task.await;

// If this is successfully bound then we'll create `WasmtimeServe`
// which reads off the first line of output to know which address was
// bound.
let _server2 = WasmtimeServe::spawn(
super::get_wasmtime_command()?
.arg("serve")
.arg("-Scli")
.arg(format!("--addr={addr}"))
.arg(wasm),
)?;

Ok(())
}
}

#[test]
Expand Down

0 comments on commit bda1a64

Please sign in to comment.