Skip to content

Commit

Permalink
fix: auto_create_table without ceresmeta
Browse files Browse the repository at this point in the history
  • Loading branch information
chunshao90 committed May 22, 2023
1 parent 7419d0c commit 61cd280
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 43 deletions.
9 changes: 8 additions & 1 deletion proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use std::{

use ::http::StatusCode;
use catalog::schema::{
CreateOptions, CreateTableRequest, DropOptions, DropTableRequest, SchemaRef,
CreateOptions, CreateTableRequest, DropOptions, DropTableRequest, NameRef, SchemaRef,
};
use ceresdbproto::storage::{
storage_service_client::StorageServiceClient, PrometheusRemoteQueryRequest,
Expand Down Expand Up @@ -70,6 +70,7 @@ pub struct Proxy<Q> {
schema_config_provider: SchemaConfigProviderRef,
hotspot_recorder: Arc<HotspotRecorder>,
engine_runtimes: Arc<EngineRuntimes>,
cluster_with_meta: bool,
}

impl<Q: QueryExecutor + 'static> Proxy<Q> {
Expand All @@ -84,6 +85,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
schema_config_provider: SchemaConfigProviderRef,
hotspot_config: hotspot::Config,
engine_runtimes: Arc<EngineRuntimes>,
cluster_with_meta: bool,
) -> Self {
let forwarder = Arc::new(Forwarder::new(
forward_config,
Expand All @@ -104,13 +106,18 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
schema_config_provider,
hotspot_recorder,
engine_runtimes,
cluster_with_meta,
}
}

pub fn instance(&self) -> InstanceRef<Q> {
self.instance.clone()
}

fn default_catalog_name(&self) -> NameRef {
self.instance.catalog_manager.default_catalog_name()
}

async fn maybe_forward_prom_remote_query(
&self,
metric: String,
Expand Down
204 changes: 166 additions & 38 deletions proxy/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ use common_types::{
time::Timestamp,
};
use common_util::error::BoxError;
use futures::{future::try_join_all, FutureExt};
use futures::{
future::{try_join_all, BoxFuture},
FutureExt,
};
use http::StatusCode;
use interpreters::interpreter::Output;
use log::{debug, error, info};
Expand Down Expand Up @@ -65,6 +68,29 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
&self,
ctx: Context,
req: WriteRequest,
) -> Result<WriteResponse> {
let write_context = req.context.clone();
let resp = if self.cluster_with_meta {
self.handle_with_meta_write(ctx, req).await?
} else {
self.handle_without_meta_write(ctx, req).await?
};

debug!(
"Grpc handle write finished, write_context:{:?}, resp:{:?}",
write_context, resp
);
Ok(resp)
}

// Handle write requests based on ceresmeta.
// 1. Create table via ceresmeta if it does not exist.
// 2. Split write request.
// 3. Process write.
async fn handle_with_meta_write(
&self,
ctx: Context,
req: WriteRequest,
) -> Result<WriteResponse> {
let request_id = RequestId::next_id();

Expand All @@ -73,7 +99,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
code: StatusCode::BAD_REQUEST,
})?;

self.handle_auto_create_table(request_id, &write_context.database, &req)
self.handle_auto_create_table_with_meta(request_id, &write_context.database, &req)
.await?;

let (write_request_to_local, write_requests_to_forward) =
Expand All @@ -82,47 +108,53 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
let mut futures = Vec::with_capacity(write_requests_to_forward.len() + 1);

// Write to remote.
for (endpoint, table_write_request) in write_requests_to_forward {
let forwarder = self.forwarder.clone();
let write_handle = self.engine_runtimes.io_runtime.spawn(async move {
Self::write_to_remote(forwarder, endpoint, table_write_request).await
});

futures.push(write_handle.boxed());
}
self.collect_write_to_remote_future(&mut futures, write_requests_to_forward)
.await;

// Write to local.
if !write_request_to_local.table_requests.is_empty() {
let local_handle = async move {
Ok(self
.write_to_local(ctx, request_id, write_request_to_local)
.await)
};
futures.push(local_handle.boxed());
}
self.collect_write_to_local_future(&mut futures, ctx, request_id, write_request_to_local)
.await;

let resps = try_join_all(futures)
.await
.box_err()
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Failed to join task",
})?;
self.collect_write_response(futures).await
}

debug!(
"Grpc handle write finished, schema:{}, resps:{:?}",
write_context.database, resps
);
// Handle write requests without ceresmeta.
// 1. Split write request.
// 2. Create table if not exist.
// 3. Process write.
async fn handle_without_meta_write(
&self,
ctx: Context,
req: WriteRequest,
) -> Result<WriteResponse> {
let request_id = RequestId::next_id();

let mut success = 0;
for resp in resps {
success += resp?.success;
}
let write_context = req.context.clone().context(ErrNoCause {
msg: "Missing context",
code: StatusCode::BAD_REQUEST,
})?;
let (write_request_to_local, write_requests_to_forward) =
self.split_write_request(req).await?;

Ok(WriteResponse {
success,
..Default::default()
})
let mut futures = Vec::with_capacity(write_requests_to_forward.len() + 1);

// Write to remote.
self.collect_write_to_remote_future(&mut futures, write_requests_to_forward)
.await;

// Create table.
self.handle_auto_create_table_without_meta(
request_id,
&write_request_to_local,
&write_context.database,
)
.await?;

// Write to local.
self.collect_write_to_local_future(&mut futures, ctx, request_id, write_request_to_local)
.await;

self.collect_write_response(futures).await
}

async fn split_write_request(
Expand Down Expand Up @@ -192,6 +224,59 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
Ok((table_requests_to_local, table_requests_to_forward))
}

async fn collect_write_to_remote_future(
&self,
futures: &mut Vec<BoxFuture<'_, common_util::runtime::Result<Result<WriteResponse>>>>,
write_request: HashMap<Endpoint, WriteRequest>,
) {
for (endpoint, table_write_request) in write_request {
let forwarder = self.forwarder.clone();
let write_handle = self.engine_runtimes.io_runtime.spawn(async move {
Self::write_to_remote(forwarder, endpoint, table_write_request).await
});

futures.push(write_handle.boxed());
}
}

#[inline]
async fn collect_write_to_local_future<'a>(
&'a self,
futures: &mut Vec<BoxFuture<'a, common_util::runtime::Result<Result<WriteResponse>>>>,
ctx: Context,
request_id: RequestId,
write_request: WriteRequest,
) {
if !write_request.table_requests.is_empty() {
let local_handle =
async move { Ok(self.write_to_local(ctx, request_id, write_request).await) };
futures.push(local_handle.boxed());
}
}

async fn collect_write_response(
&self,
futures: Vec<BoxFuture<'_, common_util::runtime::Result<Result<WriteResponse>>>>,
) -> Result<WriteResponse> {
let resps = try_join_all(futures)
.await
.box_err()
.context(ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Failed to join task",
})?;

let mut success = 0;
for resp in resps {
success += resp?.success;
}

Ok(WriteResponse {
success,
..Default::default()
})
}

async fn write_to_remote(
forwarder: ForwarderRef,
endpoint: Endpoint,
Expand Down Expand Up @@ -418,7 +503,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
})
}

async fn handle_auto_create_table(
async fn handle_auto_create_table_with_meta(
&self,
request_id: RequestId,
schema: &str,
Expand Down Expand Up @@ -462,6 +547,49 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
Ok(())
}

async fn handle_auto_create_table_without_meta(
&self,
request_id: RequestId,
write_request: &WriteRequest,
schema: &str,
) -> Result<()> {
let table_names = write_request
.table_requests
.iter()
.map(|v| v.table.clone())
.collect::<Vec<String>>();

let catalog = self.default_catalog_name();
let schema_config = self
.schema_config_provider
.schema_config(schema)
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Fail to fetch schema config, schema:{schema}"),
})?
.cloned()
.unwrap_or_default();

if self.auto_create_table {
for (idx, table_name) in table_names.iter().enumerate() {
let table = self.try_get_table(catalog, schema, table_name)?;
if table.is_none() {
self.create_table(
request_id,
catalog,
schema,
&write_request.table_requests[idx],
&schema_config,
None,
)
.await?;
}
}
}
Ok(())
}

async fn create_table(
&self,
request_id: RequestId,
Expand Down
1 change: 1 addition & 0 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
provider.clone(),
self.server_config.hotspot,
engine_runtimes.clone(),
self.cluster.is_some(),
));

let http_service = http::Builder::new(http_config)
Expand Down
8 changes: 4 additions & 4 deletions src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,28 +132,28 @@ async fn run_server_with_runtimes<T>(
.function_registry(function_registry)
.limiter(limiter);

let engine_builder = T::default();
let wal_builder = T::default();
let builder = match &config.cluster_deployment {
None => {
build_without_meta(
&config,
&StaticRouteConfig::default(),
builder,
engine_runtimes.clone(),
engine_builder,
wal_builder,
)
.await
}
Some(ClusterDeployment::NoMeta(v)) => {
build_without_meta(&config, v, builder, engine_runtimes.clone(), engine_builder).await
build_without_meta(&config, v, builder, engine_runtimes.clone(), wal_builder).await
}
Some(ClusterDeployment::WithMeta(cluster_config)) => {
build_with_meta(
&config,
cluster_config,
builder,
engine_runtimes.clone(),
engine_builder,
wal_builder,
)
.await
}
Expand Down

0 comments on commit 61cd280

Please sign in to comment.