From 5487b6bdac2a519e0d33022b4bbfed26d2112755 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 8 Jan 2022 22:04:07 +0200 Subject: [PATCH] add support for per service peer limits --- limit.go | 6 ++++ rcmgr.go | 83 ++++++++++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 75 insertions(+), 14 deletions(-) diff --git a/limit.go b/limit.go index c3a0deb..3d805de 100644 --- a/limit.go +++ b/limit.go @@ -36,6 +36,7 @@ type Limiter interface { GetSystemLimits() Limit GetTransientLimits() Limit GetServiceLimits(svc string) Limit + GetServicePeerLimits(svc string) Limit GetProtocolLimits(proto protocol.ID) Limit GetPeerLimits(p peer.ID) Limit GetStreamLimits(p peer.ID) Limit @@ -48,6 +49,7 @@ type BasicLimiter struct { TransientLimits Limit DefaultServiceLimits Limit ServiceLimits map[string]Limit + SercicePeerLimits map[string]Limit DefaultProtocolLimits Limit ProtocolLimits map[protocol.ID]Limit DefaultPeerLimits Limit @@ -103,6 +105,10 @@ func (l *BasicLimiter) GetServiceLimits(svc string) Limit { return sl } +func (l *BasicLimiter) GetServicePeerLimits(svc string) Limit { + return l.SercicePeerLimits[svc] +} + func (l *BasicLimiter) GetProtocolLimits(proto protocol.ID) Limit { pl, ok := l.ProtocolLimits[proto] if !ok { diff --git a/rcmgr.go b/rcmgr.go index 6615162..f155438 100644 --- a/rcmgr.go +++ b/rcmgr.go @@ -46,8 +46,10 @@ var _ network.ResourceScope = (*transientScope)(nil) type serviceScope struct { *resourceScope - name string - system *systemScope + name string + rcmgr *resourceManager + + peers map[peer.ID]*resourceScope } var _ network.ServiceScope = (*serviceScope)(nil) @@ -55,8 +57,8 @@ var _ network.ServiceScope = (*serviceScope)(nil) type protocolScope struct { *resourceScope - proto protocol.ID - system *systemScope + proto protocol.ID + rcmgr *resourceManager } var _ network.ProtocolScope = (*protocolScope)(nil) @@ -151,7 +153,7 @@ func (r *resourceManager) getServiceScope(svc string) *serviceScope { s, ok := r.svc[svc] if !ok { - s = newServiceScope(svc, r.limits.GetServiceLimits(svc), r.system) + s = newServiceScope(svc, r.limits.GetServiceLimits(svc), r) r.svc[svc] = s } @@ -165,7 +167,7 @@ func (r *resourceManager) getProtocolScope(proto protocol.ID) *protocolScope { s, ok := r.proto[proto] if !ok { - s = newProtocolScope(proto, r.limits.GetProtocolLimits(proto), r.system) + s = newProtocolScope(proto, r.limits.GetProtocolLimits(proto), r) r.proto[proto] = s } @@ -247,12 +249,22 @@ func (r *resourceManager) gc() { } } + var deadPeers []peer.ID for p, s := range r.peer { if s.IsUnused() { s.Done() delete(r.peer, p) + deadPeers = append(deadPeers, p) } } + + for _, s := range r.svc { + s.Lock() + for _, p := range deadPeers { + delete(s.peers, p) + } + s.Unlock() + } } func newSystemScope(limit Limit) *systemScope { @@ -268,19 +280,19 @@ func newTransientScope(limit Limit, system *systemScope) *transientScope { } } -func newServiceScope(name string, limit Limit, system *systemScope) *serviceScope { +func newServiceScope(name string, limit Limit, rcmgr *resourceManager) *serviceScope { return &serviceScope{ - resourceScope: newResourceScope(limit, []*resourceScope{system.resourceScope}, fmt.Sprintf("service.%s", name)), + resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("service.%s", name)), name: name, - system: system, + rcmgr: rcmgr, } } -func newProtocolScope(proto protocol.ID, limit Limit, system *systemScope) *protocolScope { +func newProtocolScope(proto protocol.ID, limit Limit, rcmgr *resourceManager) *protocolScope { return &protocolScope{ - resourceScope: newResourceScope(limit, []*resourceScope{system.resourceScope}, fmt.Sprintf("protocol.%s", proto)), + resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("protocol.%s", proto)), proto: proto, - system: system, + rcmgr: rcmgr, } } @@ -314,6 +326,32 @@ func (s *serviceScope) Name() string { return s.name } +func (s *serviceScope) getPeerScope(p peer.ID) *resourceScope { + s.Lock() + defer s.Unlock() + + ps, ok := s.peers[p] + if ok { + ps.IncRef() + return ps + } + + l := s.rcmgr.limits.GetServicePeerLimits(s.name) + if l == nil { + return nil + } + + if s.peers == nil { + s.peers = make(map[peer.ID]*resourceScope) + } + + ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer", s.name)) + s.peers[p] = ps + + ps.IncRef() + return ps +} + func (s *protocolScope) Protocol() protocol.ID { return s.proto } @@ -423,6 +461,18 @@ func (s *streamScope) SetService(svc string) error { return err } + // get the per peer service scope constraint, if any + peerSvcScope := s.svc.getPeerScope(s.peer.peer) + if peerSvcScope != nil { + if err := peerSvcScope.ReserveForChild(stat); err != nil { + s.svc.ReleaseForChild(stat) + s.svc.DecRef() + s.svc = nil + peerSvcScope.DecRef() + return err + } + } + // remove resources from the protocol s.proto.ReleaseForChild(stat) s.proto.DecRef() // removed from constraints @@ -430,9 +480,14 @@ func (s *streamScope) SetService(svc string) error { // update constraints constraints := []*resourceScope{ s.peer.resourceScope, - s.svc.resourceScope, - s.rcmgr.system.resourceScope, } + + if peerSvcScope != nil { + constraints = append(constraints, peerSvcScope) + } + + constraints = append(constraints, s.svc.resourceScope, s.rcmgr.system.resourceScope) + s.resourceScope.constraints = constraints return nil