diff --git a/limit.go b/limit.go index 0034bdd..8d3bedc 100644 --- a/limit.go +++ b/limit.go @@ -2,6 +2,8 @@ package rcmgr import ( "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" ) type Limit interface { @@ -10,3 +12,13 @@ type Limit interface { GetConnLimit(network.Direction) int GetFDLimit() int } + +type Limiter interface { + GetSystemLimits() Limit + GetTransientLimits() Limit + GetServiceLimits(svc string) Limit + GetProtocolLimits(proto protocol.ID) Limit + GetPeerLimits(p peer.ID) Limit + GetStreamLimits(p peer.ID) Limit + GetConnLimits() Limit +} diff --git a/rcmgr.go b/rcmgr.go index 96c3a5b..faf1a3c 100644 --- a/rcmgr.go +++ b/rcmgr.go @@ -2,6 +2,7 @@ package rcmgr import ( "fmt" + "sync" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -9,10 +10,19 @@ import ( ) type ResourceManager struct { + limits Limiter + system *SystemScope transient *TransientScope + + mx sync.Mutex + svc map[string]*ServiceScope + proto map[protocol.ID]*ProtocolScope + peer map[peer.ID]*PeerScope } +var _ network.ResourceManager = (*ResourceManager)(nil) + type SystemScope struct { *ResourceScope } @@ -83,33 +93,67 @@ type StreamScope struct { var _ network.StreamScope = (*StreamScope)(nil) +func (r *ResourceManager) GetSystem() network.ResourceScope { + return r.system +} + +func (r *ResourceManager) GetTransient() network.ResourceScope { + return r.transient +} + +func (r *ResourceManager) GetService(srv string) network.ServiceScope { + return r.getServiceScope(srv) +} + +func (r *ResourceManager) GetProtocol(proto protocol.ID) network.ProtocolScope { + return r.getProtocolScope(proto) +} + +func (r *ResourceManager) GetPeer(p peer.ID) network.PeerScope { + return r.getPeerScope(p) +} + func (r *ResourceManager) getProtocolScope(proto protocol.ID) *ProtocolScope { - // TODO - return nil + r.mx.Lock() + defer r.mx.Unlock() + + s, ok := r.proto[proto] + if !ok { + s = NewProtocolScope(proto, r.limits.GetProtocolLimits(proto), r.system) + r.proto[proto] = s + } + + return s } func (r *ResourceManager) getServiceScope(svc string) *ServiceScope { - // TODO - return nil + r.mx.Lock() + defer r.mx.Unlock() + + s, ok := r.svc[svc] + if !ok { + s = NewServiceScope(svc, r.limits.GetServiceLimits(svc), r.system) + r.svc[svc] = s + } + + return s } func (r *ResourceManager) getPeerScope(p peer.ID) *PeerScope { - // TODO - return nil -} + r.mx.Lock() + defer r.mx.Unlock() -func (r *ResourceManager) getConnLimit() Limit { - // TODO - return nil -} + s, ok := r.peer[p] + if !ok { + s = NewPeerScope(p, r.limits.GetPeerLimits(p), r) + r.peer[p] = s + } -func (r *ResourceManager) getStreamLimit(p peer.ID) Limit { - // TODO - return nil + return s } func (r *ResourceManager) OpenConnection(dir network.Direction, usefd bool) (network.ConnectionScope, error) { - conn := NewConnectionScope(dir, usefd, r.getConnLimit(), r) + conn := NewConnectionScope(dir, usefd, r.limits.GetConnLimits(), r) if err := conn.AddConn(dir); err != nil { return nil, err @@ -125,6 +169,11 @@ func (r *ResourceManager) OpenConnection(dir network.Direction, usefd bool) (net return conn, nil } +func (r *ResourceManager) Close() error { + // TODO + return nil +} + func NewSystemScope(limit Limit) *SystemScope { return &SystemScope{ ResourceScope: NewResourceScope(limit, nil), @@ -199,7 +248,7 @@ func (s *PeerScope) Peer() peer.ID { } func (s *PeerScope) OpenStream(dir network.Direction) (network.StreamScope, error) { - stream := NewStreamScope(dir, s.rcmgr.getStreamLimit(s.peer), s) + stream := NewStreamScope(dir, s.rcmgr.limits.GetStreamLimits(s.peer), s) err := stream.AddStream(dir) if err != nil { return nil, err