Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement am proxy command #128

Merged
merged 13 commits into from
Aug 30, 2023
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ target
# This directory is created by the prometheus process
# Could be removed once we persist the data somewhere else
data

.vscode
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Update all depdencies (#124)
- Fix multiarch docker image for arm64 users (#125)
- Update markdown reference generator command to disable TOC (#127)
- Add `am proxy` command (#128)

## [0.3.0]

Expand Down
5 changes: 5 additions & 0 deletions src/bin/am/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use tracing::info;

mod explore;
mod init;
mod proxy;
pub mod start;
pub mod system;
pub mod update;
Expand Down Expand Up @@ -46,6 +47,9 @@ pub enum SubCommands {
#[clap(alias = "explorer")]
Explore(explore::Arguments),

/// Use am as a proxy to another prometheus instance
Proxy(proxy::CliArguments),

/// Create a new `am.toml` file interactively with sensible defaults
Init(init::Arguments),

Expand All @@ -65,6 +69,7 @@ pub async fn handle_command(app: Application, config: AmConfig, mp: MultiProgres
SubCommands::Start(args) => start::handle_command(args, config, mp).await,
SubCommands::System(args) => system::handle_command(args, mp).await,
SubCommands::Explore(args) => explore::handle_command(args).await,
SubCommands::Proxy(args) => proxy::handle_command(args).await,
SubCommands::Init(args) => init::handle_command(args).await,
SubCommands::Discord => {
const URL: &str = "https://discord.gg/kHtwcH8As9";
Expand Down
80 changes: 80 additions & 0 deletions src/bin/am/commands/proxy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use crate::server::start_web_server;
use anyhow::{bail, Context, Result};
use clap::Parser;
use directories::ProjectDirs;
use std::net::SocketAddr;
use tokio::select;
use tokio::sync::watch;
use tracing::info;
use url::Url;

#[derive(Parser, Clone)]
pub struct CliArguments {
/// The listen address for the web server of am.
///
/// This includes am's HTTP API, the explorer and the proxy to the Prometheus, Gateway, etc.
#[clap(
short,
long,
env,
default_value = "127.0.0.1:6789",
alias = "explorer-address"
)]
listen_address: SocketAddr,

/// The upstream Prometheus URL
#[clap(long, env, alias = "prometheus-address")]
prometheus_url: Option<Url>,
}

#[derive(Debug, Clone)]
struct Arguments {
listen_address: SocketAddr,
prometheus_url: Option<Url>,
}

impl Arguments {
fn new(args: CliArguments) -> Self {
Arguments {
listen_address: args.listen_address,
prometheus_url: args.prometheus_url,
}
}
}

pub async fn handle_command(args: CliArguments) -> Result<()> {
let args = Arguments::new(args);

// First let's retrieve the directory for our application to store data in.
let project_dirs =
ProjectDirs::from("", "autometrics", "am").context("Unable to determine home directory")?;
let local_data = project_dirs.data_local_dir().to_owned();

// Make sure that the local data directory exists for our application.
std::fs::create_dir_all(&local_data)
.with_context(|| format!("Unable to create data directory: {:?}", local_data))?;

let (tx, _) = watch::channel(None);

// Start web server for hosting the explorer, am api and proxies to the enabled services.
let web_server_task = async move {
start_web_server(&args.listen_address, false, false, args.prometheus_url, tx).await
};

select! {
biased;

_ = tokio::signal::ctrl_c() => {
info!("SIGINT signal received, exiting...");
Ok(())
}

Err(err) = web_server_task => {
bail!("Web server exited with an error: {err:?}");
}

else => {
Ok(())
}
}
}
12 changes: 10 additions & 2 deletions src/bin/am/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,16 @@ pub async fn handle_command(args: CliArguments, config: AmConfig, mp: MultiProgr
let (tx, rx) = watch::channel(None);

// Start web server for hosting the explorer, am api and proxies to the enabled services.
let web_server_task =
async move { start_web_server(&args.listen_address, args.pushgateway_enabled, tx).await };
let web_server_task = async move {
start_web_server(
&args.listen_address,
true,
args.pushgateway_enabled,
None,
tx,
)
.await
};

// Start Prometheus server
let prometheus_args = args.clone();
Expand Down
63 changes: 59 additions & 4 deletions src/bin/am/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ use axum::response::Redirect;
use axum::routing::{any, get};
use axum::{Router, Server};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::watch::Sender;
use tracing::{debug, info};
use url::Url;

mod explorer;
mod prometheus;
Expand All @@ -14,9 +16,13 @@ mod util;

pub(crate) async fn start_web_server(
listen_address: &SocketAddr,
enable_prometheus: bool,
enable_pushgateway: bool,
prometheus_proxy_url: Option<Url>,
tx: Sender<Option<SocketAddr>>,
) -> Result<()> {
let is_proxying_prometheus = prometheus_proxy_url.is_some();
let should_enable_prometheus = enable_prometheus && !is_proxying_prometheus;
let mut app = Router::new()
// Any calls to the root should be redirected to the explorer which is most likely what the user wants to use.
.route("/", get(|| async { Redirect::temporary("/explorer/") }))
Expand All @@ -32,9 +38,51 @@ pub(crate) async fn start_web_server(
}),
)
.route("/explorer/", get(explorer::handler))
.route("/explorer/*path", get(explorer::handler))
.route("/prometheus/*path", any(prometheus::handler))
.route("/prometheus", any(prometheus::handler));
.route("/explorer/*path", get(explorer::handler));

// Proxy `/prometheus` to the upstream (local) prometheus instance
if should_enable_prometheus {
app = app
.route("/prometheus/*path", any(prometheus::handler))
.route("/prometheus", any(prometheus::handler));
}

// NOTE - this will override local prometheus routes if specified
if is_proxying_prometheus {
let prometheus_upstream_base = Arc::new(prometheus_proxy_url.clone().unwrap());

// Define a handler that will proxy to an external Prometheus instance
let handler = move |mut req: http::Request<Body>| {
let upstream_base = prometheus_upstream_base.clone();
// 1. Get the path and query from the request, since we need to strip out `/prometheus`
let path_and_query = req
.uri()
.path_and_query()
.map(|pq| pq.as_str())
.unwrap_or("");
if let Some(stripped_path) = path_and_query.strip_prefix("/prometheus") {
let stripped_path_str = stripped_path.to_string();
// 2. Remove the `/prometheus` prefix.
let new_path_and_query =
http::uri::PathAndQuery::from_maybe_shared(stripped_path_str)
.expect("Invalid path");

// 3. Create a new URI with the modified path.
let mut new_uri_parts = req.uri().clone().into_parts();
new_uri_parts.path_and_query = Some(new_path_and_query);

let new_uri = http::Uri::from_parts(new_uri_parts).expect("Invalid URI");

// 4. Replace the request's URI with the modified URI.
*req.uri_mut() = new_uri;
}
async move { prometheus::handler_with_url(req, &upstream_base).await }
};

app = app
.route("/prometheus/*path", any(handler.clone()))
.route("/prometheus", any(handler));
}

if enable_pushgateway {
app = app
Expand All @@ -52,7 +100,14 @@ pub(crate) async fn start_web_server(
debug!("Web server listening on {}", server.local_addr());

info!("Explorer endpoint: http://{}", server.local_addr());
info!("Prometheus endpoint: http://127.0.0.1:9090/prometheus");

if should_enable_prometheus {
info!("Prometheus endpoint: http://127.0.0.1:9090/prometheus");
}

if is_proxying_prometheus {
info!("Proxying to prometheus: {}", prometheus_proxy_url.unwrap());
}

if enable_pushgateway {
info!("Pushgateway endpoint: http://127.0.0.1:9091/pushgateway");
Expand Down
9 changes: 8 additions & 1 deletion src/bin/am/server/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ use axum::response::IntoResponse;
use url::Url;

pub(crate) async fn handler(req: http::Request<Body>) -> impl IntoResponse {
let upstream_base = Url::parse("http://localhost:9090").unwrap();
let upstream_base = url::Url::parse("http://localhost:9090").unwrap();
proxy_handler(req, upstream_base).await
}

pub(crate) async fn handler_with_url(
req: http::Request<Body>,
upstream_base: &Url,
) -> impl IntoResponse {
proxy_handler(req, upstream_base.clone()).await
}