Merge pull request #48 from libp2p/rework-limits

rewrite limits to allow auto-scaling
This commit is contained in:
Marten Seemann 2022-07-02 06:14:47 -07:00 committed by GitHub
commit d0a1694030
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 946 additions and 1219 deletions

6
go.mod
View File

@ -7,8 +7,9 @@ require (
github.com/libp2p/go-libp2p-core v0.19.0 github.com/libp2p/go-libp2p-core v0.19.0
github.com/multiformats/go-multiaddr v0.6.0 github.com/multiformats/go-multiaddr v0.6.0
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.8.0
go.opencensus.io v0.23.0 go.opencensus.io v0.23.0
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c
) )
require ( require (
@ -38,6 +39,5 @@ require (
go.uber.org/multierr v1.6.0 // indirect go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.19.1 // indirect go.uber.org/zap v1.19.1 // indirect
golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf // indirect golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf // indirect
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c // indirect gopkg.in/yaml.v3 v3.0.1 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
) )

8
go.sum
View File

@ -133,10 +133,13 @@ github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+
github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc= github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
@ -238,7 +241,8 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

216
limit.go
View File

@ -1,6 +1,9 @@
package rcmgr package rcmgr
import ( import (
"encoding/json"
"io"
"github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/protocol"
@ -20,19 +23,6 @@ type Limit interface {
GetConnTotalLimit() int GetConnTotalLimit() int
// GetFDLimit returns the file descriptor limit. // GetFDLimit returns the file descriptor limit.
GetFDLimit() int GetFDLimit() int
// WithMemoryLimit creates a copy of this limit object, with memory limit adjusted to
// the specified memFraction of its current value, bounded by minMemory and maxMemory.
WithMemoryLimit(memFraction float64, minMemory, maxMemory int64) Limit
// WithStreamLimit creates a copy of this limit object, with stream limits adjusted
// as specified.
WithStreamLimit(numStreamsIn, numStreamsOut, numStreams int) Limit
// WithConnLimit creates a copy of this limit object, with connetion limits adjusted
// as specified.
WithConnLimit(numConnsIn, numConnsOut, numConns int) Limit
// WithFDLimit creates a copy of this limit object, with file descriptor limits adjusted
// as specified
WithFDLimit(numFD int) Limit
} }
// Limiter is the interface for providing limits to the resource manager. // Limiter is the interface for providing limits to the resource manager.
@ -48,25 +38,41 @@ type Limiter interface {
GetConnLimits() Limit GetConnLimits() Limit
} }
// BasicLimiter is a limiter with fixed limits. // NewDefaultLimiterFromJSON creates a new limiter by parsing a json configuration,
type BasicLimiter struct { // using the default limits for fallback.
SystemLimits Limit func NewDefaultLimiterFromJSON(in io.Reader) (Limiter, error) {
TransientLimits Limit return NewLimiterFromJSON(in, DefaultLimits.AutoScale())
DefaultServiceLimits Limit
DefaultServicePeerLimits Limit
ServiceLimits map[string]Limit
ServicePeerLimits map[string]Limit
DefaultProtocolLimits Limit
DefaultProtocolPeerLimits Limit
ProtocolLimits map[protocol.ID]Limit
ProtocolPeerLimits map[protocol.ID]Limit
DefaultPeerLimits Limit
PeerLimits map[peer.ID]Limit
ConnLimits Limit
StreamLimits Limit
} }
var _ Limiter = (*BasicLimiter)(nil) // NewLimiterFromJSON creates a new limiter by parsing a json configuration.
func NewLimiterFromJSON(in io.Reader, defaults LimitConfig) (Limiter, error) {
cfg, err := readLimiterConfigFromJSON(in, defaults)
if err != nil {
return nil, err
}
return &fixedLimiter{cfg}, nil
}
func readLimiterConfigFromJSON(in io.Reader, defaults LimitConfig) (LimitConfig, error) {
var cfg LimitConfig
if err := json.NewDecoder(in).Decode(&cfg); err != nil {
return LimitConfig{}, err
}
cfg.Apply(defaults)
return cfg, nil
}
// fixedLimiter is a limiter with fixed limits.
type fixedLimiter struct {
LimitConfig
}
var _ Limiter = (*fixedLimiter)(nil)
func NewFixedLimiter(conf LimitConfig) Limiter {
log.Debugw("initializing new limiter with config", "limits", conf)
return &fixedLimiter{LimitConfig: conf}
}
// BaseLimit is a mixin type for basic resource limits. // BaseLimit is a mixin type for basic resource limits.
type BaseLimit struct { type BaseLimit struct {
@ -77,13 +83,77 @@ type BaseLimit struct {
ConnsInbound int ConnsInbound int
ConnsOutbound int ConnsOutbound int
FD int FD int
Memory int64
} }
// MemoryLimit is a mixin type for memory limits // Apply overwrites all zero-valued limits with the values of l2
type MemoryLimit struct { // Must not use a pointer receiver.
MemoryFraction float64 func (l *BaseLimit) Apply(l2 BaseLimit) {
MinMemory int64 if l.Streams == 0 {
MaxMemory int64 l.Streams = l2.Streams
}
if l.StreamsInbound == 0 {
l.StreamsInbound = l2.StreamsInbound
}
if l.StreamsOutbound == 0 {
l.StreamsOutbound = l2.StreamsOutbound
}
if l.Conns == 0 {
l.Conns = l2.Conns
}
if l.ConnsInbound == 0 {
l.ConnsInbound = l2.ConnsInbound
}
if l.ConnsOutbound == 0 {
l.ConnsOutbound = l2.ConnsOutbound
}
if l.Memory == 0 {
l.Memory = l2.Memory
}
if l.FD == 0 {
l.FD = l2.FD
}
}
// BaseLimitIncrease is the increase per GB of system memory.
type BaseLimitIncrease struct {
Streams int
StreamsInbound int
StreamsOutbound int
Conns int
ConnsInbound int
ConnsOutbound int
Memory int64
FDFraction float64
}
// Apply overwrites all zero-valued limits with the values of l2
// Must not use a pointer receiver.
func (l *BaseLimitIncrease) Apply(l2 BaseLimitIncrease) {
if l.Streams == 0 {
l.Streams = l2.Streams
}
if l.StreamsInbound == 0 {
l.StreamsInbound = l2.StreamsInbound
}
if l.StreamsOutbound == 0 {
l.StreamsOutbound = l2.StreamsOutbound
}
if l.Conns == 0 {
l.Conns = l2.Conns
}
if l.ConnsInbound == 0 {
l.ConnsInbound = l2.ConnsInbound
}
if l.ConnsOutbound == 0 {
l.ConnsOutbound = l2.ConnsOutbound
}
if l.Memory == 0 {
l.Memory = l2.Memory
}
if l.FDFraction == 0 {
l.FDFraction = l2.FDFraction
}
} }
func (l *BaseLimit) GetStreamLimit(dir network.Direction) int { func (l *BaseLimit) GetStreamLimit(dir network.Direction) int {
@ -114,74 +184,62 @@ func (l *BaseLimit) GetFDLimit() int {
return l.FD return l.FD
} }
func (l *BasicLimiter) GetSystemLimits() Limit { func (l *BaseLimit) GetMemoryLimit() int64 {
return l.SystemLimits return l.Memory
} }
func (l *BasicLimiter) GetTransientLimits() Limit { func (l *fixedLimiter) GetSystemLimits() Limit {
return l.TransientLimits return &l.System
} }
func (l *BasicLimiter) GetServiceLimits(svc string) Limit { func (l *fixedLimiter) GetTransientLimits() Limit {
sl, ok := l.ServiceLimits[svc] return &l.Transient
}
func (l *fixedLimiter) GetServiceLimits(svc string) Limit {
sl, ok := l.Service[svc]
if !ok { if !ok {
return l.DefaultServiceLimits return &l.ServiceDefault
} }
return sl return &sl
} }
func (l *BasicLimiter) GetServicePeerLimits(svc string) Limit { func (l *fixedLimiter) GetServicePeerLimits(svc string) Limit {
pl, ok := l.ServicePeerLimits[svc] pl, ok := l.ServicePeer[svc]
if !ok { if !ok {
return l.DefaultServicePeerLimits return &l.ServicePeerDefault
} }
return pl return &pl
} }
func (l *BasicLimiter) GetProtocolLimits(proto protocol.ID) Limit { func (l *fixedLimiter) GetProtocolLimits(proto protocol.ID) Limit {
pl, ok := l.ProtocolLimits[proto] pl, ok := l.Protocol[proto]
if !ok { if !ok {
return l.DefaultProtocolLimits return &l.ProtocolDefault
} }
return pl return &pl
} }
func (l *BasicLimiter) GetProtocolPeerLimits(proto protocol.ID) Limit { func (l *fixedLimiter) GetProtocolPeerLimits(proto protocol.ID) Limit {
pl, ok := l.ProtocolPeerLimits[proto] pl, ok := l.ProtocolPeer[proto]
if !ok { if !ok {
return l.DefaultProtocolPeerLimits return &l.ProtocolPeerDefault
} }
return pl return &pl
} }
func (l *BasicLimiter) GetPeerLimits(p peer.ID) Limit { func (l *fixedLimiter) GetPeerLimits(p peer.ID) Limit {
pl, ok := l.PeerLimits[p] pl, ok := l.Peer[p]
if !ok { if !ok {
return l.DefaultPeerLimits return &l.PeerDefault
} }
return pl return &pl
} }
func (l *BasicLimiter) GetStreamLimits(p peer.ID) Limit { func (l *fixedLimiter) GetStreamLimits(_ peer.ID) Limit {
return l.StreamLimits return &l.Stream
} }
func (l *BasicLimiter) GetConnLimits() Limit { func (l *fixedLimiter) GetConnLimits() Limit {
return l.ConnLimits return &l.Conn
}
func (l *MemoryLimit) GetMemory(memoryCap int64) int64 {
return memoryLimit(memoryCap, l.MemoryFraction, l.MinMemory, l.MaxMemory)
}
func memoryLimit(memoryCap int64, memFraction float64, minMemory, maxMemory int64) int64 {
memoryCap = int64(float64(memoryCap) * memFraction)
switch {
case memoryCap < minMemory:
return minMemory
case memoryCap > maxMemory:
return maxMemory
default:
return memoryCap
}
} }

View File

@ -1,309 +0,0 @@
package rcmgr
import (
"encoding/json"
"fmt"
"io"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/pbnjay/memory"
)
type BasicLimitConfig struct {
// if true, then a dynamic limit is used
Dynamic bool `json:",omitempty"`
// either Memory is set for fixed memory limit
Memory int64 `json:",omitempty"`
// or the following 3 fields for computed memory limits
MinMemory int64 `json:",omitempty"`
MaxMemory int64 `json:",omitempty"`
MemoryFraction float64 `json:",omitempty"`
StreamsInbound int
StreamsOutbound int
Streams int
ConnsInbound int
ConnsOutbound int
Conns int
FD int
}
func (cfg *BasicLimitConfig) toLimit(base BaseLimit, mem MemoryLimit) (Limit, error) {
if cfg == nil {
m := mem.GetMemory(int64(memory.TotalMemory()))
return &StaticLimit{
Memory: m,
BaseLimit: base,
}, nil
}
if cfg.Streams > 0 {
base.Streams = cfg.Streams
}
if cfg.StreamsInbound > 0 {
base.StreamsInbound = cfg.StreamsInbound
}
if cfg.StreamsOutbound > 0 {
base.StreamsOutbound = cfg.StreamsOutbound
}
if cfg.Conns > 0 {
base.Conns = cfg.Conns
}
if cfg.ConnsInbound > 0 {
base.ConnsInbound = cfg.ConnsInbound
}
if cfg.ConnsOutbound > 0 {
base.ConnsOutbound = cfg.ConnsOutbound
}
if cfg.FD > 0 {
base.FD = cfg.FD
}
switch {
case cfg.Memory > 0:
return &StaticLimit{
Memory: cfg.Memory,
BaseLimit: base,
}, nil
case cfg.Dynamic:
if cfg.MemoryFraction < 0 {
return nil, fmt.Errorf("negative memory fraction: %f", cfg.MemoryFraction)
}
if cfg.MemoryFraction > 0 {
mem.MemoryFraction = cfg.MemoryFraction
}
if cfg.MinMemory > 0 {
mem.MinMemory = cfg.MinMemory
}
if cfg.MaxMemory > 0 {
mem.MaxMemory = cfg.MaxMemory
}
return &DynamicLimit{
MemoryLimit: mem,
BaseLimit: base,
}, nil
default:
if cfg.MemoryFraction < 0 {
return nil, fmt.Errorf("negative memory fraction: %f", cfg.MemoryFraction)
}
if cfg.MemoryFraction > 0 {
mem.MemoryFraction = cfg.MemoryFraction
}
if cfg.MinMemory > 0 {
mem.MinMemory = cfg.MinMemory
}
if cfg.MaxMemory > 0 {
mem.MaxMemory = cfg.MaxMemory
}
m := mem.GetMemory(int64(memory.TotalMemory()))
return &StaticLimit{
Memory: m,
BaseLimit: base,
}, nil
}
}
func (cfg *BasicLimitConfig) toLimitFixed(base BaseLimit, mem int64) (Limit, error) {
if cfg == nil {
return &StaticLimit{
Memory: mem,
BaseLimit: base,
}, nil
}
if cfg.Streams > 0 {
base.Streams = cfg.Streams
}
if cfg.StreamsInbound > 0 {
base.StreamsInbound = cfg.StreamsInbound
}
if cfg.StreamsOutbound > 0 {
base.StreamsOutbound = cfg.StreamsOutbound
}
if cfg.Conns > 0 {
base.Conns = cfg.Conns
}
if cfg.ConnsInbound > 0 {
base.ConnsInbound = cfg.ConnsInbound
}
if cfg.ConnsOutbound > 0 {
base.ConnsOutbound = cfg.ConnsOutbound
}
if cfg.FD > 0 {
base.FD = cfg.FD
}
switch {
case cfg.Memory > 0:
return &StaticLimit{
Memory: cfg.Memory,
BaseLimit: base,
}, nil
case cfg.Dynamic:
return nil, fmt.Errorf("cannot specify dynamic limit for fixed memory limit")
default:
if cfg.MemoryFraction > 0 || cfg.MinMemory > 0 || cfg.MaxMemory > 0 {
return nil, fmt.Errorf("cannot specify dynamic range for fixed memory limit")
}
return &StaticLimit{
Memory: mem,
BaseLimit: base,
}, nil
}
}
type BasicLimiterConfig struct {
System *BasicLimitConfig `json:",omitempty"`
Transient *BasicLimitConfig `json:",omitempty"`
ServiceDefault *BasicLimitConfig `json:",omitempty"`
ServicePeerDefault *BasicLimitConfig `json:",omitempty"`
Service map[string]BasicLimitConfig `json:",omitempty"`
ServicePeer map[string]BasicLimitConfig `json:",omitempty"`
ProtocolDefault *BasicLimitConfig `json:",omitempty"`
ProtocolPeerDefault *BasicLimitConfig `json:",omitempty"`
Protocol map[string]BasicLimitConfig `json:",omitempty"`
ProtocolPeer map[string]BasicLimitConfig `json:",omitempty"`
PeerDefault *BasicLimitConfig `json:",omitempty"`
Peer map[string]BasicLimitConfig `json:",omitempty"`
Conn *BasicLimitConfig `json:",omitempty"`
Stream *BasicLimitConfig `json:",omitempty"`
}
// NewDefaultLimiterFromJSON creates a new limiter by parsing a json configuration,
// using the default limits for fallback.
func NewDefaultLimiterFromJSON(in io.Reader) (*BasicLimiter, error) {
return NewLimiterFromJSON(in, DefaultLimits)
}
// NewLimiterFromJSON creates a new limiter by parsing a json configuration.
func NewLimiterFromJSON(in io.Reader, defaults DefaultLimitConfig) (*BasicLimiter, error) {
jin := json.NewDecoder(in)
var cfg BasicLimiterConfig
if err := jin.Decode(&cfg); err != nil {
return nil, err
}
return NewLimiter(cfg, defaults)
}
func NewLimiter(cfg BasicLimiterConfig, defaults DefaultLimitConfig) (*BasicLimiter, error) {
limiter := new(BasicLimiter)
var err error
limiter.SystemLimits, err = cfg.System.toLimit(defaults.SystemBaseLimit, defaults.SystemMemory)
if err != nil {
return nil, fmt.Errorf("invalid system limit: %w", err)
}
limiter.TransientLimits, err = cfg.Transient.toLimit(defaults.TransientBaseLimit, defaults.TransientMemory)
if err != nil {
return nil, fmt.Errorf("invalid transient limit: %w", err)
}
limiter.DefaultServiceLimits, err = cfg.ServiceDefault.toLimit(defaults.ServiceBaseLimit, defaults.ServiceMemory)
if err != nil {
return nil, fmt.Errorf("invalid default service limit: %w", err)
}
limiter.DefaultServicePeerLimits, err = cfg.ServicePeerDefault.toLimit(defaults.ServicePeerBaseLimit, defaults.ServicePeerMemory)
if err != nil {
return nil, fmt.Errorf("invalid default service peer limit: %w", err)
}
if len(cfg.Service) > 0 {
limiter.ServiceLimits = make(map[string]Limit, len(cfg.Service))
for svc, cfgLimit := range cfg.Service {
limiter.ServiceLimits[svc], err = cfgLimit.toLimit(defaults.ServiceBaseLimit, defaults.ServiceMemory)
if err != nil {
return nil, fmt.Errorf("invalid service limit for %s: %w", svc, err)
}
}
}
if len(cfg.ServicePeer) > 0 {
limiter.ServicePeerLimits = make(map[string]Limit, len(cfg.ServicePeer))
for svc, cfgLimit := range cfg.ServicePeer {
limiter.ServicePeerLimits[svc], err = cfgLimit.toLimit(defaults.ServicePeerBaseLimit, defaults.ServicePeerMemory)
if err != nil {
return nil, fmt.Errorf("invalid service peer limit for %s: %w", svc, err)
}
}
}
limiter.DefaultProtocolLimits, err = cfg.ProtocolDefault.toLimit(defaults.ProtocolBaseLimit, defaults.ProtocolMemory)
if err != nil {
return nil, fmt.Errorf("invalid default protocol limit: %w", err)
}
limiter.DefaultProtocolPeerLimits, err = cfg.ProtocolPeerDefault.toLimit(defaults.ProtocolPeerBaseLimit, defaults.ProtocolPeerMemory)
if err != nil {
return nil, fmt.Errorf("invalid default protocol peer limit: %w", err)
}
if len(cfg.Protocol) > 0 {
limiter.ProtocolLimits = make(map[protocol.ID]Limit, len(cfg.Protocol))
for p, cfgLimit := range cfg.Protocol {
limiter.ProtocolLimits[protocol.ID(p)], err = cfgLimit.toLimit(defaults.ProtocolBaseLimit, defaults.ProtocolMemory)
if err != nil {
return nil, fmt.Errorf("invalid service limit for %s: %w", p, err)
}
}
}
if len(cfg.ProtocolPeer) > 0 {
limiter.ProtocolPeerLimits = make(map[protocol.ID]Limit, len(cfg.ProtocolPeer))
for p, cfgLimit := range cfg.ProtocolPeer {
limiter.ProtocolPeerLimits[protocol.ID(p)], err = cfgLimit.toLimit(defaults.ProtocolPeerBaseLimit, defaults.ProtocolPeerMemory)
if err != nil {
return nil, fmt.Errorf("invalid service peer limit for %s: %w", p, err)
}
}
}
limiter.DefaultPeerLimits, err = cfg.PeerDefault.toLimit(defaults.PeerBaseLimit, defaults.PeerMemory)
if err != nil {
return nil, fmt.Errorf("invalid peer limit: %w", err)
}
if len(cfg.Peer) > 0 {
limiter.PeerLimits = make(map[peer.ID]Limit, len(cfg.Peer))
for p, cfgLimit := range cfg.Peer {
pid, err := peer.Decode(p)
if err != nil {
return nil, fmt.Errorf("invalid peer ID %s: %w", p, err)
}
limiter.PeerLimits[pid], err = cfgLimit.toLimit(defaults.PeerBaseLimit, defaults.PeerMemory)
if err != nil {
return nil, fmt.Errorf("invalid peer limit for %s: %w", p, err)
}
}
}
limiter.ConnLimits, err = cfg.Conn.toLimitFixed(defaults.ConnBaseLimit, defaults.ConnMemory)
if err != nil {
return nil, fmt.Errorf("invalid conn limit: %w", err)
}
limiter.StreamLimits, err = cfg.Stream.toLimitFixed(defaults.StreamBaseLimit, defaults.StreamMemory)
if err != nil {
return nil, fmt.Errorf("invalid stream limit: %w", err)
}
return limiter, nil
}

View File

@ -4,121 +4,50 @@ import (
"os" "os"
"testing" "testing"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func withMemoryLimit(l BaseLimit, m int64) BaseLimit {
l2 := l
l2.Memory = m
return l2
}
func TestLimitConfigParser(t *testing.T) { func TestLimitConfigParser(t *testing.T) {
in, err := os.Open("limit_config_test.json") in, err := os.Open("limit_config_test.json")
require.NoError(t, err) require.NoError(t, err)
defer in.Close() defer in.Close()
limiter, err := NewDefaultLimiterFromJSON(in) DefaultLimits.AddServiceLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{})
DefaultLimits.AddProtocolPeerLimit("C", DefaultLimits.ServiceBaseLimit, BaseLimitIncrease{})
defaults := DefaultLimits.AutoScale()
cfg, err := readLimiterConfigFromJSON(in, defaults)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, require.Equal(t, int64(65536), cfg.System.Memory)
&DynamicLimit{ require.Equal(t, defaults.System.Streams, cfg.System.Streams)
MemoryLimit: MemoryLimit{ require.Equal(t, defaults.System.StreamsInbound, cfg.System.StreamsInbound)
MinMemory: 16384, require.Equal(t, defaults.System.StreamsOutbound, cfg.System.StreamsOutbound)
MaxMemory: 65536, require.Equal(t, 16, cfg.System.Conns)
MemoryFraction: 0.125, require.Equal(t, 8, cfg.System.ConnsInbound)
}, require.Equal(t, 16, cfg.System.ConnsOutbound)
BaseLimit: BaseLimit{ require.Equal(t, 16, cfg.System.FD)
Streams: 64,
StreamsInbound: 32,
StreamsOutbound: 48,
Conns: 16,
ConnsInbound: 8,
ConnsOutbound: 16,
FD: 16,
},
},
limiter.SystemLimits)
require.Equal(t, require.Equal(t, defaults.Transient, cfg.Transient)
&StaticLimit{ require.Equal(t, int64(8765), cfg.ServiceDefault.Memory)
Memory: 4096,
BaseLimit: DefaultLimits.TransientBaseLimit,
},
limiter.TransientLimits)
require.Equal(t, require.Contains(t, cfg.Service, "A")
&StaticLimit{ require.Equal(t, withMemoryLimit(cfg.ServiceDefault, 8192), cfg.Service["A"])
Memory: 8192, require.Contains(t, cfg.Service, "B")
BaseLimit: DefaultLimits.ServiceBaseLimit, require.Equal(t, cfg.ServiceDefault, cfg.Service["B"])
}, require.Contains(t, cfg.Service, "C")
limiter.DefaultServiceLimits) require.Equal(t, defaults.Service["C"], cfg.Service["C"])
require.Equal(t,
&StaticLimit{
Memory: 2048,
BaseLimit: DefaultLimits.ServicePeerBaseLimit,
},
limiter.DefaultServicePeerLimits)
require.Equal(t, 1, len(limiter.ServiceLimits))
require.Equal(t,
&StaticLimit{
Memory: 8192,
BaseLimit: DefaultLimits.ServiceBaseLimit,
},
limiter.ServiceLimits["A"])
require.Equal(t, 1, len(limiter.ServicePeerLimits))
require.Equal(t,
&StaticLimit{
Memory: 4096,
BaseLimit: DefaultLimits.ServicePeerBaseLimit,
},
limiter.ServicePeerLimits["A"])
require.Equal(t,
&StaticLimit{
Memory: 2048,
BaseLimit: DefaultLimits.ProtocolBaseLimit,
},
limiter.DefaultProtocolLimits)
require.Equal(t,
&StaticLimit{
Memory: 1024,
BaseLimit: DefaultLimits.ProtocolPeerBaseLimit,
},
limiter.DefaultProtocolPeerLimits)
require.Equal(t, 1, len(limiter.ProtocolLimits))
require.Equal(t,
&StaticLimit{
Memory: 8192,
BaseLimit: DefaultLimits.ProtocolBaseLimit,
},
limiter.ProtocolLimits["/A"])
require.Equal(t, 1, len(limiter.ProtocolPeerLimits))
require.Equal(t,
&StaticLimit{
Memory: 4096,
BaseLimit: DefaultLimits.ProtocolPeerBaseLimit,
},
limiter.ProtocolPeerLimits["/A"])
require.Equal(t,
&StaticLimit{
Memory: 4096,
BaseLimit: DefaultLimits.PeerBaseLimit,
},
limiter.DefaultPeerLimits)
require.Equal(t,
&StaticLimit{
Memory: 1 << 20,
BaseLimit: DefaultLimits.ConnBaseLimit,
},
limiter.ConnLimits)
require.Equal(t,
&StaticLimit{
Memory: 16 << 20,
BaseLimit: DefaultLimits.StreamBaseLimit,
},
limiter.StreamLimits)
require.Equal(t, int64(4096), cfg.PeerDefault.Memory)
peerID, err := peer.Decode("12D3KooWPFH2Bx2tPfw6RLxN8k2wh47GRXgkt9yrAHU37zFwHWzS")
require.NoError(t, err)
require.Contains(t, cfg.Peer, peerID)
require.Equal(t, int64(4097), cfg.Peer[peerID].Memory)
} }

View File

@ -1,32 +1,22 @@
{ {
"System": { "System": {
"Dynamic": true, "Memory": 65536,
"MinMemory": 16384,
"MaxMemory": 65536,
"MemoryFraction": 0.125,
"Streams": 64,
"StreamsInbound": 32,
"StreamsOutbound": 48,
"Conns": 16, "Conns": 16,
"ConnsInbound": 8, "ConnsInbound": 8,
"ConnsOutbound": 16, "ConnsOutbound": 16,
"FD": 16 "FD": 16
}, },
"Transient": {
"MinMemory": 1024,
"MaxMemory": 4096,
"MemoryFraction": 0.03125
},
"ServiceDefault": { "ServiceDefault": {
"Memory": 8192 "Memory": 8765
},
"ServicePeerDefault": {
"Memory": 2048
}, },
"Service": { "Service": {
"A": { "A": {
"Memory": 8192 "Memory": 8192
} },
"B": {}
},
"ServicePeerDefault": {
"Memory": 2048
}, },
"ServicePeer": { "ServicePeer": {
"A": { "A": {
@ -44,17 +34,12 @@
"Memory": 8192 "Memory": 8192
} }
}, },
"ProtocolPeer": {
"/A": {
"Memory": 4096
}
},
"PeerDefault": { "PeerDefault": {
"Memory": 4096 "Memory": 4096
}, },
"Peer": { "Peer": {
"12D3KooWPFH2Bx2tPfw6RLxN8k2wh47GRXgkt9yrAHU37zFwHWzS": { "12D3KooWPFH2Bx2tPfw6RLxN8k2wh47GRXgkt9yrAHU37zFwHWzS": {
"Memory": 4096 "Memory": 4097
} }
} }
} }

View File

@ -1,148 +1,424 @@
package rcmgr package rcmgr
import "math" import (
"math"
// DefaultLimitConfig is a struct for configuring default limits. "github.com/libp2p/go-libp2p-core/peer"
type DefaultLimitConfig struct { "github.com/libp2p/go-libp2p-core/protocol"
SystemBaseLimit BaseLimit
SystemMemory MemoryLimit
TransientBaseLimit BaseLimit "github.com/pbnjay/memory"
TransientMemory MemoryLimit )
ServiceBaseLimit BaseLimit type baseLimitConfig struct {
ServiceMemory MemoryLimit BaseLimit BaseLimit
BaseLimitIncrease BaseLimitIncrease
ServicePeerBaseLimit BaseLimit
ServicePeerMemory MemoryLimit
ProtocolBaseLimit BaseLimit
ProtocolMemory MemoryLimit
ProtocolPeerBaseLimit BaseLimit
ProtocolPeerMemory MemoryLimit
PeerBaseLimit BaseLimit
PeerMemory MemoryLimit
ConnBaseLimit BaseLimit
ConnMemory int64
StreamBaseLimit BaseLimit
StreamMemory int64
} }
func (cfg *DefaultLimitConfig) WithSystemMemory(memFraction float64, minMemory, maxMemory int64) DefaultLimitConfig { // ScalingLimitConfig is a struct for configuring default limits.
refactor := memFraction / cfg.SystemMemory.MemoryFraction // {}BaseLimit is the limits that Apply for a minimal node (128 MB of memory for libp2p) and 256 file descriptors.
r := *cfg // {}LimitIncrease is the additional limit granted for every additional 1 GB of RAM.
r.SystemMemory.MemoryFraction = memFraction type ScalingLimitConfig struct {
r.SystemMemory.MinMemory = minMemory SystemBaseLimit BaseLimit
r.SystemMemory.MaxMemory = maxMemory SystemLimitIncrease BaseLimitIncrease
r.TransientMemory.MemoryFraction *= refactor
r.ServiceMemory.MemoryFraction *= refactor TransientBaseLimit BaseLimit
r.ServicePeerMemory.MemoryFraction *= refactor TransientLimitIncrease BaseLimitIncrease
r.ProtocolMemory.MemoryFraction *= refactor
r.ProtocolPeerMemory.MemoryFraction *= refactor ServiceBaseLimit BaseLimit
r.PeerMemory.MemoryFraction *= refactor ServiceLimitIncrease BaseLimitIncrease
return r ServiceLimits map[string]baseLimitConfig // use AddServiceLimit to modify
ServicePeerBaseLimit BaseLimit
ServicePeerLimitIncrease BaseLimitIncrease
ServicePeerLimits map[string]baseLimitConfig // use AddServicePeerLimit to modify
ProtocolBaseLimit BaseLimit
ProtocolLimitIncrease BaseLimitIncrease
ProtocolLimits map[protocol.ID]baseLimitConfig // use AddProtocolLimit to modify
ProtocolPeerBaseLimit BaseLimit
ProtocolPeerLimitIncrease BaseLimitIncrease
ProtocolPeerLimits map[protocol.ID]baseLimitConfig // use AddProtocolPeerLimit to modify
PeerBaseLimit BaseLimit
PeerLimitIncrease BaseLimitIncrease
PeerLimits map[peer.ID]baseLimitConfig // use AddPeerLimit to modify
ConnBaseLimit BaseLimit
ConnLimitIncrease BaseLimitIncrease
StreamBaseLimit BaseLimit
StreamLimitIncrease BaseLimitIncrease
}
func (cfg *ScalingLimitConfig) AddServiceLimit(svc string, base BaseLimit, inc BaseLimitIncrease) {
if cfg.ServiceLimits == nil {
cfg.ServiceLimits = make(map[string]baseLimitConfig)
}
cfg.ServiceLimits[svc] = baseLimitConfig{
BaseLimit: base,
BaseLimitIncrease: inc,
}
}
func (cfg *ScalingLimitConfig) AddProtocolLimit(proto protocol.ID, base BaseLimit, inc BaseLimitIncrease) {
if cfg.ProtocolLimits == nil {
cfg.ProtocolLimits = make(map[protocol.ID]baseLimitConfig)
}
cfg.ProtocolLimits[proto] = baseLimitConfig{
BaseLimit: base,
BaseLimitIncrease: inc,
}
}
func (cfg *ScalingLimitConfig) AddPeerLimit(p peer.ID, base BaseLimit, inc BaseLimitIncrease) {
if cfg.PeerLimits == nil {
cfg.PeerLimits = make(map[peer.ID]baseLimitConfig)
}
cfg.PeerLimits[p] = baseLimitConfig{
BaseLimit: base,
BaseLimitIncrease: inc,
}
}
func (cfg *ScalingLimitConfig) AddServicePeerLimit(svc string, base BaseLimit, inc BaseLimitIncrease) {
if cfg.ServicePeerLimits == nil {
cfg.ServicePeerLimits = make(map[string]baseLimitConfig)
}
cfg.ServicePeerLimits[svc] = baseLimitConfig{
BaseLimit: base,
BaseLimitIncrease: inc,
}
}
func (cfg *ScalingLimitConfig) AddProtocolPeerLimit(proto protocol.ID, base BaseLimit, inc BaseLimitIncrease) {
if cfg.ProtocolPeerLimits == nil {
cfg.ProtocolPeerLimits = make(map[protocol.ID]baseLimitConfig)
}
cfg.ProtocolPeerLimits[proto] = baseLimitConfig{
BaseLimit: base,
BaseLimitIncrease: inc,
}
}
type LimitConfig struct {
System BaseLimit `json:",omitempty"`
Transient BaseLimit `json:",omitempty"`
ServiceDefault BaseLimit `json:",omitempty"`
Service map[string]BaseLimit `json:",omitempty"`
ServicePeerDefault BaseLimit `json:",omitempty"`
ServicePeer map[string]BaseLimit `json:",omitempty"`
ProtocolDefault BaseLimit `json:",omitempty"`
Protocol map[protocol.ID]BaseLimit `json:",omitempty"`
ProtocolPeerDefault BaseLimit `json:",omitempty"`
ProtocolPeer map[protocol.ID]BaseLimit `json:",omitempty"`
PeerDefault BaseLimit `json:",omitempty"`
Peer map[peer.ID]BaseLimit `json:",omitempty"`
Conn BaseLimit `json:",omitempty"`
Stream BaseLimit `json:",omitempty"`
}
func (cfg *LimitConfig) Apply(c LimitConfig) {
cfg.System.Apply(c.System)
cfg.Transient.Apply(c.Transient)
cfg.ServiceDefault.Apply(c.ServiceDefault)
cfg.ProtocolDefault.Apply(c.ProtocolDefault)
cfg.ProtocolPeerDefault.Apply(c.ProtocolPeerDefault)
cfg.PeerDefault.Apply(c.PeerDefault)
cfg.Conn.Apply(c.Conn)
cfg.Stream.Apply(c.Stream)
// TODO: the following could be solved a lot nicer, if only we could use generics
for s, l := range cfg.Service {
r := cfg.ServiceDefault
if l2, ok := c.Service[s]; ok {
r = l2
}
l.Apply(r)
cfg.Service[s] = l
}
if c.Service != nil && cfg.Service == nil {
cfg.Service = make(map[string]BaseLimit)
}
for s, l := range c.Service {
if _, ok := cfg.Service[s]; !ok {
cfg.Service[s] = l
}
}
for s, l := range cfg.ServicePeer {
r := cfg.ServicePeerDefault
if l2, ok := c.ServicePeer[s]; ok {
r = l2
}
l.Apply(r)
cfg.ServicePeer[s] = l
}
if c.ServicePeer != nil && cfg.ServicePeer == nil {
cfg.ServicePeer = make(map[string]BaseLimit)
}
for s, l := range c.ServicePeer {
if _, ok := cfg.ServicePeer[s]; !ok {
cfg.ServicePeer[s] = l
}
}
for s, l := range cfg.Protocol {
r := cfg.ProtocolDefault
if l2, ok := c.Protocol[s]; ok {
r = l2
}
l.Apply(r)
cfg.Protocol[s] = l
}
if c.Protocol != nil && cfg.Protocol == nil {
cfg.Protocol = make(map[protocol.ID]BaseLimit)
}
for s, l := range c.Protocol {
if _, ok := cfg.Protocol[s]; !ok {
cfg.Protocol[s] = l
}
}
for s, l := range cfg.ProtocolPeer {
r := cfg.ProtocolPeerDefault
if l2, ok := c.ProtocolPeer[s]; ok {
r = l2
}
l.Apply(r)
cfg.ProtocolPeer[s] = l
}
if c.ProtocolPeer != nil && cfg.ProtocolPeer == nil {
cfg.ProtocolPeer = make(map[protocol.ID]BaseLimit)
}
for s, l := range c.ProtocolPeer {
if _, ok := cfg.ProtocolPeer[s]; !ok {
cfg.ProtocolPeer[s] = l
}
}
for s, l := range cfg.Peer {
r := cfg.PeerDefault
if l2, ok := c.Peer[s]; ok {
r = l2
}
l.Apply(r)
cfg.Peer[s] = l
}
if c.Peer != nil && cfg.Peer == nil {
cfg.Peer = make(map[peer.ID]BaseLimit)
}
for s, l := range c.Peer {
if _, ok := cfg.Peer[s]; !ok {
cfg.Peer[s] = l
}
}
}
// Scale scales up a limit configuration.
// memory is the amount of memory that the stack is allowed to consume,
// for a full it's recommended to use 1/8 of the installed system memory.
// If memory is smaller than 128 MB, the base configuration will be used.
//
func (cfg *ScalingLimitConfig) Scale(memory int64, numFD int) LimitConfig {
var scaleFactor int
if memory > 128<<20 {
scaleFactor = int((memory - 128<<20) >> 20)
}
lc := LimitConfig{
System: scale(cfg.SystemBaseLimit, cfg.SystemLimitIncrease, scaleFactor, numFD),
Transient: scale(cfg.TransientBaseLimit, cfg.TransientLimitIncrease, scaleFactor, numFD),
ServiceDefault: scale(cfg.ServiceBaseLimit, cfg.ServiceLimitIncrease, scaleFactor, numFD),
ServicePeerDefault: scale(cfg.ServicePeerBaseLimit, cfg.ServicePeerLimitIncrease, scaleFactor, numFD),
ProtocolDefault: scale(cfg.ProtocolBaseLimit, cfg.ProtocolLimitIncrease, scaleFactor, numFD),
ProtocolPeerDefault: scale(cfg.ProtocolPeerBaseLimit, cfg.ProtocolPeerLimitIncrease, scaleFactor, numFD),
PeerDefault: scale(cfg.PeerBaseLimit, cfg.PeerLimitIncrease, scaleFactor, numFD),
Conn: scale(cfg.ConnBaseLimit, cfg.ConnLimitIncrease, scaleFactor, numFD),
Stream: scale(cfg.StreamBaseLimit, cfg.ConnLimitIncrease, scaleFactor, numFD),
}
if cfg.ServiceLimits != nil {
lc.Service = make(map[string]BaseLimit)
for svc, l := range cfg.ServiceLimits {
lc.Service[svc] = scale(l.BaseLimit, l.BaseLimitIncrease, scaleFactor, numFD)
}
}
if cfg.ProtocolLimits != nil {
lc.Protocol = make(map[protocol.ID]BaseLimit)
for proto, l := range cfg.ProtocolLimits {
lc.Protocol[proto] = scale(l.BaseLimit, l.BaseLimitIncrease, scaleFactor, numFD)
}
}
if cfg.PeerLimits != nil {
lc.Peer = make(map[peer.ID]BaseLimit)
for p, l := range cfg.PeerLimits {
lc.Peer[p] = scale(l.BaseLimit, l.BaseLimitIncrease, scaleFactor, numFD)
}
}
if cfg.ServicePeerLimits != nil {
lc.ServicePeer = make(map[string]BaseLimit)
for svc, l := range cfg.ServicePeerLimits {
lc.ServicePeer[svc] = scale(l.BaseLimit, l.BaseLimitIncrease, scaleFactor, numFD)
}
}
if cfg.ProtocolPeerLimits != nil {
lc.ProtocolPeer = make(map[protocol.ID]BaseLimit)
for p, l := range cfg.ProtocolPeerLimits {
lc.ProtocolPeer[p] = scale(l.BaseLimit, l.BaseLimitIncrease, scaleFactor, numFD)
}
}
return lc
}
func (cfg *ScalingLimitConfig) AutoScale() LimitConfig {
return cfg.Scale(
int64(memory.TotalMemory())/8,
getNumFDs()/2,
)
}
// factor is the number of MBs above the minimum (128 MB)
func scale(base BaseLimit, inc BaseLimitIncrease, factor int, numFD int) BaseLimit {
l := BaseLimit{
StreamsInbound: base.StreamsInbound + (inc.StreamsInbound*factor)>>10,
StreamsOutbound: base.StreamsOutbound + (inc.StreamsOutbound*factor)>>10,
Streams: base.Streams + (inc.Streams*factor)>>10,
ConnsInbound: base.ConnsInbound + (inc.ConnsInbound*factor)>>10,
ConnsOutbound: base.ConnsOutbound + (inc.ConnsOutbound*factor)>>10,
Conns: base.Conns + (inc.Conns*factor)>>10,
Memory: base.Memory + (inc.Memory*int64(factor))>>10,
FD: base.FD,
}
if inc.FDFraction > 0 && numFD > 0 {
l.FD = int(inc.FDFraction * float64(numFD))
}
return l
} }
// DefaultLimits are the limits used by the default limiter constructors. // DefaultLimits are the limits used by the default limiter constructors.
var DefaultLimits = DefaultLimitConfig{ var DefaultLimits = ScalingLimitConfig{
SystemBaseLimit: BaseLimit{ SystemBaseLimit: BaseLimit{
StreamsInbound: 4096, ConnsInbound: 64,
StreamsOutbound: 16384, ConnsOutbound: 128,
Streams: 16384, Conns: 128,
ConnsInbound: 256, StreamsInbound: 64 * 16,
ConnsOutbound: 1024, StreamsOutbound: 128 * 16,
Conns: 1024, Streams: 128 * 16,
FD: 512, Memory: 128 << 20,
FD: 256,
}, },
SystemMemory: MemoryLimit{ SystemLimitIncrease: BaseLimitIncrease{
MemoryFraction: 0.125, ConnsInbound: 64,
MinMemory: 128 << 20, ConnsOutbound: 128,
MaxMemory: 1 << 30, Conns: 128,
StreamsInbound: 64 * 16,
StreamsOutbound: 128 * 16,
Streams: 128 * 16,
Memory: 1 << 30,
FDFraction: 1,
}, },
TransientBaseLimit: BaseLimit{ TransientBaseLimit: BaseLimit{
StreamsInbound: 128,
StreamsOutbound: 512,
Streams: 512,
ConnsInbound: 32, ConnsInbound: 32,
ConnsOutbound: 128, ConnsOutbound: 64,
Conns: 128, Conns: 64,
FD: 128, StreamsInbound: 128,
StreamsOutbound: 256,
Streams: 256,
Memory: 32 << 20,
FD: 64,
}, },
TransientMemory: MemoryLimit{ TransientLimitIncrease: BaseLimitIncrease{
MemoryFraction: 1, ConnsInbound: 16,
MinMemory: 64 << 20, ConnsOutbound: 32,
MaxMemory: 64 << 20, Conns: 32,
StreamsInbound: 128,
StreamsOutbound: 256,
Streams: 256,
Memory: 128 << 20,
FDFraction: 0.25,
}, },
ServiceBaseLimit: BaseLimit{ ServiceBaseLimit: BaseLimit{
StreamsInbound: 2048,
StreamsOutbound: 8192,
Streams: 8192,
},
ServiceMemory: MemoryLimit{
MemoryFraction: 0.125 / 4,
MinMemory: 64 << 20,
MaxMemory: 256 << 20,
},
ServicePeerBaseLimit: BaseLimit{
StreamsInbound: 256,
StreamsOutbound: 512,
Streams: 512,
},
ServicePeerMemory: MemoryLimit{
MemoryFraction: 0.125 / 16,
MinMemory: 16 << 20,
MaxMemory: 64 << 20,
},
ProtocolBaseLimit: BaseLimit{
StreamsInbound: 1024, StreamsInbound: 1024,
StreamsOutbound: 4096, StreamsOutbound: 4096,
Streams: 4096, Streams: 4096,
Memory: 64 << 20,
}, },
ProtocolMemory: MemoryLimit{ ServiceLimitIncrease: BaseLimitIncrease{
MemoryFraction: 0.125 / 8, StreamsInbound: 512,
MinMemory: 64 << 20, StreamsOutbound: 2048,
MaxMemory: 128 << 20, Streams: 2048,
Memory: 128 << 20,
},
ServicePeerBaseLimit: BaseLimit{
StreamsInbound: 128,
StreamsOutbound: 256,
Streams: 256,
Memory: 16 << 20,
},
ServicePeerLimitIncrease: BaseLimitIncrease{
StreamsInbound: 4,
StreamsOutbound: 8,
Streams: 8,
Memory: 4 << 20,
},
ProtocolBaseLimit: BaseLimit{
StreamsInbound: 512,
StreamsOutbound: 2048,
Streams: 2048,
Memory: 64 << 20,
},
ProtocolLimitIncrease: BaseLimitIncrease{
StreamsInbound: 256,
StreamsOutbound: 512,
Streams: 512,
Memory: 164 << 20,
}, },
ProtocolPeerBaseLimit: BaseLimit{ ProtocolPeerBaseLimit: BaseLimit{
StreamsInbound: 128, StreamsInbound: 64,
StreamsOutbound: 256, StreamsOutbound: 128,
Streams: 512, Streams: 256,
Memory: 16 << 20,
}, },
ProtocolPeerMemory: MemoryLimit{ ProtocolPeerLimitIncrease: BaseLimitIncrease{
MemoryFraction: 0.125 / 16, StreamsInbound: 4,
MinMemory: 16 << 20, StreamsOutbound: 8,
MaxMemory: 64 << 20, Streams: 16,
Memory: 4,
}, },
PeerBaseLimit: BaseLimit{ PeerBaseLimit: BaseLimit{
StreamsInbound: 512, ConnsInbound: 4,
StreamsOutbound: 1024, ConnsOutbound: 8,
Streams: 1024, Conns: 8,
ConnsInbound: 8, StreamsInbound: 256,
ConnsOutbound: 16, StreamsOutbound: 512,
Conns: 16, Streams: 512,
FD: 8, Memory: 64 << 20,
FD: 4,
}, },
PeerMemory: MemoryLimit{ PeerLimitIncrease: BaseLimitIncrease{
MemoryFraction: 0.125 / 16, StreamsInbound: 128,
MinMemory: 64 << 20, StreamsOutbound: 256,
MaxMemory: 128 << 20, Streams: 256,
Memory: 128 << 20,
FDFraction: 1.0 / 64,
}, },
ConnBaseLimit: BaseLimit{ ConnBaseLimit: BaseLimit{
@ -150,17 +426,15 @@ var DefaultLimits = DefaultLimitConfig{
ConnsOutbound: 1, ConnsOutbound: 1,
Conns: 1, Conns: 1,
FD: 1, FD: 1,
Memory: 1 << 20,
}, },
ConnMemory: 1 << 20,
StreamBaseLimit: BaseLimit{ StreamBaseLimit: BaseLimit{
StreamsInbound: 1, StreamsInbound: 1,
StreamsOutbound: 1, StreamsOutbound: 1,
Streams: 1, Streams: 1,
Memory: 16 << 20,
}, },
StreamMemory: 16 << 20,
} }
var infiniteBaseLimit = BaseLimit{ var infiniteBaseLimit = BaseLimit{
@ -171,33 +445,19 @@ var infiniteBaseLimit = BaseLimit{
ConnsInbound: math.MaxInt, ConnsInbound: math.MaxInt,
ConnsOutbound: math.MaxInt, ConnsOutbound: math.MaxInt,
FD: math.MaxInt, FD: math.MaxInt,
} Memory: math.MaxInt64,
var infiniteMemoryLimit = MemoryLimit{
MemoryFraction: 1,
MinMemory: math.MaxInt64,
MaxMemory: math.MaxInt64,
} }
// InfiniteLimits are a limiter configuration that uses infinite limits, thus effectively not limiting anything. // InfiniteLimits are a limiter configuration that uses infinite limits, thus effectively not limiting anything.
// Keep in mind that the operating system limits the number of file descriptors that an application can use. // Keep in mind that the operating system limits the number of file descriptors that an application can use.
var InfiniteLimits = DefaultLimitConfig{ var InfiniteLimits = LimitConfig{
SystemBaseLimit: infiniteBaseLimit, System: infiniteBaseLimit,
SystemMemory: infiniteMemoryLimit, Transient: infiniteBaseLimit,
TransientBaseLimit: infiniteBaseLimit, ServiceDefault: infiniteBaseLimit,
TransientMemory: infiniteMemoryLimit, ServicePeerDefault: infiniteBaseLimit,
ServiceBaseLimit: infiniteBaseLimit, ProtocolDefault: infiniteBaseLimit,
ServiceMemory: infiniteMemoryLimit, ProtocolPeerDefault: infiniteBaseLimit,
ServicePeerBaseLimit: infiniteBaseLimit, PeerDefault: infiniteBaseLimit,
ServicePeerMemory: infiniteMemoryLimit, Conn: infiniteBaseLimit,
ProtocolBaseLimit: infiniteBaseLimit, Stream: infiniteBaseLimit,
ProtocolMemory: infiniteMemoryLimit,
ProtocolPeerBaseLimit: infiniteBaseLimit,
ProtocolPeerMemory: infiniteMemoryLimit,
PeerBaseLimit: infiniteBaseLimit,
PeerMemory: infiniteMemoryLimit,
ConnBaseLimit: infiniteBaseLimit,
ConnMemory: math.MaxInt64,
StreamBaseLimit: infiniteBaseLimit,
StreamMemory: math.MaxInt64,
} }

View File

@ -1,128 +0,0 @@
package rcmgr
import (
"runtime"
"github.com/pbnjay/memory"
)
// DynamicLimit is a limit with dynamic memory values, based on available (free) memory
type DynamicLimit struct {
BaseLimit
MemoryLimit
}
var _ Limit = (*DynamicLimit)(nil)
func (l *DynamicLimit) GetMemoryLimit() int64 {
freemem := memory.FreeMemory()
// account for memory retained by the runtime that is actually free
// HeapInuse - HeapAlloc is the memory available in allocator spans
// HeapIdle - HeapReleased is memory held by the runtime that could be returned to the OS
var memstat runtime.MemStats
runtime.ReadMemStats(&memstat)
freemem += (memstat.HeapInuse - memstat.HeapAlloc) + (memstat.HeapIdle - memstat.HeapReleased)
return l.MemoryLimit.GetMemory(int64(freemem))
}
func (l *DynamicLimit) WithMemoryLimit(memFraction float64, minMemory, maxMemory int64) Limit {
r := new(DynamicLimit)
*r = *l
r.MemoryLimit.MemoryFraction *= memFraction
r.MemoryLimit.MinMemory = minMemory
r.MemoryLimit.MaxMemory = maxMemory
return r
}
func (l *DynamicLimit) WithStreamLimit(numStreamsIn, numStreamsOut, numStreams int) Limit {
r := new(DynamicLimit)
*r = *l
r.BaseLimit.StreamsInbound = numStreamsIn
r.BaseLimit.StreamsOutbound = numStreamsOut
r.BaseLimit.Streams = numStreams
return r
}
func (l *DynamicLimit) WithConnLimit(numConnsIn, numConnsOut, numConns int) Limit {
r := new(DynamicLimit)
*r = *l
r.BaseLimit.ConnsInbound = numConnsIn
r.BaseLimit.ConnsOutbound = numConnsOut
r.BaseLimit.Conns = numConns
return r
}
func (l *DynamicLimit) WithFDLimit(numFD int) Limit {
r := new(DynamicLimit)
*r = *l
r.BaseLimit.FD = numFD
return r
}
// NewDefaultDynamicLimiter creates a limiter with default limits and a memory cap
// dynamically computed based on available memory.
func NewDefaultDynamicLimiter(memFraction float64, minMemory, maxMemory int64) *BasicLimiter {
return NewDynamicLimiter(DefaultLimits.WithSystemMemory(memFraction, minMemory, maxMemory))
}
// NewDynamicLimiter crates a dynamic limiter with the specified defaults
func NewDynamicLimiter(cfg DefaultLimitConfig) *BasicLimiter {
system := &DynamicLimit{
MemoryLimit: cfg.SystemMemory,
BaseLimit: cfg.SystemBaseLimit,
}
transient := &DynamicLimit{
MemoryLimit: cfg.TransientMemory,
BaseLimit: cfg.TransientBaseLimit,
}
svc := &DynamicLimit{
MemoryLimit: cfg.ServiceMemory,
BaseLimit: cfg.ServiceBaseLimit,
}
svcPeer := &DynamicLimit{
MemoryLimit: cfg.ServicePeerMemory,
BaseLimit: cfg.ServicePeerBaseLimit,
}
proto := &DynamicLimit{
MemoryLimit: cfg.ProtocolMemory,
BaseLimit: cfg.ProtocolBaseLimit,
}
protoPeer := &DynamicLimit{
MemoryLimit: cfg.ProtocolPeerMemory,
BaseLimit: cfg.ProtocolPeerBaseLimit,
}
peer := &DynamicLimit{
MemoryLimit: cfg.PeerMemory,
BaseLimit: cfg.PeerBaseLimit,
}
conn := &StaticLimit{
Memory: cfg.ConnMemory,
BaseLimit: cfg.ConnBaseLimit,
}
stream := &StaticLimit{
Memory: cfg.StreamMemory,
BaseLimit: cfg.StreamBaseLimit,
}
return &BasicLimiter{
SystemLimits: system,
TransientLimits: transient,
DefaultServiceLimits: svc,
DefaultServicePeerLimits: svcPeer,
DefaultProtocolLimits: proto,
DefaultProtocolPeerLimits: protoPeer,
DefaultPeerLimits: peer,
ConnLimits: conn,
StreamLimits: stream,
}
}

View File

@ -1,138 +0,0 @@
package rcmgr
import (
"github.com/pbnjay/memory"
)
// StaticLimit is a limit with static values.
type StaticLimit struct {
BaseLimit
Memory int64
}
var _ Limit = (*StaticLimit)(nil)
func (l *StaticLimit) GetMemoryLimit() int64 {
return l.Memory
}
func (l *StaticLimit) WithMemoryLimit(memFraction float64, minMemory, maxMemory int64) Limit {
r := new(StaticLimit)
*r = *l
r.Memory = int64(memFraction * float64(r.Memory))
if r.Memory < minMemory {
r.Memory = minMemory
} else if r.Memory > maxMemory {
r.Memory = maxMemory
}
return r
}
func (l *StaticLimit) WithStreamLimit(numStreamsIn, numStreamsOut, numStreams int) Limit {
r := new(StaticLimit)
*r = *l
r.BaseLimit.StreamsInbound = numStreamsIn
r.BaseLimit.StreamsOutbound = numStreamsOut
r.BaseLimit.Streams = numStreams
return r
}
func (l *StaticLimit) WithConnLimit(numConnsIn, numConnsOut, numConns int) Limit {
r := new(StaticLimit)
*r = *l
r.BaseLimit.ConnsInbound = numConnsIn
r.BaseLimit.ConnsOutbound = numConnsOut
r.BaseLimit.Conns = numConns
return r
}
func (l *StaticLimit) WithFDLimit(numFD int) Limit {
r := new(StaticLimit)
*r = *l
r.BaseLimit.FD = numFD
return r
}
// NewDefaultStaticLimiter creates a static limiter with default base limits and a system memory cap
// specified as a fraction of total system memory. The assigned memory will not be less than
// minMemory or more than maxMemory.
func NewDefaultStaticLimiter(memFraction float64, minMemory, maxMemory int64) *BasicLimiter {
return NewStaticLimiter(DefaultLimits.WithSystemMemory(memFraction, minMemory, maxMemory))
}
// NewDefaultFixedLimiter creates a static limiter with default base limits and a specified system
// memory cap.
func NewDefaultFixedLimiter(memoryCap int64) *BasicLimiter {
return NewStaticLimiter(DefaultLimits.WithSystemMemory(1, memoryCap, memoryCap))
}
// NewDefaultLimiter creates a static limiter with the default limits
func NewDefaultLimiter() *BasicLimiter {
return NewStaticLimiter(DefaultLimits)
}
// NewStaticLimiter creates a static limiter using the specified system memory cap and default
// limit config.
func NewStaticLimiter(cfg DefaultLimitConfig) *BasicLimiter {
memoryCap := memoryLimit(
int64(memory.TotalMemory()),
cfg.SystemMemory.MemoryFraction,
cfg.SystemMemory.MinMemory,
cfg.SystemMemory.MaxMemory)
system := &StaticLimit{
Memory: memoryCap,
BaseLimit: cfg.SystemBaseLimit,
}
transient := &StaticLimit{
Memory: cfg.TransientMemory.GetMemory(memoryCap),
BaseLimit: cfg.TransientBaseLimit,
}
svc := &StaticLimit{
Memory: cfg.ServiceMemory.GetMemory(memoryCap),
BaseLimit: cfg.ServiceBaseLimit,
}
svcPeer := &StaticLimit{
Memory: cfg.ServicePeerMemory.GetMemory(memoryCap),
BaseLimit: cfg.ServicePeerBaseLimit,
}
proto := &StaticLimit{
Memory: cfg.ProtocolMemory.GetMemory(memoryCap),
BaseLimit: cfg.ProtocolBaseLimit,
}
protoPeer := &StaticLimit{
Memory: cfg.ProtocolPeerMemory.GetMemory(memoryCap),
BaseLimit: cfg.ProtocolPeerBaseLimit,
}
peer := &StaticLimit{
Memory: cfg.PeerMemory.GetMemory(memoryCap),
BaseLimit: cfg.PeerBaseLimit,
}
conn := &StaticLimit{
Memory: cfg.ConnMemory,
BaseLimit: cfg.ConnBaseLimit,
}
stream := &StaticLimit{
Memory: cfg.StreamMemory,
BaseLimit: cfg.StreamBaseLimit,
}
return &BasicLimiter{
SystemLimits: system,
TransientLimits: transient,
DefaultServiceLimits: svc,
DefaultServicePeerLimits: svcPeer,
DefaultProtocolLimits: proto,
DefaultProtocolPeerLimits: protoPeer,
DefaultPeerLimits: peer,
ConnLimits: conn,
StreamLimits: stream,
}
}

88
limit_test.go Normal file
View File

@ -0,0 +1,88 @@
package rcmgr
import (
"runtime"
"testing"
"github.com/stretchr/testify/require"
)
func TestFileDescriptorCounting(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("can't read file descriptors on Windows")
}
n := getNumFDs()
require.NotZero(t, n)
require.Less(t, n, int(1e6))
}
func TestScaling(t *testing.T) {
base := BaseLimit{
Streams: 100,
StreamsInbound: 200,
StreamsOutbound: 400,
Conns: 10,
ConnsInbound: 20,
ConnsOutbound: 40,
FD: 1,
Memory: 1 << 20,
}
t.Run("no scaling if no increase is defined", func(t *testing.T) {
cfg := ScalingLimitConfig{ServiceBaseLimit: base}
scaled := cfg.Scale(8<<30, 100)
require.Equal(t, base, scaled.ServiceDefault)
})
t.Run("scaling", func(t *testing.T) {
cfg := ScalingLimitConfig{
TransientBaseLimit: base,
TransientLimitIncrease: BaseLimitIncrease{
Streams: 1,
StreamsInbound: 2,
StreamsOutbound: 3,
Conns: 4,
ConnsInbound: 5,
ConnsOutbound: 6,
Memory: 7,
FDFraction: 0.5,
},
}
scaled := cfg.Scale(128<<20+4<<30, 1000)
require.Equal(t, 500, scaled.Transient.FD)
require.Equal(t, base.Streams+4, scaled.Transient.Streams)
require.Equal(t, base.StreamsInbound+4*2, scaled.Transient.StreamsInbound)
require.Equal(t, base.StreamsOutbound+4*3, scaled.Transient.StreamsOutbound)
require.Equal(t, base.Conns+4*4, scaled.Transient.Conns)
require.Equal(t, base.ConnsInbound+4*5, scaled.Transient.ConnsInbound)
require.Equal(t, base.ConnsOutbound+4*6, scaled.Transient.ConnsOutbound)
require.Equal(t, base.Memory+4*7, scaled.Transient.Memory)
})
t.Run("scaling limits in maps", func(t *testing.T) {
cfg := ScalingLimitConfig{
ServiceLimits: map[string]baseLimitConfig{
"A": {
BaseLimit: BaseLimit{Streams: 10, Memory: 100, FD: 9},
},
"B": {
BaseLimit: BaseLimit{Streams: 20, Memory: 200, FD: 10},
BaseLimitIncrease: BaseLimitIncrease{Streams: 2, Memory: 3, FDFraction: 0.4},
},
},
}
scaled := cfg.Scale(128<<20+4<<30, 1000)
require.Len(t, scaled.Service, 2)
require.Contains(t, scaled.Service, "A")
require.Equal(t, 10, scaled.Service["A"].Streams)
require.Equal(t, int64(100), scaled.Service["A"].Memory)
require.Equal(t, 9, scaled.Service["A"].FD)
require.Contains(t, scaled.Service, "B")
require.Equal(t, 20+4*2, scaled.Service["B"].Streams)
require.Equal(t, int64(200+4*3), scaled.Service["B"].Memory)
require.Equal(t, 400, scaled.Service["B"].FD)
})
}

View File

@ -20,10 +20,9 @@ func TestResourceManager(t *testing.T) {
svcA := "A.svc" svcA := "A.svc"
svcB := "B.svc" svcB := "B.svc"
nmgr, err := NewResourceManager( nmgr, err := NewResourceManager(
&BasicLimiter{ NewFixedLimiter(LimitConfig{
SystemLimits: &StaticLimit{ System: BaseLimit{
Memory: 16384, Memory: 16384,
BaseLimit: BaseLimit{
StreamsInbound: 3, StreamsInbound: 3,
StreamsOutbound: 3, StreamsOutbound: 3,
Streams: 6, Streams: 6,
@ -32,10 +31,8 @@ func TestResourceManager(t *testing.T) {
Conns: 6, Conns: 6,
FD: 2, FD: 2,
}, },
}, Transient: BaseLimit{
TransientLimits: &StaticLimit{
Memory: 4096, Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1, StreamsInbound: 1,
StreamsOutbound: 1, StreamsOutbound: 1,
Streams: 2, Streams: 2,
@ -44,10 +41,8 @@ func TestResourceManager(t *testing.T) {
Conns: 2, Conns: 2,
FD: 1, FD: 1,
}, },
}, ServiceDefault: BaseLimit{
DefaultServiceLimits: &StaticLimit{
Memory: 4096, Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1, StreamsInbound: 1,
StreamsOutbound: 1, StreamsOutbound: 1,
Streams: 2, Streams: 2,
@ -56,19 +51,25 @@ func TestResourceManager(t *testing.T) {
Conns: 2, Conns: 2,
FD: 1, FD: 1,
}, },
}, ServicePeerDefault: BaseLimit{
DefaultServicePeerLimits: &StaticLimit{
Memory: 4096, Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 5, StreamsInbound: 5,
StreamsOutbound: 5, StreamsOutbound: 5,
Streams: 10, Streams: 10,
}, },
}, Service: map[string]BaseLimit{
ServiceLimits: map[string]Limit{ svcA: {
svcA: &StaticLimit{ Memory: 8192,
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 4,
ConnsInbound: 2,
ConnsOutbound: 2,
Conns: 4,
FD: 1,
},
svcB: {
Memory: 8192, Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 2, StreamsInbound: 2,
StreamsOutbound: 2, StreamsOutbound: 2,
Streams: 4, Streams: 4,
@ -78,60 +79,38 @@ func TestResourceManager(t *testing.T) {
FD: 1, FD: 1,
}, },
}, },
svcB: &StaticLimit{ ServicePeer: map[string]BaseLimit{
svcB: {
Memory: 8192, Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 2,
StreamsOutbound: 2,
Streams: 4,
ConnsInbound: 2,
ConnsOutbound: 2,
Conns: 4,
FD: 1,
},
},
},
ServicePeerLimits: map[string]Limit{
svcB: &StaticLimit{
Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 1, StreamsInbound: 1,
StreamsOutbound: 1, StreamsOutbound: 1,
Streams: 2, Streams: 2,
}, },
}, },
}, ProtocolDefault: BaseLimit{
DefaultProtocolLimits: &StaticLimit{
Memory: 4096, Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1, StreamsInbound: 1,
StreamsOutbound: 1, StreamsOutbound: 1,
Streams: 2, Streams: 2,
}, },
}, Protocol: map[protocol.ID]BaseLimit{
ProtocolLimits: map[protocol.ID]Limit{ protoA: {
protoA: &StaticLimit{
Memory: 8192, Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 2, StreamsInbound: 2,
StreamsOutbound: 2, StreamsOutbound: 2,
Streams: 2, Streams: 2,
}, },
}, },
}, ProtocolPeer: map[protocol.ID]BaseLimit{
ProtocolPeerLimits: map[protocol.ID]Limit{ protoB: {
protoB: &StaticLimit{
Memory: 8192, Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 1, StreamsInbound: 1,
StreamsOutbound: 1, StreamsOutbound: 1,
Streams: 2, Streams: 2,
}, },
}, },
}, PeerDefault: BaseLimit{
DefaultPeerLimits: &StaticLimit{
Memory: 4096, Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1, StreamsInbound: 1,
StreamsOutbound: 1, StreamsOutbound: 1,
Streams: 2, Streams: 2,
@ -140,19 +119,15 @@ func TestResourceManager(t *testing.T) {
Conns: 2, Conns: 2,
FD: 1, FD: 1,
}, },
}, ProtocolPeerDefault: BaseLimit{
DefaultProtocolPeerLimits: &StaticLimit{
Memory: 4096, Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 5, StreamsInbound: 5,
StreamsOutbound: 5, StreamsOutbound: 5,
Streams: 10, Streams: 10,
}, },
}, Peer: map[peer.ID]BaseLimit{
PeerLimits: map[peer.ID]Limit{ peerA: {
peerA: &StaticLimit{
Memory: 8192, Memory: 8192,
BaseLimit: BaseLimit{
StreamsInbound: 2, StreamsInbound: 2,
StreamsOutbound: 2, StreamsOutbound: 2,
Streams: 4, Streams: 4,
@ -162,25 +137,21 @@ func TestResourceManager(t *testing.T) {
FD: 1, FD: 1,
}, },
}, },
}, Conn: BaseLimit{
ConnLimits: &StaticLimit{
Memory: 4096, Memory: 4096,
BaseLimit: BaseLimit{
ConnsInbound: 1, ConnsInbound: 1,
ConnsOutbound: 1, ConnsOutbound: 1,
Conns: 1, Conns: 1,
FD: 1, FD: 1,
}, },
}, Stream: BaseLimit{
StreamLimits: &StaticLimit{
Memory: 4096, Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1, StreamsInbound: 1,
StreamsOutbound: 1, StreamsOutbound: 1,
Streams: 1, Streams: 1,
}, },
}, }),
}) )
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -1006,10 +977,14 @@ func TestResourceManager(t *testing.T) {
func TestResourceManagerWithAllowlist(t *testing.T) { func TestResourceManagerWithAllowlist(t *testing.T) {
peerA := test.RandPeerIDFatal(t) peerA := test.RandPeerIDFatal(t)
limits := NewDefaultLimiter() limits := DefaultLimits.Scale(1<<30, 100)
limits.SystemLimits = limits.SystemLimits.WithConnLimit(0, 0, 0) limits.System.Conns = 0
limits.TransientLimits = limits.SystemLimits.WithConnLimit(0, 0, 0) limits.System.ConnsInbound = 0
rcmgr, err := NewResourceManager(limits, WithAllowlistedMultiaddrs([]multiaddr.Multiaddr{ limits.System.ConnsOutbound = 0
limits.Transient.Conns = 0
limits.Transient.ConnsInbound = 0
limits.Transient.ConnsOutbound = 0
rcmgr, err := NewResourceManager(NewFixedLimiter(limits), WithAllowlistedMultiaddrs([]multiaddr.Multiaddr{
multiaddr.StringCast("/ip4/1.2.3.4"), multiaddr.StringCast("/ip4/1.2.3.4"),
multiaddr.StringCast("/ip4/4.3.2.1/p2p/" + peerA.String()), multiaddr.StringCast("/ip4/4.3.2.1/p2p/" + peerA.String()),
})) }))
@ -1021,9 +996,18 @@ func TestResourceManagerWithAllowlist(t *testing.T) {
// Setup allowlist. TODO, replace this with a config once config changes are in // Setup allowlist. TODO, replace this with a config once config changes are in
r := rcmgr.(*resourceManager) r := rcmgr.(*resourceManager)
r.allowlistedSystem = newSystemScope(limits.GetSystemLimits().WithConnLimit(2, 1, 2), r, "allowlistedSystem") sysLimit := limits.System
sysLimit.Conns = 2
sysLimit.ConnsInbound = 2
sysLimit.ConnsOutbound = 1
r.allowlistedSystem = newSystemScope(&sysLimit, r, "allowlistedSystem")
r.allowlistedSystem.IncRef() r.allowlistedSystem.IncRef()
r.allowlistedTransient = newTransientScope(limits.GetTransientLimits().WithConnLimit(1, 1, 1), r, "allowlistedTransient", r.allowlistedSystem.resourceScope)
transLimit := limits.Transient
transLimit.Conns = 1
transLimit.ConnsInbound = 1
transLimit.ConnsOutbound = 1
r.allowlistedTransient = newTransientScope(&transLimit, r, "allowlistedTransient", r.allowlistedSystem.resourceScope)
r.allowlistedTransient.IncRef() r.allowlistedTransient.IncRef()
} }

View File

@ -30,9 +30,8 @@ func checkResources(t *testing.T, rc *resources, st network.ScopeStat) {
} }
func TestResources(t *testing.T) { func TestResources(t *testing.T) {
rc := resources{limit: &StaticLimit{ rc := resources{limit: &BaseLimit{
Memory: 4096, Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1, StreamsInbound: 1,
StreamsOutbound: 1, StreamsOutbound: 1,
Streams: 1, Streams: 1,
@ -40,7 +39,6 @@ func TestResources(t *testing.T) {
ConnsOutbound: 1, ConnsOutbound: 1,
Conns: 1, Conns: 1,
FD: 1, FD: 1,
},
}} }}
checkResources(t, &rc, network.ScopeStat{}) checkResources(t, &rc, network.ScopeStat{})
@ -245,9 +243,8 @@ func TestResources(t *testing.T) {
func TestResourceScopeSimple(t *testing.T) { func TestResourceScopeSimple(t *testing.T) {
s := newResourceScope( s := newResourceScope(
&StaticLimit{ &BaseLimit{
Memory: 4096, Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1, StreamsInbound: 1,
StreamsOutbound: 1, StreamsOutbound: 1,
Streams: 1, Streams: 1,
@ -256,7 +253,6 @@ func TestResourceScopeSimple(t *testing.T) {
Conns: 1, Conns: 1,
FD: 1, FD: 1,
}, },
},
nil, "test", nil, nil, nil, "test", nil, nil,
) )
@ -379,9 +375,8 @@ func testResourceScopeBasic(t *testing.T, s *resourceScope) {
func TestResourceScopeTxnBasic(t *testing.T) { func TestResourceScopeTxnBasic(t *testing.T) {
s := newResourceScope( s := newResourceScope(
&StaticLimit{ &BaseLimit{
Memory: 4096, Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1, StreamsInbound: 1,
StreamsOutbound: 1, StreamsOutbound: 1,
Streams: 1, Streams: 1,
@ -390,7 +385,6 @@ func TestResourceScopeTxnBasic(t *testing.T) {
Conns: 1, Conns: 1,
FD: 1, FD: 1,
}, },
},
nil, "test", nil, nil, nil, "test", nil, nil,
) )
@ -416,9 +410,8 @@ func TestResourceScopeTxnBasic(t *testing.T) {
func TestResourceScopeTxnZombie(t *testing.T) { func TestResourceScopeTxnZombie(t *testing.T) {
s := newResourceScope( s := newResourceScope(
&StaticLimit{ &BaseLimit{
Memory: 4096, Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1, StreamsInbound: 1,
StreamsOutbound: 1, StreamsOutbound: 1,
Streams: 1, Streams: 1,
@ -427,7 +420,6 @@ func TestResourceScopeTxnZombie(t *testing.T) {
Conns: 1, Conns: 1,
FD: 1, FD: 1,
}, },
},
nil, "test", nil, nil, nil, "test", nil, nil,
) )
@ -460,9 +452,8 @@ func TestResourceScopeTxnZombie(t *testing.T) {
func TestResourceScopeTxnTree(t *testing.T) { func TestResourceScopeTxnTree(t *testing.T) {
s := newResourceScope( s := newResourceScope(
&StaticLimit{ &BaseLimit{
Memory: 4096, Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 1, StreamsInbound: 1,
StreamsOutbound: 1, StreamsOutbound: 1,
Streams: 1, Streams: 1,
@ -471,7 +462,6 @@ func TestResourceScopeTxnTree(t *testing.T) {
Conns: 1, Conns: 1,
FD: 1, FD: 1,
}, },
},
nil, "test", nil, nil, nil, "test", nil, nil,
) )
@ -571,9 +561,8 @@ func TestResourceScopeDAG(t *testing.T) {
// \ // \
// ------> s6 // ------> s6
s1 := newResourceScope( s1 := newResourceScope(
&StaticLimit{ &BaseLimit{
Memory: 4096, Memory: 4096,
BaseLimit: BaseLimit{
StreamsInbound: 4, StreamsInbound: 4,
StreamsOutbound: 4, StreamsOutbound: 4,
Streams: 4, Streams: 4,
@ -582,13 +571,11 @@ func TestResourceScopeDAG(t *testing.T) {
Conns: 4, Conns: 4,
FD: 4, FD: 4,
}, },
},
nil, "test", nil, nil, nil, "test", nil, nil,
) )
s2 := newResourceScope( s2 := newResourceScope(
&StaticLimit{ &BaseLimit{
Memory: 2048, Memory: 2048,
BaseLimit: BaseLimit{
StreamsInbound: 2, StreamsInbound: 2,
StreamsOutbound: 2, StreamsOutbound: 2,
Streams: 2, Streams: 2,
@ -597,13 +584,11 @@ func TestResourceScopeDAG(t *testing.T) {
Conns: 2, Conns: 2,
FD: 2, FD: 2,
}, },
},
[]*resourceScope{s1}, "test", nil, nil, []*resourceScope{s1}, "test", nil, nil,
) )
s3 := newResourceScope( s3 := newResourceScope(
&StaticLimit{ &BaseLimit{
Memory: 2048, Memory: 2048,
BaseLimit: BaseLimit{
StreamsInbound: 2, StreamsInbound: 2,
StreamsOutbound: 2, StreamsOutbound: 2,
Streams: 2, Streams: 2,
@ -612,13 +597,11 @@ func TestResourceScopeDAG(t *testing.T) {
Conns: 2, Conns: 2,
FD: 2, FD: 2,
}, },
},
[]*resourceScope{s1}, "test", nil, nil, []*resourceScope{s1}, "test", nil, nil,
) )
s4 := newResourceScope( s4 := newResourceScope(
&StaticLimit{ &BaseLimit{
Memory: 2048, Memory: 2048,
BaseLimit: BaseLimit{
StreamsInbound: 2, StreamsInbound: 2,
StreamsOutbound: 2, StreamsOutbound: 2,
Streams: 2, Streams: 2,
@ -627,13 +610,11 @@ func TestResourceScopeDAG(t *testing.T) {
Conns: 2, Conns: 2,
FD: 2, FD: 2,
}, },
},
[]*resourceScope{s2, s3, s1}, "test", nil, nil, []*resourceScope{s2, s3, s1}, "test", nil, nil,
) )
s5 := newResourceScope( s5 := newResourceScope(
&StaticLimit{ &BaseLimit{
Memory: 2048, Memory: 2048,
BaseLimit: BaseLimit{
StreamsInbound: 2, StreamsInbound: 2,
StreamsOutbound: 2, StreamsOutbound: 2,
Streams: 2, Streams: 2,
@ -642,13 +623,11 @@ func TestResourceScopeDAG(t *testing.T) {
Conns: 2, Conns: 2,
FD: 2, FD: 2,
}, },
},
[]*resourceScope{s2, s1}, "test", nil, nil, []*resourceScope{s2, s1}, "test", nil, nil,
) )
s6 := newResourceScope( s6 := newResourceScope(
&StaticLimit{ &BaseLimit{
Memory: 2048, Memory: 2048,
BaseLimit: BaseLimit{
StreamsInbound: 2, StreamsInbound: 2,
StreamsOutbound: 2, StreamsOutbound: 2,
Streams: 2, Streams: 2,
@ -657,7 +636,6 @@ func TestResourceScopeDAG(t *testing.T) {
Conns: 2, Conns: 2,
FD: 2, FD: 2,
}, },
},
[]*resourceScope{s3, s1}, "test", nil, nil, []*resourceScope{s3, s1}, "test", nil, nil,
) )
@ -1095,39 +1073,27 @@ func TestResourceScopeDAGTxn(t *testing.T) {
// \ // \
// ------> s6 // ------> s6
s1 := newResourceScope( s1 := newResourceScope(
&StaticLimit{ &BaseLimit{Memory: 8192},
Memory: 8192,
},
nil, "test", nil, nil, nil, "test", nil, nil,
) )
s2 := newResourceScope( s2 := newResourceScope(
&StaticLimit{ &BaseLimit{Memory: 4096 + 2048},
Memory: 4096 + 2048,
},
[]*resourceScope{s1}, "test", nil, nil, []*resourceScope{s1}, "test", nil, nil,
) )
s3 := newResourceScope( s3 := newResourceScope(
&StaticLimit{ &BaseLimit{Memory: 4096 + 2048},
Memory: 4096 + 2048,
},
[]*resourceScope{s1}, "test", nil, nil, []*resourceScope{s1}, "test", nil, nil,
) )
s4 := newResourceScope( s4 := newResourceScope(
&StaticLimit{ &BaseLimit{Memory: 4096 + 1024},
Memory: 4096 + 1024,
},
[]*resourceScope{s2, s3, s1}, "test", nil, nil, []*resourceScope{s2, s3, s1}, "test", nil, nil,
) )
s5 := newResourceScope( s5 := newResourceScope(
&StaticLimit{ &BaseLimit{Memory: 4096 + 1024},
Memory: 4096 + 1024,
},
[]*resourceScope{s2, s1}, "test", nil, nil, []*resourceScope{s2, s1}, "test", nil, nil,
) )
s6 := newResourceScope( s6 := newResourceScope(
&StaticLimit{ &BaseLimit{Memory: 4096 + 1024},
Memory: 4096 + 1024,
},
[]*resourceScope{s3, s1}, "test", nil, nil, []*resourceScope{s3, s1}, "test", nil, nil,
) )

11
sys_not_unix.go Normal file
View File

@ -0,0 +1,11 @@
//go:build !linux && !darwin
package rcmgr
import "runtime"
// TODO: figure out how to get the number of file descriptors on Windows and other systems
func getNumFDs() int {
log.Warnf("cannot determine number of file descriptors on %s", runtime.GOOS)
return 0
}

17
sys_unix.go Normal file
View File

@ -0,0 +1,17 @@
//go:build linux || darwin
// +build linux darwin
package rcmgr
import (
"golang.org/x/sys/unix"
)
func getNumFDs() int {
var l unix.Rlimit
if err := unix.Getrlimit(unix.RLIMIT_NOFILE, &l); err != nil {
log.Errorw("failed to get fd limit", "error", err)
return 0
}
return int(l.Cur)
}