flesh out resource manager

This commit is contained in:
vyzo 2021-12-23 17:46:08 +02:00
parent 431ca69df7
commit cff880ac60
2 changed files with 77 additions and 16 deletions

View File

@ -2,6 +2,8 @@ package rcmgr
import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)
type Limit interface {
@ -10,3 +12,13 @@ type Limit interface {
GetConnLimit(network.Direction) int
GetFDLimit() int
}
type Limiter interface {
GetSystemLimits() Limit
GetTransientLimits() Limit
GetServiceLimits(svc string) Limit
GetProtocolLimits(proto protocol.ID) Limit
GetPeerLimits(p peer.ID) Limit
GetStreamLimits(p peer.ID) Limit
GetConnLimits() Limit
}

View File

@ -2,6 +2,7 @@ package rcmgr
import (
"fmt"
"sync"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
@ -9,10 +10,19 @@ import (
)
type ResourceManager struct {
limits Limiter
system *SystemScope
transient *TransientScope
mx sync.Mutex
svc map[string]*ServiceScope
proto map[protocol.ID]*ProtocolScope
peer map[peer.ID]*PeerScope
}
var _ network.ResourceManager = (*ResourceManager)(nil)
type SystemScope struct {
*ResourceScope
}
@ -83,33 +93,67 @@ type StreamScope struct {
var _ network.StreamScope = (*StreamScope)(nil)
func (r *ResourceManager) GetSystem() network.ResourceScope {
return r.system
}
func (r *ResourceManager) GetTransient() network.ResourceScope {
return r.transient
}
func (r *ResourceManager) GetService(srv string) network.ServiceScope {
return r.getServiceScope(srv)
}
func (r *ResourceManager) GetProtocol(proto protocol.ID) network.ProtocolScope {
return r.getProtocolScope(proto)
}
func (r *ResourceManager) GetPeer(p peer.ID) network.PeerScope {
return r.getPeerScope(p)
}
func (r *ResourceManager) getProtocolScope(proto protocol.ID) *ProtocolScope {
// TODO
return nil
r.mx.Lock()
defer r.mx.Unlock()
s, ok := r.proto[proto]
if !ok {
s = NewProtocolScope(proto, r.limits.GetProtocolLimits(proto), r.system)
r.proto[proto] = s
}
return s
}
func (r *ResourceManager) getServiceScope(svc string) *ServiceScope {
// TODO
return nil
r.mx.Lock()
defer r.mx.Unlock()
s, ok := r.svc[svc]
if !ok {
s = NewServiceScope(svc, r.limits.GetServiceLimits(svc), r.system)
r.svc[svc] = s
}
return s
}
func (r *ResourceManager) getPeerScope(p peer.ID) *PeerScope {
// TODO
return nil
}
r.mx.Lock()
defer r.mx.Unlock()
func (r *ResourceManager) getConnLimit() Limit {
// TODO
return nil
}
s, ok := r.peer[p]
if !ok {
s = NewPeerScope(p, r.limits.GetPeerLimits(p), r)
r.peer[p] = s
}
func (r *ResourceManager) getStreamLimit(p peer.ID) Limit {
// TODO
return nil
return s
}
func (r *ResourceManager) OpenConnection(dir network.Direction, usefd bool) (network.ConnectionScope, error) {
conn := NewConnectionScope(dir, usefd, r.getConnLimit(), r)
conn := NewConnectionScope(dir, usefd, r.limits.GetConnLimits(), r)
if err := conn.AddConn(dir); err != nil {
return nil, err
@ -125,6 +169,11 @@ func (r *ResourceManager) OpenConnection(dir network.Direction, usefd bool) (net
return conn, nil
}
func (r *ResourceManager) Close() error {
// TODO
return nil
}
func NewSystemScope(limit Limit) *SystemScope {
return &SystemScope{
ResourceScope: NewResourceScope(limit, nil),
@ -199,7 +248,7 @@ func (s *PeerScope) Peer() peer.ID {
}
func (s *PeerScope) OpenStream(dir network.Direction) (network.StreamScope, error) {
stream := NewStreamScope(dir, s.rcmgr.getStreamLimit(s.peer), s)
stream := NewStreamScope(dir, s.rcmgr.limits.GetStreamLimits(s.peer), s)
err := stream.AddStream(dir)
if err != nil {
return nil, err