implement scaling of limits

This commit is contained in:
Marten Seemann 2022-06-10 16:01:03 +02:00
parent 7fbc5d29a3
commit 33bc8a8560
9 changed files with 672 additions and 911 deletions

5
go.mod
View File

@ -6,7 +6,7 @@ require (
github.com/ipfs/go-log/v2 v2.5.1
github.com/libp2p/go-libp2p-core v0.19.0
github.com/multiformats/go-multiaddr v0.6.0
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
github.com/stretchr/testify v1.8.0
go.opencensus.io v0.23.0
)
@ -14,6 +14,7 @@ require (
github.com/btcsuite/btcd v0.22.1 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.1.3 // indirect
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/gogo/protobuf v1.3.1 // indirect
github.com/ipfs/go-cid v0.2.0 // indirect
@ -30,10 +31,12 @@ require (
github.com/multiformats/go-multicodec v0.4.1 // indirect
github.com/multiformats/go-multihash v0.0.15 // indirect
github.com/multiformats/go-varint v0.0.6 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.19.1 // indirect
golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf // indirect
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

13
go.sum
View File

@ -76,8 +76,10 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4=
github.com/klauspost/cpuid/v2 v2.0.4 h1:g0I61F2K2DjRHz1cnxlkNSBIaePVoJIjjnHui8QHbiw=
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs=
github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM=
@ -120,8 +122,6 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0=
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@ -131,10 +131,13 @@ github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
@ -228,6 +231,7 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
@ -235,7 +239,8 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

120
limit.go
View File

@ -20,19 +20,6 @@ type Limit interface {
GetConnTotalLimit() int
// GetFDLimit returns the file descriptor limit.
GetFDLimit() int
// WithMemoryLimit creates a copy of this limit object, with memory limit adjusted to
// the specified memFraction of its current value, bounded by minMemory and maxMemory.
WithMemoryLimit(memFraction float64, minMemory, maxMemory int64) Limit
// WithStreamLimit creates a copy of this limit object, with stream limits adjusted
// as specified.
WithStreamLimit(numStreamsIn, numStreamsOut, numStreams int) Limit
// WithConnLimit creates a copy of this limit object, with connetion limits adjusted
// as specified.
WithConnLimit(numConnsIn, numConnsOut, numConns int) Limit
// WithFDLimit creates a copy of this limit object, with file descriptor limits adjusted
// as specified
WithFDLimit(numFD int) Limit
}
// Limiter is the interface for providing limits to the resource manager.
@ -48,25 +35,16 @@ type Limiter interface {
GetConnLimits() Limit
}
// BasicLimiter is a limiter with fixed limits.
type BasicLimiter struct {
SystemLimits Limit
TransientLimits Limit
DefaultServiceLimits Limit
DefaultServicePeerLimits Limit
ServiceLimits map[string]Limit
ServicePeerLimits map[string]Limit
DefaultProtocolLimits Limit
DefaultProtocolPeerLimits Limit
ProtocolLimits map[protocol.ID]Limit
ProtocolPeerLimits map[protocol.ID]Limit
DefaultPeerLimits Limit
PeerLimits map[peer.ID]Limit
ConnLimits Limit
StreamLimits Limit
// fixedLimiter is a limiter with fixed limits.
type fixedLimiter struct {
LimitConfig
}
var _ Limiter = (*BasicLimiter)(nil)
var _ Limiter = (*fixedLimiter)(nil)
func NewFixedLimiter(conf LimitConfig) Limiter {
return &fixedLimiter{LimitConfig: conf}
}
// BaseLimit is a mixin type for basic resource limits.
type BaseLimit struct {
@ -77,13 +55,19 @@ type BaseLimit struct {
ConnsInbound int
ConnsOutbound int
FD int
Memory int64
}
// MemoryLimit is a mixin type for memory limits
type MemoryLimit struct {
MemoryFraction float64
MinMemory int64
MaxMemory int64
// BaseLimitIncrease is the increase per GB of system memory.
type BaseLimitIncrease struct {
Streams int
StreamsInbound int
StreamsOutbound int
Conns int
ConnsInbound int
ConnsOutbound int
Memory int64
FDFraction float64
}
func (l *BaseLimit) GetStreamLimit(dir network.Direction) int {
@ -114,74 +98,62 @@ func (l *BaseLimit) GetFDLimit() int {
return l.FD
}
func (l *BasicLimiter) GetSystemLimits() Limit {
return l.SystemLimits
func (l *BaseLimit) GetMemoryLimit() int64 {
return l.Memory
}
func (l *BasicLimiter) GetTransientLimits() Limit {
return l.TransientLimits
func (l *fixedLimiter) GetSystemLimits() Limit {
return &l.SystemLimit
}
func (l *BasicLimiter) GetServiceLimits(svc string) Limit {
func (l *fixedLimiter) GetTransientLimits() Limit {
return &l.TransientLimit
}
func (l *fixedLimiter) GetServiceLimits(svc string) Limit {
sl, ok := l.ServiceLimits[svc]
if !ok {
return l.DefaultServiceLimits
return &l.DefaultServiceLimit
}
return sl
return &sl
}
func (l *BasicLimiter) GetServicePeerLimits(svc string) Limit {
func (l *fixedLimiter) GetServicePeerLimits(svc string) Limit {
pl, ok := l.ServicePeerLimits[svc]
if !ok {
return l.DefaultServicePeerLimits
return &l.DefaultServicePeerLimit
}
return pl
return &pl
}
func (l *BasicLimiter) GetProtocolLimits(proto protocol.ID) Limit {
func (l *fixedLimiter) GetProtocolLimits(proto protocol.ID) Limit {
pl, ok := l.ProtocolLimits[proto]
if !ok {
return l.DefaultProtocolLimits
return &l.DefaultProtocolLimit
}
return pl
return &pl
}
func (l *BasicLimiter) GetProtocolPeerLimits(proto protocol.ID) Limit {
func (l *fixedLimiter) GetProtocolPeerLimits(proto protocol.ID) Limit {
pl, ok := l.ProtocolPeerLimits[proto]
if !ok {
return l.DefaultProtocolPeerLimits
return &l.DefaultProtocolPeerLimit
}
return pl
return &pl
}
func (l *BasicLimiter) GetPeerLimits(p peer.ID) Limit {
func (l *fixedLimiter) GetPeerLimits(p peer.ID) Limit {
pl, ok := l.PeerLimits[p]
if !ok {
return l.DefaultPeerLimits
return &l.DefaultPeerLimit
}
return pl
return &pl
}
func (l *BasicLimiter) GetStreamLimits(p peer.ID) Limit {
return l.StreamLimits
func (l *fixedLimiter) GetStreamLimits(_ peer.ID) Limit {
return &l.StreamLimit
}
func (l *BasicLimiter) GetConnLimits() Limit {
return l.ConnLimits
}
func (l *MemoryLimit) GetMemory(memoryCap int64) int64 {
return memoryLimit(memoryCap, l.MemoryFraction, l.MinMemory, l.MaxMemory)
}
func memoryLimit(memoryCap int64, memFraction float64, minMemory, maxMemory int64) int64 {
memoryCap = int64(float64(memoryCap) * memFraction)
switch {
case memoryCap < minMemory:
return minMemory
case memoryCap > maxMemory:
return maxMemory
default:
return memoryCap
}
func (l *fixedLimiter) GetConnLimits() Limit {
return &l.ConnLimit
}

View File

@ -1,262 +0,0 @@
package rcmgr
import (
"fmt"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/pbnjay/memory"
)
type BasicLimitConfig struct {
// either Memory is set for fixed memory limit
Memory int64 `json:",omitempty"`
// or the following 3 fields for computed memory limits
MinMemory int64 `json:",omitempty"`
MaxMemory int64 `json:",omitempty"`
MemoryFraction float64 `json:",omitempty"`
StreamsInbound int
StreamsOutbound int
Streams int
ConnsInbound int
ConnsOutbound int
Conns int
FD int
}
func (cfg *BasicLimitConfig) toLimit(base BaseLimit, mem MemoryLimit) (Limit, error) {
if cfg == nil {
m := mem.GetMemory(int64(memory.TotalMemory()))
return &StaticLimit{
Memory: m,
BaseLimit: base,
}, nil
}
if cfg.Streams > 0 {
base.Streams = cfg.Streams
}
if cfg.StreamsInbound > 0 {
base.StreamsInbound = cfg.StreamsInbound
}
if cfg.StreamsOutbound > 0 {
base.StreamsOutbound = cfg.StreamsOutbound
}
if cfg.Conns > 0 {
base.Conns = cfg.Conns
}
if cfg.ConnsInbound > 0 {
base.ConnsInbound = cfg.ConnsInbound
}
if cfg.ConnsOutbound > 0 {
base.ConnsOutbound = cfg.ConnsOutbound
}
if cfg.FD > 0 {
base.FD = cfg.FD
}
switch {
case cfg.Memory > 0:
return &StaticLimit{
Memory: cfg.Memory,
BaseLimit: base,
}, nil
default:
if cfg.MemoryFraction < 0 {
return nil, fmt.Errorf("negative memory fraction: %f", cfg.MemoryFraction)
}
if cfg.MemoryFraction > 0 {
mem.MemoryFraction = cfg.MemoryFraction
}
if cfg.MinMemory > 0 {
mem.MinMemory = cfg.MinMemory
}
if cfg.MaxMemory > 0 {
mem.MaxMemory = cfg.MaxMemory
}
m := mem.GetMemory(int64(memory.TotalMemory()))
return &StaticLimit{
Memory: m,
BaseLimit: base,
}, nil
}
}
func (cfg *BasicLimitConfig) toLimitFixed(base BaseLimit, mem int64) (Limit, error) {
if cfg == nil {
return &StaticLimit{
Memory: mem,
BaseLimit: base,
}, nil
}
if cfg.Streams > 0 {
base.Streams = cfg.Streams
}
if cfg.StreamsInbound > 0 {
base.StreamsInbound = cfg.StreamsInbound
}
if cfg.StreamsOutbound > 0 {
base.StreamsOutbound = cfg.StreamsOutbound
}
if cfg.Conns > 0 {
base.Conns = cfg.Conns
}
if cfg.ConnsInbound > 0 {
base.ConnsInbound = cfg.ConnsInbound
}
if cfg.ConnsOutbound > 0 {
base.ConnsOutbound = cfg.ConnsOutbound
}
if cfg.FD > 0 {
base.FD = cfg.FD
}
switch {
case cfg.Memory > 0:
return &StaticLimit{
Memory: cfg.Memory,
BaseLimit: base,
}, nil
default:
if cfg.MemoryFraction > 0 || cfg.MinMemory > 0 || cfg.MaxMemory > 0 {
return nil, fmt.Errorf("cannot specify dynamic range for fixed memory limit")
}
return &StaticLimit{
Memory: mem,
BaseLimit: base,
}, nil
}
}
type BasicLimiterConfig struct {
System *BasicLimitConfig `json:",omitempty"`
Transient *BasicLimitConfig `json:",omitempty"`
ServiceDefault *BasicLimitConfig `json:",omitempty"`
ServicePeerDefault *BasicLimitConfig `json:",omitempty"`
Service map[string]BasicLimitConfig `json:",omitempty"`
ServicePeer map[string]BasicLimitConfig `json:",omitempty"`
ProtocolDefault *BasicLimitConfig `json:",omitempty"`
ProtocolPeerDefault *BasicLimitConfig `json:",omitempty"`
Protocol map[string]BasicLimitConfig `json:",omitempty"`
ProtocolPeer map[string]BasicLimitConfig `json:",omitempty"`
PeerDefault *BasicLimitConfig `json:",omitempty"`
Peer map[string]BasicLimitConfig `json:",omitempty"`
Conn *BasicLimitConfig `json:",omitempty"`
Stream *BasicLimitConfig `json:",omitempty"`
}
func NewLimiter(cfg BasicLimiterConfig, defaults DefaultLimitConfig) (*BasicLimiter, error) {
limiter := new(BasicLimiter)
var err error
limiter.SystemLimits, err = cfg.System.toLimit(defaults.SystemBaseLimit, defaults.SystemMemory)
if err != nil {
return nil, fmt.Errorf("invalid system limit: %w", err)
}
limiter.TransientLimits, err = cfg.Transient.toLimit(defaults.TransientBaseLimit, defaults.TransientMemory)
if err != nil {
return nil, fmt.Errorf("invalid transient limit: %w", err)
}
limiter.DefaultServiceLimits, err = cfg.ServiceDefault.toLimit(defaults.ServiceBaseLimit, defaults.ServiceMemory)
if err != nil {
return nil, fmt.Errorf("invalid default service limit: %w", err)
}
limiter.DefaultServicePeerLimits, err = cfg.ServicePeerDefault.toLimit(defaults.ServicePeerBaseLimit, defaults.ServicePeerMemory)
if err != nil {
return nil, fmt.Errorf("invalid default service peer limit: %w", err)
}
if len(cfg.Service) > 0 {
limiter.ServiceLimits = make(map[string]Limit, len(cfg.Service))
for svc, cfgLimit := range cfg.Service {
limiter.ServiceLimits[svc], err = cfgLimit.toLimit(defaults.ServiceBaseLimit, defaults.ServiceMemory)
if err != nil {
return nil, fmt.Errorf("invalid service limit for %s: %w", svc, err)
}
}
}
if len(cfg.ServicePeer) > 0 {
limiter.ServicePeerLimits = make(map[string]Limit, len(cfg.ServicePeer))
for svc, cfgLimit := range cfg.ServicePeer {
limiter.ServicePeerLimits[svc], err = cfgLimit.toLimit(defaults.ServicePeerBaseLimit, defaults.ServicePeerMemory)
if err != nil {
return nil, fmt.Errorf("invalid service peer limit for %s: %w", svc, err)
}
}
}
limiter.DefaultProtocolLimits, err = cfg.ProtocolDefault.toLimit(defaults.ProtocolBaseLimit, defaults.ProtocolMemory)
if err != nil {
return nil, fmt.Errorf("invalid default protocol limit: %w", err)
}
limiter.DefaultProtocolPeerLimits, err = cfg.ProtocolPeerDefault.toLimit(defaults.ProtocolPeerBaseLimit, defaults.ProtocolPeerMemory)
if err != nil {
return nil, fmt.Errorf("invalid default protocol peer limit: %w", err)
}
if len(cfg.Protocol) > 0 {
limiter.ProtocolLimits = make(map[protocol.ID]Limit, len(cfg.Protocol))
for p, cfgLimit := range cfg.Protocol {
limiter.ProtocolLimits[protocol.ID(p)], err = cfgLimit.toLimit(defaults.ProtocolBaseLimit, defaults.ProtocolMemory)
if err != nil {
return nil, fmt.Errorf("invalid service limit for %s: %w", p, err)
}
}
}
if len(cfg.ProtocolPeer) > 0 {
limiter.ProtocolPeerLimits = make(map[protocol.ID]Limit, len(cfg.ProtocolPeer))
for p, cfgLimit := range cfg.ProtocolPeer {
limiter.ProtocolPeerLimits[protocol.ID(p)], err = cfgLimit.toLimit(defaults.ProtocolPeerBaseLimit, defaults.ProtocolPeerMemory)
if err != nil {
return nil, fmt.Errorf("invalid service peer limit for %s: %w", p, err)
}
}
}
limiter.DefaultPeerLimits, err = cfg.PeerDefault.toLimit(defaults.PeerBaseLimit, defaults.PeerMemory)
if err != nil {
return nil, fmt.Errorf("invalid peer limit: %w", err)
}
if len(cfg.Peer) > 0 {
limiter.PeerLimits = make(map[peer.ID]Limit, len(cfg.Peer))
for p, cfgLimit := range cfg.Peer {
pid, err := peer.Decode(p)
if err != nil {
return nil, fmt.Errorf("invalid peer ID %s: %w", p, err)
}
limiter.PeerLimits[pid], err = cfgLimit.toLimit(defaults.PeerBaseLimit, defaults.PeerMemory)
if err != nil {
return nil, fmt.Errorf("invalid peer limit for %s: %w", p, err)
}
}
}
limiter.ConnLimits, err = cfg.Conn.toLimitFixed(defaults.ConnBaseLimit, defaults.ConnMemory)
if err != nil {
return nil, fmt.Errorf("invalid conn limit: %w", err)
}
limiter.StreamLimits, err = cfg.Stream.toLimitFixed(defaults.StreamBaseLimit, defaults.StreamMemory)
if err != nil {
return nil, fmt.Errorf("invalid stream limit: %w", err)
}
return limiter, nil
}

View File

@ -1,148 +1,318 @@
package rcmgr
import "math"
import (
"math"
// DefaultLimitConfig is a struct for configuring default limits.
type DefaultLimitConfig struct {
SystemBaseLimit BaseLimit
SystemMemory MemoryLimit
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)
TransientBaseLimit BaseLimit
TransientMemory MemoryLimit
ServiceBaseLimit BaseLimit
ServiceMemory MemoryLimit
ServicePeerBaseLimit BaseLimit
ServicePeerMemory MemoryLimit
ProtocolBaseLimit BaseLimit
ProtocolMemory MemoryLimit
ProtocolPeerBaseLimit BaseLimit
ProtocolPeerMemory MemoryLimit
PeerBaseLimit BaseLimit
PeerMemory MemoryLimit
ConnBaseLimit BaseLimit
ConnMemory int64
StreamBaseLimit BaseLimit
StreamMemory int64
type baseLimitConfig struct {
BaseLimit BaseLimit
BaseLimitIncrease BaseLimitIncrease
}
func (cfg *DefaultLimitConfig) WithSystemMemory(memFraction float64, minMemory, maxMemory int64) DefaultLimitConfig {
refactor := memFraction / cfg.SystemMemory.MemoryFraction
r := *cfg
r.SystemMemory.MemoryFraction = memFraction
r.SystemMemory.MinMemory = minMemory
r.SystemMemory.MaxMemory = maxMemory
r.TransientMemory.MemoryFraction *= refactor
r.ServiceMemory.MemoryFraction *= refactor
r.ServicePeerMemory.MemoryFraction *= refactor
r.ProtocolMemory.MemoryFraction *= refactor
r.ProtocolPeerMemory.MemoryFraction *= refactor
r.PeerMemory.MemoryFraction *= refactor
return r
// ScalingLimitConfig is a struct for configuring default limits.
// {}BaseLimit is the limits that apply for a minimal node (128 MB of memory for libp2p) and 256 file descriptors.
// {}LimitIncrease is the additional limit granted for every additional 1 GB of RAM.
type ScalingLimitConfig struct {
SystemBaseLimit BaseLimit
SystemLimitIncrease BaseLimitIncrease
TransientBaseLimit BaseLimit
TransientLimitIncrease BaseLimitIncrease
ServiceBaseLimit BaseLimit
ServiceLimitIncrease BaseLimitIncrease
ServiceLimits map[string]baseLimitConfig // use AddServiceLimit to modify
ServicePeerBaseLimit BaseLimit
ServicePeerLimitIncrease BaseLimitIncrease
ServicePeerLimits map[string]baseLimitConfig // use AddServicePeerLimit to modify
ProtocolBaseLimit BaseLimit
ProtocolLimitIncrease BaseLimitIncrease
ProtocolLimits map[protocol.ID]baseLimitConfig // use AddProtocolLimit to modify
ProtocolPeerBaseLimit BaseLimit
ProtocolPeerLimitIncrease BaseLimitIncrease
ProtocolPeerLimits map[protocol.ID]baseLimitConfig // use AddProtocolPeerLimit to modify
PeerBaseLimit BaseLimit
PeerLimitIncrease BaseLimitIncrease
PeerLimits map[peer.ID]baseLimitConfig // use AddPeerLimit to modify
ConnBaseLimit BaseLimit
ConnLimitIncrease BaseLimitIncrease
StreamBaseLimit BaseLimit
StreamLimitIncrease BaseLimitIncrease
}
func (cfg *ScalingLimitConfig) AddServiceLimit(svc string, base BaseLimit, inc BaseLimitIncrease) {
if cfg.ServiceLimits == nil {
cfg.ServiceLimits = make(map[string]baseLimitConfig)
}
cfg.ServiceLimits[svc] = baseLimitConfig{
BaseLimit: base,
BaseLimitIncrease: inc,
}
}
func (cfg *ScalingLimitConfig) AddProtocolLimit(proto protocol.ID, base BaseLimit, inc BaseLimitIncrease) {
if cfg.ProtocolLimits == nil {
cfg.ProtocolLimits = make(map[protocol.ID]baseLimitConfig)
}
cfg.ProtocolLimits[proto] = baseLimitConfig{
BaseLimit: base,
BaseLimitIncrease: inc,
}
}
func (cfg *ScalingLimitConfig) AddPeerLimit(p peer.ID, base BaseLimit, inc BaseLimitIncrease) {
if cfg.PeerLimits == nil {
cfg.PeerLimits = make(map[peer.ID]baseLimitConfig)
}
cfg.PeerLimits[p] = baseLimitConfig{
BaseLimit: base,
BaseLimitIncrease: inc,
}
}
func (cfg *ScalingLimitConfig) AddServicePeerLimit(svc string, base BaseLimit, inc BaseLimitIncrease) {
if cfg.ServicePeerLimits == nil {
cfg.ServicePeerLimits = make(map[string]baseLimitConfig)
}
cfg.ServicePeerLimits[svc] = baseLimitConfig{
BaseLimit: base,
BaseLimitIncrease: inc,
}
}
func (cfg *ScalingLimitConfig) AddProtocolPeerLimit(proto protocol.ID, base BaseLimit, inc BaseLimitIncrease) {
if cfg.ProtocolPeerLimits == nil {
cfg.ProtocolPeerLimits = make(map[protocol.ID]baseLimitConfig)
}
cfg.ProtocolPeerLimits[proto] = baseLimitConfig{
BaseLimit: base,
BaseLimitIncrease: inc,
}
}
type LimitConfig struct {
SystemLimit BaseLimit
TransientLimit BaseLimit
DefaultServiceLimit BaseLimit
ServiceLimits map[string]BaseLimit
DefaultServicePeerLimit BaseLimit
ServicePeerLimits map[string]BaseLimit
DefaultProtocolLimit BaseLimit
ProtocolLimits map[protocol.ID]BaseLimit
DefaultProtocolPeerLimit BaseLimit
ProtocolPeerLimits map[protocol.ID]BaseLimit
DefaultPeerLimit BaseLimit
PeerLimits map[peer.ID]BaseLimit
ConnLimit BaseLimit
StreamLimit BaseLimit
}
// Scale scales up a limit configuration.
// memory is the amount of memory that the stack is allowed to consume,
// for a full it's recommended to use 1/8 of the installed system memory.
// If memory is smaller than 128 MB, the base configuration will be used.
//
func (cfg *ScalingLimitConfig) Scale(memory int64, numFD int) LimitConfig {
var scaleFactor int
if memory > 128<<20 {
scaleFactor = int((memory - 128<<20) >> 20)
}
lc := LimitConfig{
SystemLimit: scale(cfg.SystemBaseLimit, cfg.SystemLimitIncrease, scaleFactor, numFD),
TransientLimit: scale(cfg.TransientBaseLimit, cfg.TransientLimitIncrease, scaleFactor, numFD),
DefaultServiceLimit: scale(cfg.ServiceBaseLimit, cfg.ServiceLimitIncrease, scaleFactor, numFD),
DefaultServicePeerLimit: scale(cfg.ServicePeerBaseLimit, cfg.ServicePeerLimitIncrease, scaleFactor, numFD),
DefaultProtocolLimit: scale(cfg.ProtocolBaseLimit, cfg.ProtocolLimitIncrease, scaleFactor, numFD),
DefaultProtocolPeerLimit: scale(cfg.ProtocolPeerBaseLimit, cfg.ProtocolPeerLimitIncrease, scaleFactor, numFD),
DefaultPeerLimit: scale(cfg.PeerBaseLimit, cfg.PeerLimitIncrease, scaleFactor, numFD),
ConnLimit: scale(cfg.ConnBaseLimit, cfg.ConnLimitIncrease, scaleFactor, numFD),
StreamLimit: scale(cfg.StreamBaseLimit, cfg.ConnLimitIncrease, scaleFactor, numFD),
}
if cfg.ServiceLimits != nil {
lc.ServiceLimits = make(map[string]BaseLimit)
for svc, l := range cfg.ServiceLimits {
lc.ServiceLimits[svc] = scale(l.BaseLimit, l.BaseLimitIncrease, scaleFactor, numFD)
}
}
if cfg.ProtocolLimits != nil {
lc.ProtocolLimits = make(map[protocol.ID]BaseLimit)
for proto, l := range cfg.ProtocolLimits {
lc.ProtocolLimits[proto] = scale(l.BaseLimit, l.BaseLimitIncrease, scaleFactor, numFD)
}
}
if cfg.PeerLimits != nil {
lc.PeerLimits = make(map[peer.ID]BaseLimit)
for p, l := range cfg.PeerLimits {
lc.PeerLimits[p] = scale(l.BaseLimit, l.BaseLimitIncrease, scaleFactor, numFD)
}
}
if cfg.ServicePeerLimits != nil {
lc.ServicePeerLimits = make(map[string]BaseLimit)
for svc, l := range cfg.ServicePeerLimits {
lc.ServicePeerLimits[svc] = scale(l.BaseLimit, l.BaseLimitIncrease, scaleFactor, numFD)
}
}
if cfg.ProtocolPeerLimits != nil {
lc.ProtocolPeerLimits = make(map[protocol.ID]BaseLimit)
for p, l := range cfg.ProtocolPeerLimits {
lc.ProtocolPeerLimits[p] = scale(l.BaseLimit, l.BaseLimitIncrease, scaleFactor, numFD)
}
}
return lc
}
// factor is the number of MBs above the minimum (128 MB)
func scale(base BaseLimit, inc BaseLimitIncrease, factor int, numFD int) BaseLimit {
l := BaseLimit{
StreamsInbound: base.StreamsInbound + (inc.StreamsInbound*factor)>>10,
StreamsOutbound: base.StreamsOutbound + (inc.StreamsOutbound*factor)>>10,
Streams: base.Streams + (inc.Streams*factor)>>10,
ConnsInbound: base.ConnsInbound + (inc.ConnsInbound*factor)>>10,
ConnsOutbound: base.ConnsOutbound + (inc.ConnsOutbound*factor)>>10,
Conns: base.Conns + (inc.Conns*factor)>>10,
Memory: base.Memory + (inc.Memory*int64(factor))>>10,
FD: base.FD,
}
if inc.FDFraction > 0 {
l.FD = int(inc.FDFraction * float64(numFD))
}
return l
}
// DefaultLimits are the limits used by the default limiter constructors.
var DefaultLimits = DefaultLimitConfig{
var DefaultLimits = ScalingLimitConfig{
SystemBaseLimit: BaseLimit{
StreamsInbound: 4096,
StreamsOutbound: 16384,
Streams: 16384,
ConnsInbound: 256,
ConnsOutbound: 1024,
Conns: 1024,
FD: 512,
ConnsInbound: 64,
ConnsOutbound: 128,
Conns: 128,
StreamsInbound: 64 * 16,
StreamsOutbound: 128 * 16,
Streams: 128 * 16,
Memory: 128 << 20,
FD: 256,
},
SystemMemory: MemoryLimit{
MemoryFraction: 0.125,
MinMemory: 128 << 20,
MaxMemory: 1 << 30,
SystemLimitIncrease: BaseLimitIncrease{
ConnsInbound: 64,
ConnsOutbound: 128,
Conns: 128,
StreamsInbound: 64 * 16,
StreamsOutbound: 128 * 16,
Streams: 128 * 16,
Memory: 1 << 30,
FDFraction: 1,
},
TransientBaseLimit: BaseLimit{
StreamsInbound: 128,
StreamsOutbound: 512,
Streams: 512,
ConnsInbound: 32,
ConnsOutbound: 128,
Conns: 128,
FD: 128,
ConnsOutbound: 64,
Conns: 64,
StreamsInbound: 128,
StreamsOutbound: 256,
Streams: 256,
Memory: 32 << 20,
FD: 64,
},
TransientMemory: MemoryLimit{
MemoryFraction: 1,
MinMemory: 64 << 20,
MaxMemory: 64 << 20,
TransientLimitIncrease: BaseLimitIncrease{
ConnsInbound: 16,
ConnsOutbound: 32,
Conns: 32,
StreamsInbound: 128,
StreamsOutbound: 256,
Streams: 256,
Memory: 128 << 20,
FDFraction: 0.25,
},
ServiceBaseLimit: BaseLimit{
StreamsInbound: 2048,
StreamsOutbound: 8192,
Streams: 8192,
},
ServiceMemory: MemoryLimit{
MemoryFraction: 0.125 / 4,
MinMemory: 64 << 20,
MaxMemory: 256 << 20,
},
ServicePeerBaseLimit: BaseLimit{
StreamsInbound: 256,
StreamsOutbound: 512,
Streams: 512,
},
ServicePeerMemory: MemoryLimit{
MemoryFraction: 0.125 / 16,
MinMemory: 16 << 20,
MaxMemory: 64 << 20,
},
ProtocolBaseLimit: BaseLimit{
StreamsInbound: 1024,
StreamsOutbound: 4096,
Streams: 4096,
Memory: 64 << 20,
},
ProtocolMemory: MemoryLimit{
MemoryFraction: 0.125 / 8,
MinMemory: 64 << 20,
MaxMemory: 128 << 20,
ServiceLimitIncrease: BaseLimitIncrease{
StreamsInbound: 512,
StreamsOutbound: 2048,
Streams: 2048,
Memory: 128 << 20,
},
ServicePeerBaseLimit: BaseLimit{
StreamsInbound: 128,
StreamsOutbound: 256,
Streams: 256,
Memory: 16 << 20,
},
ServicePeerLimitIncrease: BaseLimitIncrease{
StreamsInbound: 4,
StreamsOutbound: 8,
Streams: 8,
Memory: 4 << 20,
},
ProtocolBaseLimit: BaseLimit{
StreamsInbound: 512,
StreamsOutbound: 2048,
Streams: 2048,
Memory: 64 << 20,
},
ProtocolLimitIncrease: BaseLimitIncrease{
StreamsInbound: 256,
StreamsOutbound: 512,
Streams: 512,
Memory: 164 << 20,
},
ProtocolPeerBaseLimit: BaseLimit{
StreamsInbound: 128,
StreamsOutbound: 256,
Streams: 512,
StreamsInbound: 64,
StreamsOutbound: 128,
Streams: 256,
Memory: 16 << 20,
},
ProtocolPeerMemory: MemoryLimit{
MemoryFraction: 0.125 / 16,
MinMemory: 16 << 20,
MaxMemory: 64 << 20,
ProtocolPeerLimitIncrease: BaseLimitIncrease{
StreamsInbound: 4,
StreamsOutbound: 8,
Streams: 16,
Memory: 4,
},
PeerBaseLimit: BaseLimit{
StreamsInbound: 512,
StreamsOutbound: 1024,
Streams: 1024,
ConnsInbound: 8,
ConnsOutbound: 16,
Conns: 16,
FD: 8,
ConnsInbound: 4,
ConnsOutbound: 8,
Conns: 8,
StreamsInbound: 256,
StreamsOutbound: 512,
Streams: 512,
Memory: 64 << 20,
FD: 4,
},
PeerMemory: MemoryLimit{
MemoryFraction: 0.125 / 16,
MinMemory: 64 << 20,
MaxMemory: 128 << 20,
PeerLimitIncrease: BaseLimitIncrease{
StreamsInbound: 128,
StreamsOutbound: 256,
Streams: 256,
Memory: 128 << 20,
FDFraction: 1.0 / 64,
},
ConnBaseLimit: BaseLimit{
@ -150,17 +320,15 @@ var DefaultLimits = DefaultLimitConfig{
ConnsOutbound: 1,
Conns: 1,
FD: 1,
Memory: 1 << 20,
},
ConnMemory: 1 << 20,
StreamBaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 1,
Memory: 16 << 20,
},
StreamMemory: 16 << 20,
}
var infiniteBaseLimit = BaseLimit{
@ -171,33 +339,19 @@ var infiniteBaseLimit = BaseLimit{
ConnsInbound: math.MaxInt,
ConnsOutbound: math.MaxInt,
FD: math.MaxInt,
}
var infiniteMemoryLimit = MemoryLimit{
MemoryFraction: 1,
MinMemory: math.MaxInt64,
MaxMemory: math.MaxInt64,
Memory: math.MaxInt64,
}
// InfiniteLimits are a limiter configuration that uses infinite limits, thus effectively not limiting anything.
// Keep in mind that the operating system limits the number of file descriptors that an application can use.
var InfiniteLimits = DefaultLimitConfig{
SystemBaseLimit: infiniteBaseLimit,
SystemMemory: infiniteMemoryLimit,
TransientBaseLimit: infiniteBaseLimit,
TransientMemory: infiniteMemoryLimit,
ServiceBaseLimit: infiniteBaseLimit,
ServiceMemory: infiniteMemoryLimit,
ServicePeerBaseLimit: infiniteBaseLimit,
ServicePeerMemory: infiniteMemoryLimit,
ProtocolBaseLimit: infiniteBaseLimit,
ProtocolMemory: infiniteMemoryLimit,
ProtocolPeerBaseLimit: infiniteBaseLimit,
ProtocolPeerMemory: infiniteMemoryLimit,
PeerBaseLimit: infiniteBaseLimit,
PeerMemory: infiniteMemoryLimit,
ConnBaseLimit: infiniteBaseLimit,
ConnMemory: math.MaxInt64,
StreamBaseLimit: infiniteBaseLimit,
StreamMemory: math.MaxInt64,
var InfiniteLimits = LimitConfig{
SystemLimit: infiniteBaseLimit,
TransientLimit: infiniteBaseLimit,
DefaultServiceLimit: infiniteBaseLimit,
DefaultServicePeerLimit: infiniteBaseLimit,
DefaultProtocolLimit: infiniteBaseLimit,
DefaultProtocolPeerLimit: infiniteBaseLimit,
DefaultPeerLimit: infiniteBaseLimit,
ConnLimit: infiniteBaseLimit,
StreamLimit: infiniteBaseLimit,
}

View File

@ -1,138 +0,0 @@
package rcmgr
import (
"github.com/pbnjay/memory"
)
// StaticLimit is a limit with static values.
type StaticLimit struct {
BaseLimit
Memory int64
}
var _ Limit = (*StaticLimit)(nil)
func (l *StaticLimit) GetMemoryLimit() int64 {
return l.Memory
}
func (l *StaticLimit) WithMemoryLimit(memFraction float64, minMemory, maxMemory int64) Limit {
r := new(StaticLimit)
*r = *l
r.Memory = int64(memFraction * float64(r.Memory))
if r.Memory < minMemory {
r.Memory = minMemory
} else if r.Memory > maxMemory {
r.Memory = maxMemory
}
return r
}
func (l *StaticLimit) WithStreamLimit(numStreamsIn, numStreamsOut, numStreams int) Limit {
r := new(StaticLimit)
*r = *l
r.BaseLimit.StreamsInbound = numStreamsIn
r.BaseLimit.StreamsOutbound = numStreamsOut
r.BaseLimit.Streams = numStreams
return r
}
func (l *StaticLimit) WithConnLimit(numConnsIn, numConnsOut, numConns int) Limit {
r := new(StaticLimit)
*r = *l
r.BaseLimit.ConnsInbound = numConnsIn
r.BaseLimit.ConnsOutbound = numConnsOut
r.BaseLimit.Conns = numConns
return r
}
func (l *StaticLimit) WithFDLimit(numFD int) Limit {
r := new(StaticLimit)
*r = *l
r.BaseLimit.FD = numFD
return r
}
// NewDefaultStaticLimiter creates a static limiter with default base limits and a system memory cap
// specified as a fraction of total system memory. The assigned memory will not be less than
// minMemory or more than maxMemory.
func NewDefaultStaticLimiter(memFraction float64, minMemory, maxMemory int64) *BasicLimiter {
return NewStaticLimiter(DefaultLimits.WithSystemMemory(memFraction, minMemory, maxMemory))
}
// NewDefaultFixedLimiter creates a static limiter with default base limits and a specified system
// memory cap.
func NewDefaultFixedLimiter(memoryCap int64) *BasicLimiter {
return NewStaticLimiter(DefaultLimits.WithSystemMemory(1, memoryCap, memoryCap))
}
// NewDefaultLimiter creates a static limiter with the default limits
func NewDefaultLimiter() *BasicLimiter {
return NewStaticLimiter(DefaultLimits)
}
// NewStaticLimiter creates a static limiter using the specified system memory cap and default
// limit config.
func NewStaticLimiter(cfg DefaultLimitConfig) *BasicLimiter {
memoryCap := memoryLimit(
int64(memory.TotalMemory()),
cfg.SystemMemory.MemoryFraction,
cfg.SystemMemory.MinMemory,
cfg.SystemMemory.MaxMemory)
system := &StaticLimit{
Memory: memoryCap,
BaseLimit: cfg.SystemBaseLimit,
}
transient := &StaticLimit{
Memory: cfg.TransientMemory.GetMemory(memoryCap),
BaseLimit: cfg.TransientBaseLimit,
}
svc := &StaticLimit{
Memory: cfg.ServiceMemory.GetMemory(memoryCap),
BaseLimit: cfg.ServiceBaseLimit,
}
svcPeer := &StaticLimit{
Memory: cfg.ServicePeerMemory.GetMemory(memoryCap),
BaseLimit: cfg.ServicePeerBaseLimit,
}
proto := &StaticLimit{
Memory: cfg.ProtocolMemory.GetMemory(memoryCap),
BaseLimit: cfg.ProtocolBaseLimit,
}
protoPeer := &StaticLimit{
Memory: cfg.ProtocolPeerMemory.GetMemory(memoryCap),
BaseLimit: cfg.ProtocolPeerBaseLimit,
}
peer := &StaticLimit{
Memory: cfg.PeerMemory.GetMemory(memoryCap),
BaseLimit: cfg.PeerBaseLimit,
}
conn := &StaticLimit{
Memory: cfg.ConnMemory,
BaseLimit: cfg.ConnBaseLimit,
}
stream := &StaticLimit{
Memory: cfg.StreamMemory,
BaseLimit: cfg.StreamBaseLimit,
}
return &BasicLimiter{
SystemLimits: system,
TransientLimits: transient,
DefaultServiceLimits: svc,
DefaultServicePeerLimits: svcPeer,
DefaultProtocolLimits: proto,
DefaultProtocolPeerLimits: protoPeer,
DefaultPeerLimits: peer,
ConnLimits: conn,
StreamLimits: stream,
}
}

77
limit_test.go Normal file
View File

@ -0,0 +1,77 @@
package rcmgr
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestScaling(t *testing.T) {
base := BaseLimit{
Streams: 100,
StreamsInbound: 200,
StreamsOutbound: 400,
Conns: 10,
ConnsInbound: 20,
ConnsOutbound: 40,
FD: 1,
Memory: 1 << 20,
}
t.Run("no scaling if no increase is defined", func(t *testing.T) {
cfg := ScalingLimitConfig{ServiceBaseLimit: base}
scaled := cfg.Scale(8<<30, 100)
require.Equal(t, base, scaled.DefaultServiceLimit)
})
t.Run("scaling", func(t *testing.T) {
cfg := ScalingLimitConfig{
TransientBaseLimit: base,
TransientLimitIncrease: BaseLimitIncrease{
Streams: 1,
StreamsInbound: 2,
StreamsOutbound: 3,
Conns: 4,
ConnsInbound: 5,
ConnsOutbound: 6,
Memory: 7,
FDFraction: 0.5,
},
}
scaled := cfg.Scale(128<<20+4<<30, 1000)
require.Equal(t, 500, scaled.TransientLimit.FD)
require.Equal(t, base.Streams+4, scaled.TransientLimit.Streams)
require.Equal(t, base.StreamsInbound+4*2, scaled.TransientLimit.StreamsInbound)
require.Equal(t, base.StreamsOutbound+4*3, scaled.TransientLimit.StreamsOutbound)
require.Equal(t, base.Conns+4*4, scaled.TransientLimit.Conns)
require.Equal(t, base.ConnsInbound+4*5, scaled.TransientLimit.ConnsInbound)
require.Equal(t, base.ConnsOutbound+4*6, scaled.TransientLimit.ConnsOutbound)
require.Equal(t, base.Memory+4*7, scaled.TransientLimit.Memory)
})
t.Run("scaling limits in maps", func(t *testing.T) {
cfg := ScalingLimitConfig{
ServiceLimits: map[string]baseLimitConfig{
"A": {
BaseLimit: BaseLimit{Streams: 10, Memory: 100, FD: 9},
},
"B": {
BaseLimit: BaseLimit{Streams: 20, Memory: 200, FD: 10},
BaseLimitIncrease: BaseLimitIncrease{Streams: 2, Memory: 3, FDFraction: 0.4},
},
},
}
scaled := cfg.Scale(128<<20+4<<30, 1000)
require.Len(t, scaled.ServiceLimits, 2)
require.Contains(t, scaled.ServiceLimits, "A")
require.Equal(t, 10, scaled.ServiceLimits["A"].Streams)
require.Equal(t, int64(100), scaled.ServiceLimits["A"].Memory)
require.Equal(t, 9, scaled.ServiceLimits["A"].FD)
require.Contains(t, scaled.ServiceLimits, "B")
require.Equal(t, 20+4*2, scaled.ServiceLimits["B"].Streams)
require.Equal(t, int64(200+4*3), scaled.ServiceLimits["B"].Memory)
require.Equal(t, 400, scaled.ServiceLimits["B"].FD)
})
}

View File

@ -20,167 +20,138 @@ func TestResourceManager(t *testing.T) {
svcA := "A.svc"
svcB := "B.svc"
nmgr, err := NewResourceManager(
&BasicLimiter{
SystemLimits: &StaticLimit{
Memory: 16384,
BaseLimit: BaseLimit{
StreamsInbound: 3,
StreamsOutbound: 3,
Streams: 6,
ConnsInbound: 3,
ConnsOutbound: 3,
Conns: 6,
FD: 2,
},
NewFixedLimiter(LimitConfig{
SystemLimit: BaseLimit{
Memory: 16384,
StreamsInbound: 3,
StreamsOutbound: 3,
Streams: 6,
ConnsInbound: 3,
ConnsOutbound: 3,
Conns: 6,
FD: 2,
},
TransientLimits: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 2,
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 2,
TransientLimit: BaseLimit{
Memory: 4096,
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 2,
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 2,
FD: 1,
},
DefaultServiceLimit: BaseLimit{
Memory: 4096,
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 2,
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 2,
FD: 1,
},
DefaultServicePeerLimit: BaseLimit{
Memory: 4096,
StreamsInbound: 5,
StreamsOutbound: 5,
Streams: 10,
},
ServiceLimits: map[string]BaseLimit{
svcA: {
Memory: 8192,
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 4,
ConnsInbound: 2,
ConnsOutbound: 2,
Conns: 4,
FD: 1,
},
svcB: {
Memory: 8192,
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 4,
ConnsInbound: 2,
ConnsOutbound: 2,
Conns: 4,
FD: 1,
},
},
DefaultServiceLimits: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
ServicePeerLimits: map[string]BaseLimit{
svcB: {
Memory: 8192,
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 2,
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 2,
},
},
DefaultProtocolLimit: BaseLimit{
Memory: 4096,
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 2,
},
ProtocolLimits: map[protocol.ID]BaseLimit{
protoA: {
Memory: 8192,
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 2,
},
},
ProtocolPeerLimits: map[protocol.ID]BaseLimit{
protoB: {
Memory: 8192,
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 2,
},
},
DefaultPeerLimit: BaseLimit{
Memory: 4096,
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 2,
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 2,
FD: 1,
},
DefaultProtocolPeerLimit: BaseLimit{
Memory: 4096,
StreamsInbound: 5,
StreamsOutbound: 5,
Streams: 10,
},
PeerLimits: map[peer.ID]BaseLimit{
peerA: {
Memory: 8192,
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 4,
ConnsInbound: 2,
ConnsOutbound: 2,
Conns: 4,
FD: 1,
},
},
DefaultServicePeerLimits: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 5,
StreamsOutbound: 5,
Streams: 10,
},
ConnLimit: BaseLimit{
Memory: 4096,
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 1,
FD: 1,
},
ServiceLimits: map[string]Limit{
svcA: &StaticLimit{
Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 4,
ConnsInbound: 2,
ConnsOutbound: 2,
Conns: 4,
FD: 1,
},
},
svcB: &StaticLimit{
Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 4,
ConnsInbound: 2,
ConnsOutbound: 2,
Conns: 4,
FD: 1,
},
},
StreamLimit: BaseLimit{
Memory: 4096,
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 1,
},
ServicePeerLimits: map[string]Limit{
svcB: &StaticLimit{
Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 2,
},
},
},
DefaultProtocolLimits: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 2,
},
},
ProtocolLimits: map[protocol.ID]Limit{
protoA: &StaticLimit{
Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 2,
},
},
},
ProtocolPeerLimits: map[protocol.ID]Limit{
protoB: &StaticLimit{
Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 2,
},
},
},
DefaultPeerLimits: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 2,
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 2,
FD: 1,
},
},
DefaultProtocolPeerLimits: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 5,
StreamsOutbound: 5,
Streams: 10,
},
},
PeerLimits: map[peer.ID]Limit{
peerA: &StaticLimit{
Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 4,
ConnsInbound: 2,
ConnsOutbound: 2,
Conns: 4,
FD: 1,
},
},
},
ConnLimits: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 1,
FD: 1,
},
},
StreamLimits: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 1,
},
},
})
}),
)
if err != nil {
t.Fatal(err)
@ -1006,10 +977,14 @@ func TestResourceManager(t *testing.T) {
func TestResourceManagerWithAllowlist(t *testing.T) {
peerA := test.RandPeerIDFatal(t)
limits := NewDefaultLimiter()
limits.SystemLimits = limits.SystemLimits.WithConnLimit(0, 0, 0)
limits.TransientLimits = limits.SystemLimits.WithConnLimit(0, 0, 0)
rcmgr, err := NewResourceManager(limits, WithAllowlistedMultiaddrs([]multiaddr.Multiaddr{
limits := DefaultLimits.Scale(1<<30, 100)
limits.SystemLimit.Conns = 0
limits.SystemLimit.ConnsInbound = 0
limits.SystemLimit.ConnsOutbound = 0
limits.TransientLimit.Conns = 0
limits.TransientLimit.ConnsInbound = 0
limits.TransientLimit.ConnsOutbound = 0
rcmgr, err := NewResourceManager(NewFixedLimiter(limits), WithAllowlistedMultiaddrs([]multiaddr.Multiaddr{
multiaddr.StringCast("/ip4/1.2.3.4"),
multiaddr.StringCast("/ip4/4.3.2.1/p2p/" + peerA.String()),
}))
@ -1021,9 +996,18 @@ func TestResourceManagerWithAllowlist(t *testing.T) {
// Setup allowlist. TODO, replace this with a config once config changes are in
r := rcmgr.(*resourceManager)
r.allowlistedSystem = newSystemScope(limits.GetSystemLimits().WithConnLimit(2, 1, 2), r, "allowlistedSystem")
sysLimit := limits.SystemLimit
sysLimit.Conns = 2
sysLimit.ConnsInbound = 2
sysLimit.ConnsOutbound = 1
r.allowlistedSystem = newSystemScope(&sysLimit, r, "allowlistedSystem")
r.allowlistedSystem.IncRef()
r.allowlistedTransient = newTransientScope(limits.GetTransientLimits().WithConnLimit(1, 1, 1), r, "allowlistedTransient", r.allowlistedSystem.resourceScope)
transLimit := limits.TransientLimit
transLimit.Conns = 1
transLimit.ConnsInbound = 1
transLimit.ConnsOutbound = 1
r.allowlistedTransient = newTransientScope(&transLimit, r, "allowlistedTransient", r.allowlistedSystem.resourceScope)
r.allowlistedTransient.IncRef()
}

View File

@ -30,17 +30,15 @@ func checkResources(t *testing.T, rc *resources, st network.ScopeStat) {
}
func TestResources(t *testing.T) {
rc := resources{limit: &StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 1,
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 1,
FD: 1,
},
rc := resources{limit: &BaseLimit{
Memory: 4096,
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 1,
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 1,
FD: 1,
}}
checkResources(t, &rc, network.ScopeStat{})
@ -245,17 +243,15 @@ func TestResources(t *testing.T) {
func TestResourceScopeSimple(t *testing.T) {
s := newResourceScope(
&StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 1,
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 1,
FD: 1,
},
&BaseLimit{
Memory: 4096,
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 1,
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 1,
FD: 1,
},
nil, "test", nil, nil,
)
@ -379,17 +375,15 @@ func testResourceScopeBasic(t *testing.T, s *resourceScope) {
func TestResourceScopeTxnBasic(t *testing.T) {
s := newResourceScope(
&StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 1,
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 1,
FD: 1,
},
&BaseLimit{
Memory: 4096,
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 1,
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 1,
FD: 1,
},
nil, "test", nil, nil,
)
@ -416,17 +410,15 @@ func TestResourceScopeTxnBasic(t *testing.T) {
func TestResourceScopeTxnZombie(t *testing.T) {
s := newResourceScope(
&StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 1,
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 1,
FD: 1,
},
&BaseLimit{
Memory: 4096,
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 1,
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 1,
FD: 1,
},
nil, "test", nil, nil,
)
@ -460,17 +452,15 @@ func TestResourceScopeTxnZombie(t *testing.T) {
func TestResourceScopeTxnTree(t *testing.T) {
s := newResourceScope(
&StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 1,
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 1,
FD: 1,
},
&BaseLimit{
Memory: 4096,
StreamsInbound: 1,
StreamsOutbound: 1,
Streams: 1,
ConnsInbound: 1,
ConnsOutbound: 1,
Conns: 1,
FD: 1,
},
nil, "test", nil, nil,
)
@ -571,92 +561,80 @@ func TestResourceScopeDAG(t *testing.T) {
// \
// ------> s6
s1 := newResourceScope(
&StaticLimit{
Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 4,
StreamsOutbound: 4,
Streams: 4,
ConnsInbound: 4,
ConnsOutbound: 4,
Conns: 4,
FD: 4,
},
&BaseLimit{
Memory: 4096,
StreamsInbound: 4,
StreamsOutbound: 4,
Streams: 4,
ConnsInbound: 4,
ConnsOutbound: 4,
Conns: 4,
FD: 4,
},
nil, "test", nil, nil,
)
s2 := newResourceScope(
&StaticLimit{
Memory: 2048,
BaseLimit: BaseLimit{
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 2,
ConnsInbound: 2,
ConnsOutbound: 2,
Conns: 2,
FD: 2,
},
&BaseLimit{
Memory: 2048,
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 2,
ConnsInbound: 2,
ConnsOutbound: 2,
Conns: 2,
FD: 2,
},
[]*resourceScope{s1}, "test", nil, nil,
)
s3 := newResourceScope(
&StaticLimit{
Memory: 2048,
BaseLimit: BaseLimit{
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 2,
ConnsInbound: 2,
ConnsOutbound: 2,
Conns: 2,
FD: 2,
},
&BaseLimit{
Memory: 2048,
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 2,
ConnsInbound: 2,
ConnsOutbound: 2,
Conns: 2,
FD: 2,
},
[]*resourceScope{s1}, "test", nil, nil,
)
s4 := newResourceScope(
&StaticLimit{
Memory: 2048,
BaseLimit: BaseLimit{
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 2,
ConnsInbound: 2,
ConnsOutbound: 2,
Conns: 2,
FD: 2,
},
&BaseLimit{
Memory: 2048,
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 2,
ConnsInbound: 2,
ConnsOutbound: 2,
Conns: 2,
FD: 2,
},
[]*resourceScope{s2, s3, s1}, "test", nil, nil,
)
s5 := newResourceScope(
&StaticLimit{
Memory: 2048,
BaseLimit: BaseLimit{
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 2,
ConnsInbound: 2,
ConnsOutbound: 2,
Conns: 2,
FD: 2,
},
&BaseLimit{
Memory: 2048,
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 2,
ConnsInbound: 2,
ConnsOutbound: 2,
Conns: 2,
FD: 2,
},
[]*resourceScope{s2, s1}, "test", nil, nil,
)
s6 := newResourceScope(
&StaticLimit{
Memory: 2048,
BaseLimit: BaseLimit{
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 2,
ConnsInbound: 2,
ConnsOutbound: 2,
Conns: 2,
FD: 2,
},
&BaseLimit{
Memory: 2048,
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 2,
ConnsInbound: 2,
ConnsOutbound: 2,
Conns: 2,
FD: 2,
},
[]*resourceScope{s3, s1}, "test", nil, nil,
)
@ -1095,39 +1073,27 @@ func TestResourceScopeDAGTxn(t *testing.T) {
// \
// ------> s6
s1 := newResourceScope(
&StaticLimit{
Memory: 8192,
},
&BaseLimit{Memory: 8192},
nil, "test", nil, nil,
)
s2 := newResourceScope(
&StaticLimit{
Memory: 4096 + 2048,
},
&BaseLimit{Memory: 4096 + 2048},
[]*resourceScope{s1}, "test", nil, nil,
)
s3 := newResourceScope(
&StaticLimit{
Memory: 4096 + 2048,
},
&BaseLimit{Memory: 4096 + 2048},
[]*resourceScope{s1}, "test", nil, nil,
)
s4 := newResourceScope(
&StaticLimit{
Memory: 4096 + 1024,
},
&BaseLimit{Memory: 4096 + 1024},
[]*resourceScope{s2, s3, s1}, "test", nil, nil,
)
s5 := newResourceScope(
&StaticLimit{
Memory: 4096 + 1024,
},
&BaseLimit{Memory: 4096 + 1024},
[]*resourceScope{s2, s1}, "test", nil, nil,
)
s6 := newResourceScope(
&StaticLimit{
Memory: 4096 + 1024,
},
&BaseLimit{Memory: 4096 + 1024},
[]*resourceScope{s3, s1}, "test", nil, nil,
)