Skip to content

Commit

Permalink
Add Loadbalance trait (#199)
Browse files Browse the repository at this point in the history
* refact(ld): add LoadBalancer trait, impl random ld

* refact(loadbalance): update ld api design

* style: cargo fmt

* example(greet): update client

* chore: add tag field

* style: cargo fmt

* refact(dubbo): use target_family

* style: cargo check
  • Loading branch information
yang20150702 committed Jul 2, 2024
1 parent a68707a commit 0e2050a
Show file tree
Hide file tree
Showing 11 changed files with 218 additions and 134 deletions.
1 change: 1 addition & 0 deletions application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dubbo:
provider:
services:
GreeterProvider:
tag: red
version: 1.0.0
group: test
protocol: triple
Expand Down
3 changes: 2 additions & 1 deletion dubbo/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ impl RootConfig {
.group("test".to_string())
.version("1.0.0".to_string())
.interface("helloworld.Greeter".to_string())
.protocol("triple".to_string()),
.protocol("triple".to_string())
.tag("read".to_string()),
);
self.protocols.insert(
"triple".to_string(),
Expand Down
4 changes: 4 additions & 0 deletions dubbo/src/config/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub struct ServiceConfig {
pub group: String,
pub protocol: String,
pub interface: String,
pub tag: String,
}

impl ServiceConfig {
Expand All @@ -41,4 +42,7 @@ impl ServiceConfig {
pub fn protocol(self, protocol: String) -> Self {
Self { protocol, ..self }
}
pub fn tag(self, tag: String) -> Self {
Self { tag, ..self }
}
}
44 changes: 25 additions & 19 deletions dubbo/src/extension/invoker_extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,30 +40,32 @@ pub trait Invoker {
async fn url(&self) -> Result<Url, StdError>;
}

pub enum CallType {
Unary,
ClientStream,
ServerStream,
BiStream,
}
// pub enum CallType {
// Unary,
// ClientStream,
// ServerStream,
// BiStream,
// }

pub struct GrpcInvocation {
service_name: String,
method_name: String,
arguments: Vec<Argument>,
attachments: HashMap<String, String>,
call_type: CallType,
// service_name: String,
// method_name: String,
// arguments: Vec<Argument>,
// attachments: HashMap<String, String>,
// call_type: CallType,
}

pub struct Argument {
name: String,
value: Box<dyn Stream<Item = Box<dyn Serializable + Send + 'static>> + Send + 'static>,
}
// pub struct Argument {
// name: String,
// value: Box<dyn Stream<Item = Box<dyn Serializable + Send + 'static>> + Send + 'static>,
// }

#[allow(dead_code)]
pub trait Serializable {
fn serialize(&self, serialization_type: String) -> Result<Bytes, StdError>;
}

#[allow(dead_code)]
pub trait Deserializable {
fn deserialize(&self, bytes: Bytes, deserialization_type: String) -> Result<Self, StdError>
where
Expand Down Expand Up @@ -138,8 +140,8 @@ pub mod proxy {
}
}

impl From<Box<dyn Invoker + Send + 'static>> for InvokerProxy {
fn from(invoker: Box<dyn Invoker + Send + 'static>) -> Self {
impl From<Box<dyn Invoker + Send + Sync + 'static>> for InvokerProxy {
fn from(invoker: Box<dyn Invoker + Send + Sync + 'static>) -> Self {
let (tx, mut rx) = tokio::sync::mpsc::channel(64);
tokio::spawn(async move {
while let Some(opt) = rx.recv().await {
Expand Down Expand Up @@ -220,7 +222,11 @@ impl InvokerExtensionLoader {
type InvokerExtensionConstructor = fn(
Url,
) -> Pin<
Box<dyn Future<Output = Result<Box<dyn Invoker + Send + 'static>, StdError>> + Send + 'static>,
Box<
dyn Future<Output = Result<Box<dyn Invoker + Send + Sync + 'static>, StdError>>
+ Send
+ 'static,
>,
>;
pub(crate) struct InvokerExtensionFactory {
constructor: InvokerExtensionConstructor,
Expand Down Expand Up @@ -275,7 +281,7 @@ where
impl<T> ExtensionMetaInfo for InvokerExtension<T>
where
T: Invoker + Send + 'static,
T: Extension<Target = Box<dyn Invoker + Send + 'static>>,
T: Extension<Target = Box<dyn Invoker + Send + Sync + 'static>>,
{
fn name() -> String {
T::name()
Expand Down
104 changes: 90 additions & 14 deletions dubbo/src/loadbalancer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::StdError;

pub mod random;

use futures_core::future::BoxFuture;
use std::error::Error;
use tokio::time::Duration;
use tower::{discover::ServiceList, ServiceExt};
use tower_service::Service;
use tracing::debug;

use crate::{
codegen::RpcInvocation,
invocation::Metadata,
invoker::{clone_body::CloneBody, clone_invoker::CloneInvoker},
loadbalancer::random::RandomLoadBalancer,
param::Param,
protocol::triple::triple_invoker::TripleInvoker,
svc::NewService,
StdError,
};

use crate::protocol::triple::triple_invoker::TripleInvoker;

pub struct NewLoadBalancer<N> {
inner: N,
}

#[derive(Clone)]
pub struct LoadBalancer<S> {
pub struct LoadBalancerSvc<S> {
inner: S, // Routes service
}

Expand All @@ -53,17 +60,17 @@ where
// NewRoutes
N: NewService<T>,
{
type Service = LoadBalancer<N::Service>;
type Service = LoadBalancerSvc<N::Service>;

fn new_service(&self, target: T) -> Self::Service {
// Routes service
let svc = self.inner.new_service(target);

LoadBalancer { inner: svc }
LoadBalancerSvc { inner: svc }
}
}

impl<N> Service<http::Request<CloneBody>> for LoadBalancer<N>
impl<N> Service<http::Request<CloneBody>> for LoadBalancerSvc<N>
where
// Routes service
N: Service<(), Response = Vec<CloneInvoker<TripleInvoker>>> + Clone,
Expand Down Expand Up @@ -94,18 +101,87 @@ where
Ok(routes) => routes,
};

let service_list: Vec<_> = routes
.into_iter()
.map(|invoker| tower::load::Constant::new(invoker, 1))
.collect();
// let service_list: Vec<_> = routes
// .into_iter()
// // .map(|invoker| tower::load::Constant::new(invoker, 1))
// .collect();

let service_list = ServiceList::new(service_list);
// let rdm = RandomLoadBalancer::default();
let metadata = Metadata::from_headers(req.headers().clone());
// let invks = rdm.select_invokers(service_list, metadata);
// invks.oneshot(req).await
// let service_list = ServiceList::new(service_list);

let p2c = tower::balance::p2c::Balance::new(service_list);
// let p2c = tower::balance::p2c::Balance::new(service_list);
// let p: Box<dyn LoadBalancer<Invoker = BoxService<http::Request<CloneBody>, http::Response<UnsyncBoxBody<bytes::Bytes, status::Status>>, Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>>> + std::marker::Send + std::marker::Sync> = get_loadbalancer("p2c").into();
let p = get_loadbalancer("p2c");
// let ivk = p.select_invokers(invokers, metadata);
let ivk = p.select_invokers(routes, metadata);

p2c.oneshot(req).await
ivk.oneshot(req).await
};

Box::pin(fut)
}
}

type DubboBoxService = tower::util::BoxService<
http::Request<CloneBody>,
http::Response<crate::BoxBody>,
Box<dyn Error + Send + Sync>,
>;

pub trait LoadBalancer {
type Invoker;

fn select_invokers(
&self,
invokers: Vec<CloneInvoker<TripleInvoker>>,
metadata: Metadata,
) -> Self::Invoker;
}

fn get_loadbalancer(
loadbalancer: &str,
) -> Box<dyn LoadBalancer<Invoker = DubboBoxService> + Send + Sync + 'static> {
match loadbalancer {
"random" => {
println!("random!");
Box::new(RandomLoadBalancer::default())
}
"p2c" => Box::new(P2cBalancer::default()),
_ => Box::new(P2cBalancer::default()),
}
}
const DEFAULT_RTT: Duration = Duration::from_millis(30);
#[derive(Debug, Default)]
pub struct P2cBalancer {}

impl LoadBalancer for P2cBalancer {
type Invoker = DubboBoxService;

fn select_invokers(
&self,
invokers: Vec<CloneInvoker<TripleInvoker>>,
_metadata: Metadata,
) -> Self::Invoker {
debug!("p2c load balancer");
let service_list: Vec<_> = invokers
.into_iter()
.map(|invoker| tower::load::Constant::new(invoker, 1))
.collect();

let decay = Duration::from_secs(10);
let service_list = ServiceList::new(service_list);
let s = tower::load::PeakEwmaDiscover::new(
service_list,
DEFAULT_RTT,
decay,
tower::load::CompleteOnResponse::default(),
);

let p = tower::balance::p2c::Balance::new(s);
let svc = DubboBoxService::new(p);
svc
}
}
42 changes: 42 additions & 0 deletions dubbo/src/loadbalancer/random.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

use rand::prelude::SliceRandom;
use tracing::debug;

use super::{DubboBoxService, LoadBalancer};
use crate::{
invocation::Metadata, loadbalancer::CloneInvoker,
protocol::triple::triple_invoker::TripleInvoker,
};

#[derive(Clone, Default)]
pub struct RandomLoadBalancer {}

impl LoadBalancer for RandomLoadBalancer {
type Invoker = DubboBoxService;

fn select_invokers(
&self,
invokers: Vec<CloneInvoker<TripleInvoker>>,
metadata: Metadata,
) -> Self::Invoker {
debug!("random loadbalance {:?}", metadata);
let ivk = invokers.choose(&mut rand::thread_rng()).unwrap().clone();
DubboBoxService::new(ivk)
}
}
4 changes: 2 additions & 2 deletions dubbo/src/triple/transport/connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

pub mod http_connector;
pub mod https_connector;
#[cfg(any(target_os = "macos", target_os = "unix"))]
#[cfg(any(target_os = "macos", target_family = "unix"))]
pub mod unix_connector;

use hyper::Uri;
Expand Down Expand Up @@ -84,7 +84,7 @@ pub fn get_connector(connector: &str) -> BoxCloneService<Uri, BoxIO, crate::Erro
let c = https_connector::HttpsConnector::new();
BoxCloneService::new(Connector::new(c))
}
#[cfg(any(target_os = "macos", target_os = "unix"))]
#[cfg(any(target_os = "macos", target_family = "unix"))]
"unix" => {
let c = unix_connector::UnixConnector::new();
BoxCloneService::new(Connector::new(c))
Expand Down
4 changes: 2 additions & 2 deletions dubbo/src/triple/transport/listener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/

pub mod tcp_listener;
#[cfg(any(target_os = "macos", target_os = "unix"))]
#[cfg(any(target_os = "macos", target_family = "unix"))]
pub mod unix_listener;

use std::net::SocketAddr;
Expand Down Expand Up @@ -65,7 +65,7 @@ impl<T: Listener> Listener for WrappedListener<T> {
pub async fn get_listener(name: String, addr: SocketAddr) -> Result<BoxListener, crate::Error> {
match name.as_str() {
"tcp" => Ok(TcpListener::bind(addr).await?.boxed()),
#[cfg(any(target_os = "macos", target_os = "unix"))]
#[cfg(any(target_os = "macos", target_family = "unix"))]
"unix" => Ok(unix_listener::UnixListener::bind(addr).await?.boxed()),
_ => {
warn!("no support listener: {:?}", name);
Expand Down
Loading

0 comments on commit 0e2050a

Please sign in to comment.