add support for per service peer limits

This commit is contained in:
vyzo 2022-01-08 22:04:07 +02:00
parent 05a6764a69
commit 5487b6bdac
2 changed files with 75 additions and 14 deletions

View File

@ -36,6 +36,7 @@ type Limiter interface {
GetSystemLimits() Limit
GetTransientLimits() Limit
GetServiceLimits(svc string) Limit
GetServicePeerLimits(svc string) Limit
GetProtocolLimits(proto protocol.ID) Limit
GetPeerLimits(p peer.ID) Limit
GetStreamLimits(p peer.ID) Limit
@ -48,6 +49,7 @@ type BasicLimiter struct {
TransientLimits Limit
DefaultServiceLimits Limit
ServiceLimits map[string]Limit
SercicePeerLimits map[string]Limit
DefaultProtocolLimits Limit
ProtocolLimits map[protocol.ID]Limit
DefaultPeerLimits Limit
@ -103,6 +105,10 @@ func (l *BasicLimiter) GetServiceLimits(svc string) Limit {
return sl
}
func (l *BasicLimiter) GetServicePeerLimits(svc string) Limit {
return l.SercicePeerLimits[svc]
}
func (l *BasicLimiter) GetProtocolLimits(proto protocol.ID) Limit {
pl, ok := l.ProtocolLimits[proto]
if !ok {

View File

@ -46,8 +46,10 @@ var _ network.ResourceScope = (*transientScope)(nil)
type serviceScope struct {
*resourceScope
name string
system *systemScope
name string
rcmgr *resourceManager
peers map[peer.ID]*resourceScope
}
var _ network.ServiceScope = (*serviceScope)(nil)
@ -55,8 +57,8 @@ var _ network.ServiceScope = (*serviceScope)(nil)
type protocolScope struct {
*resourceScope
proto protocol.ID
system *systemScope
proto protocol.ID
rcmgr *resourceManager
}
var _ network.ProtocolScope = (*protocolScope)(nil)
@ -151,7 +153,7 @@ func (r *resourceManager) getServiceScope(svc string) *serviceScope {
s, ok := r.svc[svc]
if !ok {
s = newServiceScope(svc, r.limits.GetServiceLimits(svc), r.system)
s = newServiceScope(svc, r.limits.GetServiceLimits(svc), r)
r.svc[svc] = s
}
@ -165,7 +167,7 @@ func (r *resourceManager) getProtocolScope(proto protocol.ID) *protocolScope {
s, ok := r.proto[proto]
if !ok {
s = newProtocolScope(proto, r.limits.GetProtocolLimits(proto), r.system)
s = newProtocolScope(proto, r.limits.GetProtocolLimits(proto), r)
r.proto[proto] = s
}
@ -247,12 +249,22 @@ func (r *resourceManager) gc() {
}
}
var deadPeers []peer.ID
for p, s := range r.peer {
if s.IsUnused() {
s.Done()
delete(r.peer, p)
deadPeers = append(deadPeers, p)
}
}
for _, s := range r.svc {
s.Lock()
for _, p := range deadPeers {
delete(s.peers, p)
}
s.Unlock()
}
}
func newSystemScope(limit Limit) *systemScope {
@ -268,19 +280,19 @@ func newTransientScope(limit Limit, system *systemScope) *transientScope {
}
}
func newServiceScope(name string, limit Limit, system *systemScope) *serviceScope {
func newServiceScope(name string, limit Limit, rcmgr *resourceManager) *serviceScope {
return &serviceScope{
resourceScope: newResourceScope(limit, []*resourceScope{system.resourceScope}, fmt.Sprintf("service.%s", name)),
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("service.%s", name)),
name: name,
system: system,
rcmgr: rcmgr,
}
}
func newProtocolScope(proto protocol.ID, limit Limit, system *systemScope) *protocolScope {
func newProtocolScope(proto protocol.ID, limit Limit, rcmgr *resourceManager) *protocolScope {
return &protocolScope{
resourceScope: newResourceScope(limit, []*resourceScope{system.resourceScope}, fmt.Sprintf("protocol.%s", proto)),
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("protocol.%s", proto)),
proto: proto,
system: system,
rcmgr: rcmgr,
}
}
@ -314,6 +326,32 @@ func (s *serviceScope) Name() string {
return s.name
}
func (s *serviceScope) getPeerScope(p peer.ID) *resourceScope {
s.Lock()
defer s.Unlock()
ps, ok := s.peers[p]
if ok {
ps.IncRef()
return ps
}
l := s.rcmgr.limits.GetServicePeerLimits(s.name)
if l == nil {
return nil
}
if s.peers == nil {
s.peers = make(map[peer.ID]*resourceScope)
}
ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer", s.name))
s.peers[p] = ps
ps.IncRef()
return ps
}
func (s *protocolScope) Protocol() protocol.ID {
return s.proto
}
@ -423,6 +461,18 @@ func (s *streamScope) SetService(svc string) error {
return err
}
// get the per peer service scope constraint, if any
peerSvcScope := s.svc.getPeerScope(s.peer.peer)
if peerSvcScope != nil {
if err := peerSvcScope.ReserveForChild(stat); err != nil {
s.svc.ReleaseForChild(stat)
s.svc.DecRef()
s.svc = nil
peerSvcScope.DecRef()
return err
}
}
// remove resources from the protocol
s.proto.ReleaseForChild(stat)
s.proto.DecRef() // removed from constraints
@ -430,9 +480,14 @@ func (s *streamScope) SetService(svc string) error {
// update constraints
constraints := []*resourceScope{
s.peer.resourceScope,
s.svc.resourceScope,
s.rcmgr.system.resourceScope,
}
if peerSvcScope != nil {
constraints = append(constraints, peerSvcScope)
}
constraints = append(constraints, s.svc.resourceScope, s.rcmgr.system.resourceScope)
s.resourceScope.constraints = constraints
return nil