flesh out concrete scopes

This commit is contained in:
vyzo 2021-12-23 15:28:14 +02:00
parent 01e64d31ec
commit 0ca5ac15d9
3 changed files with 536 additions and 54 deletions

View File

@ -1,7 +1,12 @@
package rcmgr
import (
"github.com/libp2p/go-libp2p-core/network"
)
type Limit interface {
GetMemoryLimit() int64
GetStreamLimit() int
GetConnLimit() int
GetStreamLimit(network.Direction) int
GetConnLimit(network.Direction) int
GetFDLimit() int
}

363
rcmgr.go Normal file
View File

@ -0,0 +1,363 @@
package rcmgr
import (
"fmt"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)
type ResourceManager struct {
system *SystemScope
transient *TransientScope
}
type SystemScope struct {
*ResourceScope
}
var _ network.ResourceScope = (*SystemScope)(nil)
type TransientScope struct {
*ResourceScope
system *SystemScope
}
var _ network.ResourceScope = (*TransientScope)(nil)
type ServiceScope struct {
*ResourceScope
name string
system *SystemScope
}
var _ network.ServiceScope = (*ServiceScope)(nil)
type ProtocolScope struct {
*ResourceScope
proto protocol.ID
system *SystemScope
}
var _ network.ProtocolScope = (*ProtocolScope)(nil)
type PeerScope struct {
*ResourceScope
peer peer.ID
rcmgr *ResourceManager
system *SystemScope
transient *TransientScope
}
var _ network.PeerScope = (*PeerScope)(nil)
type ConnectionScope struct {
*ResourceScope
dir network.Direction
rcmgr *ResourceManager
system *SystemScope
transient *TransientScope
peer *PeerScope
}
var _ network.ConnectionScope = (*ConnectionScope)(nil)
type StreamScope struct {
*ResourceScope
dir network.Direction
rcmgr *ResourceManager
system *SystemScope
transient *TransientScope
peer *PeerScope
svc *ServiceScope
proto *ProtocolScope
}
var _ network.StreamScope = (*StreamScope)(nil)
func (r *ResourceManager) getProtocolScope(proto protocol.ID) *ProtocolScope {
// TODO
return nil
}
func (r *ResourceManager) getServiceScope(svc string) *ServiceScope {
// TODO
return nil
}
func (r *ResourceManager) getPeerScope(p peer.ID) *PeerScope {
// TODO
return nil
}
func (r *ResourceManager) getConnLimit() Limit {
// TODO
return nil
}
func (r *ResourceManager) getStreamLimit(p peer.ID) Limit {
// TODO
return nil
}
func (r *ResourceManager) OpenConnection(dir network.Direction, usefd bool) (network.ConnectionScope, error) {
conn := NewConnectionScope(dir, r.getConnLimit(), r)
if err := conn.AddConn(dir); err != nil {
return nil, err
}
if err := conn.AddFD(1); err != nil {
conn.RemoveStream(dir)
return nil, err
}
return conn, nil
}
func NewSystemScope(limit Limit) *SystemScope {
return &SystemScope{
ResourceScope: NewResourceScope(limit, nil),
}
}
func NewTransientScope(limit Limit, system *SystemScope) *TransientScope {
return &TransientScope{
ResourceScope: NewResourceScope(limit, []*ResourceScope{system.ResourceScope}),
system: system,
}
}
func NewServiceScope(name string, limit Limit, system *SystemScope) *ServiceScope {
return &ServiceScope{
ResourceScope: NewResourceScope(limit, []*ResourceScope{system.ResourceScope}),
name: name,
system: system,
}
}
func NewProtocolScope(proto protocol.ID, limit Limit, system *SystemScope) *ProtocolScope {
return &ProtocolScope{
ResourceScope: NewResourceScope(limit, []*ResourceScope{system.ResourceScope}),
proto: proto,
system: system,
}
}
func NewPeerScope(p peer.ID, limit Limit, rcmgr *ResourceManager) *PeerScope {
return &PeerScope{
ResourceScope: NewResourceScope(limit, []*ResourceScope{rcmgr.system.ResourceScope}),
peer: p,
rcmgr: rcmgr,
system: rcmgr.system,
transient: rcmgr.transient,
}
}
func NewConnectionScope(dir network.Direction, limit Limit, rcmgr *ResourceManager) *ConnectionScope {
return &ConnectionScope{
ResourceScope: NewResourceScope(limit, []*ResourceScope{rcmgr.transient.ResourceScope, rcmgr.system.ResourceScope}),
dir: dir,
rcmgr: rcmgr,
system: rcmgr.system,
transient: rcmgr.transient,
}
}
func NewStreamScope(dir network.Direction, limit Limit, peer *PeerScope) *StreamScope {
return &StreamScope{
ResourceScope: NewResourceScope(limit, []*ResourceScope{peer.ResourceScope, peer.transient.ResourceScope, peer.system.ResourceScope}),
dir: dir,
rcmgr: peer.rcmgr,
system: peer.system,
transient: peer.transient,
peer: peer,
}
}
func (s *ServiceScope) Name() string {
return s.name
}
func (s *ProtocolScope) Protocol() protocol.ID {
return s.proto
}
func (s *PeerScope) Peer() peer.ID {
return s.peer
}
func (s *PeerScope) OpenStream(dir network.Direction) (network.StreamScope, error) {
stream := NewStreamScope(dir, s.rcmgr.getStreamLimit(s.peer), s)
err := stream.AddStream(dir)
if err != nil {
return nil, err
}
return stream, nil
}
func (s *ConnectionScope) PeerScope() network.PeerScope {
s.Lock()
defer s.Unlock()
return s.peer
}
func (s *ConnectionScope) SetPeer(p peer.ID) error {
s.Lock()
defer s.Unlock()
if s.peer != nil {
return fmt.Errorf("connection scope already attached to a peer")
}
s.peer = s.rcmgr.getPeerScope(p)
// juggle resources from transient scope to peer scope
mem := s.ResourceScope.rc.memory
var incount, outcount int
if s.dir == network.DirInbound {
incount = 1
} else {
outcount = 1
}
if err := s.peer.ReserveMemoryForChild(mem); err != nil {
return err
}
if err := s.peer.AddConnForChild(incount, outcount); err != nil {
s.peer.ReleaseMemoryForChild(mem)
return err
}
if err := s.peer.AddFDForChild(1); err != nil {
s.peer.ReleaseMemoryForChild(mem)
s.peer.RemoveConnForChild(incount, outcount)
return err
}
s.transient.ReleaseMemoryForChild(mem)
s.transient.RemoveConnForChild(incount, outcount)
s.transient.RemoveFDForChild(1)
// update constraints
constraints := []*ResourceScope{
s.peer.ResourceScope,
s.system.ResourceScope,
}
s.ResourceScope.constraints = constraints
return nil
}
func (s *StreamScope) ProtocolScope() network.ProtocolScope {
s.Lock()
defer s.Unlock()
return s.proto
}
func (s *StreamScope) SetProtocol(proto protocol.ID) error {
s.Lock()
defer s.Unlock()
if s.proto != nil {
return fmt.Errorf("stream scope already attached to a protocol")
}
s.proto = s.rcmgr.getProtocolScope(proto)
// juggle resources from transient scope to protocol scope
mem := s.ResourceScope.rc.memory
var incount, outcount int
if s.dir == network.DirInbound {
incount = 1
} else {
outcount = 1
}
if err := s.proto.ReserveMemoryForChild(mem); err != nil {
return err
}
if err := s.proto.AddStreamForChild(incount, outcount); err != nil {
s.proto.ReleaseMemoryForChild(mem)
return err
}
s.transient.ReleaseMemoryForChild(mem)
s.transient.RemoveStreamForChild(incount, outcount)
// update constraints
constraints := []*ResourceScope{
s.peer.ResourceScope,
s.proto.ResourceScope,
}
if s.svc != nil {
constraints = append(constraints, s.svc.ResourceScope)
}
constraints = append(constraints, s.system.ResourceScope)
s.ResourceScope.constraints = constraints
return nil
}
func (s *StreamScope) ServiceScope() network.ServiceScope {
s.Lock()
defer s.Unlock()
return s.svc
}
func (s *StreamScope) SetService(svc string) error {
s.Lock()
defer s.Unlock()
if s.proto == nil {
return fmt.Errorf("stream scope not attached to a protocol")
}
if s.svc != nil {
return fmt.Errorf("stream scope already attached to a service")
}
s.svc = s.rcmgr.getServiceScope(svc)
// reserve resources in service
mem := s.ResourceScope.rc.memory
var incount, outcount int
if s.dir == network.DirInbound {
incount = 1
} else {
outcount = 1
}
if err := s.svc.ReserveMemoryForChild(mem); err != nil {
return err
}
if err := s.svc.AddStreamForChild(incount, outcount); err != nil {
s.svc.ReleaseMemoryForChild(mem)
return err
}
// update constraints
constraints := []*ResourceScope{
s.peer.ResourceScope,
s.proto.ResourceScope,
s.svc.ResourceScope,
s.system.ResourceScope,
}
s.ResourceScope.constraints = constraints
return nil
}
func (s *StreamScope) PeerScope() network.PeerScope {
s.Lock()
defer s.Unlock()
return s.peer
}

218
scope.go
View File

@ -10,11 +10,14 @@ import (
// Basic resource mamagement.
type Resources struct {
limit Limit
nconns int
nstreams int
memory int64
buffers map[interface{}][]byte
limit Limit
nconnsIn, nconnsOut int
nstreamsIn, nstreamsOut int
nfd int
memory int64
buffers map[interface{}][]byte
}
// DAG ResourceScopes.
@ -46,7 +49,7 @@ func NewResourceScope(limit Limit, constraints []*ResourceScope) *ResourceScope
}
// Resources implementation
func (rc *Resources) checkMemory(rsvp int) error {
func (rc *Resources) checkMemory(rsvp int64) error {
// overflow check; this also has the side-effect that we cannot reserve negative memory.
newmem := rc.memory + int64(rsvp)
if newmem < rc.memory {
@ -68,7 +71,7 @@ func (rc *Resources) releaseBuffers() {
}
}
func (rc *Resources) reserveMemory(size int) error {
func (rc *Resources) reserveMemory(size int64) error {
if err := rc.checkMemory(size); err != nil {
return err
}
@ -87,7 +90,7 @@ func (rc *Resources) releaseMemory(size int64) {
}
func (rc *Resources) getBuffer(size int) ([]byte, error) {
if err := rc.checkMemory(size); err != nil {
if err := rc.checkMemory(int64(size)); err != nil {
return nil, err
}
@ -101,7 +104,7 @@ func (rc *Resources) getBuffer(size int) ([]byte, error) {
func (rc *Resources) growBuffer(oldbuf []byte, newsize int) ([]byte, error) {
grow := newsize - len(oldbuf)
if err := rc.checkMemory(grow); err != nil {
if err := rc.checkMemory(int64(grow)); err != nil {
return nil, err
}
@ -127,45 +130,72 @@ func (rc *Resources) releaseBuffer(buf []byte) {
pool.Put(buf)
}
func (rc *Resources) addStream(count int) error {
if rc.nstreams+count > rc.limit.GetStreamLimit() {
func (rc *Resources) addStream(incount, outcount int) error {
if incount > 0 && rc.nstreamsIn+incount > rc.limit.GetStreamLimit(network.DirInbound) {
return fmt.Errorf("cannot reserve stream: %w", ErrResourceLimitExceeded)
}
if outcount > 0 && rc.nstreamsOut+outcount > rc.limit.GetStreamLimit(network.DirOutbound) {
return fmt.Errorf("cannot reserve stream: %w", ErrResourceLimitExceeded)
}
rc.nstreams += count
rc.nstreamsIn += incount
rc.nstreamsOut += outcount
return nil
}
func (rc *Resources) removeStream(count int) {
rc.nstreams -= count
func (rc *Resources) removeStream(incount, outcount int) {
rc.nstreamsIn -= incount
rc.nstreamsOut -= outcount
if rc.nstreams < 0 {
if rc.nstreamsIn < 0 || rc.nstreamsOut < 0 {
panic("BUG: too many streams released")
}
}
func (rc *Resources) addConn(count int) error {
if rc.nconns+count > rc.limit.GetConnLimit() {
func (rc *Resources) addConn(incount, outcount int) error {
if incount > 0 && rc.nconnsIn+incount > rc.limit.GetConnLimit(network.DirInbound) {
return fmt.Errorf("cannot reserve connection: %w", ErrResourceLimitExceeded)
}
if outcount > 0 && rc.nconnsOut+outcount > rc.limit.GetConnLimit(network.DirOutbound) {
return fmt.Errorf("cannot reserve connection: %w", ErrResourceLimitExceeded)
}
rc.nconns += count
rc.nconnsIn += incount
rc.nconnsOut += outcount
return nil
}
func (rc *Resources) removeConn(count int) {
rc.nconns -= count
func (rc *Resources) removeConn(incount, outcount int) {
rc.nconnsIn -= incount
rc.nconnsOut -= outcount
if rc.nconns < 0 {
if rc.nconnsIn < 0 || rc.nconnsOut < 0 {
panic("BUG: too many connections released")
}
}
func (rc *Resources) addFD(count int) error {
if rc.nfd+count > rc.limit.GetFDLimit() {
return fmt.Errorf("cannot reserve file descriptor: %w", ErrResourceLimitExceeded)
}
rc.nfd += count
return nil
}
func (rc *Resources) removeFD(count int) {
rc.nfd -= count
if rc.nfd < 0 {
panic("BUG: too many file descriptors released")
}
}
func (rc *Resources) stat() network.ScopeStat {
return network.ScopeStat{
Memory: rc.memory,
NumConns: rc.nconns,
NumStreams: rc.nstreams,
NumConns: rc.nconnsIn + rc.nconnsOut,
NumStreams: rc.nstreamsIn + rc.nstreamsOut,
}
}
@ -178,7 +208,7 @@ func (s *ResourceScope) ReserveMemory(size int) error {
return ErrResourceScopeClosed
}
if err := s.rc.reserveMemory(size); err != nil {
if err := s.rc.reserveMemory(int64(size)); err != nil {
return err
}
@ -194,7 +224,7 @@ func (s *ResourceScope) reserveMemoryForConstraints(size int) error {
var reserved int
var err error
for _, cst := range s.constraints {
if err = cst.ReserveMemoryForChild(size); err != nil {
if err = cst.ReserveMemoryForChild(int64(size)); err != nil {
break
}
reserved++
@ -210,7 +240,7 @@ func (s *ResourceScope) reserveMemoryForConstraints(size int) error {
return err
}
func (s *ResourceScope) ReserveMemoryForChild(size int) error {
func (s *ResourceScope) ReserveMemoryForChild(size int64) error {
s.Lock()
defer s.Unlock()
@ -302,7 +332,7 @@ func (s *ResourceScope) ReleaseBuffer(buf []byte) {
}
}
func (s *ResourceScope) AddStream(count int) error {
func (s *ResourceScope) AddStream(dir network.Direction) error {
s.Lock()
defer s.Unlock()
@ -310,14 +340,26 @@ func (s *ResourceScope) AddStream(count int) error {
return ErrResourceScopeClosed
}
if err := s.rc.addStream(count); err != nil {
var incount, outcount int
if dir == network.DirInbound {
incount = 1
} else {
outcount = 1
}
if err := s.rc.addStream(incount, outcount); err != nil {
return err
}
var err error
var reserved int
for _, cst := range s.constraints {
if err = cst.AddStreamForChild(count); err != nil {
var incount, outcount int
if dir == network.DirInbound {
incount = 1
} else {
outcount = 1
}
if err = cst.AddStreamForChild(incount, outcount); err != nil {
break
}
reserved++
@ -325,37 +367,44 @@ func (s *ResourceScope) AddStream(count int) error {
if err != nil {
for _, cst := range s.constraints[:reserved] {
cst.RemoveStreamForChild(count)
cst.RemoveStreamForChild(incount, outcount)
}
}
return err
}
func (s *ResourceScope) AddStreamForChild(count int) error {
func (s *ResourceScope) AddStreamForChild(incount, outcount int) error {
s.Lock()
defer s.Unlock()
return s.rc.addStream(count)
return s.rc.addStream(incount, outcount)
}
func (s *ResourceScope) RemoveStream(count int) {
func (s *ResourceScope) RemoveStream(dir network.Direction) {
s.Lock()
defer s.Unlock()
s.rc.removeStream(count)
var incount, outcount int
if dir == network.DirInbound {
incount = 1
} else {
outcount = 1
}
s.rc.removeStream(incount, outcount)
for _, cst := range s.constraints {
cst.RemoveStreamForChild(count)
cst.RemoveStreamForChild(incount, outcount)
}
}
func (s *ResourceScope) RemoveStreamForChild(count int) {
func (s *ResourceScope) RemoveStreamForChild(incount, outcount int) {
s.Lock()
defer s.Unlock()
s.rc.removeStream(count)
s.rc.removeStream(incount, outcount)
}
func (s *ResourceScope) AddConn(count int) error {
func (s *ResourceScope) AddConn(dir network.Direction) error {
s.Lock()
defer s.Unlock()
@ -363,14 +412,20 @@ func (s *ResourceScope) AddConn(count int) error {
return ErrResourceScopeClosed
}
if err := s.rc.addConn(count); err != nil {
var incount, outcount int
if dir == network.DirInbound {
incount = 1
} else {
outcount = 1
}
if err := s.rc.addConn(incount, outcount); err != nil {
return err
}
var err error
var reserved int
for _, cst := range s.constraints {
if err = cst.AddConnForChild(count); err != nil {
if err = cst.AddConnForChild(incount, outcount); err != nil {
break
}
reserved++
@ -378,34 +433,89 @@ func (s *ResourceScope) AddConn(count int) error {
if err != nil {
for _, cst := range s.constraints[:reserved] {
cst.RemoveConnForChild(count)
cst.RemoveConnForChild(incount, outcount)
}
}
return err
}
func (s *ResourceScope) AddConnForChild(count int) error {
func (s *ResourceScope) AddConnForChild(incount, outcount int) error {
s.Lock()
defer s.Unlock()
return s.rc.addConn(count)
return s.rc.addConn(incount, outcount)
}
func (s *ResourceScope) RemoveConn(count int) {
func (s *ResourceScope) RemoveConn(dir network.Direction) {
s.Lock()
defer s.Unlock()
s.rc.removeConn(count)
var incount, outcount int
if dir == network.DirInbound {
incount = 1
} else {
outcount = 1
}
s.rc.removeConn(incount, outcount)
for _, cst := range s.constraints {
cst.RemoveConnForChild(count)
cst.RemoveConnForChild(incount, outcount)
}
}
func (s *ResourceScope) RemoveConnForChild(count int) {
func (s *ResourceScope) RemoveConnForChild(incount, outcount int) {
s.Lock()
defer s.Unlock()
s.rc.removeConn(count)
s.rc.removeConn(incount, outcount)
}
func (s *ResourceScope) AddFD(count int) error {
s.Lock()
defer s.Unlock()
if err := s.rc.addFD(count); err != nil {
return err
}
var err error
var reserved int
for _, cst := range s.constraints {
if err = cst.AddFDForChild(count); err != nil {
break
}
reserved++
}
if err != nil {
for _, cst := range s.constraints[:reserved] {
cst.RemoveFDForChild(count)
}
}
return err
}
func (s *ResourceScope) AddFDForChild(count int) error {
s.Lock()
defer s.Unlock()
return s.rc.addFD(count)
}
func (s *ResourceScope) RemoveFD(count int) {
s.Lock()
defer s.Unlock()
s.rc.removeFD(count)
for _, cst := range s.constraints {
cst.RemoveFDForChild(count)
}
}
func (s *ResourceScope) RemoveFDForChild(count int) {
s.Lock()
defer s.Unlock()
s.rc.removeFD(count)
}
func (s *ResourceScope) Done() {
@ -418,14 +528,18 @@ func (s *ResourceScope) Done() {
for _, cst := range s.constraints {
cst.ReleaseMemoryForChild(s.rc.memory)
cst.RemoveStreamForChild(s.rc.nstreams)
cst.RemoveConnForChild(s.rc.nconns)
cst.RemoveStreamForChild(s.rc.nstreamsIn, s.rc.nstreamsOut)
cst.RemoveConnForChild(s.rc.nconnsIn, s.rc.nconnsOut)
cst.RemoveFDForChild(s.rc.nfd)
}
s.rc.releaseBuffers()
s.rc.nstreams = 0
s.rc.nconns = 0
s.rc.nstreamsIn = 0
s.rc.nstreamsOut = 0
s.rc.nconnsIn = 0
s.rc.nconnsOut = 0
s.rc.nfd = 0
s.rc.memory = 0
s.rc.buffers = nil