Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

Commit

Permalink
chore: Upgrade Axum and friends to 0.7+.
Browse files Browse the repository at this point in the history
  • Loading branch information
jsantell committed Jan 17, 2024
1 parent f8dd752 commit 9d9b800
Show file tree
Hide file tree
Showing 19 changed files with 274 additions and 157 deletions.
235 changes: 170 additions & 65 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ resolver = "2"
anyhow = { version = "1" }
async-recursion = { version = "1" }
async-stream = { version = "0.3" }
axum = { version = "^0.6.18" }
axum-tracing-opentelemetry = { version = "0.15.0" }
axum = { version = "^0.7.3" }
axum-tracing-opentelemetry = { version = "0.16.0" }
axum-extra = { version = "^0.9.1" }
base64 = { version = "^0.21" }
byteorder = { version = "~1.4" } # keep in sync with pinned libipld-* crates
bytes = { version = "^1" }
Expand All @@ -33,7 +34,6 @@ futures = { version = "0.3" }
futures-util = { version = "0.3" }
gloo-net = { version = "0.4" }
gloo-timers = { version = "0.3", features = ["futures"] }
headers = { version = "=0.3.8" } # Match version used by `axum`.
ignore = { version = "0.4.20" }
instant = { version = "0.1", features = ["wasm-bindgen"] }
iroh-car = { version = "^0.3.0" }
Expand Down Expand Up @@ -61,7 +61,7 @@ tokio = { version = "^1" }
tokio-stream = { version = "~0.1" }
tokio-util = { version = "0.7" }
tower = { version = "^0.4.13" }
tower-http = { version = "^0.4.3" }
tower-http = { version = "^0.5" }
tracing = { version = "0.1" }
tracing-subscriber = { version = "~0.3.18", features = ["env-filter", "tracing-log", "json"] }
ucan = { version = "0.4.0" }
Expand Down
4 changes: 2 additions & 2 deletions rust/noosphere-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ url = { workspace = true, features = ["serde"] }
async-trait = "~0.1"
async-recursion = { workspace = true }
async-stream = { workspace = true }

# NOTE: async-once-cell 0.4.0 shipped unstable feature usage
async-once-cell = "~0.4"
anyhow = { workspace = true }
http = { version = "^1" } # Keep in sync with `axum`
axum-extra = { workspace = true, features = ["typed-header"] }
bytes = { workspace = true }
instant = { workspace = true }
iroh-car = { workspace = true }
Expand All @@ -56,7 +57,6 @@ reqwest = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
tokio-util = { workspace = true, features = ["io"] }
headers = { workspace = true }
noosphere-common = { version = "0.1.2", path = "../noosphere-common" }
noosphere-storage = { version = "0.10.0", path = "../noosphere-storage" }
noosphere-collections = { version = "0.7.0", path = "../noosphere-collections" }
Expand Down
31 changes: 26 additions & 5 deletions rust/noosphere-core/src/api/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::str::FromStr;

use crate::{
api::{route::RouteUrl, v0alpha1, v0alpha2},
api::{route::RouteUrl, v0alpha1, v0alpha2, StatusCode},
error::NoosphereError,
stream::{from_car_stream, memo_history_stream, put_block_stream, to_car_stream},
};
Expand All @@ -18,7 +18,7 @@ use crate::{
data::{Link, MemoIpld},
};
use noosphere_storage::{block_deserialize, block_serialize, BlockStore};
use reqwest::{header::HeaderMap, StatusCode};
use reqwest::header::HeaderMap;
use tokio_stream::{Stream, StreamExt};
use tokio_util::io::StreamReader;
use ucan::{
Expand Down Expand Up @@ -93,7 +93,7 @@ where
client.get(url).send().await?
};

match did_response.status() {
match translate_status_code(did_response.status())? {
StatusCode::OK => (),
_ => return Err(anyhow!("Unable to look up gateway identity")),
};
Expand Down Expand Up @@ -475,8 +475,10 @@ where
v0alpha2::PushError::BrokenUpstream
})?;

trace!("Checking response...");
if response.status() == StatusCode::CONFLICT {
let status = translate_status_code(response.status())?;
trace!("Checking response ({})...", status);

if status == StatusCode::CONFLICT {
return Err(v0alpha2::PushError::Conflict);
}

Expand Down Expand Up @@ -529,3 +531,22 @@ where
Ok(push_response)
}
}

/// Both `reqwest` and `axum` re-export `StatusCode` from the `http` crate.
///
/// We're stuck on reqwest@0.11.20 [1] that uses an older version
/// of `http::StatusCode`, whereas axum >= 0.7 uses the 1.0 release
/// of several HTTP libraries (`http`, `http-body`, `hyper`) [2], which
/// we'd like to use as our canonical representation.
///
/// This utility converts between the old `reqwest::StatusCode` to the
/// >=1.0 implementation. Notably, we do not pull in all of `axum`
/// into the `noosphere-core` crate, only the common underlying
/// crate `http@1.0.0` (or greater).
///
/// [1] https://github.com/subconsciousnetwork/noosphere/issues/686
/// [2] https://github.com/tokio-rs/axum/blob/5b6204168a676497d2f4188af603546d9ebfe20a/axum/CHANGELOG.md#070-27-november-2023
fn translate_status_code(reqwest_code: reqwest::StatusCode) -> Result<StatusCode> {
let code: u16 = reqwest_code.into();
Ok(code.try_into()?)
}
2 changes: 1 addition & 1 deletion rust/noosphere-core/src/api/headers/ucan.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{authority::SUPPORTED_KEYS, data::Jwt};
use anyhow::anyhow;
use axum_extra::headers::{self, Header, HeaderName, HeaderValue};
use cid::Cid;
use headers::{self, Header, HeaderName, HeaderValue};
use once_cell::sync::Lazy;
use ucan::{chain::ProofChain, crypto::did::DidParser, store::UcanJwtStore};

Expand Down
4 changes: 4 additions & 0 deletions rust/noosphere-core/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ pub mod v0alpha2;

pub use client::*;
pub use data::*;

// Re-export `http::StatusCode` here as our preferred `StatusCode` instance,
// disambiguating from other crate's implementations.
pub(crate) use http::StatusCode;
6 changes: 4 additions & 2 deletions rust/noosphere-core/src/api/v0alpha1/data.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::fmt::Display;

use crate::api::data::{empty_string_as_none, AsQuery};
use crate::api::{
data::{empty_string_as_none, AsQuery},
StatusCode,
};
use crate::{
authority::{generate_capability, SphereAbility, SPHERE_SEMANTICS},
data::{Bundle, Did, Jwt, Link, MemoIpld},
Expand All @@ -9,7 +12,6 @@ use crate::{
use anyhow::{anyhow, Result};
use cid::Cid;
use noosphere_storage::{base64_decode, base64_encode};
use reqwest::StatusCode;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use ucan::{
Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-core/src/api/v0alpha2/data.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{
api::StatusCode,
data::{Did, Jwt, Link, MemoIpld},
error::NoosphereError,
};
use reqwest::StatusCode;
use serde::{Deserialize, Serialize};
use thiserror::Error;

Expand Down
4 changes: 2 additions & 2 deletions rust/noosphere-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ noosphere-core = { version = "0.18.0", path = "../noosphere-core", features = ["

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
anyhow = { workspace = true }
axum = { workspace = true, features = ["headers", "macros"] }
axum = { workspace = true, features = ["macros"] }
axum-extra = { workspace = true, features = ["typed-header"] }
axum-tracing-opentelemetry = { workspace = true, optional = true }
iroh-car = { workspace = true }
thiserror = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
bytes = { workspace = true }

tokio = { workspace = true, features = ["full"] }
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
Expand Down
4 changes: 1 addition & 3 deletions rust/noosphere-gateway/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::Json;
use axum::{http::StatusCode, response::IntoResponse, Json};
use noosphere_core::api::v0alpha2::PushError;
use serde::{Deserialize, Serialize};

Expand Down
4 changes: 3 additions & 1 deletion rust/noosphere-gateway/src/extractors/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use anyhow::Result;
use async_trait::async_trait;
use axum::{
extract::FromRequestParts,
headers::{authorization::Bearer, Authorization},
http::{request::Parts, StatusCode},
};
use axum_extra::{
headers::{authorization::Bearer, Authorization},
TypedHeader,
};
use noosphere_core::{
Expand Down
16 changes: 6 additions & 10 deletions rust/noosphere-gateway/src/extractors/cbor.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use async_trait::async_trait;
use axum::{
body::{Bytes, HttpBody},
extract::FromRequest,
http::{header, Request, StatusCode},
body::Bytes,
extract::{FromRequest, Request},
http::{header, StatusCode},
response::IntoResponse,
BoxError,
};
use libipld_cbor::DagCborCodec;
use mime_guess::mime;
Expand All @@ -30,17 +29,14 @@ where
}

#[async_trait]
impl<S, T, B> FromRequest<S, B> for Cbor<T>
impl<S, T> FromRequest<S> for Cbor<T>
where
T: Serialize + DeserializeOwned,
S: Send + Sync,
B: HttpBody + Send + 'static,
B::Data: Send,
B::Error: Into<BoxError>,
{
type Rejection = StatusCode;

async fn from_request(req: Request<B>, state: &S) -> Result<Self, Self::Rejection> {
async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
if !is_octet_stream_content_type(&req) {
return Err(StatusCode::BAD_REQUEST);
}
Expand All @@ -59,7 +55,7 @@ where
}
}

fn is_octet_stream_content_type<B>(req: &Request<B>) -> bool {
fn is_octet_stream_content_type(req: &Request) -> bool {
let content_type = if let Some(content_type) = req.headers().get(header::CONTENT_TYPE) {
content_type
} else {
Expand Down
19 changes: 9 additions & 10 deletions rust/noosphere-gateway/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use anyhow::Result;
use axum::extract::DefaultBodyLimit;
use axum::http::{HeaderValue, Method};
use axum::routing::{get, put};
use axum::{Extension, Router, Server};
use axum::{serve, Extension, Router};
use noosphere_core::api::{v0alpha1, v0alpha2};
use noosphere_core::context::HasMutableSphereContext;
use noosphere_ipfs::KuboClient;
Expand All @@ -31,7 +31,7 @@ type WorkerHandles = Vec<JoinHandle<Result<()>>>;

/// Represents a Noosphere gateway server.
pub struct Gateway {
app: Router,
router: Router,
worker_handles: WorkerHandles,
}

Expand Down Expand Up @@ -83,7 +83,7 @@ impl Gateway {
);
let (cleanup_tx, cleanup_task) = start_cleanup::<M, C, S>(manager.clone());

let app = Router::new()
let router = Router::new()
.route(
&v0alpha1::Route::Did.to_string(),
get(handlers::v0alpha1::did_route),
Expand Down Expand Up @@ -117,27 +117,26 @@ impl Gateway {
.layer(cors);

#[cfg(feature = "observability")]
let app = {
app.layer(OtelInResponseLayer) // include trace context in response
let router = {
router
.layer(OtelInResponseLayer) // include trace context in response
.layer(OtelAxumLayer::default()) // initialize otel trace on incoming request
};

let app = app
let router = router
.layer(TraceLayer::new_for_http())
.with_state(Arc::new(manager));

Ok(Self {
app,
router,
worker_handles: vec![syndication_task, name_system_task, cleanup_task],
})
}

/// Start the gateway server with `listener`, consuming the [Gateway]
/// object until the process terminates or has an unrecoverable error.
pub async fn start(self, listener: TcpListener) -> Result<()> {
Server::from_tcp(listener)?
.serve(self.app.into_make_service())
.await?;
serve(tokio::net::TcpListener::from_std(listener)?, self.router).await?;
for handle in self.worker_handles {
handle.abort();
}
Expand Down
6 changes: 3 additions & 3 deletions rust/noosphere-gateway/src/handlers/v0alpha1/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::pin::Pin;

use anyhow::Result;

use axum::{body::StreamBody, extract::Query, http::StatusCode, Extension};
use axum::{body::Body, extract::Query, http::StatusCode, Extension};
use bytes::Bytes;
use noosphere_core::{
api::v0alpha1::FetchParameters,
Expand All @@ -28,7 +28,7 @@ pub async fn fetch_route<C, S>(
gateway_scope: GatewayScope<C, S>,
Query(FetchParameters { since }): Query<FetchParameters>,
Extension(ipfs_client): Extension<KuboClient>,
) -> Result<StreamBody<impl Stream<Item = Result<Bytes, std::io::Error>>>, StatusCode>
) -> Result<Body, StatusCode>
where
C: HasMutableSphereContext<S>,
S: Storage + 'static,
Expand Down Expand Up @@ -56,7 +56,7 @@ where
StatusCode::INTERNAL_SERVER_ERROR
})?;

Ok(StreamBody::new(stream))
Ok(Body::from_stream(stream))
}

/// Generates a CAR stream that can be used as a the streaming body of a
Expand Down
18 changes: 5 additions & 13 deletions rust/noosphere-gateway/src/handlers/v0alpha1/replicate.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use std::pin::Pin;

use crate::extractors::{GatewayAuthority, GatewayScope, SphereExtractor};
use anyhow::Result;
use axum::{
body::StreamBody,
body::Body,
extract::{Path, Query},
http::StatusCode,
Extension,
};
use bytes::Bytes;
use cid::Cid;
use libipld_cbor::DagCborCodec;
use noosphere_core::api::v0alpha1::ReplicateParameters;
Expand All @@ -19,12 +17,6 @@ use noosphere_core::{
};
use noosphere_ipfs::{IpfsStore, KuboClient};
use noosphere_storage::{BlockStore, BlockStoreRetry, Storage};
use tokio_stream::Stream;

use crate::extractors::{GatewayAuthority, GatewayScope, SphereExtractor};

pub type ReplicationCarStreamBody =
StreamBody<Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>>;

/// Invoke to get a streamed CARv1 response that represents all the blocks
/// needed to manifest the content associated with the given [Cid] path
Expand All @@ -50,7 +42,7 @@ pub async fn replicate_route<C, S>(
include_content,
}): Query<ReplicateParameters>,
Extension(ipfs_client): Extension<KuboClient>,
) -> Result<ReplicationCarStreamBody, StatusCode>
) -> Result<Body, StatusCode>
where
C: HasMutableSphereContext<S>,
S: Storage + 'static,
Expand Down Expand Up @@ -116,7 +108,7 @@ where
// of usage. Maybe somewhere in the ballpark of 1~10k revisions. It
// should be a large-but-finite number.
debug!("Streaming revisions from {} to {}", since, memo_version);
return Ok(StreamBody::new(Box::pin(to_car_stream(
return Ok(Body::from_stream(Box::pin(to_car_stream(
vec![memo_version],
memo_history_stream(store, &memo_version.into(), Some(&since), false),
))));
Expand All @@ -136,7 +128,7 @@ where
debug!("Streaming entire version for {}", memo_version);

// Always fall back to a full replication
Ok(StreamBody::new(Box::pin(to_car_stream(
Ok(Body::from_stream(Box::pin(to_car_stream(
vec![memo_version],
memo_body_stream(store, &memo_version.into(), include_content),
))))
Expand Down
Loading

0 comments on commit 9d9b800

Please sign in to comment.