Skip to content

Commit

Permalink
feat: add dynamic setting log level (apache#445)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZuLiangWang committed Dec 5, 2022
1 parent 10eff69 commit 01c6799
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 32 deletions.
72 changes: 59 additions & 13 deletions server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
};

use log::error;
use logger::RuntimeLevel;
use profile::Profiler;
use query_engine::executor::Executor as QueryExecutor;
use serde_derive::Serialize;
Expand Down Expand Up @@ -41,8 +42,14 @@ pub enum Error {
source: Box<crate::handlers::error::Error>,
},

#[snafu(display("Missing runtimes to build service.\nBacktrace:\n{}", backtrace))]
MissingRuntimes { backtrace: Backtrace },
#[snafu(display("Failed to handle update log level, err:{}", msg))]
HandleUpdateLogLevel { msg: String },

#[snafu(display("Missing engine runtimes to build service.\nBacktrace:\n{}", backtrace))]
MissingEngineRuntimes { backtrace: Backtrace },

#[snafu(display("Missing log runtime to build service.\nBacktrace:\n{}", backtrace))]
MissingLogRuntime { backtrace: Backtrace },

#[snafu(display("Missing instance to build service.\nBacktrace:\n{}", backtrace))]
MissingInstance { backtrace: Backtrace },
Expand Down Expand Up @@ -88,7 +95,8 @@ const MAX_BODY_SIZE: u64 = 4096;
///
/// Note that the service does not owns the runtime
pub struct Service<Q> {
runtimes: Arc<EngineRuntimes>,
engine_runtimes: Arc<EngineRuntimes>,
log_runtime: Arc<RuntimeLevel>,
instance: InstanceRef<Q>,
profiler: Arc<Profiler>,
tx: Sender<()>,
Expand All @@ -109,6 +117,7 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
.or(self.heap_profile())
.or(self.admin_block())
.or(self.flush_memtable())
.or(self.update_log_level())
}

fn home(&self) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
Expand Down Expand Up @@ -230,6 +239,25 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
)
}

fn update_log_level(
&self,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
warp::path!("log_level" / String)
.and(warp::put())
.and(self.with_log_runtime())
.and_then(
|log_level: String, log_runtime: Arc<RuntimeLevel>| async move {
let result = log_runtime
.set_level_by_str(log_level.as_str())
.map_err(|e| Error::HandleUpdateLogLevel { msg: e });
match result {
Ok(()) => Ok(reply::reply()),
Err(e) => Err(reject::custom(e)),
}
},
)
}

fn with_context(
&self,
) -> impl Filter<Extract = (RequestContext,), Error = warp::Rejection> + Clone {
Expand All @@ -244,7 +272,7 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
.default_schema_name()
.to_string();
//TODO(boyan) use read/write runtime by sql type.
let runtime = self.runtimes.bg_runtime.clone();
let runtime = self.engine_runtimes.bg_runtime.clone();

header::optional::<String>(consts::CATALOG_HEADER)
.and(header::optional::<String>(consts::TENANT_HEADER))
Expand Down Expand Up @@ -277,6 +305,13 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
warp::any().map(move || instance.clone())
}

fn with_log_runtime(
&self,
) -> impl Filter<Extract = (Arc<RuntimeLevel>,), Error = Infallible> + Clone {
let log_runtime = self.log_runtime.clone();
warp::any().map(move || log_runtime.clone())
}

fn admin_block(
&self,
) -> impl Filter<Extract = impl warp::Reply, Error = warp::Rejection> + Clone {
Expand Down Expand Up @@ -305,21 +340,28 @@ impl<Q: QueryExecutor + 'static> Service<Q> {
/// Service builder
pub struct Builder<Q> {
endpoint: Endpoint,
runtimes: Option<Arc<EngineRuntimes>>,
engine_runtimes: Option<Arc<EngineRuntimes>>,
log_runtime: Option<Arc<RuntimeLevel>>,
instance: Option<InstanceRef<Q>>,
}

impl<Q> Builder<Q> {
pub fn new(endpoint: Endpoint) -> Self {
Self {
endpoint,
runtimes: None,
engine_runtimes: None,
log_runtime: None,
instance: None,
}
}

pub fn runtimes(mut self, runtimes: Arc<EngineRuntimes>) -> Self {
self.runtimes = Some(runtimes);
pub fn engine_runtimes(mut self, engine_runtimes: Arc<EngineRuntimes>) -> Self {
self.engine_runtimes = Some(engine_runtimes);
self
}

pub fn log_runtime(mut self, log_runtime: Arc<RuntimeLevel>) -> Self {
self.log_runtime = Some(log_runtime);
self
}

Expand All @@ -332,12 +374,14 @@ impl<Q> Builder<Q> {
impl<Q: QueryExecutor + 'static> Builder<Q> {
/// Build and start the service
pub fn build(self) -> Result<Service<Q>> {
let runtimes = self.runtimes.context(MissingRuntimes)?;
let engine_runtime = self.engine_runtimes.context(MissingEngineRuntimes)?;
let log_runtime = self.log_runtime.context(MissingLogRuntime)?;
let instance = self.instance.context(MissingInstance)?;
let (tx, rx) = oneshot::channel();

let service = Service {
runtimes: runtimes.clone(),
engine_runtimes: engine_runtime.clone(),
log_runtime,
instance,
profiler: Arc::new(Profiler::default()),
tx,
Expand All @@ -354,7 +398,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
rx.await.ok();
});
// Run the service
runtimes.bg_runtime.spawn(server);
engine_runtime.bg_runtime.spawn(server);

Ok(service)
}
Expand All @@ -371,12 +415,14 @@ fn error_to_status_code(err: &Error) -> StatusCode {
Error::CreateContext { .. } => StatusCode::BAD_REQUEST,
// TODO(yingwen): Map handle request error to more accurate status code
Error::HandleRequest { .. }
| Error::MissingRuntimes { .. }
| Error::MissingEngineRuntimes { .. }
| Error::MissingLogRuntime { .. }
| Error::MissingInstance { .. }
| Error::ParseIpAddr { .. }
| Error::ProfileHeap { .. }
| Error::Internal { .. }
| Error::JoinAsyncTask { .. } => StatusCode::INTERNAL_SERVER_ERROR,
| Error::JoinAsyncTask { .. }
| Error::HandleUpdateLogLevel { .. } => StatusCode::INTERNAL_SERVER_ERROR,
}
}

Expand Down
33 changes: 23 additions & 10 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use cluster::ClusterRef;
use df_operator::registry::FunctionRegistryRef;
use interpreters::table_manipulator::TableManipulatorRef;
use log::warn;
use logger::RuntimeLevel;
use query_engine::executor::Executor as QueryExecutor;
use snafu::{Backtrace, OptionExt, ResultExt, Snafu};
use table_engine::engine::{EngineRuntimes, TableEngineRef};
Expand All @@ -27,8 +28,11 @@ use crate::{

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Missing runtimes.\nBacktrace:\n{}", backtrace))]
MissingRuntimes { backtrace: Backtrace },
#[snafu(display("Missing engine runtimes.\nBacktrace:\n{}", backtrace))]
MissingEngineRuntimes { backtrace: Backtrace },

#[snafu(display("Missing log runtime.\nBacktrace:\n{}", backtrace))]
MissingLogRuntime { backtrace: Backtrace },

#[snafu(display("Missing router.\nBacktrace:\n{}", backtrace))]
MissingRouter { backtrace: Backtrace },
Expand Down Expand Up @@ -140,7 +144,8 @@ impl<Q: QueryExecutor + 'static> Server<Q> {
#[must_use]
pub struct Builder<Q> {
config: Config,
runtimes: Option<Arc<EngineRuntimes>>,
engine_runtimes: Option<Arc<EngineRuntimes>>,
log_runtime: Option<Arc<RuntimeLevel>>,
catalog_manager: Option<ManagerRef>,
query_executor: Option<Q>,
table_engine: Option<TableEngineRef>,
Expand All @@ -156,7 +161,8 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
pub fn new(config: Config) -> Self {
Self {
config,
runtimes: None,
engine_runtimes: None,
log_runtime: None,
catalog_manager: None,
query_executor: None,
table_engine: None,
Expand All @@ -169,8 +175,13 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
}
}

pub fn runtimes(mut self, runtimes: Arc<EngineRuntimes>) -> Self {
self.runtimes = Some(runtimes);
pub fn engine_runtimes(mut self, engine_runtimes: Arc<EngineRuntimes>) -> Self {
self.engine_runtimes = Some(engine_runtimes);
self
}

pub fn log_runtime(mut self, log_runtime: Arc<RuntimeLevel>) -> Self {
self.log_runtime = Some(log_runtime);
self
}

Expand Down Expand Up @@ -250,9 +261,11 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
};

// Start http service
let runtimes = self.runtimes.context(MissingRuntimes)?;
let engine_runtimes = self.engine_runtimes.context(MissingEngineRuntimes)?;
let log_runtime = self.log_runtime.context(MissingLogRuntime)?;
let http_service = http::Builder::new(http_config)
.runtimes(runtimes.clone())
.engine_runtimes(engine_runtimes.clone())
.log_runtime(log_runtime)
.instance(instance.clone())
.build()
.context(StartHttpService)?;
Expand All @@ -263,7 +276,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
};

let mysql_service = mysql::Builder::new(mysql_config)
.runtimes(runtimes.clone())
.runtimes(engine_runtimes.clone())
.instance(instance.clone())
.build()
.context(BuildMysqlService)?;
Expand All @@ -274,7 +287,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
.context(MissingSchemaConfigProvider)?;
let rpc_services = grpc::Builder::new()
.endpoint(Endpoint::new(self.config.bind_addr, self.config.grpc_port).to_string())
.runtimes(runtimes)
.runtimes(engine_runtimes)
.instance(instance.clone())
.router(router)
.cluster(self.cluster.clone())
Expand Down
4 changes: 2 additions & 2 deletions src/bin/ceresdb-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ fn main() {
}

// Setup log.
let _runtime_level = setup::setup_log(&config);
let runtime_level = setup::setup_log(&config);
// Setup tracing.
let _writer_guard = setup::setup_tracing(&config);

Expand All @@ -73,5 +73,5 @@ fn main() {
// Log version.
info!("version:{}", version);

setup::run_server(config);
setup::run_server(config, runtime_level);
}
34 changes: 27 additions & 7 deletions src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,31 +74,50 @@ fn build_engine_runtimes(config: &RuntimeConfig) -> EngineRuntimes {
}

/// Run a server, returns when the server is shutdown by user
pub fn run_server(config: Config) {
pub fn run_server(config: Config, log_runtime: RuntimeLevel) {
let runtimes = Arc::new(build_engine_runtimes(&config.runtime));
let engine_runtimes = runtimes.clone();
let log_runtime = Arc::new(log_runtime);

info!("Server starts up, config:{:#?}", config);

runtimes.bg_runtime.block_on(async {
match config.analytic.wal_storage {
WalStorageConfig::RocksDB => {
run_server_with_runtimes::<RocksDBWalEngineBuilder>(config, engine_runtimes).await
run_server_with_runtimes::<RocksDBWalEngineBuilder>(
config,
engine_runtimes,
log_runtime,
)
.await
}

WalStorageConfig::Obkv(_) => {
run_server_with_runtimes::<ObkvWalEngineBuilder>(config, engine_runtimes).await;
run_server_with_runtimes::<ObkvWalEngineBuilder>(
config,
engine_runtimes,
log_runtime,
)
.await;
}

WalStorageConfig::Kafka(_) => {
run_server_with_runtimes::<KafkaWalEngineBuilder>(config, engine_runtimes).await;
run_server_with_runtimes::<KafkaWalEngineBuilder>(
config,
engine_runtimes,
log_runtime,
)
.await;
}
}
});
}

async fn run_server_with_runtimes<T>(config: Config, runtimes: Arc<EngineRuntimes>)
where
async fn run_server_with_runtimes<T>(
config: Config,
runtimes: Arc<EngineRuntimes>,
log_runtime: Arc<RuntimeLevel>,
) where
T: EngineBuilder,
{
// Build all table engine
Expand Down Expand Up @@ -132,7 +151,8 @@ where
let limiter = Limiter::new(config.limiter.clone());

let builder = Builder::new(config.clone())
.runtimes(runtimes.clone())
.engine_runtimes(runtimes.clone())
.log_runtime(log_runtime.clone())
.query_executor(query_executor)
.table_engine(engine_proxy.clone())
.function_registry(function_registry)
Expand Down

0 comments on commit 01c6799

Please sign in to comment.