Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balancer: start populating weight by edsbalancer for weighted_round_robin #2945

Merged
merged 9 commits into from
Aug 6, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 29 additions & 11 deletions balancer/base/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package base

import (
"context"
"google.golang.org/grpc/balancer/wrr"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
Expand All @@ -38,7 +39,7 @@ func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions)
cc: cc,
pickerBuilder: bb.pickerBuilder,

subConns: make(map[resolver.Address]balancer.SubConn),
subConns: make(map[resolver.Address]AddrInfo),
scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &balancer.ConnectivityStateEvaluator{},
// Initialize picker to a picker that always return
Expand All @@ -60,7 +61,7 @@ type baseBalancer struct {
csEvltr *balancer.ConnectivityStateEvaluator
state connectivity.State

subConns map[resolver.Address]balancer.SubConn
subConns map[resolver.Address]AddrInfo
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With changes in this file, we will still unnecessarily reconnect when the weight for an address changes.
To avoid that, we should clear metadata field when addresses are received.
But this changes the behavior.

Base is to be used as an example, and in the case only picking algorithm is different.
I think we should leave base package as is, and re-implement this for wrr.

wrr will have some special behaviors, some I can think of now:

  • Duplicates in addresses list (same address string, but multiple entries in addrs list, with potential different metadata)
    • Should we add up the weights?
  • Since metadata is not hashed, the only useful thing is the address string itself. Maybe we should use the string as the key, instead of the address struct

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this care diff becomes really small, just weightedroundrobin package declaration with metadata population in edsbalancer. Updated pull request.

scStates map[balancer.SubConn]connectivity.State
picker balancer.Picker
config Config
Expand All @@ -85,15 +86,19 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) {
grpclog.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue
}
b.subConns[a] = sc
addrInfo := AddrInfo{SubConn: sc}
if wrrInfo, ok := a.Metadata.(*wrr.Info); ok {
addrInfo.Weight = wrrInfo.Weight
}
b.subConns[a] = addrInfo
b.scStates[sc] = connectivity.Idle
sc.Connect()
}
}
for a, sc := range b.subConns {
// a was removed by resolver.
if _, ok := addrsSet[a]; !ok {
b.cc.RemoveSubConn(sc)
b.cc.RemoveSubConn(sc.SubConn)
delete(b.subConns, a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in HandleSubConnStateChange.
Expand All @@ -110,15 +115,28 @@ func (b *baseBalancer) regeneratePicker() {
b.picker = NewErrPicker(balancer.ErrTransientFailure)
return
}
readySCs := make(map[resolver.Address]balancer.SubConn)

// Filter out all ready SCs from full subConn map.
for addr, sc := range b.subConns {
if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
readySCs[addr] = sc
pickerBuilderV2, ok := b.pickerBuilder.(PickerBuilderV2)
if ok {
readySCs := make(map[resolver.Address]AddrInfo)

// Filter out all ready SCs from full subConn map.
for addr, sc := range b.subConns {
if st, ok := b.scStates[sc.SubConn]; ok && st == connectivity.Ready {
readySCs[addr] = sc
}
}
b.picker = pickerBuilderV2.BuildV2(readySCs)
} else {
readySCs := make(map[resolver.Address]balancer.SubConn)

// Filter out all ready SCs from full subConn map.
for addr, sc := range b.subConns {
if st, ok := b.scStates[sc.SubConn]; ok && st == connectivity.Ready {
readySCs[addr] = sc.SubConn
}
}
b.picker = b.pickerBuilder.Build(readySCs)
}
b.picker = b.pickerBuilder.Build(readySCs)
}

func (b *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
Expand Down
10 changes: 10 additions & 0 deletions balancer/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,16 @@ type PickerBuilder interface {
Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker
}

type AddrInfo struct {
SubConn balancer.SubConn
Weight uint32
}

type PickerBuilderV2 interface {
PickerBuilder
BuildV2(readySCs map[resolver.Address]AddrInfo) balancer.Picker
}

// NewBalancerBuilder returns a balancer builder. The balancers
// built by this builder will use the picker builder to build pickers.
func NewBalancerBuilder(name string, pb PickerBuilder) balancer.Builder {
Expand Down
9 changes: 9 additions & 0 deletions balancer/wrr/wrr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package wrr
alazarev marked this conversation as resolved.
Show resolved Hide resolved

// Name is the name of weighted_round_robin balancer.
const Name = "weighted_round_robin"

// Information that should be stored inside Address metadata in order to use wrr.
type Info struct{
alazarev marked this conversation as resolved.
Show resolved Hide resolved

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe let's make it an interface, e.g.

type Weighted interface {
  Weight() uint32
}

?

This would allow us to provide some object with xds-specific knowledge (e.g. is it a canary instance?) in the future, which would also implement the Weighted interface among others.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it may make sense to move this interface to the balancer package itself, as it's possible it would be used by other weight-aware load balancers, e.g. maglev.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with interface is, it's hard to add methods. But for structs, we can add fields.

I also think it's better to not keep it in balancer package, because it's more like a feature of a specific balancer, not all balancers.

Weight uint32
}
9 changes: 7 additions & 2 deletions balancer/xds/edsbalancer/edsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package edsbalancer
import (
"context"
"encoding/json"
"google.golang.org/grpc/balancer/wrr"
"net"
"reflect"
"strconv"
Expand Down Expand Up @@ -227,9 +228,13 @@ func (xdsB *EDSBalancer) HandleEDSResponse(edsResp *edspb.ClusterLoadAssignment)
var newAddrs []resolver.Address
for _, lbEndpoint := range locality.GetLbEndpoints() {
socketAddress := lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()
newAddrs = append(newAddrs, resolver.Address{
address := resolver.Address{
Addr: net.JoinHostPort(socketAddress.GetAddress(), strconv.Itoa(int(socketAddress.GetPortValue()))),
})
}
if lbEndpoint.GetLoadBalancingWeight().GetValue() != 0 {
alazarev marked this conversation as resolved.
Show resolved Hide resolved
address.Metadata = &wrr.Info{Weight:lbEndpoint.GetLoadBalancingWeight().GetValue()}
}
newAddrs = append(newAddrs, address)
}
var weightChanged, addrsChanged bool
config, ok := xdsB.lidToConfig[lid]
Expand Down