diff --git a/go.mod b/go.mod index 7336ba7..d6b6921 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/ipfs/go-log/v2 v2.5.1 github.com/libp2p/go-libp2p-core v0.19.0 github.com/multiformats/go-multiaddr v0.6.0 - github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 + github.com/stretchr/testify v1.8.0 go.opencensus.io v0.23.0 ) @@ -14,6 +14,7 @@ require ( github.com/btcsuite/btcd v0.22.1 // indirect github.com/btcsuite/btcd/btcec/v2 v2.1.3 // indirect github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect github.com/gogo/protobuf v1.3.1 // indirect github.com/ipfs/go-cid v0.2.0 // indirect @@ -30,10 +31,12 @@ require ( github.com/multiformats/go-multicodec v0.4.1 // indirect github.com/multiformats/go-multihash v0.0.15 // indirect github.com/multiformats/go-varint v0.0.6 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect go.uber.org/zap v1.19.1 // indirect golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf // indirect golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 50976be..5984cab 100644 --- a/go.sum +++ b/go.sum @@ -76,8 +76,10 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= github.com/klauspost/cpuid/v2 v2.0.4 h1:g0I61F2K2DjRHz1cnxlkNSBIaePVoJIjjnHui8QHbiw= github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs= github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM= @@ -120,8 +122,6 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 h1:onHthvaw9LFnH4t2DcNVpwGmV9E1BkGknEliJkfwQj0= -github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58/go.mod h1:DXv8WO4yhMYhSNPKjeNKa5WY9YCIEBRbNzFFPJbWO6Y= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -131,10 +131,13 @@ github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+ github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= @@ -228,6 +231,7 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= @@ -235,7 +239,8 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/limit.go b/limit.go index 17d7e6c..812309b 100644 --- a/limit.go +++ b/limit.go @@ -20,19 +20,6 @@ type Limit interface { GetConnTotalLimit() int // GetFDLimit returns the file descriptor limit. GetFDLimit() int - - // WithMemoryLimit creates a copy of this limit object, with memory limit adjusted to - // the specified memFraction of its current value, bounded by minMemory and maxMemory. - WithMemoryLimit(memFraction float64, minMemory, maxMemory int64) Limit - // WithStreamLimit creates a copy of this limit object, with stream limits adjusted - // as specified. - WithStreamLimit(numStreamsIn, numStreamsOut, numStreams int) Limit - // WithConnLimit creates a copy of this limit object, with connetion limits adjusted - // as specified. - WithConnLimit(numConnsIn, numConnsOut, numConns int) Limit - // WithFDLimit creates a copy of this limit object, with file descriptor limits adjusted - // as specified - WithFDLimit(numFD int) Limit } // Limiter is the interface for providing limits to the resource manager. @@ -48,25 +35,16 @@ type Limiter interface { GetConnLimits() Limit } -// BasicLimiter is a limiter with fixed limits. -type BasicLimiter struct { - SystemLimits Limit - TransientLimits Limit - DefaultServiceLimits Limit - DefaultServicePeerLimits Limit - ServiceLimits map[string]Limit - ServicePeerLimits map[string]Limit - DefaultProtocolLimits Limit - DefaultProtocolPeerLimits Limit - ProtocolLimits map[protocol.ID]Limit - ProtocolPeerLimits map[protocol.ID]Limit - DefaultPeerLimits Limit - PeerLimits map[peer.ID]Limit - ConnLimits Limit - StreamLimits Limit +// fixedLimiter is a limiter with fixed limits. +type fixedLimiter struct { + LimitConfig } -var _ Limiter = (*BasicLimiter)(nil) +var _ Limiter = (*fixedLimiter)(nil) + +func NewFixedLimiter(conf LimitConfig) Limiter { + return &fixedLimiter{LimitConfig: conf} +} // BaseLimit is a mixin type for basic resource limits. type BaseLimit struct { @@ -77,13 +55,19 @@ type BaseLimit struct { ConnsInbound int ConnsOutbound int FD int + Memory int64 } -// MemoryLimit is a mixin type for memory limits -type MemoryLimit struct { - MemoryFraction float64 - MinMemory int64 - MaxMemory int64 +// BaseLimitIncrease is the increase per GB of system memory. +type BaseLimitIncrease struct { + Streams int + StreamsInbound int + StreamsOutbound int + Conns int + ConnsInbound int + ConnsOutbound int + Memory int64 + FDFraction float64 } func (l *BaseLimit) GetStreamLimit(dir network.Direction) int { @@ -114,74 +98,62 @@ func (l *BaseLimit) GetFDLimit() int { return l.FD } -func (l *BasicLimiter) GetSystemLimits() Limit { - return l.SystemLimits +func (l *BaseLimit) GetMemoryLimit() int64 { + return l.Memory } -func (l *BasicLimiter) GetTransientLimits() Limit { - return l.TransientLimits +func (l *fixedLimiter) GetSystemLimits() Limit { + return &l.SystemLimit } -func (l *BasicLimiter) GetServiceLimits(svc string) Limit { +func (l *fixedLimiter) GetTransientLimits() Limit { + return &l.TransientLimit +} + +func (l *fixedLimiter) GetServiceLimits(svc string) Limit { sl, ok := l.ServiceLimits[svc] if !ok { - return l.DefaultServiceLimits + return &l.DefaultServiceLimit } - return sl + return &sl } -func (l *BasicLimiter) GetServicePeerLimits(svc string) Limit { +func (l *fixedLimiter) GetServicePeerLimits(svc string) Limit { pl, ok := l.ServicePeerLimits[svc] if !ok { - return l.DefaultServicePeerLimits + return &l.DefaultServicePeerLimit } - return pl + return &pl } -func (l *BasicLimiter) GetProtocolLimits(proto protocol.ID) Limit { +func (l *fixedLimiter) GetProtocolLimits(proto protocol.ID) Limit { pl, ok := l.ProtocolLimits[proto] if !ok { - return l.DefaultProtocolLimits + return &l.DefaultProtocolLimit } - return pl + return &pl } -func (l *BasicLimiter) GetProtocolPeerLimits(proto protocol.ID) Limit { +func (l *fixedLimiter) GetProtocolPeerLimits(proto protocol.ID) Limit { pl, ok := l.ProtocolPeerLimits[proto] if !ok { - return l.DefaultProtocolPeerLimits + return &l.DefaultProtocolPeerLimit } - return pl + return &pl } -func (l *BasicLimiter) GetPeerLimits(p peer.ID) Limit { +func (l *fixedLimiter) GetPeerLimits(p peer.ID) Limit { pl, ok := l.PeerLimits[p] if !ok { - return l.DefaultPeerLimits + return &l.DefaultPeerLimit } - return pl + return &pl } -func (l *BasicLimiter) GetStreamLimits(p peer.ID) Limit { - return l.StreamLimits +func (l *fixedLimiter) GetStreamLimits(_ peer.ID) Limit { + return &l.StreamLimit } -func (l *BasicLimiter) GetConnLimits() Limit { - return l.ConnLimits -} - -func (l *MemoryLimit) GetMemory(memoryCap int64) int64 { - return memoryLimit(memoryCap, l.MemoryFraction, l.MinMemory, l.MaxMemory) -} - -func memoryLimit(memoryCap int64, memFraction float64, minMemory, maxMemory int64) int64 { - memoryCap = int64(float64(memoryCap) * memFraction) - switch { - case memoryCap < minMemory: - return minMemory - case memoryCap > maxMemory: - return maxMemory - default: - return memoryCap - } +func (l *fixedLimiter) GetConnLimits() Limit { + return &l.ConnLimit } diff --git a/limit_config.go b/limit_config.go deleted file mode 100644 index 5be45bc..0000000 --- a/limit_config.go +++ /dev/null @@ -1,262 +0,0 @@ -package rcmgr - -import ( - "fmt" - - "github.com/libp2p/go-libp2p-core/peer" - "github.com/libp2p/go-libp2p-core/protocol" - - "github.com/pbnjay/memory" -) - -type BasicLimitConfig struct { - // either Memory is set for fixed memory limit - Memory int64 `json:",omitempty"` - // or the following 3 fields for computed memory limits - MinMemory int64 `json:",omitempty"` - MaxMemory int64 `json:",omitempty"` - MemoryFraction float64 `json:",omitempty"` - - StreamsInbound int - StreamsOutbound int - Streams int - - ConnsInbound int - ConnsOutbound int - Conns int - - FD int -} - -func (cfg *BasicLimitConfig) toLimit(base BaseLimit, mem MemoryLimit) (Limit, error) { - if cfg == nil { - m := mem.GetMemory(int64(memory.TotalMemory())) - return &StaticLimit{ - Memory: m, - BaseLimit: base, - }, nil - } - - if cfg.Streams > 0 { - base.Streams = cfg.Streams - } - if cfg.StreamsInbound > 0 { - base.StreamsInbound = cfg.StreamsInbound - } - if cfg.StreamsOutbound > 0 { - base.StreamsOutbound = cfg.StreamsOutbound - } - if cfg.Conns > 0 { - base.Conns = cfg.Conns - } - if cfg.ConnsInbound > 0 { - base.ConnsInbound = cfg.ConnsInbound - } - if cfg.ConnsOutbound > 0 { - base.ConnsOutbound = cfg.ConnsOutbound - } - if cfg.FD > 0 { - base.FD = cfg.FD - } - - switch { - case cfg.Memory > 0: - return &StaticLimit{ - Memory: cfg.Memory, - BaseLimit: base, - }, nil - default: - if cfg.MemoryFraction < 0 { - return nil, fmt.Errorf("negative memory fraction: %f", cfg.MemoryFraction) - } - if cfg.MemoryFraction > 0 { - mem.MemoryFraction = cfg.MemoryFraction - } - if cfg.MinMemory > 0 { - mem.MinMemory = cfg.MinMemory - } - if cfg.MaxMemory > 0 { - mem.MaxMemory = cfg.MaxMemory - } - - m := mem.GetMemory(int64(memory.TotalMemory())) - return &StaticLimit{ - Memory: m, - BaseLimit: base, - }, nil - } -} - -func (cfg *BasicLimitConfig) toLimitFixed(base BaseLimit, mem int64) (Limit, error) { - if cfg == nil { - return &StaticLimit{ - Memory: mem, - BaseLimit: base, - }, nil - } - - if cfg.Streams > 0 { - base.Streams = cfg.Streams - } - if cfg.StreamsInbound > 0 { - base.StreamsInbound = cfg.StreamsInbound - } - if cfg.StreamsOutbound > 0 { - base.StreamsOutbound = cfg.StreamsOutbound - } - if cfg.Conns > 0 { - base.Conns = cfg.Conns - } - if cfg.ConnsInbound > 0 { - base.ConnsInbound = cfg.ConnsInbound - } - if cfg.ConnsOutbound > 0 { - base.ConnsOutbound = cfg.ConnsOutbound - } - if cfg.FD > 0 { - base.FD = cfg.FD - } - - switch { - case cfg.Memory > 0: - return &StaticLimit{ - Memory: cfg.Memory, - BaseLimit: base, - }, nil - default: - if cfg.MemoryFraction > 0 || cfg.MinMemory > 0 || cfg.MaxMemory > 0 { - return nil, fmt.Errorf("cannot specify dynamic range for fixed memory limit") - } - return &StaticLimit{ - Memory: mem, - BaseLimit: base, - }, nil - } -} - -type BasicLimiterConfig struct { - System *BasicLimitConfig `json:",omitempty"` - Transient *BasicLimitConfig `json:",omitempty"` - - ServiceDefault *BasicLimitConfig `json:",omitempty"` - ServicePeerDefault *BasicLimitConfig `json:",omitempty"` - Service map[string]BasicLimitConfig `json:",omitempty"` - ServicePeer map[string]BasicLimitConfig `json:",omitempty"` - - ProtocolDefault *BasicLimitConfig `json:",omitempty"` - ProtocolPeerDefault *BasicLimitConfig `json:",omitempty"` - Protocol map[string]BasicLimitConfig `json:",omitempty"` - ProtocolPeer map[string]BasicLimitConfig `json:",omitempty"` - - PeerDefault *BasicLimitConfig `json:",omitempty"` - Peer map[string]BasicLimitConfig `json:",omitempty"` - - Conn *BasicLimitConfig `json:",omitempty"` - Stream *BasicLimitConfig `json:",omitempty"` -} - -func NewLimiter(cfg BasicLimiterConfig, defaults DefaultLimitConfig) (*BasicLimiter, error) { - limiter := new(BasicLimiter) - var err error - - limiter.SystemLimits, err = cfg.System.toLimit(defaults.SystemBaseLimit, defaults.SystemMemory) - if err != nil { - return nil, fmt.Errorf("invalid system limit: %w", err) - } - - limiter.TransientLimits, err = cfg.Transient.toLimit(defaults.TransientBaseLimit, defaults.TransientMemory) - if err != nil { - return nil, fmt.Errorf("invalid transient limit: %w", err) - } - - limiter.DefaultServiceLimits, err = cfg.ServiceDefault.toLimit(defaults.ServiceBaseLimit, defaults.ServiceMemory) - if err != nil { - return nil, fmt.Errorf("invalid default service limit: %w", err) - } - - limiter.DefaultServicePeerLimits, err = cfg.ServicePeerDefault.toLimit(defaults.ServicePeerBaseLimit, defaults.ServicePeerMemory) - if err != nil { - return nil, fmt.Errorf("invalid default service peer limit: %w", err) - } - - if len(cfg.Service) > 0 { - limiter.ServiceLimits = make(map[string]Limit, len(cfg.Service)) - for svc, cfgLimit := range cfg.Service { - limiter.ServiceLimits[svc], err = cfgLimit.toLimit(defaults.ServiceBaseLimit, defaults.ServiceMemory) - if err != nil { - return nil, fmt.Errorf("invalid service limit for %s: %w", svc, err) - } - } - } - - if len(cfg.ServicePeer) > 0 { - limiter.ServicePeerLimits = make(map[string]Limit, len(cfg.ServicePeer)) - for svc, cfgLimit := range cfg.ServicePeer { - limiter.ServicePeerLimits[svc], err = cfgLimit.toLimit(defaults.ServicePeerBaseLimit, defaults.ServicePeerMemory) - if err != nil { - return nil, fmt.Errorf("invalid service peer limit for %s: %w", svc, err) - } - } - } - - limiter.DefaultProtocolLimits, err = cfg.ProtocolDefault.toLimit(defaults.ProtocolBaseLimit, defaults.ProtocolMemory) - if err != nil { - return nil, fmt.Errorf("invalid default protocol limit: %w", err) - } - - limiter.DefaultProtocolPeerLimits, err = cfg.ProtocolPeerDefault.toLimit(defaults.ProtocolPeerBaseLimit, defaults.ProtocolPeerMemory) - if err != nil { - return nil, fmt.Errorf("invalid default protocol peer limit: %w", err) - } - - if len(cfg.Protocol) > 0 { - limiter.ProtocolLimits = make(map[protocol.ID]Limit, len(cfg.Protocol)) - for p, cfgLimit := range cfg.Protocol { - limiter.ProtocolLimits[protocol.ID(p)], err = cfgLimit.toLimit(defaults.ProtocolBaseLimit, defaults.ProtocolMemory) - if err != nil { - return nil, fmt.Errorf("invalid service limit for %s: %w", p, err) - } - } - } - - if len(cfg.ProtocolPeer) > 0 { - limiter.ProtocolPeerLimits = make(map[protocol.ID]Limit, len(cfg.ProtocolPeer)) - for p, cfgLimit := range cfg.ProtocolPeer { - limiter.ProtocolPeerLimits[protocol.ID(p)], err = cfgLimit.toLimit(defaults.ProtocolPeerBaseLimit, defaults.ProtocolPeerMemory) - if err != nil { - return nil, fmt.Errorf("invalid service peer limit for %s: %w", p, err) - } - } - } - - limiter.DefaultPeerLimits, err = cfg.PeerDefault.toLimit(defaults.PeerBaseLimit, defaults.PeerMemory) - if err != nil { - return nil, fmt.Errorf("invalid peer limit: %w", err) - } - - if len(cfg.Peer) > 0 { - limiter.PeerLimits = make(map[peer.ID]Limit, len(cfg.Peer)) - for p, cfgLimit := range cfg.Peer { - pid, err := peer.Decode(p) - if err != nil { - return nil, fmt.Errorf("invalid peer ID %s: %w", p, err) - } - limiter.PeerLimits[pid], err = cfgLimit.toLimit(defaults.PeerBaseLimit, defaults.PeerMemory) - if err != nil { - return nil, fmt.Errorf("invalid peer limit for %s: %w", p, err) - } - } - } - - limiter.ConnLimits, err = cfg.Conn.toLimitFixed(defaults.ConnBaseLimit, defaults.ConnMemory) - if err != nil { - return nil, fmt.Errorf("invalid conn limit: %w", err) - } - - limiter.StreamLimits, err = cfg.Stream.toLimitFixed(defaults.StreamBaseLimit, defaults.StreamMemory) - if err != nil { - return nil, fmt.Errorf("invalid stream limit: %w", err) - } - - return limiter, nil -} diff --git a/limit_defaults.go b/limit_defaults.go index 72e51a8..21de7c3 100644 --- a/limit_defaults.go +++ b/limit_defaults.go @@ -1,148 +1,318 @@ package rcmgr -import "math" +import ( + "math" -// DefaultLimitConfig is a struct for configuring default limits. -type DefaultLimitConfig struct { - SystemBaseLimit BaseLimit - SystemMemory MemoryLimit + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" +) - TransientBaseLimit BaseLimit - TransientMemory MemoryLimit - - ServiceBaseLimit BaseLimit - ServiceMemory MemoryLimit - - ServicePeerBaseLimit BaseLimit - ServicePeerMemory MemoryLimit - - ProtocolBaseLimit BaseLimit - ProtocolMemory MemoryLimit - - ProtocolPeerBaseLimit BaseLimit - ProtocolPeerMemory MemoryLimit - - PeerBaseLimit BaseLimit - PeerMemory MemoryLimit - - ConnBaseLimit BaseLimit - ConnMemory int64 - - StreamBaseLimit BaseLimit - StreamMemory int64 +type baseLimitConfig struct { + BaseLimit BaseLimit + BaseLimitIncrease BaseLimitIncrease } -func (cfg *DefaultLimitConfig) WithSystemMemory(memFraction float64, minMemory, maxMemory int64) DefaultLimitConfig { - refactor := memFraction / cfg.SystemMemory.MemoryFraction - r := *cfg - r.SystemMemory.MemoryFraction = memFraction - r.SystemMemory.MinMemory = minMemory - r.SystemMemory.MaxMemory = maxMemory - r.TransientMemory.MemoryFraction *= refactor - r.ServiceMemory.MemoryFraction *= refactor - r.ServicePeerMemory.MemoryFraction *= refactor - r.ProtocolMemory.MemoryFraction *= refactor - r.ProtocolPeerMemory.MemoryFraction *= refactor - r.PeerMemory.MemoryFraction *= refactor - return r +// ScalingLimitConfig is a struct for configuring default limits. +// {}BaseLimit is the limits that apply for a minimal node (128 MB of memory for libp2p) and 256 file descriptors. +// {}LimitIncrease is the additional limit granted for every additional 1 GB of RAM. +type ScalingLimitConfig struct { + SystemBaseLimit BaseLimit + SystemLimitIncrease BaseLimitIncrease + + TransientBaseLimit BaseLimit + TransientLimitIncrease BaseLimitIncrease + + ServiceBaseLimit BaseLimit + ServiceLimitIncrease BaseLimitIncrease + ServiceLimits map[string]baseLimitConfig // use AddServiceLimit to modify + + ServicePeerBaseLimit BaseLimit + ServicePeerLimitIncrease BaseLimitIncrease + ServicePeerLimits map[string]baseLimitConfig // use AddServicePeerLimit to modify + + ProtocolBaseLimit BaseLimit + ProtocolLimitIncrease BaseLimitIncrease + ProtocolLimits map[protocol.ID]baseLimitConfig // use AddProtocolLimit to modify + + ProtocolPeerBaseLimit BaseLimit + ProtocolPeerLimitIncrease BaseLimitIncrease + ProtocolPeerLimits map[protocol.ID]baseLimitConfig // use AddProtocolPeerLimit to modify + + PeerBaseLimit BaseLimit + PeerLimitIncrease BaseLimitIncrease + PeerLimits map[peer.ID]baseLimitConfig // use AddPeerLimit to modify + + ConnBaseLimit BaseLimit + ConnLimitIncrease BaseLimitIncrease + + StreamBaseLimit BaseLimit + StreamLimitIncrease BaseLimitIncrease +} + +func (cfg *ScalingLimitConfig) AddServiceLimit(svc string, base BaseLimit, inc BaseLimitIncrease) { + if cfg.ServiceLimits == nil { + cfg.ServiceLimits = make(map[string]baseLimitConfig) + } + cfg.ServiceLimits[svc] = baseLimitConfig{ + BaseLimit: base, + BaseLimitIncrease: inc, + } +} + +func (cfg *ScalingLimitConfig) AddProtocolLimit(proto protocol.ID, base BaseLimit, inc BaseLimitIncrease) { + if cfg.ProtocolLimits == nil { + cfg.ProtocolLimits = make(map[protocol.ID]baseLimitConfig) + } + cfg.ProtocolLimits[proto] = baseLimitConfig{ + BaseLimit: base, + BaseLimitIncrease: inc, + } +} + +func (cfg *ScalingLimitConfig) AddPeerLimit(p peer.ID, base BaseLimit, inc BaseLimitIncrease) { + if cfg.PeerLimits == nil { + cfg.PeerLimits = make(map[peer.ID]baseLimitConfig) + } + cfg.PeerLimits[p] = baseLimitConfig{ + BaseLimit: base, + BaseLimitIncrease: inc, + } +} + +func (cfg *ScalingLimitConfig) AddServicePeerLimit(svc string, base BaseLimit, inc BaseLimitIncrease) { + if cfg.ServicePeerLimits == nil { + cfg.ServicePeerLimits = make(map[string]baseLimitConfig) + } + cfg.ServicePeerLimits[svc] = baseLimitConfig{ + BaseLimit: base, + BaseLimitIncrease: inc, + } +} + +func (cfg *ScalingLimitConfig) AddProtocolPeerLimit(proto protocol.ID, base BaseLimit, inc BaseLimitIncrease) { + if cfg.ProtocolPeerLimits == nil { + cfg.ProtocolPeerLimits = make(map[protocol.ID]baseLimitConfig) + } + cfg.ProtocolPeerLimits[proto] = baseLimitConfig{ + BaseLimit: base, + BaseLimitIncrease: inc, + } +} + +type LimitConfig struct { + SystemLimit BaseLimit + TransientLimit BaseLimit + + DefaultServiceLimit BaseLimit + ServiceLimits map[string]BaseLimit + + DefaultServicePeerLimit BaseLimit + ServicePeerLimits map[string]BaseLimit + + DefaultProtocolLimit BaseLimit + ProtocolLimits map[protocol.ID]BaseLimit + + DefaultProtocolPeerLimit BaseLimit + ProtocolPeerLimits map[protocol.ID]BaseLimit + + DefaultPeerLimit BaseLimit + PeerLimits map[peer.ID]BaseLimit + + ConnLimit BaseLimit + StreamLimit BaseLimit +} + +// Scale scales up a limit configuration. +// memory is the amount of memory that the stack is allowed to consume, +// for a full it's recommended to use 1/8 of the installed system memory. +// If memory is smaller than 128 MB, the base configuration will be used. +// +func (cfg *ScalingLimitConfig) Scale(memory int64, numFD int) LimitConfig { + var scaleFactor int + if memory > 128<<20 { + scaleFactor = int((memory - 128<<20) >> 20) + } + lc := LimitConfig{ + SystemLimit: scale(cfg.SystemBaseLimit, cfg.SystemLimitIncrease, scaleFactor, numFD), + TransientLimit: scale(cfg.TransientBaseLimit, cfg.TransientLimitIncrease, scaleFactor, numFD), + DefaultServiceLimit: scale(cfg.ServiceBaseLimit, cfg.ServiceLimitIncrease, scaleFactor, numFD), + DefaultServicePeerLimit: scale(cfg.ServicePeerBaseLimit, cfg.ServicePeerLimitIncrease, scaleFactor, numFD), + DefaultProtocolLimit: scale(cfg.ProtocolBaseLimit, cfg.ProtocolLimitIncrease, scaleFactor, numFD), + DefaultProtocolPeerLimit: scale(cfg.ProtocolPeerBaseLimit, cfg.ProtocolPeerLimitIncrease, scaleFactor, numFD), + DefaultPeerLimit: scale(cfg.PeerBaseLimit, cfg.PeerLimitIncrease, scaleFactor, numFD), + ConnLimit: scale(cfg.ConnBaseLimit, cfg.ConnLimitIncrease, scaleFactor, numFD), + StreamLimit: scale(cfg.StreamBaseLimit, cfg.ConnLimitIncrease, scaleFactor, numFD), + } + if cfg.ServiceLimits != nil { + lc.ServiceLimits = make(map[string]BaseLimit) + for svc, l := range cfg.ServiceLimits { + lc.ServiceLimits[svc] = scale(l.BaseLimit, l.BaseLimitIncrease, scaleFactor, numFD) + } + } + if cfg.ProtocolLimits != nil { + lc.ProtocolLimits = make(map[protocol.ID]BaseLimit) + for proto, l := range cfg.ProtocolLimits { + lc.ProtocolLimits[proto] = scale(l.BaseLimit, l.BaseLimitIncrease, scaleFactor, numFD) + } + } + if cfg.PeerLimits != nil { + lc.PeerLimits = make(map[peer.ID]BaseLimit) + for p, l := range cfg.PeerLimits { + lc.PeerLimits[p] = scale(l.BaseLimit, l.BaseLimitIncrease, scaleFactor, numFD) + } + } + if cfg.ServicePeerLimits != nil { + lc.ServicePeerLimits = make(map[string]BaseLimit) + for svc, l := range cfg.ServicePeerLimits { + lc.ServicePeerLimits[svc] = scale(l.BaseLimit, l.BaseLimitIncrease, scaleFactor, numFD) + } + } + if cfg.ProtocolPeerLimits != nil { + lc.ProtocolPeerLimits = make(map[protocol.ID]BaseLimit) + for p, l := range cfg.ProtocolPeerLimits { + lc.ProtocolPeerLimits[p] = scale(l.BaseLimit, l.BaseLimitIncrease, scaleFactor, numFD) + } + } + return lc +} + +// factor is the number of MBs above the minimum (128 MB) +func scale(base BaseLimit, inc BaseLimitIncrease, factor int, numFD int) BaseLimit { + l := BaseLimit{ + StreamsInbound: base.StreamsInbound + (inc.StreamsInbound*factor)>>10, + StreamsOutbound: base.StreamsOutbound + (inc.StreamsOutbound*factor)>>10, + Streams: base.Streams + (inc.Streams*factor)>>10, + ConnsInbound: base.ConnsInbound + (inc.ConnsInbound*factor)>>10, + ConnsOutbound: base.ConnsOutbound + (inc.ConnsOutbound*factor)>>10, + Conns: base.Conns + (inc.Conns*factor)>>10, + Memory: base.Memory + (inc.Memory*int64(factor))>>10, + FD: base.FD, + } + if inc.FDFraction > 0 { + l.FD = int(inc.FDFraction * float64(numFD)) + } + return l } // DefaultLimits are the limits used by the default limiter constructors. -var DefaultLimits = DefaultLimitConfig{ +var DefaultLimits = ScalingLimitConfig{ SystemBaseLimit: BaseLimit{ - StreamsInbound: 4096, - StreamsOutbound: 16384, - Streams: 16384, - ConnsInbound: 256, - ConnsOutbound: 1024, - Conns: 1024, - FD: 512, + ConnsInbound: 64, + ConnsOutbound: 128, + Conns: 128, + StreamsInbound: 64 * 16, + StreamsOutbound: 128 * 16, + Streams: 128 * 16, + Memory: 128 << 20, + FD: 256, }, - SystemMemory: MemoryLimit{ - MemoryFraction: 0.125, - MinMemory: 128 << 20, - MaxMemory: 1 << 30, + SystemLimitIncrease: BaseLimitIncrease{ + ConnsInbound: 64, + ConnsOutbound: 128, + Conns: 128, + StreamsInbound: 64 * 16, + StreamsOutbound: 128 * 16, + Streams: 128 * 16, + Memory: 1 << 30, + FDFraction: 1, }, TransientBaseLimit: BaseLimit{ - StreamsInbound: 128, - StreamsOutbound: 512, - Streams: 512, ConnsInbound: 32, - ConnsOutbound: 128, - Conns: 128, - FD: 128, + ConnsOutbound: 64, + Conns: 64, + StreamsInbound: 128, + StreamsOutbound: 256, + Streams: 256, + Memory: 32 << 20, + FD: 64, }, - TransientMemory: MemoryLimit{ - MemoryFraction: 1, - MinMemory: 64 << 20, - MaxMemory: 64 << 20, + TransientLimitIncrease: BaseLimitIncrease{ + ConnsInbound: 16, + ConnsOutbound: 32, + Conns: 32, + StreamsInbound: 128, + StreamsOutbound: 256, + Streams: 256, + Memory: 128 << 20, + FDFraction: 0.25, }, ServiceBaseLimit: BaseLimit{ - StreamsInbound: 2048, - StreamsOutbound: 8192, - Streams: 8192, - }, - - ServiceMemory: MemoryLimit{ - MemoryFraction: 0.125 / 4, - MinMemory: 64 << 20, - MaxMemory: 256 << 20, - }, - - ServicePeerBaseLimit: BaseLimit{ - StreamsInbound: 256, - StreamsOutbound: 512, - Streams: 512, - }, - - ServicePeerMemory: MemoryLimit{ - MemoryFraction: 0.125 / 16, - MinMemory: 16 << 20, - MaxMemory: 64 << 20, - }, - - ProtocolBaseLimit: BaseLimit{ StreamsInbound: 1024, StreamsOutbound: 4096, Streams: 4096, + Memory: 64 << 20, }, - ProtocolMemory: MemoryLimit{ - MemoryFraction: 0.125 / 8, - MinMemory: 64 << 20, - MaxMemory: 128 << 20, + ServiceLimitIncrease: BaseLimitIncrease{ + StreamsInbound: 512, + StreamsOutbound: 2048, + Streams: 2048, + Memory: 128 << 20, + }, + + ServicePeerBaseLimit: BaseLimit{ + StreamsInbound: 128, + StreamsOutbound: 256, + Streams: 256, + Memory: 16 << 20, + }, + + ServicePeerLimitIncrease: BaseLimitIncrease{ + StreamsInbound: 4, + StreamsOutbound: 8, + Streams: 8, + Memory: 4 << 20, + }, + + ProtocolBaseLimit: BaseLimit{ + StreamsInbound: 512, + StreamsOutbound: 2048, + Streams: 2048, + Memory: 64 << 20, + }, + + ProtocolLimitIncrease: BaseLimitIncrease{ + StreamsInbound: 256, + StreamsOutbound: 512, + Streams: 512, + Memory: 164 << 20, }, ProtocolPeerBaseLimit: BaseLimit{ - StreamsInbound: 128, - StreamsOutbound: 256, - Streams: 512, + StreamsInbound: 64, + StreamsOutbound: 128, + Streams: 256, + Memory: 16 << 20, }, - ProtocolPeerMemory: MemoryLimit{ - MemoryFraction: 0.125 / 16, - MinMemory: 16 << 20, - MaxMemory: 64 << 20, + ProtocolPeerLimitIncrease: BaseLimitIncrease{ + StreamsInbound: 4, + StreamsOutbound: 8, + Streams: 16, + Memory: 4, }, PeerBaseLimit: BaseLimit{ - StreamsInbound: 512, - StreamsOutbound: 1024, - Streams: 1024, - ConnsInbound: 8, - ConnsOutbound: 16, - Conns: 16, - FD: 8, + ConnsInbound: 4, + ConnsOutbound: 8, + Conns: 8, + StreamsInbound: 256, + StreamsOutbound: 512, + Streams: 512, + Memory: 64 << 20, + FD: 4, }, - PeerMemory: MemoryLimit{ - MemoryFraction: 0.125 / 16, - MinMemory: 64 << 20, - MaxMemory: 128 << 20, + PeerLimitIncrease: BaseLimitIncrease{ + StreamsInbound: 128, + StreamsOutbound: 256, + Streams: 256, + Memory: 128 << 20, + FDFraction: 1.0 / 64, }, ConnBaseLimit: BaseLimit{ @@ -150,17 +320,15 @@ var DefaultLimits = DefaultLimitConfig{ ConnsOutbound: 1, Conns: 1, FD: 1, + Memory: 1 << 20, }, - ConnMemory: 1 << 20, - StreamBaseLimit: BaseLimit{ StreamsInbound: 1, StreamsOutbound: 1, Streams: 1, + Memory: 16 << 20, }, - - StreamMemory: 16 << 20, } var infiniteBaseLimit = BaseLimit{ @@ -171,33 +339,19 @@ var infiniteBaseLimit = BaseLimit{ ConnsInbound: math.MaxInt, ConnsOutbound: math.MaxInt, FD: math.MaxInt, -} - -var infiniteMemoryLimit = MemoryLimit{ - MemoryFraction: 1, - MinMemory: math.MaxInt64, - MaxMemory: math.MaxInt64, + Memory: math.MaxInt64, } // InfiniteLimits are a limiter configuration that uses infinite limits, thus effectively not limiting anything. // Keep in mind that the operating system limits the number of file descriptors that an application can use. -var InfiniteLimits = DefaultLimitConfig{ - SystemBaseLimit: infiniteBaseLimit, - SystemMemory: infiniteMemoryLimit, - TransientBaseLimit: infiniteBaseLimit, - TransientMemory: infiniteMemoryLimit, - ServiceBaseLimit: infiniteBaseLimit, - ServiceMemory: infiniteMemoryLimit, - ServicePeerBaseLimit: infiniteBaseLimit, - ServicePeerMemory: infiniteMemoryLimit, - ProtocolBaseLimit: infiniteBaseLimit, - ProtocolMemory: infiniteMemoryLimit, - ProtocolPeerBaseLimit: infiniteBaseLimit, - ProtocolPeerMemory: infiniteMemoryLimit, - PeerBaseLimit: infiniteBaseLimit, - PeerMemory: infiniteMemoryLimit, - ConnBaseLimit: infiniteBaseLimit, - ConnMemory: math.MaxInt64, - StreamBaseLimit: infiniteBaseLimit, - StreamMemory: math.MaxInt64, +var InfiniteLimits = LimitConfig{ + SystemLimit: infiniteBaseLimit, + TransientLimit: infiniteBaseLimit, + DefaultServiceLimit: infiniteBaseLimit, + DefaultServicePeerLimit: infiniteBaseLimit, + DefaultProtocolLimit: infiniteBaseLimit, + DefaultProtocolPeerLimit: infiniteBaseLimit, + DefaultPeerLimit: infiniteBaseLimit, + ConnLimit: infiniteBaseLimit, + StreamLimit: infiniteBaseLimit, } diff --git a/limit_static.go b/limit_static.go deleted file mode 100644 index e96884a..0000000 --- a/limit_static.go +++ /dev/null @@ -1,138 +0,0 @@ -package rcmgr - -import ( - "github.com/pbnjay/memory" -) - -// StaticLimit is a limit with static values. -type StaticLimit struct { - BaseLimit - Memory int64 -} - -var _ Limit = (*StaticLimit)(nil) - -func (l *StaticLimit) GetMemoryLimit() int64 { - return l.Memory -} - -func (l *StaticLimit) WithMemoryLimit(memFraction float64, minMemory, maxMemory int64) Limit { - r := new(StaticLimit) - *r = *l - - r.Memory = int64(memFraction * float64(r.Memory)) - if r.Memory < minMemory { - r.Memory = minMemory - } else if r.Memory > maxMemory { - r.Memory = maxMemory - } - - return r -} - -func (l *StaticLimit) WithStreamLimit(numStreamsIn, numStreamsOut, numStreams int) Limit { - r := new(StaticLimit) - *r = *l - - r.BaseLimit.StreamsInbound = numStreamsIn - r.BaseLimit.StreamsOutbound = numStreamsOut - r.BaseLimit.Streams = numStreams - - return r -} - -func (l *StaticLimit) WithConnLimit(numConnsIn, numConnsOut, numConns int) Limit { - r := new(StaticLimit) - *r = *l - - r.BaseLimit.ConnsInbound = numConnsIn - r.BaseLimit.ConnsOutbound = numConnsOut - r.BaseLimit.Conns = numConns - - return r -} - -func (l *StaticLimit) WithFDLimit(numFD int) Limit { - r := new(StaticLimit) - *r = *l - - r.BaseLimit.FD = numFD - - return r -} - -// NewDefaultStaticLimiter creates a static limiter with default base limits and a system memory cap -// specified as a fraction of total system memory. The assigned memory will not be less than -// minMemory or more than maxMemory. -func NewDefaultStaticLimiter(memFraction float64, minMemory, maxMemory int64) *BasicLimiter { - return NewStaticLimiter(DefaultLimits.WithSystemMemory(memFraction, minMemory, maxMemory)) -} - -// NewDefaultFixedLimiter creates a static limiter with default base limits and a specified system -// memory cap. -func NewDefaultFixedLimiter(memoryCap int64) *BasicLimiter { - return NewStaticLimiter(DefaultLimits.WithSystemMemory(1, memoryCap, memoryCap)) -} - -// NewDefaultLimiter creates a static limiter with the default limits -func NewDefaultLimiter() *BasicLimiter { - return NewStaticLimiter(DefaultLimits) -} - -// NewStaticLimiter creates a static limiter using the specified system memory cap and default -// limit config. -func NewStaticLimiter(cfg DefaultLimitConfig) *BasicLimiter { - memoryCap := memoryLimit( - int64(memory.TotalMemory()), - cfg.SystemMemory.MemoryFraction, - cfg.SystemMemory.MinMemory, - cfg.SystemMemory.MaxMemory) - system := &StaticLimit{ - Memory: memoryCap, - BaseLimit: cfg.SystemBaseLimit, - } - transient := &StaticLimit{ - Memory: cfg.TransientMemory.GetMemory(memoryCap), - BaseLimit: cfg.TransientBaseLimit, - } - svc := &StaticLimit{ - Memory: cfg.ServiceMemory.GetMemory(memoryCap), - BaseLimit: cfg.ServiceBaseLimit, - } - svcPeer := &StaticLimit{ - Memory: cfg.ServicePeerMemory.GetMemory(memoryCap), - BaseLimit: cfg.ServicePeerBaseLimit, - } - proto := &StaticLimit{ - Memory: cfg.ProtocolMemory.GetMemory(memoryCap), - BaseLimit: cfg.ProtocolBaseLimit, - } - protoPeer := &StaticLimit{ - Memory: cfg.ProtocolPeerMemory.GetMemory(memoryCap), - BaseLimit: cfg.ProtocolPeerBaseLimit, - } - peer := &StaticLimit{ - Memory: cfg.PeerMemory.GetMemory(memoryCap), - BaseLimit: cfg.PeerBaseLimit, - } - conn := &StaticLimit{ - Memory: cfg.ConnMemory, - BaseLimit: cfg.ConnBaseLimit, - } - stream := &StaticLimit{ - Memory: cfg.StreamMemory, - BaseLimit: cfg.StreamBaseLimit, - } - - return &BasicLimiter{ - SystemLimits: system, - TransientLimits: transient, - DefaultServiceLimits: svc, - DefaultServicePeerLimits: svcPeer, - DefaultProtocolLimits: proto, - DefaultProtocolPeerLimits: protoPeer, - DefaultPeerLimits: peer, - ConnLimits: conn, - StreamLimits: stream, - } -} diff --git a/limit_test.go b/limit_test.go new file mode 100644 index 0000000..2ff92cc --- /dev/null +++ b/limit_test.go @@ -0,0 +1,77 @@ +package rcmgr + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestScaling(t *testing.T) { + base := BaseLimit{ + Streams: 100, + StreamsInbound: 200, + StreamsOutbound: 400, + Conns: 10, + ConnsInbound: 20, + ConnsOutbound: 40, + FD: 1, + Memory: 1 << 20, + } + + t.Run("no scaling if no increase is defined", func(t *testing.T) { + cfg := ScalingLimitConfig{ServiceBaseLimit: base} + scaled := cfg.Scale(8<<30, 100) + require.Equal(t, base, scaled.DefaultServiceLimit) + }) + + t.Run("scaling", func(t *testing.T) { + cfg := ScalingLimitConfig{ + TransientBaseLimit: base, + TransientLimitIncrease: BaseLimitIncrease{ + Streams: 1, + StreamsInbound: 2, + StreamsOutbound: 3, + Conns: 4, + ConnsInbound: 5, + ConnsOutbound: 6, + Memory: 7, + FDFraction: 0.5, + }, + } + scaled := cfg.Scale(128<<20+4<<30, 1000) + require.Equal(t, 500, scaled.TransientLimit.FD) + require.Equal(t, base.Streams+4, scaled.TransientLimit.Streams) + require.Equal(t, base.StreamsInbound+4*2, scaled.TransientLimit.StreamsInbound) + require.Equal(t, base.StreamsOutbound+4*3, scaled.TransientLimit.StreamsOutbound) + require.Equal(t, base.Conns+4*4, scaled.TransientLimit.Conns) + require.Equal(t, base.ConnsInbound+4*5, scaled.TransientLimit.ConnsInbound) + require.Equal(t, base.ConnsOutbound+4*6, scaled.TransientLimit.ConnsOutbound) + require.Equal(t, base.Memory+4*7, scaled.TransientLimit.Memory) + }) + + t.Run("scaling limits in maps", func(t *testing.T) { + cfg := ScalingLimitConfig{ + ServiceLimits: map[string]baseLimitConfig{ + "A": { + BaseLimit: BaseLimit{Streams: 10, Memory: 100, FD: 9}, + }, + "B": { + BaseLimit: BaseLimit{Streams: 20, Memory: 200, FD: 10}, + BaseLimitIncrease: BaseLimitIncrease{Streams: 2, Memory: 3, FDFraction: 0.4}, + }, + }, + } + scaled := cfg.Scale(128<<20+4<<30, 1000) + + require.Len(t, scaled.ServiceLimits, 2) + require.Contains(t, scaled.ServiceLimits, "A") + require.Equal(t, 10, scaled.ServiceLimits["A"].Streams) + require.Equal(t, int64(100), scaled.ServiceLimits["A"].Memory) + require.Equal(t, 9, scaled.ServiceLimits["A"].FD) + + require.Contains(t, scaled.ServiceLimits, "B") + require.Equal(t, 20+4*2, scaled.ServiceLimits["B"].Streams) + require.Equal(t, int64(200+4*3), scaled.ServiceLimits["B"].Memory) + require.Equal(t, 400, scaled.ServiceLimits["B"].FD) + }) +} diff --git a/rcmgr_test.go b/rcmgr_test.go index fdf8dbf..0d49206 100644 --- a/rcmgr_test.go +++ b/rcmgr_test.go @@ -20,167 +20,138 @@ func TestResourceManager(t *testing.T) { svcA := "A.svc" svcB := "B.svc" nmgr, err := NewResourceManager( - &BasicLimiter{ - SystemLimits: &StaticLimit{ - Memory: 16384, - BaseLimit: BaseLimit{ - StreamsInbound: 3, - StreamsOutbound: 3, - Streams: 6, - ConnsInbound: 3, - ConnsOutbound: 3, - Conns: 6, - FD: 2, - }, + NewFixedLimiter(LimitConfig{ + SystemLimit: BaseLimit{ + Memory: 16384, + StreamsInbound: 3, + StreamsOutbound: 3, + Streams: 6, + ConnsInbound: 3, + ConnsOutbound: 3, + Conns: 6, + FD: 2, }, - TransientLimits: &StaticLimit{ - Memory: 4096, - BaseLimit: BaseLimit{ - StreamsInbound: 1, - StreamsOutbound: 1, - Streams: 2, - ConnsInbound: 1, - ConnsOutbound: 1, - Conns: 2, + TransientLimit: BaseLimit{ + Memory: 4096, + StreamsInbound: 1, + StreamsOutbound: 1, + Streams: 2, + ConnsInbound: 1, + ConnsOutbound: 1, + Conns: 2, + FD: 1, + }, + DefaultServiceLimit: BaseLimit{ + Memory: 4096, + StreamsInbound: 1, + StreamsOutbound: 1, + Streams: 2, + ConnsInbound: 1, + ConnsOutbound: 1, + Conns: 2, + FD: 1, + }, + DefaultServicePeerLimit: BaseLimit{ + Memory: 4096, + StreamsInbound: 5, + StreamsOutbound: 5, + Streams: 10, + }, + ServiceLimits: map[string]BaseLimit{ + svcA: { + Memory: 8192, + StreamsInbound: 2, + StreamsOutbound: 2, + Streams: 4, + ConnsInbound: 2, + ConnsOutbound: 2, + Conns: 4, + FD: 1, + }, + svcB: { + Memory: 8192, + StreamsInbound: 2, + StreamsOutbound: 2, + Streams: 4, + ConnsInbound: 2, + ConnsOutbound: 2, + Conns: 4, FD: 1, }, }, - DefaultServiceLimits: &StaticLimit{ - Memory: 4096, - BaseLimit: BaseLimit{ + ServicePeerLimits: map[string]BaseLimit{ + svcB: { + Memory: 8192, StreamsInbound: 1, StreamsOutbound: 1, Streams: 2, - ConnsInbound: 1, - ConnsOutbound: 1, - Conns: 2, + }, + }, + DefaultProtocolLimit: BaseLimit{ + Memory: 4096, + StreamsInbound: 1, + StreamsOutbound: 1, + Streams: 2, + }, + ProtocolLimits: map[protocol.ID]BaseLimit{ + protoA: { + Memory: 8192, + StreamsInbound: 2, + StreamsOutbound: 2, + Streams: 2, + }, + }, + ProtocolPeerLimits: map[protocol.ID]BaseLimit{ + protoB: { + Memory: 8192, + StreamsInbound: 1, + StreamsOutbound: 1, + Streams: 2, + }, + }, + DefaultPeerLimit: BaseLimit{ + Memory: 4096, + StreamsInbound: 1, + StreamsOutbound: 1, + Streams: 2, + ConnsInbound: 1, + ConnsOutbound: 1, + Conns: 2, + FD: 1, + }, + DefaultProtocolPeerLimit: BaseLimit{ + Memory: 4096, + StreamsInbound: 5, + StreamsOutbound: 5, + Streams: 10, + }, + PeerLimits: map[peer.ID]BaseLimit{ + peerA: { + Memory: 8192, + StreamsInbound: 2, + StreamsOutbound: 2, + Streams: 4, + ConnsInbound: 2, + ConnsOutbound: 2, + Conns: 4, FD: 1, }, }, - DefaultServicePeerLimits: &StaticLimit{ - Memory: 4096, - BaseLimit: BaseLimit{ - StreamsInbound: 5, - StreamsOutbound: 5, - Streams: 10, - }, + ConnLimit: BaseLimit{ + Memory: 4096, + ConnsInbound: 1, + ConnsOutbound: 1, + Conns: 1, + FD: 1, }, - ServiceLimits: map[string]Limit{ - svcA: &StaticLimit{ - Memory: 8192, - BaseLimit: BaseLimit{ - StreamsInbound: 2, - StreamsOutbound: 2, - Streams: 4, - ConnsInbound: 2, - ConnsOutbound: 2, - Conns: 4, - FD: 1, - }, - }, - svcB: &StaticLimit{ - Memory: 8192, - BaseLimit: BaseLimit{ - StreamsInbound: 2, - StreamsOutbound: 2, - Streams: 4, - ConnsInbound: 2, - ConnsOutbound: 2, - Conns: 4, - FD: 1, - }, - }, + StreamLimit: BaseLimit{ + Memory: 4096, + StreamsInbound: 1, + StreamsOutbound: 1, + Streams: 1, }, - ServicePeerLimits: map[string]Limit{ - svcB: &StaticLimit{ - Memory: 8192, - BaseLimit: BaseLimit{ - StreamsInbound: 1, - StreamsOutbound: 1, - Streams: 2, - }, - }, - }, - DefaultProtocolLimits: &StaticLimit{ - Memory: 4096, - BaseLimit: BaseLimit{ - StreamsInbound: 1, - StreamsOutbound: 1, - Streams: 2, - }, - }, - ProtocolLimits: map[protocol.ID]Limit{ - protoA: &StaticLimit{ - Memory: 8192, - BaseLimit: BaseLimit{ - StreamsInbound: 2, - StreamsOutbound: 2, - Streams: 2, - }, - }, - }, - ProtocolPeerLimits: map[protocol.ID]Limit{ - protoB: &StaticLimit{ - Memory: 8192, - BaseLimit: BaseLimit{ - StreamsInbound: 1, - StreamsOutbound: 1, - Streams: 2, - }, - }, - }, - DefaultPeerLimits: &StaticLimit{ - Memory: 4096, - BaseLimit: BaseLimit{ - StreamsInbound: 1, - StreamsOutbound: 1, - Streams: 2, - ConnsInbound: 1, - ConnsOutbound: 1, - Conns: 2, - FD: 1, - }, - }, - DefaultProtocolPeerLimits: &StaticLimit{ - Memory: 4096, - BaseLimit: BaseLimit{ - StreamsInbound: 5, - StreamsOutbound: 5, - Streams: 10, - }, - }, - PeerLimits: map[peer.ID]Limit{ - peerA: &StaticLimit{ - Memory: 8192, - BaseLimit: BaseLimit{ - StreamsInbound: 2, - StreamsOutbound: 2, - Streams: 4, - ConnsInbound: 2, - ConnsOutbound: 2, - Conns: 4, - FD: 1, - }, - }, - }, - ConnLimits: &StaticLimit{ - Memory: 4096, - BaseLimit: BaseLimit{ - ConnsInbound: 1, - ConnsOutbound: 1, - Conns: 1, - FD: 1, - }, - }, - StreamLimits: &StaticLimit{ - Memory: 4096, - BaseLimit: BaseLimit{ - StreamsInbound: 1, - StreamsOutbound: 1, - Streams: 1, - }, - }, - }) + }), + ) if err != nil { t.Fatal(err) @@ -1006,10 +977,14 @@ func TestResourceManager(t *testing.T) { func TestResourceManagerWithAllowlist(t *testing.T) { peerA := test.RandPeerIDFatal(t) - limits := NewDefaultLimiter() - limits.SystemLimits = limits.SystemLimits.WithConnLimit(0, 0, 0) - limits.TransientLimits = limits.SystemLimits.WithConnLimit(0, 0, 0) - rcmgr, err := NewResourceManager(limits, WithAllowlistedMultiaddrs([]multiaddr.Multiaddr{ + limits := DefaultLimits.Scale(1<<30, 100) + limits.SystemLimit.Conns = 0 + limits.SystemLimit.ConnsInbound = 0 + limits.SystemLimit.ConnsOutbound = 0 + limits.TransientLimit.Conns = 0 + limits.TransientLimit.ConnsInbound = 0 + limits.TransientLimit.ConnsOutbound = 0 + rcmgr, err := NewResourceManager(NewFixedLimiter(limits), WithAllowlistedMultiaddrs([]multiaddr.Multiaddr{ multiaddr.StringCast("/ip4/1.2.3.4"), multiaddr.StringCast("/ip4/4.3.2.1/p2p/" + peerA.String()), })) @@ -1021,9 +996,18 @@ func TestResourceManagerWithAllowlist(t *testing.T) { // Setup allowlist. TODO, replace this with a config once config changes are in r := rcmgr.(*resourceManager) - r.allowlistedSystem = newSystemScope(limits.GetSystemLimits().WithConnLimit(2, 1, 2), r, "allowlistedSystem") + sysLimit := limits.SystemLimit + sysLimit.Conns = 2 + sysLimit.ConnsInbound = 2 + sysLimit.ConnsOutbound = 1 + r.allowlistedSystem = newSystemScope(&sysLimit, r, "allowlistedSystem") r.allowlistedSystem.IncRef() - r.allowlistedTransient = newTransientScope(limits.GetTransientLimits().WithConnLimit(1, 1, 1), r, "allowlistedTransient", r.allowlistedSystem.resourceScope) + + transLimit := limits.TransientLimit + transLimit.Conns = 1 + transLimit.ConnsInbound = 1 + transLimit.ConnsOutbound = 1 + r.allowlistedTransient = newTransientScope(&transLimit, r, "allowlistedTransient", r.allowlistedSystem.resourceScope) r.allowlistedTransient.IncRef() } diff --git a/scope_test.go b/scope_test.go index 98440fa..3743068 100644 --- a/scope_test.go +++ b/scope_test.go @@ -30,17 +30,15 @@ func checkResources(t *testing.T, rc *resources, st network.ScopeStat) { } func TestResources(t *testing.T) { - rc := resources{limit: &StaticLimit{ - Memory: 4096, - BaseLimit: BaseLimit{ - StreamsInbound: 1, - StreamsOutbound: 1, - Streams: 1, - ConnsInbound: 1, - ConnsOutbound: 1, - Conns: 1, - FD: 1, - }, + rc := resources{limit: &BaseLimit{ + Memory: 4096, + StreamsInbound: 1, + StreamsOutbound: 1, + Streams: 1, + ConnsInbound: 1, + ConnsOutbound: 1, + Conns: 1, + FD: 1, }} checkResources(t, &rc, network.ScopeStat{}) @@ -245,17 +243,15 @@ func TestResources(t *testing.T) { func TestResourceScopeSimple(t *testing.T) { s := newResourceScope( - &StaticLimit{ - Memory: 4096, - BaseLimit: BaseLimit{ - StreamsInbound: 1, - StreamsOutbound: 1, - Streams: 1, - ConnsInbound: 1, - ConnsOutbound: 1, - Conns: 1, - FD: 1, - }, + &BaseLimit{ + Memory: 4096, + StreamsInbound: 1, + StreamsOutbound: 1, + Streams: 1, + ConnsInbound: 1, + ConnsOutbound: 1, + Conns: 1, + FD: 1, }, nil, "test", nil, nil, ) @@ -379,17 +375,15 @@ func testResourceScopeBasic(t *testing.T, s *resourceScope) { func TestResourceScopeTxnBasic(t *testing.T) { s := newResourceScope( - &StaticLimit{ - Memory: 4096, - BaseLimit: BaseLimit{ - StreamsInbound: 1, - StreamsOutbound: 1, - Streams: 1, - ConnsInbound: 1, - ConnsOutbound: 1, - Conns: 1, - FD: 1, - }, + &BaseLimit{ + Memory: 4096, + StreamsInbound: 1, + StreamsOutbound: 1, + Streams: 1, + ConnsInbound: 1, + ConnsOutbound: 1, + Conns: 1, + FD: 1, }, nil, "test", nil, nil, ) @@ -416,17 +410,15 @@ func TestResourceScopeTxnBasic(t *testing.T) { func TestResourceScopeTxnZombie(t *testing.T) { s := newResourceScope( - &StaticLimit{ - Memory: 4096, - BaseLimit: BaseLimit{ - StreamsInbound: 1, - StreamsOutbound: 1, - Streams: 1, - ConnsInbound: 1, - ConnsOutbound: 1, - Conns: 1, - FD: 1, - }, + &BaseLimit{ + Memory: 4096, + StreamsInbound: 1, + StreamsOutbound: 1, + Streams: 1, + ConnsInbound: 1, + ConnsOutbound: 1, + Conns: 1, + FD: 1, }, nil, "test", nil, nil, ) @@ -460,17 +452,15 @@ func TestResourceScopeTxnZombie(t *testing.T) { func TestResourceScopeTxnTree(t *testing.T) { s := newResourceScope( - &StaticLimit{ - Memory: 4096, - BaseLimit: BaseLimit{ - StreamsInbound: 1, - StreamsOutbound: 1, - Streams: 1, - ConnsInbound: 1, - ConnsOutbound: 1, - Conns: 1, - FD: 1, - }, + &BaseLimit{ + Memory: 4096, + StreamsInbound: 1, + StreamsOutbound: 1, + Streams: 1, + ConnsInbound: 1, + ConnsOutbound: 1, + Conns: 1, + FD: 1, }, nil, "test", nil, nil, ) @@ -571,92 +561,80 @@ func TestResourceScopeDAG(t *testing.T) { // \ // ------> s6 s1 := newResourceScope( - &StaticLimit{ - Memory: 4096, - BaseLimit: BaseLimit{ - StreamsInbound: 4, - StreamsOutbound: 4, - Streams: 4, - ConnsInbound: 4, - ConnsOutbound: 4, - Conns: 4, - FD: 4, - }, + &BaseLimit{ + Memory: 4096, + StreamsInbound: 4, + StreamsOutbound: 4, + Streams: 4, + ConnsInbound: 4, + ConnsOutbound: 4, + Conns: 4, + FD: 4, }, nil, "test", nil, nil, ) s2 := newResourceScope( - &StaticLimit{ - Memory: 2048, - BaseLimit: BaseLimit{ - StreamsInbound: 2, - StreamsOutbound: 2, - Streams: 2, - ConnsInbound: 2, - ConnsOutbound: 2, - Conns: 2, - FD: 2, - }, + &BaseLimit{ + Memory: 2048, + StreamsInbound: 2, + StreamsOutbound: 2, + Streams: 2, + ConnsInbound: 2, + ConnsOutbound: 2, + Conns: 2, + FD: 2, }, []*resourceScope{s1}, "test", nil, nil, ) s3 := newResourceScope( - &StaticLimit{ - Memory: 2048, - BaseLimit: BaseLimit{ - StreamsInbound: 2, - StreamsOutbound: 2, - Streams: 2, - ConnsInbound: 2, - ConnsOutbound: 2, - Conns: 2, - FD: 2, - }, + &BaseLimit{ + Memory: 2048, + StreamsInbound: 2, + StreamsOutbound: 2, + Streams: 2, + ConnsInbound: 2, + ConnsOutbound: 2, + Conns: 2, + FD: 2, }, []*resourceScope{s1}, "test", nil, nil, ) s4 := newResourceScope( - &StaticLimit{ - Memory: 2048, - BaseLimit: BaseLimit{ - StreamsInbound: 2, - StreamsOutbound: 2, - Streams: 2, - ConnsInbound: 2, - ConnsOutbound: 2, - Conns: 2, - FD: 2, - }, + &BaseLimit{ + Memory: 2048, + StreamsInbound: 2, + StreamsOutbound: 2, + Streams: 2, + ConnsInbound: 2, + ConnsOutbound: 2, + Conns: 2, + FD: 2, }, []*resourceScope{s2, s3, s1}, "test", nil, nil, ) s5 := newResourceScope( - &StaticLimit{ - Memory: 2048, - BaseLimit: BaseLimit{ - StreamsInbound: 2, - StreamsOutbound: 2, - Streams: 2, - ConnsInbound: 2, - ConnsOutbound: 2, - Conns: 2, - FD: 2, - }, + &BaseLimit{ + Memory: 2048, + StreamsInbound: 2, + StreamsOutbound: 2, + Streams: 2, + ConnsInbound: 2, + ConnsOutbound: 2, + Conns: 2, + FD: 2, }, []*resourceScope{s2, s1}, "test", nil, nil, ) s6 := newResourceScope( - &StaticLimit{ - Memory: 2048, - BaseLimit: BaseLimit{ - StreamsInbound: 2, - StreamsOutbound: 2, - Streams: 2, - ConnsInbound: 2, - ConnsOutbound: 2, - Conns: 2, - FD: 2, - }, + &BaseLimit{ + Memory: 2048, + StreamsInbound: 2, + StreamsOutbound: 2, + Streams: 2, + ConnsInbound: 2, + ConnsOutbound: 2, + Conns: 2, + FD: 2, }, []*resourceScope{s3, s1}, "test", nil, nil, ) @@ -1095,39 +1073,27 @@ func TestResourceScopeDAGTxn(t *testing.T) { // \ // ------> s6 s1 := newResourceScope( - &StaticLimit{ - Memory: 8192, - }, + &BaseLimit{Memory: 8192}, nil, "test", nil, nil, ) s2 := newResourceScope( - &StaticLimit{ - Memory: 4096 + 2048, - }, + &BaseLimit{Memory: 4096 + 2048}, []*resourceScope{s1}, "test", nil, nil, ) s3 := newResourceScope( - &StaticLimit{ - Memory: 4096 + 2048, - }, + &BaseLimit{Memory: 4096 + 2048}, []*resourceScope{s1}, "test", nil, nil, ) s4 := newResourceScope( - &StaticLimit{ - Memory: 4096 + 1024, - }, + &BaseLimit{Memory: 4096 + 1024}, []*resourceScope{s2, s3, s1}, "test", nil, nil, ) s5 := newResourceScope( - &StaticLimit{ - Memory: 4096 + 1024, - }, + &BaseLimit{Memory: 4096 + 1024}, []*resourceScope{s2, s1}, "test", nil, nil, ) s6 := newResourceScope( - &StaticLimit{ - Memory: 4096 + 1024, - }, + &BaseLimit{Memory: 4096 + 1024}, []*resourceScope{s3, s1}, "test", nil, nil, )