From 885f480ce9fed490559f9ebc227b85b6759d423b Mon Sep 17 00:00:00 2001 From: Jordan Santell Date: Mon, 8 Jan 2024 16:28:10 -0800 Subject: [PATCH] Upgrade Axum and friends to 0.7+. --- Cargo.lock | 235 +++++++++++++----- Cargo.toml | 8 +- rust/noosphere-core/Cargo.toml | 4 +- rust/noosphere-core/src/api/client.rs | 31 ++- rust/noosphere-core/src/api/headers/ucan.rs | 2 +- rust/noosphere-core/src/api/mod.rs | 4 + rust/noosphere-core/src/api/v0alpha1/data.rs | 6 +- rust/noosphere-core/src/api/v0alpha2/data.rs | 2 +- rust/noosphere-gateway/Cargo.toml | 4 +- rust/noosphere-gateway/src/error.rs | 4 +- .../src/extractors/authority.rs | 4 +- rust/noosphere-gateway/src/extractors/cbor.rs | 16 +- rust/noosphere-gateway/src/gateway.rs | 19 +- .../src/handlers/v0alpha1/fetch.rs | 6 +- .../src/handlers/v0alpha1/replicate.rs | 18 +- .../src/handlers/v0alpha2/push.rs | 44 ++-- .../examples/notes-to-html/implementation.rs | 8 +- rust/noosphere-ns/Cargo.toml | 2 +- .../noosphere-ns/src/server/implementation.rs | 19 +- 19 files changed, 277 insertions(+), 159 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 10b8dbc4d..57629cced 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -429,20 +429,20 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "axum" -version = "0.6.20" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +checksum = "d09dbe0e490df5da9d69b36dca48a76635288a82f92eca90024883a56202026d" dependencies = [ "async-trait", "axum-core", "axum-macros", - "bitflags 1.3.2", "bytes", "futures-util", - "headers", - "http", - "http-body", - "hyper", + "http 1.0.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.1.0", + "hyper-util", "itoa", "matchit", "memchr", @@ -459,30 +459,57 @@ dependencies = [ "tower", "tower-layer", "tower-service", + "tracing", ] [[package]] name = "axum-core" -version = "0.3.4" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +checksum = "e87c8503f93e6d144ee5690907ba22db7ba79ab001a932ab99034f0fe836b3df" dependencies = [ "async-trait", "bytes", "futures-util", - "http", - "http-body", + "http 1.0.0", + "http-body 1.0.0", + "http-body-util", "mime", + "pin-project-lite", "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-extra" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "881348a37b079994894b6e5e46edcc4b8a60e1c0333669a65b810abeed780598" +dependencies = [ + "axum", + "axum-core", + "bytes", + "futures-util", + "headers", + "http 1.0.0", + "http-body 1.0.0", + "http-body-util", + "mime", + "pin-project-lite", + "serde", + "tower", "tower-layer", "tower-service", ] [[package]] name = "axum-macros" -version = "0.3.8" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cdca6a10ecad987bda04e95606ef85a5417dcaac1a78455242d72e031e2b6b62" +checksum = "5a2edad600410b905404c594e2523549f1bcd4bded1e252c8f74524ccce0b867" dependencies = [ "heck", "proc-macro2", @@ -492,14 +519,14 @@ dependencies = [ [[package]] name = "axum-tracing-opentelemetry" -version = "0.15.0" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96874d8776ed9834dbb02c234a51f4f439529cd833bc3ccc4bfbbe6d1821d0d2" +checksum = "bdad298231394729042d1f155b93f9fdf0b5ee1aea0b62404c4d7341f7d8fe08" dependencies = [ "axum", "futures-core", "futures-util", - "http", + "http 1.0.0", "opentelemetry", "pin-project-lite", "tower", @@ -920,7 +947,7 @@ dependencies = [ "bytes", "futures-core", "futures-util", - "http", + "http 0.2.9", "mime", "mime_guess", "rand", @@ -1979,7 +2006,7 @@ dependencies = [ "futures-core", "futures-sink", "gloo-utils", - "http", + "http 0.2.9", "js-sys", "pin-project", "serde", @@ -2048,7 +2075,7 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.9", "indexmap 1.9.3", "slab", "tokio", @@ -2056,6 +2083,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d308f63daf4181410c242d34c11f928dcb3aa105852019e043c9d1f4e4368a" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 1.0.0", + "indexmap 2.0.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -2082,15 +2128,14 @@ checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" [[package]] name = "headers" -version = "0.3.8" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3e372db8e5c0d213e0cd0b9be18be2aca3d44cf2fe30a9d46a65581cd454584" +checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9" dependencies = [ - "base64 0.13.1", - "bitflags 1.3.2", + "base64 0.21.4", "bytes", "headers-core", - "http", + "http 1.0.0", "httpdate", "mime", "sha1", @@ -2098,11 +2143,11 @@ dependencies = [ [[package]] name = "headers-core" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" dependencies = [ - "http", + "http 1.0.0", ] [[package]] @@ -2194,6 +2239,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.5" @@ -2201,15 +2257,38 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ "bytes", - "http", + "http 0.2.9", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.0.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41cb79eb393015dadd30fc252023adb0b2400a0caee0fa2a077e6e21a551e840" +dependencies = [ + "bytes", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", "pin-project-lite", ] [[package]] name = "http-range-header" -version = "0.3.1" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f" +checksum = "3ce4ef31cda248bbdb6e6820603b82dfcd9e833db65a43e997a0ccec777d11fe" [[package]] name = "httparse" @@ -2233,9 +2312,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", - "http", - "http-body", + "h2 0.3.21", + "http 0.2.9", + "http-body 0.4.5", "httparse", "httpdate", "itoa", @@ -2247,6 +2326,25 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5aa53871fc917b1a9ed87b683a5d86db645e23acb32c2e0785a353e522fb75" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.0", + "http 1.0.0", + "http-body 1.0.0", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "tokio", +] + [[package]] name = "hyper-multipart-rfc7578" version = "0.8.0" @@ -2256,8 +2354,8 @@ dependencies = [ "bytes", "common-multipart-rfc7578", "futures-core", - "http", - "hyper", + "http 0.2.9", + "hyper 0.14.27", ] [[package]] @@ -2267,13 +2365,31 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d78e1e73ec14cf7375674f74d7dde185c8206fd9dea6fb6295e8a98098aaa97" dependencies = [ "futures-util", - "http", - "hyper", + "http 0.2.9", + "hyper 0.14.27", "rustls 0.21.7", "tokio", "tokio-rustls", ] +[[package]] +name = "hyper-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdea9aac0dbe5a9240d68cfd9501e2db94222c6dc06843e06640b9e07f0fdc67" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "hyper 1.1.0", + "pin-project-lite", + "socket2 0.5.4", + "tokio", + "tracing", +] + [[package]] name = "ident_case" version = "1.0.1" @@ -2447,7 +2563,7 @@ dependencies = [ "common-multipart-rfc7578", "dirs", "futures", - "http", + "http 0.2.9", "multiaddr", "multibase", "serde", @@ -3587,6 +3703,7 @@ dependencies = [ "async-recursion", "async-stream", "async-trait", + "axum-extra", "base64 0.21.4", "bytes", "cid", @@ -3597,7 +3714,7 @@ dependencies = [ "futures-util", "getrandom 0.2.10", "gloo-net", - "headers", + "http 1.0.0", "instant", "iroh-car", "js-sys", @@ -3641,6 +3758,7 @@ dependencies = [ "async-stream", "async-trait", "axum", + "axum-extra", "axum-tracing-opentelemetry", "bytes", "cid", @@ -3710,7 +3828,7 @@ dependencies = [ "async-stream", "async-trait", "cid", - "hyper", + "hyper 0.14.27", "hyper-multipart-rfc7578", "ipfs-api-prelude", "iroh-car", @@ -3954,18 +4072,6 @@ dependencies = [ "urlencoding", ] -[[package]] -name = "opentelemetry-http" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f51189ce8be654f9b5f7e70e49967ed894e84a06fc35c6c042e64ac1fc5399e" -dependencies = [ - "async-trait", - "bytes", - "http", - "opentelemetry", -] - [[package]] name = "opentelemetry_sdk" version = "0.21.2" @@ -4610,10 +4716,10 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", - "http", - "http-body", - "hyper", + "h2 0.3.21", + "http 0.2.9", + "http-body 0.4.5", + "hyper 0.14.27", "hyper-rustls", "ipnet", "js-sys", @@ -5775,16 +5881,16 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.4.4" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" +checksum = "09e12e6351354851911bdf8c2b8f2ab15050c567d70a8b9a37ae7b8301a4080d" dependencies = [ "bitflags 2.4.0", "bytes", - "futures-core", "futures-util", - "http", - "http-body", + "http 1.0.0", + "http-body 1.0.0", + "http-body-util", "http-range-header", "httpdate", "mime", @@ -5874,13 +5980,12 @@ dependencies = [ [[package]] name = "tracing-opentelemetry-instrumentation-sdk" -version = "0.15.0" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b16403065e0fb78b4708ed0e7245024a5e2eaeeed043dcadba4f32af327a397" +checksum = "9920abb6a3ee3a2af7d30c9ff02900f8481935d36723c3da95cf807468218e8c" dependencies = [ - "http", + "http 1.0.0", "opentelemetry", - "opentelemetry-http", "tracing", "tracing-opentelemetry", ] diff --git a/Cargo.toml b/Cargo.toml index 3af955692..9a4398add 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,8 +19,9 @@ resolver = "2" anyhow = { version = "1" } async-recursion = { version = "1" } async-stream = { version = "0.3" } -axum = { version = "^0.6.18" } -axum-tracing-opentelemetry = { version = "0.15.0" } +axum = { version = "^0.7.3" } +axum-tracing-opentelemetry = { version = "0.16.0" } +axum-extra = { version = "^0.9.1" } base64 = { version = "^0.21" } byteorder = { version = "~1.4" } # keep in sync with pinned libipld-* crates bytes = { version = "^1" } @@ -33,7 +34,6 @@ futures = { version = "0.3" } futures-util = { version = "0.3" } gloo-net = { version = "0.4" } gloo-timers = { version = "0.3", features = ["futures"] } -headers = { version = "=0.3.8" } # Match version used by `axum`. ignore = { version = "0.4.20" } instant = { version = "0.1", features = ["wasm-bindgen"] } iroh-car = { version = "^0.3.0" } @@ -61,7 +61,7 @@ tokio = { version = "^1" } tokio-stream = { version = "~0.1" } tokio-util = { version = "0.7" } tower = { version = "^0.4.13" } -tower-http = { version = "^0.4.3" } +tower-http = { version = "^0.5" } tracing = { version = "0.1" } tracing-subscriber = { version = "~0.3.18", features = ["env-filter", "tracing-log", "json"] } ucan = { version = "0.4.0" } diff --git a/rust/noosphere-core/Cargo.toml b/rust/noosphere-core/Cargo.toml index 4263726fb..8e811bab8 100644 --- a/rust/noosphere-core/Cargo.toml +++ b/rust/noosphere-core/Cargo.toml @@ -30,10 +30,11 @@ url = { workspace = true, features = ["serde"] } async-trait = "~0.1" async-recursion = { workspace = true } async-stream = { workspace = true } - # NOTE: async-once-cell 0.4.0 shipped unstable feature usage async-once-cell = "~0.4" anyhow = { workspace = true } +http = { version = "^1" } # Keep in sync with `axum` +axum-extra = { workspace = true, features = ["typed-header"] } bytes = { workspace = true } instant = { workspace = true } iroh-car = { workspace = true } @@ -56,7 +57,6 @@ reqwest = { workspace = true } strum = { workspace = true } strum_macros = { workspace = true } tokio-util = { workspace = true, features = ["io"] } -headers = { workspace = true } noosphere-common = { version = "0.1.2", path = "../noosphere-common" } noosphere-storage = { version = "0.10.0", path = "../noosphere-storage" } noosphere-collections = { version = "0.7.0", path = "../noosphere-collections" } diff --git a/rust/noosphere-core/src/api/client.rs b/rust/noosphere-core/src/api/client.rs index 9c618e000..c74cdaddd 100644 --- a/rust/noosphere-core/src/api/client.rs +++ b/rust/noosphere-core/src/api/client.rs @@ -1,7 +1,7 @@ use std::str::FromStr; use crate::{ - api::{route::RouteUrl, v0alpha1, v0alpha2}, + api::{route::RouteUrl, v0alpha1, v0alpha2, StatusCode}, error::NoosphereError, stream::{from_car_stream, memo_history_stream, put_block_stream, to_car_stream}, }; @@ -18,7 +18,7 @@ use crate::{ data::{Link, MemoIpld}, }; use noosphere_storage::{block_deserialize, block_serialize, BlockStore}; -use reqwest::{header::HeaderMap, StatusCode}; +use reqwest::header::HeaderMap; use tokio_stream::{Stream, StreamExt}; use tokio_util::io::StreamReader; use ucan::{ @@ -93,7 +93,7 @@ where client.get(url).send().await? }; - match did_response.status() { + match translate_status_code(did_response.status())? { StatusCode::OK => (), _ => return Err(anyhow!("Unable to look up gateway identity")), }; @@ -475,8 +475,10 @@ where v0alpha2::PushError::BrokenUpstream })?; - trace!("Checking response..."); - if response.status() == StatusCode::CONFLICT { + let status = translate_status_code(response.status())?; + trace!("Checking response ({})...", status); + + if status == StatusCode::CONFLICT { return Err(v0alpha2::PushError::Conflict); } @@ -529,3 +531,22 @@ where Ok(push_response) } } + +/// Both `reqwest` and `axum` re-export `StatusCode` from the `http` crate. +/// +/// We're stuck on reqwest@0.11.20 [1] that uses an older version +/// of `http::StatusCode`, whereas axum >= 0.7 uses the 1.0 release +/// of several HTTP libraries (`http`, `http-body`, `hyper`) [2], which +/// we'd like to use as our canonical representation. +/// +/// This utility converts between the old `reqwest::StatusCode` to the +/// >=1.0 implementation. Notably, we do not pull in all of `axum` +/// into the `noosphere-core` crate, only the common underlying +/// crate `http@1.0.0` (or greater). +/// +/// [1] https://github.com/subconsciousnetwork/noosphere/issues/686 +/// [2] https://github.com/tokio-rs/axum/blob/5b6204168a676497d2f4188af603546d9ebfe20a/axum/CHANGELOG.md#070-27-november-2023 +fn translate_status_code(reqwest_code: reqwest::StatusCode) -> Result { + let code: u16 = reqwest_code.into(); + Ok(code.try_into()?) +} diff --git a/rust/noosphere-core/src/api/headers/ucan.rs b/rust/noosphere-core/src/api/headers/ucan.rs index 730e62b2e..57d6224bc 100644 --- a/rust/noosphere-core/src/api/headers/ucan.rs +++ b/rust/noosphere-core/src/api/headers/ucan.rs @@ -1,7 +1,7 @@ use crate::{authority::SUPPORTED_KEYS, data::Jwt}; use anyhow::anyhow; +use axum_extra::headers::{self, Header, HeaderName, HeaderValue}; use cid::Cid; -use headers::{self, Header, HeaderName, HeaderValue}; use once_cell::sync::Lazy; use ucan::{chain::ProofChain, crypto::did::DidParser, store::UcanJwtStore}; diff --git a/rust/noosphere-core/src/api/mod.rs b/rust/noosphere-core/src/api/mod.rs index baeab3252..d04865d4a 100644 --- a/rust/noosphere-core/src/api/mod.rs +++ b/rust/noosphere-core/src/api/mod.rs @@ -11,3 +11,7 @@ pub mod v0alpha2; pub use client::*; pub use data::*; + +// Re-export `http::StatusCode` here as our preferred `StatusCode` instance, +// disambiguating from other crate's implementations. +pub use http::StatusCode; diff --git a/rust/noosphere-core/src/api/v0alpha1/data.rs b/rust/noosphere-core/src/api/v0alpha1/data.rs index e2b7b1cfd..c170750a0 100644 --- a/rust/noosphere-core/src/api/v0alpha1/data.rs +++ b/rust/noosphere-core/src/api/v0alpha1/data.rs @@ -1,6 +1,9 @@ use std::fmt::Display; -use crate::api::data::{empty_string_as_none, AsQuery}; +use crate::api::{ + data::{empty_string_as_none, AsQuery}, + StatusCode, +}; use crate::{ authority::{generate_capability, SphereAbility, SPHERE_SEMANTICS}, data::{Bundle, Did, Jwt, Link, MemoIpld}, @@ -9,7 +12,6 @@ use crate::{ use anyhow::{anyhow, Result}; use cid::Cid; use noosphere_storage::{base64_decode, base64_encode}; -use reqwest::StatusCode; use serde::{Deserialize, Serialize}; use thiserror::Error; use ucan::{ diff --git a/rust/noosphere-core/src/api/v0alpha2/data.rs b/rust/noosphere-core/src/api/v0alpha2/data.rs index b6a08b887..ba955fa4d 100644 --- a/rust/noosphere-core/src/api/v0alpha2/data.rs +++ b/rust/noosphere-core/src/api/v0alpha2/data.rs @@ -1,8 +1,8 @@ use crate::{ + api::StatusCode, data::{Did, Jwt, Link, MemoIpld}, error::NoosphereError, }; -use reqwest::StatusCode; use serde::{Deserialize, Serialize}; use thiserror::Error; diff --git a/rust/noosphere-gateway/Cargo.toml b/rust/noosphere-gateway/Cargo.toml index a9539c58a..fae380df7 100644 --- a/rust/noosphere-gateway/Cargo.toml +++ b/rust/noosphere-gateway/Cargo.toml @@ -30,14 +30,14 @@ noosphere-core = { version = "0.18.0", path = "../noosphere-core", features = [" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] anyhow = { workspace = true } -axum = { workspace = true, features = ["headers", "macros"] } +axum = { workspace = true, features = ["macros"] } +axum-extra = { workspace = true, features = ["typed-header"] } axum-tracing-opentelemetry = { workspace = true, optional = true } iroh-car = { workspace = true } thiserror = { workspace = true } strum = { workspace = true } strum_macros = { workspace = true } bytes = { workspace = true } - tokio = { workspace = true, features = ["full"] } tokio-stream = { workspace = true } tokio-util = { workspace = true } diff --git a/rust/noosphere-gateway/src/error.rs b/rust/noosphere-gateway/src/error.rs index bd529cc1e..7be283712 100644 --- a/rust/noosphere-gateway/src/error.rs +++ b/rust/noosphere-gateway/src/error.rs @@ -1,6 +1,4 @@ -use axum::http::StatusCode; -use axum::response::IntoResponse; -use axum::Json; +use axum::{http::StatusCode, response::IntoResponse, Json}; use noosphere_core::api::v0alpha2::PushError; use serde::{Deserialize, Serialize}; diff --git a/rust/noosphere-gateway/src/extractors/authority.rs b/rust/noosphere-gateway/src/extractors/authority.rs index abb19fa94..de76fceff 100644 --- a/rust/noosphere-gateway/src/extractors/authority.rs +++ b/rust/noosphere-gateway/src/extractors/authority.rs @@ -2,8 +2,10 @@ use anyhow::Result; use async_trait::async_trait; use axum::{ extract::FromRequestParts, - headers::{authorization::Bearer, Authorization}, http::{request::Parts, StatusCode}, +}; +use axum_extra::{ + headers::{authorization::Bearer, Authorization}, TypedHeader, }; use noosphere_core::{ diff --git a/rust/noosphere-gateway/src/extractors/cbor.rs b/rust/noosphere-gateway/src/extractors/cbor.rs index edd3bef5d..0a2dd93b8 100644 --- a/rust/noosphere-gateway/src/extractors/cbor.rs +++ b/rust/noosphere-gateway/src/extractors/cbor.rs @@ -1,10 +1,9 @@ use async_trait::async_trait; use axum::{ - body::{Bytes, HttpBody}, - extract::FromRequest, - http::{header, Request, StatusCode}, + body::Bytes, + extract::{FromRequest, Request}, + http::{header, StatusCode}, response::IntoResponse, - BoxError, }; use libipld_cbor::DagCborCodec; use mime_guess::mime; @@ -30,17 +29,14 @@ where } #[async_trait] -impl FromRequest for Cbor +impl FromRequest for Cbor where T: Serialize + DeserializeOwned, S: Send + Sync, - B: HttpBody + Send + 'static, - B::Data: Send, - B::Error: Into, { type Rejection = StatusCode; - async fn from_request(req: Request, state: &S) -> Result { + async fn from_request(req: Request, state: &S) -> Result { if !is_octet_stream_content_type(&req) { return Err(StatusCode::BAD_REQUEST); } @@ -59,7 +55,7 @@ where } } -fn is_octet_stream_content_type(req: &Request) -> bool { +fn is_octet_stream_content_type(req: &Request) -> bool { let content_type = if let Some(content_type) = req.headers().get(header::CONTENT_TYPE) { content_type } else { diff --git a/rust/noosphere-gateway/src/gateway.rs b/rust/noosphere-gateway/src/gateway.rs index 7d82d444d..f29852967 100644 --- a/rust/noosphere-gateway/src/gateway.rs +++ b/rust/noosphere-gateway/src/gateway.rs @@ -10,7 +10,7 @@ use anyhow::Result; use axum::extract::DefaultBodyLimit; use axum::http::{HeaderValue, Method}; use axum::routing::{get, put}; -use axum::{Extension, Router, Server}; +use axum::{serve, Extension, Router}; use noosphere_core::api::{v0alpha1, v0alpha2}; use noosphere_core::context::HasMutableSphereContext; use noosphere_ipfs::KuboClient; @@ -31,7 +31,7 @@ type WorkerHandles = Vec>>; /// Represents a Noosphere gateway server. pub struct Gateway { - app: Router, + router: Router, worker_handles: WorkerHandles, } @@ -83,7 +83,7 @@ impl Gateway { ); let (cleanup_tx, cleanup_task) = start_cleanup::(manager.clone()); - let app = Router::new() + let router = Router::new() .route( &v0alpha1::Route::Did.to_string(), get(handlers::v0alpha1::did_route), @@ -117,17 +117,18 @@ impl Gateway { .layer(cors); #[cfg(feature = "observability")] - let app = { - app.layer(OtelInResponseLayer) // include trace context in response + let router = { + router + .layer(OtelInResponseLayer) // include trace context in response .layer(OtelAxumLayer::default()) // initialize otel trace on incoming request }; - let app = app + let router = router .layer(TraceLayer::new_for_http()) .with_state(Arc::new(manager)); Ok(Self { - app, + router, worker_handles: vec![syndication_task, name_system_task, cleanup_task], }) } @@ -135,9 +136,7 @@ impl Gateway { /// Start the gateway server with `listener`, consuming the [Gateway] /// object until the process terminates or has an unrecoverable error. pub async fn start(self, listener: TcpListener) -> Result<()> { - Server::from_tcp(listener)? - .serve(self.app.into_make_service()) - .await?; + serve(tokio::net::TcpListener::from_std(listener)?, self.router).await?; for handle in self.worker_handles { handle.abort(); } diff --git a/rust/noosphere-gateway/src/handlers/v0alpha1/fetch.rs b/rust/noosphere-gateway/src/handlers/v0alpha1/fetch.rs index 5352e321a..fedc7f21a 100644 --- a/rust/noosphere-gateway/src/handlers/v0alpha1/fetch.rs +++ b/rust/noosphere-gateway/src/handlers/v0alpha1/fetch.rs @@ -2,7 +2,7 @@ use std::pin::Pin; use anyhow::Result; -use axum::{body::StreamBody, extract::Query, http::StatusCode, Extension}; +use axum::{body::Body, extract::Query, http::StatusCode, Extension}; use bytes::Bytes; use noosphere_core::{ api::v0alpha1::FetchParameters, @@ -28,7 +28,7 @@ pub async fn fetch_route( gateway_scope: GatewayScope, Query(FetchParameters { since }): Query, Extension(ipfs_client): Extension, -) -> Result>>, StatusCode> +) -> Result where C: HasMutableSphereContext, S: Storage + 'static, @@ -56,7 +56,7 @@ where StatusCode::INTERNAL_SERVER_ERROR })?; - Ok(StreamBody::new(stream)) + Ok(Body::from_stream(stream)) } /// Generates a CAR stream that can be used as a the streaming body of a diff --git a/rust/noosphere-gateway/src/handlers/v0alpha1/replicate.rs b/rust/noosphere-gateway/src/handlers/v0alpha1/replicate.rs index 1b3ba55e7..ccc86e383 100644 --- a/rust/noosphere-gateway/src/handlers/v0alpha1/replicate.rs +++ b/rust/noosphere-gateway/src/handlers/v0alpha1/replicate.rs @@ -1,13 +1,11 @@ -use std::pin::Pin; - +use crate::extractors::{GatewayAuthority, GatewayScope, SphereExtractor}; use anyhow::Result; use axum::{ - body::StreamBody, + body::Body, extract::{Path, Query}, http::StatusCode, Extension, }; -use bytes::Bytes; use cid::Cid; use libipld_cbor::DagCborCodec; use noosphere_core::api::v0alpha1::ReplicateParameters; @@ -19,12 +17,6 @@ use noosphere_core::{ }; use noosphere_ipfs::{IpfsStore, KuboClient}; use noosphere_storage::{BlockStore, BlockStoreRetry, Storage}; -use tokio_stream::Stream; - -use crate::extractors::{GatewayAuthority, GatewayScope, SphereExtractor}; - -pub type ReplicationCarStreamBody = - StreamBody> + Send>>>; /// Invoke to get a streamed CARv1 response that represents all the blocks /// needed to manifest the content associated with the given [Cid] path @@ -50,7 +42,7 @@ pub async fn replicate_route( include_content, }): Query, Extension(ipfs_client): Extension, -) -> Result +) -> Result where C: HasMutableSphereContext, S: Storage + 'static, @@ -116,7 +108,7 @@ where // of usage. Maybe somewhere in the ballpark of 1~10k revisions. It // should be a large-but-finite number. debug!("Streaming revisions from {} to {}", since, memo_version); - return Ok(StreamBody::new(Box::pin(to_car_stream( + return Ok(Body::from_stream(Box::pin(to_car_stream( vec![memo_version], memo_history_stream(store, &memo_version.into(), Some(&since), false), )))); @@ -136,7 +128,7 @@ where debug!("Streaming entire version for {}", memo_version); // Always fall back to a full replication - Ok(StreamBody::new(Box::pin(to_car_stream( + Ok(Body::from_stream(Box::pin(to_car_stream( vec![memo_version], memo_body_stream(store, &memo_version.into(), include_content), )))) diff --git a/rust/noosphere-gateway/src/handlers/v0alpha2/push.rs b/rust/noosphere-gateway/src/handlers/v0alpha2/push.rs index 0cd80a12a..cebede121 100644 --- a/rust/noosphere-gateway/src/handlers/v0alpha2/push.rs +++ b/rust/noosphere-gateway/src/handlers/v0alpha2/push.rs @@ -1,13 +1,16 @@ -use std::collections::BTreeSet; - +use crate::extractors::{GatewayScope, SphereExtractor}; +use crate::{ + error::GatewayErrorResponse, + extractors::GatewayAuthority, + worker::{NameSystemJob, SyndicationJob}, +}; use anyhow::Result; - use async_stream::try_stream; -use axum::{body::StreamBody, extract::BodyStream, Extension}; - +use axum::{body::Body, Extension}; use bytes::Bytes; use cid::Cid; use libipld_cbor::DagCborCodec; +use noosphere_common::UnsharedStream; use noosphere_core::api::v0alpha2::{PushBody, PushError, PushResponse}; use noosphere_core::context::{HasMutableSphereContext, SphereContentWrite, SphereCursor}; use noosphere_core::stream::{ @@ -19,17 +22,11 @@ use noosphere_core::{ view::Sphere, }; use noosphere_storage::{block_deserialize, block_serialize, Storage}; +use std::collections::BTreeSet; +use std::pin::Pin; use tokio::sync::mpsc::UnboundedSender; use tokio_stream::{Stream, StreamExt}; -use crate::extractors::{GatewayScope, SphereExtractor}; -use crate::{ - error::GatewayErrorResponse, - extractors::GatewayAuthority, - worker::{NameSystemJob, SyndicationJob}, -}; - -// #[debug_handler] #[instrument( level = "debug", skip( @@ -38,7 +35,7 @@ use crate::{ gateway_scope, syndication_tx, name_system_tx, - stream + body ) )] pub async fn push_route( @@ -47,8 +44,8 @@ pub async fn push_route( gateway_scope: GatewayScope, Extension(syndication_tx): Extension>>, Extension(name_system_tx): Extension>>, - stream: BodyStream, -) -> Result>>, GatewayErrorResponse> + body: Body, +) -> Result where for<'a> C: HasMutableSphereContext + 'a, for<'a> S: Storage + 'a, @@ -64,16 +61,19 @@ where &generate_capability(counterpart.as_str(), SphereAbility::Push), ) .await?; - let gateway_push_routine = GatewayPushRoutine { gateway_sphere, gateway_scope, syndication_tx, name_system_tx, - block_stream: Box::pin(from_car_stream(stream)), + // In Axum 0.7+, there are `Sync` bounds required. The incoming stream + // is `!Sync`, but as the only consumers of the stream, + // consider it `Sync` via `UnsharedStream`. + block_stream: Box::pin(from_car_stream(UnsharedStream::new( + body.into_data_stream(), + ))), }; - - Ok(StreamBody::new(gateway_push_routine.invoke().await?)) + Ok(Body::from_stream(gateway_push_routine.invoke().await?)) } pub struct GatewayPushRoutine @@ -98,7 +98,7 @@ where #[instrument(level = "debug", skip(self))] pub async fn invoke( mut self, - ) -> Result> + Send + 'static, PushError> { + ) -> Result> + Send>>, PushError> { debug!("Invoking gateway push..."); let push_body = self.verify_history().await?; @@ -136,7 +136,7 @@ where info!("Finished gateway push routine!"); }; - Ok(to_car_stream(roots, block_stream)) + Ok(Box::pin(to_car_stream(roots, block_stream))) } /// Ensure that the pushed history is not in direct conflict with our diff --git a/rust/noosphere-into/examples/notes-to-html/implementation.rs b/rust/noosphere-into/examples/notes-to-html/implementation.rs index 0f323d073..8c26f4409 100644 --- a/rust/noosphere-into/examples/notes-to-html/implementation.rs +++ b/rust/noosphere-into/examples/notes-to-html/implementation.rs @@ -91,18 +91,16 @@ pub async fn main() -> Result<()> { sphere_into_html(cursor, &native_fs).await?; - let app = + let router = get_service(ServeDir::new(html_root.path())).layer(HandleErrorLayer::new(|_| async { (StatusCode::INTERNAL_SERVER_ERROR, "Something went wrong...") })); - let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); + let listener = tokio::net::TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 3000))).await?; println!("Serving generated HTML at http://127.0.0.1:3000/"); - axum::Server::bind(&addr) - .serve(app.into_make_service()) - .await?; + axum::serve(listener, router).await?; Ok(()) } diff --git a/rust/noosphere-ns/Cargo.toml b/rust/noosphere-ns/Cargo.toml index 3d91cd415..cbb9b85d2 100644 --- a/rust/noosphere-ns/Cargo.toml +++ b/rust/noosphere-ns/Cargo.toml @@ -50,7 +50,7 @@ home = { version = "~0.5", optional = true } toml = { version = "~0.8", optional = true } # noosphere_ns::server -axum = { workspace = true, features = ["json", "headers", "macros"], optional = true } +axum = { workspace = true, features = ["json", "macros"], optional = true } axum-tracing-opentelemetry = { workspace = true, optional = true } reqwest = { version = "~0.11", default-features = false, features = ["json", "rustls-tls"], optional = true } tower-http = { workspace = true, features = ["trace"], optional = true } diff --git a/rust/noosphere-ns/src/server/implementation.rs b/rust/noosphere-ns/src/server/implementation.rs index f58f2ce35..6096b99e4 100644 --- a/rust/noosphere-ns/src/server/implementation.rs +++ b/rust/noosphere-ns/src/server/implementation.rs @@ -1,8 +1,10 @@ use crate::server::{handlers, routes::Route}; use crate::{DhtClient, NameSystem}; use anyhow::Result; -use axum::routing::{delete, get, post}; -use axum::{Router, Server}; +use axum::{ + routing::{delete, get, post}, + serve, Router, +}; use std::net::TcpListener; use std::sync::Arc; use tower_http::trace::TraceLayer; @@ -16,7 +18,7 @@ pub async fn start_name_system_api_server( ) -> Result<()> { let peer_id = ns.peer_id().to_owned(); - let app = Router::new() + let router = Router::new() .route( &Route::NetworkInfo.to_string(), get(handlers::get_network_info), @@ -35,18 +37,17 @@ pub async fn start_name_system_api_server( .route(&Route::Bootstrap.to_string(), post(handlers::bootstrap)); #[cfg(feature = "observability")] - let app = { - app.layer(OtelInResponseLayer) // include trace context in response + let router = { + router + .layer(OtelInResponseLayer) // include trace context in response .layer(OtelAxumLayer::default()) // initialize otel trace on incoming request }; - let app = app + let router = router .layer(TraceLayer::new_for_http()) .with_state(handlers::RouterState { ns, peer_id }); - Server::from_tcp(listener)? - .serve(app.into_make_service()) - .await?; + serve(tokio::net::TcpListener::from_std(listener)?, router).await?; Ok(()) }