Skip to content

Commit

Permalink
feat!: rcmgr: Change LimitConfig to use LimitVal type (#2000)
Browse files Browse the repository at this point in the history
* rcmgr: Change LimitConfig to use LimitVal type

* Nits

* Remove DefaultReifiedLimits

* Use pointers for optional ResourceLimits

* Reify a nil pointer with defaults

* Fix maxints

* Move helper

* Return nil if everything is default

* Check for nil pointer

* Change terms

* Add IsDefault helper

* Use values instead of pointers for PartialLimitConfig

* Remove references to reify

* Unmarshal 0 limits as block all

* ResourceLimits can build on Limits interface objs

* Convert max int to unlimited

* Remove ToLimitConfigWithDefaults

* Rename to ToPartialLimitConfig
  • Loading branch information
MarcoPolo authored Feb 9, 2023
1 parent 235f25b commit f4fc85d
Show file tree
Hide file tree
Showing 8 changed files with 956 additions and 260 deletions.
68 changes: 64 additions & 4 deletions p2p/host/resource-manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,21 @@ scalingLimits := rcmgr.DefaultLimits
// Add limits around included libp2p protocols
libp2p.SetDefaultServiceLimits(&scalingLimits)

// Turn the scaling limits into a static set of limits using `.AutoScale`. This
// Turn the scaling limits into a concrete set of limits using `.AutoScale`. This
// scales the limits proportional to your system memory.
limits := scalingLimits.AutoScale()
scaledDefaultLimits := scalingLimits.AutoScale()

// Tweak certain settings
cfg := rcmgr.PartialLimitConfig{
System: &rcmgr.ResourceLimits{
// Allow unlimited outbound streams
StreamsOutbound: rcmgr.Unlimited,
},
// Everything else is default. The exact values will come from `scaledDefaultLimits` above.
}

// Create our limits by using our cfg and replacing the default values with values from `scaledDefaultLimits`
limits := cfg.Build(scaledDefaultLimits)

// The resource manager expects a limiter, se we create one from our limits.
limiter := rcmgr.NewFixedLimiter(limits)
Expand All @@ -52,6 +64,54 @@ if err != nil {
host, err := libp2p.New(libp2p.ResourceManager(rm))
```

### Saving the limits config
The easiest way to save the defined limits is to serialize the `PartialLimitConfig`
type as JSON.

```go
noisyNeighbor, _ := peer.Decode("QmVvtzcZgCkMnSFf2dnrBPXrWuNFWNM9J3MpZQCvWPuVZf")
cfg := rcmgr.PartialLimitConfig{
System: &rcmgr.ResourceLimits{
// Allow unlimited outbound streams
StreamsOutbound: rcmgr.Unlimited,
},
Peer: map[peer.ID]rcmgr.ResourceLimits{
noisyNeighbor: {
// No inbound connections from this peer
ConnsInbound: rcmgr.BlockAllLimit,
// But let me open connections to them
Conns: rcmgr.DefaultLimit,
ConnsOutbound: rcmgr.DefaultLimit,
// No inbound streams from this peer
StreamsInbound: rcmgr.BlockAllLimit,
// And let me open unlimited (by me) outbound streams (the peer may have their own limits on me)
StreamsOutbound: rcmgr.Unlimited,
},
},
}
jsonBytes, _ := json.Marshal(&cfg)

// string(jsonBytes)
// {
// "System": {
// "StreamsOutbound": "unlimited"
// },
// "Peer": {
// "QmVvtzcZgCkMnSFf2dnrBPXrWuNFWNM9J3MpZQCvWPuVZf": {
// "StreamsInbound": "blockAll",
// "StreamsOutbound": "unlimited",
// "ConnsInbound": "blockAll"
// }
// }
// }
```

This will omit defaults from the JSON output. It will also serialize the
blockAll, and unlimited values explicitly.

The `Memory` field is serialized as a string to workaround the JSON limitation
of 32 bit integers (`Memory` is an int64).

## Basic Resources

### Memory
Expand Down Expand Up @@ -279,7 +339,7 @@ This is done using the `ScalingLimitConfig`. For every scope, this configuration
struct defines the absolutely bare minimum limits, and an (optional) increase of
these limits, which will be applied on nodes that have sufficient memory.

A `ScalingLimitConfig` can be converted into a `LimitConfig` (which can then be
A `ScalingLimitConfig` can be converted into a `ConcreteLimitConfig` (which can then be
used to initialize a fixed limiter with `NewFixedLimiter`) by calling the `Scale` method.
The `Scale` method takes two parameters: the amount of memory and the number of file
descriptors that an application is willing to dedicate to libp2p.
Expand Down Expand Up @@ -347,7 +407,7 @@ go-libp2p process. For the default definitions see [`DefaultLimits` and

If the defaults seem mostly okay, but you want to adjust one facet you can
simply copy the default struct object and update the field you want to change. You can
apply changes to a `BaseLimit`, `BaseLimitIncrease`, and `LimitConfig` with
apply changes to a `BaseLimit`, `BaseLimitIncrease`, and `ConcreteLimitConfig` with
`.Apply`.

Example
Expand Down
93 changes: 62 additions & 31 deletions p2p/host/resource-manager/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package rcmgr
import (
"encoding/json"
"io"
"math"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -56,33 +57,32 @@ func NewDefaultLimiterFromJSON(in io.Reader) (Limiter, error) {
}

// NewLimiterFromJSON creates a new limiter by parsing a json configuration.
func NewLimiterFromJSON(in io.Reader, defaults LimitConfig) (Limiter, error) {
func NewLimiterFromJSON(in io.Reader, defaults ConcreteLimitConfig) (Limiter, error) {
cfg, err := readLimiterConfigFromJSON(in, defaults)
if err != nil {
return nil, err
}
return &fixedLimiter{cfg}, nil
}

func readLimiterConfigFromJSON(in io.Reader, defaults LimitConfig) (LimitConfig, error) {
var cfg LimitConfig
func readLimiterConfigFromJSON(in io.Reader, defaults ConcreteLimitConfig) (ConcreteLimitConfig, error) {
var cfg PartialLimitConfig
if err := json.NewDecoder(in).Decode(&cfg); err != nil {
return LimitConfig{}, err
return ConcreteLimitConfig{}, err
}
cfg.Apply(defaults)
return cfg, nil
return cfg.Build(defaults), nil
}

// fixedLimiter is a limiter with fixed limits.
type fixedLimiter struct {
LimitConfig
ConcreteLimitConfig
}

var _ Limiter = (*fixedLimiter)(nil)

func NewFixedLimiter(conf LimitConfig) Limiter {
func NewFixedLimiter(conf ConcreteLimitConfig) Limiter {
log.Debugw("initializing new limiter with config", "limits", conf)
return &fixedLimiter{LimitConfig: conf}
return &fixedLimiter{conf}
}

// BaseLimit is a mixin type for basic resource limits.
Expand All @@ -97,6 +97,37 @@ type BaseLimit struct {
Memory int64 `json:",omitempty"`
}

func valueOrBlockAll(n int) LimitVal {
if n == 0 {
return BlockAllLimit
} else if n == math.MaxInt {
return Unlimited
}
return LimitVal(n)
}
func valueOrBlockAll64(n int64) LimitVal64 {
if n == 0 {
return BlockAllLimit64
} else if n == math.MaxInt {
return Unlimited64
}
return LimitVal64(n)
}

// ToResourceLimits converts the BaseLimit to a ResourceLimits
func (l BaseLimit) ToResourceLimits() ResourceLimits {
return ResourceLimits{
Streams: valueOrBlockAll(l.Streams),
StreamsInbound: valueOrBlockAll(l.StreamsInbound),
StreamsOutbound: valueOrBlockAll(l.StreamsOutbound),
Conns: valueOrBlockAll(l.Conns),
ConnsInbound: valueOrBlockAll(l.ConnsInbound),
ConnsOutbound: valueOrBlockAll(l.ConnsOutbound),
FD: valueOrBlockAll(l.FD),
Memory: valueOrBlockAll64(l.Memory),
}
}

// Apply overwrites all zero-valued limits with the values of l2
// Must not use a pointer receiver.
func (l *BaseLimit) Apply(l2 BaseLimit) {
Expand Down Expand Up @@ -169,98 +200,98 @@ func (l *BaseLimitIncrease) Apply(l2 BaseLimitIncrease) {
}
}

func (l *BaseLimit) GetStreamLimit(dir network.Direction) int {
func (l BaseLimit) GetStreamLimit(dir network.Direction) int {
if dir == network.DirInbound {
return l.StreamsInbound
} else {
return l.StreamsOutbound
}
}

func (l *BaseLimit) GetStreamTotalLimit() int {
func (l BaseLimit) GetStreamTotalLimit() int {
return l.Streams
}

func (l *BaseLimit) GetConnLimit(dir network.Direction) int {
func (l BaseLimit) GetConnLimit(dir network.Direction) int {
if dir == network.DirInbound {
return l.ConnsInbound
} else {
return l.ConnsOutbound
}
}

func (l *BaseLimit) GetConnTotalLimit() int {
func (l BaseLimit) GetConnTotalLimit() int {
return l.Conns
}

func (l *BaseLimit) GetFDLimit() int {
func (l BaseLimit) GetFDLimit() int {
return l.FD
}

func (l *BaseLimit) GetMemoryLimit() int64 {
func (l BaseLimit) GetMemoryLimit() int64 {
return l.Memory
}

func (l *fixedLimiter) GetSystemLimits() Limit {
return &l.System
return &l.system
}

func (l *fixedLimiter) GetTransientLimits() Limit {
return &l.Transient
return &l.transient
}

func (l *fixedLimiter) GetAllowlistedSystemLimits() Limit {
return &l.AllowlistedSystem
return &l.allowlistedSystem
}

func (l *fixedLimiter) GetAllowlistedTransientLimits() Limit {
return &l.AllowlistedTransient
return &l.allowlistedTransient
}

func (l *fixedLimiter) GetServiceLimits(svc string) Limit {
sl, ok := l.Service[svc]
sl, ok := l.service[svc]
if !ok {
return &l.ServiceDefault
return &l.serviceDefault
}
return &sl
}

func (l *fixedLimiter) GetServicePeerLimits(svc string) Limit {
pl, ok := l.ServicePeer[svc]
pl, ok := l.servicePeer[svc]
if !ok {
return &l.ServicePeerDefault
return &l.servicePeerDefault
}
return &pl
}

func (l *fixedLimiter) GetProtocolLimits(proto protocol.ID) Limit {
pl, ok := l.Protocol[proto]
pl, ok := l.protocol[proto]
if !ok {
return &l.ProtocolDefault
return &l.protocolDefault
}
return &pl
}

func (l *fixedLimiter) GetProtocolPeerLimits(proto protocol.ID) Limit {
pl, ok := l.ProtocolPeer[proto]
pl, ok := l.protocolPeer[proto]
if !ok {
return &l.ProtocolPeerDefault
return &l.protocolPeerDefault
}
return &pl
}

func (l *fixedLimiter) GetPeerLimits(p peer.ID) Limit {
pl, ok := l.Peer[p]
pl, ok := l.peer[p]
if !ok {
return &l.PeerDefault
return &l.peerDefault
}
return &pl
}

func (l *fixedLimiter) GetStreamLimits(_ peer.ID) Limit {
return &l.Stream
return &l.stream
}

func (l *fixedLimiter) GetConnLimits() Limit {
return &l.Conn
return &l.conn
}
45 changes: 45 additions & 0 deletions p2p/host/resource-manager/limit_config_test.backwards-compat.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
"System": {
"Memory": 65536,
"Conns": 16,
"ConnsInbound": 8,
"ConnsOutbound": 16,
"FD": 16
},
"ServiceDefault": {
"Memory": 8765
},
"Service": {
"A": {
"Memory": 8192
},
"B": {}
},
"ServicePeerDefault": {
"Memory": 2048
},
"ServicePeer": {
"A": {
"Memory": 4096
}
},
"ProtocolDefault": {
"Memory": 2048
},
"ProtocolPeerDefault": {
"Memory": 1024
},
"Protocol": {
"/A": {
"Memory": 8192
}
},
"PeerDefault": {
"Memory": 4096
},
"Peer": {
"12D3KooWPFH2Bx2tPfw6RLxN8k2wh47GRXgkt9yrAHU37zFwHWzS": {
"Memory": 4097
}
}
}
Loading

0 comments on commit f4fc85d

Please sign in to comment.