Skip to content
This repository has been archived by the owner on Sep 21, 2024. It is now read-only.

chore: Update axum 0.7 #778

Merged
merged 1 commit into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 170 additions & 65 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ resolver = "2"
anyhow = { version = "1" }
async-recursion = { version = "1" }
async-stream = { version = "0.3" }
axum = { version = "^0.6.18" }
axum-tracing-opentelemetry = { version = "0.15.0" }
axum = { version = "^0.7.3" }
axum-tracing-opentelemetry = { version = "0.16.0" }
axum-extra = { version = "^0.9.1" }
base64 = { version = "^0.21" }
byteorder = { version = "~1.4" } # keep in sync with pinned libipld-* crates
bytes = { version = "^1" }
Expand All @@ -33,7 +34,6 @@ futures = { version = "0.3" }
futures-util = { version = "0.3" }
gloo-net = { version = "0.4" }
gloo-timers = { version = "0.3", features = ["futures"] }
headers = { version = "=0.3.8" } # Match version used by `axum`.
ignore = { version = "0.4.20" }
instant = { version = "0.1", features = ["wasm-bindgen"] }
iroh-car = { version = "^0.3.0" }
Expand Down Expand Up @@ -61,7 +61,7 @@ tokio = { version = "^1" }
tokio-stream = { version = "~0.1" }
tokio-util = { version = "0.7" }
tower = { version = "^0.4.13" }
tower-http = { version = "^0.4.3" }
tower-http = { version = "^0.5" }
tracing = { version = "0.1" }
tracing-subscriber = { version = "~0.3.18", features = ["env-filter", "tracing-log", "json"] }
ucan = { version = "0.4.0" }
Expand Down
4 changes: 2 additions & 2 deletions rust/noosphere-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ url = { workspace = true, features = ["serde"] }
async-trait = "~0.1"
async-recursion = { workspace = true }
async-stream = { workspace = true }

# NOTE: async-once-cell 0.4.0 shipped unstable feature usage
async-once-cell = "~0.4"
anyhow = { workspace = true }
http = { version = "^1" } # Keep in sync with `axum`
axum-extra = { workspace = true, features = ["typed-header"] }
bytes = { workspace = true }
instant = { workspace = true }
iroh-car = { workspace = true }
Expand All @@ -56,7 +57,6 @@ reqwest = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
tokio-util = { workspace = true, features = ["io"] }
headers = { workspace = true }
noosphere-common = { version = "0.1.2", path = "../noosphere-common" }
noosphere-storage = { version = "0.10.0", path = "../noosphere-storage" }
noosphere-collections = { version = "0.7.0", path = "../noosphere-collections" }
Expand Down
31 changes: 26 additions & 5 deletions rust/noosphere-core/src/api/client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::str::FromStr;

use crate::{
api::{route::RouteUrl, v0alpha1, v0alpha2},
api::{route::RouteUrl, v0alpha1, v0alpha2, StatusCode},
error::NoosphereError,
stream::{from_car_stream, memo_history_stream, put_block_stream, to_car_stream},
};
Expand All @@ -18,7 +18,7 @@ use crate::{
data::{Link, MemoIpld},
};
use noosphere_storage::{block_deserialize, block_serialize, BlockStore};
use reqwest::{header::HeaderMap, StatusCode};
use reqwest::header::HeaderMap;
use tokio_stream::{Stream, StreamExt};
use tokio_util::io::StreamReader;
use ucan::{
Expand Down Expand Up @@ -93,7 +93,7 @@ where
client.get(url).send().await?
};

match did_response.status() {
match translate_status_code(did_response.status())? {
StatusCode::OK => (),
_ => return Err(anyhow!("Unable to look up gateway identity")),
};
Expand Down Expand Up @@ -475,8 +475,10 @@ where
v0alpha2::PushError::BrokenUpstream
})?;

trace!("Checking response...");
if response.status() == StatusCode::CONFLICT {
let status = translate_status_code(response.status())?;
trace!("Checking response ({})...", status);

if status == StatusCode::CONFLICT {
return Err(v0alpha2::PushError::Conflict);
}

Expand Down Expand Up @@ -529,3 +531,22 @@ where
Ok(push_response)
}
}

/// Both `reqwest` and `axum` re-export `StatusCode` from the `http` crate.
///
/// We're stuck on reqwest@0.11.20 [1] that uses an older version
/// of `http::StatusCode`, whereas axum >= 0.7 uses the 1.0 release
/// of several HTTP libraries (`http`, `http-body`, `hyper`) [2], which
/// we'd like to use as our canonical representation.
///
/// This utility converts between the old `reqwest::StatusCode` to the
/// >=1.0 implementation. Notably, we do not pull in all of `axum`
/// into the `noosphere-core` crate, only the common underlying
/// crate `http@1.0.0` (or greater).
///
/// [1] https://github.com/subconsciousnetwork/noosphere/issues/686
/// [2] https://github.com/tokio-rs/axum/blob/5b6204168a676497d2f4188af603546d9ebfe20a/axum/CHANGELOG.md#070-27-november-2023
jsantell marked this conversation as resolved.
Show resolved Hide resolved
fn translate_status_code(reqwest_code: reqwest::StatusCode) -> Result<StatusCode> {
let code: u16 = reqwest_code.into();
Ok(code.try_into()?)
}
5 changes: 1 addition & 4 deletions rust/noosphere-core/src/api/headers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
//! A collection of typed [headers::Header] implementations
//! A collection of typed [axum_extra::headers::Header] implementations
//! used in gateway APIs.

#[cfg(doc)]
use headers;

mod ucan;

pub use self::ucan::*;
2 changes: 1 addition & 1 deletion rust/noosphere-core/src/api/headers/ucan.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{authority::SUPPORTED_KEYS, data::Jwt};
use anyhow::anyhow;
use axum_extra::headers::{self, Header, HeaderName, HeaderValue};
use cid::Cid;
use headers::{self, Header, HeaderName, HeaderValue};
use once_cell::sync::Lazy;
use ucan::{chain::ProofChain, crypto::did::DidParser, store::UcanJwtStore};

Expand Down
4 changes: 4 additions & 0 deletions rust/noosphere-core/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,7 @@ pub mod v0alpha2;

pub use client::*;
pub use data::*;

// Re-export `http::StatusCode` here as our preferred `StatusCode` instance,
// disambiguating from other crate's implementations.
pub(crate) use http::StatusCode;
6 changes: 4 additions & 2 deletions rust/noosphere-core/src/api/v0alpha1/data.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::fmt::Display;

use crate::api::data::{empty_string_as_none, AsQuery};
use crate::api::{
data::{empty_string_as_none, AsQuery},
StatusCode,
};
use crate::{
authority::{generate_capability, SphereAbility, SPHERE_SEMANTICS},
data::{Bundle, Did, Jwt, Link, MemoIpld},
Expand All @@ -9,7 +12,6 @@ use crate::{
use anyhow::{anyhow, Result};
use cid::Cid;
use noosphere_storage::{base64_decode, base64_encode};
use reqwest::StatusCode;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use ucan::{
Expand Down
2 changes: 1 addition & 1 deletion rust/noosphere-core/src/api/v0alpha2/data.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{
api::StatusCode,
data::{Did, Jwt, Link, MemoIpld},
error::NoosphereError,
};
use reqwest::StatusCode;
use serde::{Deserialize, Serialize};
use thiserror::Error;

Expand Down
4 changes: 2 additions & 2 deletions rust/noosphere-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ noosphere-core = { version = "0.18.0", path = "../noosphere-core", features = ["

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
anyhow = { workspace = true }
axum = { workspace = true, features = ["headers", "macros"] }
axum = { workspace = true, features = ["macros"] }
axum-extra = { workspace = true, features = ["typed-header"] }
axum-tracing-opentelemetry = { workspace = true, optional = true }
iroh-car = { workspace = true }
thiserror = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
bytes = { workspace = true }

tokio = { workspace = true, features = ["full"] }
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
Expand Down
4 changes: 1 addition & 3 deletions rust/noosphere-gateway/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use axum::http::StatusCode;
use axum::response::IntoResponse;
use axum::Json;
use axum::{http::StatusCode, response::IntoResponse, Json};
use noosphere_core::api::v0alpha2::PushError;
use serde::{Deserialize, Serialize};

Expand Down
4 changes: 3 additions & 1 deletion rust/noosphere-gateway/src/extractors/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ use anyhow::Result;
use async_trait::async_trait;
use axum::{
extract::FromRequestParts,
headers::{authorization::Bearer, Authorization},
http::{request::Parts, StatusCode},
};
use axum_extra::{
headers::{authorization::Bearer, Authorization},
TypedHeader,
};
use noosphere_core::{
Expand Down
16 changes: 6 additions & 10 deletions rust/noosphere-gateway/src/extractors/cbor.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use async_trait::async_trait;
use axum::{
body::{Bytes, HttpBody},
extract::FromRequest,
http::{header, Request, StatusCode},
body::Bytes,
extract::{FromRequest, Request},
http::{header, StatusCode},
response::IntoResponse,
BoxError,
};
use libipld_cbor::DagCborCodec;
use mime_guess::mime;
Expand All @@ -30,17 +29,14 @@ where
}

#[async_trait]
impl<S, T, B> FromRequest<S, B> for Cbor<T>
impl<S, T> FromRequest<S> for Cbor<T>
where
T: Serialize + DeserializeOwned,
S: Send + Sync,
B: HttpBody + Send + 'static,
B::Data: Send,
B::Error: Into<BoxError>,
{
type Rejection = StatusCode;

async fn from_request(req: Request<B>, state: &S) -> Result<Self, Self::Rejection> {
async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
if !is_octet_stream_content_type(&req) {
return Err(StatusCode::BAD_REQUEST);
}
Expand All @@ -59,7 +55,7 @@ where
}
}

fn is_octet_stream_content_type<B>(req: &Request<B>) -> bool {
fn is_octet_stream_content_type(req: &Request) -> bool {
let content_type = if let Some(content_type) = req.headers().get(header::CONTENT_TYPE) {
content_type
} else {
Expand Down
23 changes: 13 additions & 10 deletions rust/noosphere-gateway/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use anyhow::Result;
use axum::extract::DefaultBodyLimit;
use axum::http::{HeaderValue, Method};
use axum::routing::{get, put};
use axum::{Extension, Router, Server};
use axum::{serve, Extension, Router};
use noosphere_core::api::{v0alpha1, v0alpha2};
use noosphere_core::context::HasMutableSphereContext;
use noosphere_ipfs::KuboClient;
Expand All @@ -31,7 +31,7 @@ type WorkerHandles = Vec<JoinHandle<Result<()>>>;

/// Represents a Noosphere gateway server.
pub struct Gateway {
app: Router,
router: Router,
worker_handles: WorkerHandles,
}

Expand Down Expand Up @@ -83,7 +83,7 @@ impl Gateway {
);
let (cleanup_tx, cleanup_task) = start_cleanup::<M, C, S>(manager.clone());

let app = Router::new()
let router = Router::new()
.route(
&v0alpha1::Route::Did.to_string(),
get(handlers::v0alpha1::did_route),
Expand Down Expand Up @@ -117,27 +117,30 @@ impl Gateway {
.layer(cors);

#[cfg(feature = "observability")]
let app = {
app.layer(OtelInResponseLayer) // include trace context in response
let router = {
router
.layer(OtelInResponseLayer) // include trace context in response
.layer(OtelAxumLayer::default()) // initialize otel trace on incoming request
};

let app = app
let router = router
.layer(TraceLayer::new_for_http())
.with_state(Arc::new(manager));

Ok(Self {
app,
router,
worker_handles: vec![syndication_task, name_system_task, cleanup_task],
})
}

/// Start the gateway server with `listener`, consuming the [Gateway]
/// object until the process terminates or has an unrecoverable error.
pub async fn start(self, listener: TcpListener) -> Result<()> {
Server::from_tcp(listener)?
.serve(self.app.into_make_service())
.await?;
// Listener must be set to nonblocking
// https://docs.rs/tokio/latest/tokio/net/struct.TcpListener.html#method.from_std
listener.set_nonblocking(true)?;
let tokio_listener = tokio::net::TcpListener::from_std(listener)?;
serve(tokio_listener, self.router.into_make_service()).await?;
for handle in self.worker_handles {
handle.abort();
}
Expand Down
6 changes: 3 additions & 3 deletions rust/noosphere-gateway/src/handlers/v0alpha1/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::pin::Pin;

use anyhow::Result;

use axum::{body::StreamBody, extract::Query, http::StatusCode, Extension};
use axum::{body::Body, extract::Query, http::StatusCode, Extension};
use bytes::Bytes;
use noosphere_core::{
api::v0alpha1::FetchParameters,
Expand All @@ -28,7 +28,7 @@ pub async fn fetch_route<C, S>(
gateway_scope: GatewayScope<C, S>,
Query(FetchParameters { since }): Query<FetchParameters>,
Extension(ipfs_client): Extension<KuboClient>,
) -> Result<StreamBody<impl Stream<Item = Result<Bytes, std::io::Error>>>, StatusCode>
) -> Result<Body, StatusCode>
where
C: HasMutableSphereContext<S>,
S: Storage + 'static,
Expand Down Expand Up @@ -56,7 +56,7 @@ where
StatusCode::INTERNAL_SERVER_ERROR
})?;

Ok(StreamBody::new(stream))
Ok(Body::from_stream(stream))
}

/// Generates a CAR stream that can be used as a the streaming body of a
Expand Down
18 changes: 5 additions & 13 deletions rust/noosphere-gateway/src/handlers/v0alpha1/replicate.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use std::pin::Pin;

use crate::extractors::{GatewayAuthority, GatewayScope, SphereExtractor};
use anyhow::Result;
use axum::{
body::StreamBody,
body::Body,
extract::{Path, Query},
http::StatusCode,
Extension,
};
use bytes::Bytes;
use cid::Cid;
use libipld_cbor::DagCborCodec;
use noosphere_core::api::v0alpha1::ReplicateParameters;
Expand All @@ -19,12 +17,6 @@ use noosphere_core::{
};
use noosphere_ipfs::{IpfsStore, KuboClient};
use noosphere_storage::{BlockStore, BlockStoreRetry, Storage};
use tokio_stream::Stream;

use crate::extractors::{GatewayAuthority, GatewayScope, SphereExtractor};

pub type ReplicationCarStreamBody =
StreamBody<Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>>;

/// Invoke to get a streamed CARv1 response that represents all the blocks
/// needed to manifest the content associated with the given [Cid] path
Expand All @@ -50,7 +42,7 @@ pub async fn replicate_route<C, S>(
include_content,
}): Query<ReplicateParameters>,
Extension(ipfs_client): Extension<KuboClient>,
) -> Result<ReplicationCarStreamBody, StatusCode>
) -> Result<Body, StatusCode>
where
C: HasMutableSphereContext<S>,
S: Storage + 'static,
Expand Down Expand Up @@ -116,7 +108,7 @@ where
// of usage. Maybe somewhere in the ballpark of 1~10k revisions. It
// should be a large-but-finite number.
debug!("Streaming revisions from {} to {}", since, memo_version);
return Ok(StreamBody::new(Box::pin(to_car_stream(
return Ok(Body::from_stream(Box::pin(to_car_stream(
vec![memo_version],
memo_history_stream(store, &memo_version.into(), Some(&since), false),
))));
Expand All @@ -136,7 +128,7 @@ where
debug!("Streaming entire version for {}", memo_version);

// Always fall back to a full replication
Ok(StreamBody::new(Box::pin(to_car_stream(
Ok(Body::from_stream(Box::pin(to_car_stream(
vec![memo_version],
memo_body_stream(store, &memo_version.into(), include_content),
))))
Expand Down
Loading
Loading