diff --git a/limit.go b/limit.go index 9b5f826..0034bdd 100644 --- a/limit.go +++ b/limit.go @@ -1,7 +1,12 @@ package rcmgr +import ( + "github.com/libp2p/go-libp2p-core/network" +) + type Limit interface { GetMemoryLimit() int64 - GetStreamLimit() int - GetConnLimit() int + GetStreamLimit(network.Direction) int + GetConnLimit(network.Direction) int + GetFDLimit() int } diff --git a/rcmgr.go b/rcmgr.go new file mode 100644 index 0000000..eb7dafa --- /dev/null +++ b/rcmgr.go @@ -0,0 +1,363 @@ +package rcmgr + +import ( + "fmt" + + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" +) + +type ResourceManager struct { + system *SystemScope + transient *TransientScope +} + +type SystemScope struct { + *ResourceScope +} + +var _ network.ResourceScope = (*SystemScope)(nil) + +type TransientScope struct { + *ResourceScope + + system *SystemScope +} + +var _ network.ResourceScope = (*TransientScope)(nil) + +type ServiceScope struct { + *ResourceScope + + name string + system *SystemScope +} + +var _ network.ServiceScope = (*ServiceScope)(nil) + +type ProtocolScope struct { + *ResourceScope + + proto protocol.ID + system *SystemScope +} + +var _ network.ProtocolScope = (*ProtocolScope)(nil) + +type PeerScope struct { + *ResourceScope + + peer peer.ID + rcmgr *ResourceManager + system *SystemScope + transient *TransientScope +} + +var _ network.PeerScope = (*PeerScope)(nil) + +type ConnectionScope struct { + *ResourceScope + + dir network.Direction + rcmgr *ResourceManager + system *SystemScope + transient *TransientScope + peer *PeerScope +} + +var _ network.ConnectionScope = (*ConnectionScope)(nil) + +type StreamScope struct { + *ResourceScope + + dir network.Direction + rcmgr *ResourceManager + system *SystemScope + transient *TransientScope + peer *PeerScope + svc *ServiceScope + proto *ProtocolScope +} + +var _ network.StreamScope = (*StreamScope)(nil) + +func (r *ResourceManager) getProtocolScope(proto protocol.ID) *ProtocolScope { + // TODO + return nil +} + +func (r *ResourceManager) getServiceScope(svc string) *ServiceScope { + // TODO + return nil +} + +func (r *ResourceManager) getPeerScope(p peer.ID) *PeerScope { + // TODO + return nil +} + +func (r *ResourceManager) getConnLimit() Limit { + // TODO + return nil +} + +func (r *ResourceManager) getStreamLimit(p peer.ID) Limit { + // TODO + return nil +} + +func (r *ResourceManager) OpenConnection(dir network.Direction, usefd bool) (network.ConnectionScope, error) { + conn := NewConnectionScope(dir, r.getConnLimit(), r) + + if err := conn.AddConn(dir); err != nil { + return nil, err + } + + if err := conn.AddFD(1); err != nil { + conn.RemoveStream(dir) + return nil, err + } + + return conn, nil +} + +func NewSystemScope(limit Limit) *SystemScope { + return &SystemScope{ + ResourceScope: NewResourceScope(limit, nil), + } +} + +func NewTransientScope(limit Limit, system *SystemScope) *TransientScope { + return &TransientScope{ + ResourceScope: NewResourceScope(limit, []*ResourceScope{system.ResourceScope}), + system: system, + } +} + +func NewServiceScope(name string, limit Limit, system *SystemScope) *ServiceScope { + return &ServiceScope{ + ResourceScope: NewResourceScope(limit, []*ResourceScope{system.ResourceScope}), + name: name, + system: system, + } +} + +func NewProtocolScope(proto protocol.ID, limit Limit, system *SystemScope) *ProtocolScope { + return &ProtocolScope{ + ResourceScope: NewResourceScope(limit, []*ResourceScope{system.ResourceScope}), + proto: proto, + system: system, + } +} + +func NewPeerScope(p peer.ID, limit Limit, rcmgr *ResourceManager) *PeerScope { + return &PeerScope{ + ResourceScope: NewResourceScope(limit, []*ResourceScope{rcmgr.system.ResourceScope}), + peer: p, + rcmgr: rcmgr, + system: rcmgr.system, + transient: rcmgr.transient, + } +} + +func NewConnectionScope(dir network.Direction, limit Limit, rcmgr *ResourceManager) *ConnectionScope { + return &ConnectionScope{ + ResourceScope: NewResourceScope(limit, []*ResourceScope{rcmgr.transient.ResourceScope, rcmgr.system.ResourceScope}), + dir: dir, + rcmgr: rcmgr, + system: rcmgr.system, + transient: rcmgr.transient, + } +} + +func NewStreamScope(dir network.Direction, limit Limit, peer *PeerScope) *StreamScope { + return &StreamScope{ + ResourceScope: NewResourceScope(limit, []*ResourceScope{peer.ResourceScope, peer.transient.ResourceScope, peer.system.ResourceScope}), + dir: dir, + rcmgr: peer.rcmgr, + system: peer.system, + transient: peer.transient, + peer: peer, + } +} + +func (s *ServiceScope) Name() string { + return s.name +} + +func (s *ProtocolScope) Protocol() protocol.ID { + return s.proto +} + +func (s *PeerScope) Peer() peer.ID { + return s.peer +} + +func (s *PeerScope) OpenStream(dir network.Direction) (network.StreamScope, error) { + stream := NewStreamScope(dir, s.rcmgr.getStreamLimit(s.peer), s) + err := stream.AddStream(dir) + if err != nil { + return nil, err + } + + return stream, nil +} + +func (s *ConnectionScope) PeerScope() network.PeerScope { + s.Lock() + defer s.Unlock() + return s.peer +} + +func (s *ConnectionScope) SetPeer(p peer.ID) error { + s.Lock() + defer s.Unlock() + + if s.peer != nil { + return fmt.Errorf("connection scope already attached to a peer") + } + s.peer = s.rcmgr.getPeerScope(p) + + // juggle resources from transient scope to peer scope + mem := s.ResourceScope.rc.memory + + var incount, outcount int + if s.dir == network.DirInbound { + incount = 1 + } else { + outcount = 1 + } + + if err := s.peer.ReserveMemoryForChild(mem); err != nil { + return err + } + if err := s.peer.AddConnForChild(incount, outcount); err != nil { + s.peer.ReleaseMemoryForChild(mem) + return err + } + if err := s.peer.AddFDForChild(1); err != nil { + s.peer.ReleaseMemoryForChild(mem) + s.peer.RemoveConnForChild(incount, outcount) + return err + } + + s.transient.ReleaseMemoryForChild(mem) + s.transient.RemoveConnForChild(incount, outcount) + s.transient.RemoveFDForChild(1) + + // update constraints + constraints := []*ResourceScope{ + s.peer.ResourceScope, + s.system.ResourceScope, + } + s.ResourceScope.constraints = constraints + + return nil +} + +func (s *StreamScope) ProtocolScope() network.ProtocolScope { + s.Lock() + defer s.Unlock() + return s.proto +} + +func (s *StreamScope) SetProtocol(proto protocol.ID) error { + s.Lock() + defer s.Unlock() + + if s.proto != nil { + return fmt.Errorf("stream scope already attached to a protocol") + } + + s.proto = s.rcmgr.getProtocolScope(proto) + + // juggle resources from transient scope to protocol scope + mem := s.ResourceScope.rc.memory + + var incount, outcount int + if s.dir == network.DirInbound { + incount = 1 + } else { + outcount = 1 + } + + if err := s.proto.ReserveMemoryForChild(mem); err != nil { + return err + } + if err := s.proto.AddStreamForChild(incount, outcount); err != nil { + s.proto.ReleaseMemoryForChild(mem) + return err + } + + s.transient.ReleaseMemoryForChild(mem) + s.transient.RemoveStreamForChild(incount, outcount) + + // update constraints + constraints := []*ResourceScope{ + s.peer.ResourceScope, + s.proto.ResourceScope, + } + if s.svc != nil { + constraints = append(constraints, s.svc.ResourceScope) + } + constraints = append(constraints, s.system.ResourceScope) + s.ResourceScope.constraints = constraints + + return nil +} + +func (s *StreamScope) ServiceScope() network.ServiceScope { + s.Lock() + defer s.Unlock() + return s.svc +} + +func (s *StreamScope) SetService(svc string) error { + s.Lock() + defer s.Unlock() + + if s.proto == nil { + return fmt.Errorf("stream scope not attached to a protocol") + } + if s.svc != nil { + return fmt.Errorf("stream scope already attached to a service") + } + + s.svc = s.rcmgr.getServiceScope(svc) + + // reserve resources in service + mem := s.ResourceScope.rc.memory + + var incount, outcount int + if s.dir == network.DirInbound { + incount = 1 + } else { + outcount = 1 + } + + if err := s.svc.ReserveMemoryForChild(mem); err != nil { + return err + } + if err := s.svc.AddStreamForChild(incount, outcount); err != nil { + s.svc.ReleaseMemoryForChild(mem) + return err + } + + // update constraints + constraints := []*ResourceScope{ + s.peer.ResourceScope, + s.proto.ResourceScope, + s.svc.ResourceScope, + s.system.ResourceScope, + } + s.ResourceScope.constraints = constraints + + return nil +} + +func (s *StreamScope) PeerScope() network.PeerScope { + s.Lock() + defer s.Unlock() + return s.peer +} diff --git a/scope.go b/scope.go index 52153c6..99042af 100644 --- a/scope.go +++ b/scope.go @@ -10,11 +10,14 @@ import ( // Basic resource mamagement. type Resources struct { - limit Limit - nconns int - nstreams int - memory int64 - buffers map[interface{}][]byte + limit Limit + + nconnsIn, nconnsOut int + nstreamsIn, nstreamsOut int + nfd int + + memory int64 + buffers map[interface{}][]byte } // DAG ResourceScopes. @@ -46,7 +49,7 @@ func NewResourceScope(limit Limit, constraints []*ResourceScope) *ResourceScope } // Resources implementation -func (rc *Resources) checkMemory(rsvp int) error { +func (rc *Resources) checkMemory(rsvp int64) error { // overflow check; this also has the side-effect that we cannot reserve negative memory. newmem := rc.memory + int64(rsvp) if newmem < rc.memory { @@ -68,7 +71,7 @@ func (rc *Resources) releaseBuffers() { } } -func (rc *Resources) reserveMemory(size int) error { +func (rc *Resources) reserveMemory(size int64) error { if err := rc.checkMemory(size); err != nil { return err } @@ -87,7 +90,7 @@ func (rc *Resources) releaseMemory(size int64) { } func (rc *Resources) getBuffer(size int) ([]byte, error) { - if err := rc.checkMemory(size); err != nil { + if err := rc.checkMemory(int64(size)); err != nil { return nil, err } @@ -101,7 +104,7 @@ func (rc *Resources) getBuffer(size int) ([]byte, error) { func (rc *Resources) growBuffer(oldbuf []byte, newsize int) ([]byte, error) { grow := newsize - len(oldbuf) - if err := rc.checkMemory(grow); err != nil { + if err := rc.checkMemory(int64(grow)); err != nil { return nil, err } @@ -127,45 +130,72 @@ func (rc *Resources) releaseBuffer(buf []byte) { pool.Put(buf) } -func (rc *Resources) addStream(count int) error { - if rc.nstreams+count > rc.limit.GetStreamLimit() { +func (rc *Resources) addStream(incount, outcount int) error { + if incount > 0 && rc.nstreamsIn+incount > rc.limit.GetStreamLimit(network.DirInbound) { + return fmt.Errorf("cannot reserve stream: %w", ErrResourceLimitExceeded) + } + if outcount > 0 && rc.nstreamsOut+outcount > rc.limit.GetStreamLimit(network.DirOutbound) { return fmt.Errorf("cannot reserve stream: %w", ErrResourceLimitExceeded) } - rc.nstreams += count + rc.nstreamsIn += incount + rc.nstreamsOut += outcount return nil } -func (rc *Resources) removeStream(count int) { - rc.nstreams -= count +func (rc *Resources) removeStream(incount, outcount int) { + rc.nstreamsIn -= incount + rc.nstreamsOut -= outcount - if rc.nstreams < 0 { + if rc.nstreamsIn < 0 || rc.nstreamsOut < 0 { panic("BUG: too many streams released") } } -func (rc *Resources) addConn(count int) error { - if rc.nconns+count > rc.limit.GetConnLimit() { +func (rc *Resources) addConn(incount, outcount int) error { + if incount > 0 && rc.nconnsIn+incount > rc.limit.GetConnLimit(network.DirInbound) { + return fmt.Errorf("cannot reserve connection: %w", ErrResourceLimitExceeded) + } + if outcount > 0 && rc.nconnsOut+outcount > rc.limit.GetConnLimit(network.DirOutbound) { return fmt.Errorf("cannot reserve connection: %w", ErrResourceLimitExceeded) } - rc.nconns += count + rc.nconnsIn += incount + rc.nconnsOut += outcount return nil } -func (rc *Resources) removeConn(count int) { - rc.nconns -= count +func (rc *Resources) removeConn(incount, outcount int) { + rc.nconnsIn -= incount + rc.nconnsOut -= outcount - if rc.nconns < 0 { + if rc.nconnsIn < 0 || rc.nconnsOut < 0 { panic("BUG: too many connections released") } } +func (rc *Resources) addFD(count int) error { + if rc.nfd+count > rc.limit.GetFDLimit() { + return fmt.Errorf("cannot reserve file descriptor: %w", ErrResourceLimitExceeded) + } + + rc.nfd += count + return nil +} + +func (rc *Resources) removeFD(count int) { + rc.nfd -= count + + if rc.nfd < 0 { + panic("BUG: too many file descriptors released") + } +} + func (rc *Resources) stat() network.ScopeStat { return network.ScopeStat{ Memory: rc.memory, - NumConns: rc.nconns, - NumStreams: rc.nstreams, + NumConns: rc.nconnsIn + rc.nconnsOut, + NumStreams: rc.nstreamsIn + rc.nstreamsOut, } } @@ -178,7 +208,7 @@ func (s *ResourceScope) ReserveMemory(size int) error { return ErrResourceScopeClosed } - if err := s.rc.reserveMemory(size); err != nil { + if err := s.rc.reserveMemory(int64(size)); err != nil { return err } @@ -194,7 +224,7 @@ func (s *ResourceScope) reserveMemoryForConstraints(size int) error { var reserved int var err error for _, cst := range s.constraints { - if err = cst.ReserveMemoryForChild(size); err != nil { + if err = cst.ReserveMemoryForChild(int64(size)); err != nil { break } reserved++ @@ -210,7 +240,7 @@ func (s *ResourceScope) reserveMemoryForConstraints(size int) error { return err } -func (s *ResourceScope) ReserveMemoryForChild(size int) error { +func (s *ResourceScope) ReserveMemoryForChild(size int64) error { s.Lock() defer s.Unlock() @@ -302,7 +332,7 @@ func (s *ResourceScope) ReleaseBuffer(buf []byte) { } } -func (s *ResourceScope) AddStream(count int) error { +func (s *ResourceScope) AddStream(dir network.Direction) error { s.Lock() defer s.Unlock() @@ -310,14 +340,26 @@ func (s *ResourceScope) AddStream(count int) error { return ErrResourceScopeClosed } - if err := s.rc.addStream(count); err != nil { + var incount, outcount int + if dir == network.DirInbound { + incount = 1 + } else { + outcount = 1 + } + if err := s.rc.addStream(incount, outcount); err != nil { return err } var err error var reserved int for _, cst := range s.constraints { - if err = cst.AddStreamForChild(count); err != nil { + var incount, outcount int + if dir == network.DirInbound { + incount = 1 + } else { + outcount = 1 + } + if err = cst.AddStreamForChild(incount, outcount); err != nil { break } reserved++ @@ -325,37 +367,44 @@ func (s *ResourceScope) AddStream(count int) error { if err != nil { for _, cst := range s.constraints[:reserved] { - cst.RemoveStreamForChild(count) + cst.RemoveStreamForChild(incount, outcount) } } return err } -func (s *ResourceScope) AddStreamForChild(count int) error { +func (s *ResourceScope) AddStreamForChild(incount, outcount int) error { s.Lock() defer s.Unlock() - return s.rc.addStream(count) + return s.rc.addStream(incount, outcount) } -func (s *ResourceScope) RemoveStream(count int) { +func (s *ResourceScope) RemoveStream(dir network.Direction) { s.Lock() defer s.Unlock() - s.rc.removeStream(count) + var incount, outcount int + if dir == network.DirInbound { + incount = 1 + } else { + outcount = 1 + } + + s.rc.removeStream(incount, outcount) for _, cst := range s.constraints { - cst.RemoveStreamForChild(count) + cst.RemoveStreamForChild(incount, outcount) } } -func (s *ResourceScope) RemoveStreamForChild(count int) { +func (s *ResourceScope) RemoveStreamForChild(incount, outcount int) { s.Lock() defer s.Unlock() - s.rc.removeStream(count) + s.rc.removeStream(incount, outcount) } -func (s *ResourceScope) AddConn(count int) error { +func (s *ResourceScope) AddConn(dir network.Direction) error { s.Lock() defer s.Unlock() @@ -363,14 +412,20 @@ func (s *ResourceScope) AddConn(count int) error { return ErrResourceScopeClosed } - if err := s.rc.addConn(count); err != nil { + var incount, outcount int + if dir == network.DirInbound { + incount = 1 + } else { + outcount = 1 + } + if err := s.rc.addConn(incount, outcount); err != nil { return err } var err error var reserved int for _, cst := range s.constraints { - if err = cst.AddConnForChild(count); err != nil { + if err = cst.AddConnForChild(incount, outcount); err != nil { break } reserved++ @@ -378,34 +433,89 @@ func (s *ResourceScope) AddConn(count int) error { if err != nil { for _, cst := range s.constraints[:reserved] { - cst.RemoveConnForChild(count) + cst.RemoveConnForChild(incount, outcount) } } return err } -func (s *ResourceScope) AddConnForChild(count int) error { +func (s *ResourceScope) AddConnForChild(incount, outcount int) error { s.Lock() defer s.Unlock() - return s.rc.addConn(count) + return s.rc.addConn(incount, outcount) } -func (s *ResourceScope) RemoveConn(count int) { +func (s *ResourceScope) RemoveConn(dir network.Direction) { s.Lock() defer s.Unlock() - s.rc.removeConn(count) + var incount, outcount int + if dir == network.DirInbound { + incount = 1 + } else { + outcount = 1 + } + + s.rc.removeConn(incount, outcount) for _, cst := range s.constraints { - cst.RemoveConnForChild(count) + cst.RemoveConnForChild(incount, outcount) } } -func (s *ResourceScope) RemoveConnForChild(count int) { +func (s *ResourceScope) RemoveConnForChild(incount, outcount int) { s.Lock() defer s.Unlock() - s.rc.removeConn(count) + s.rc.removeConn(incount, outcount) +} + +func (s *ResourceScope) AddFD(count int) error { + s.Lock() + defer s.Unlock() + + if err := s.rc.addFD(count); err != nil { + return err + } + + var err error + var reserved int + for _, cst := range s.constraints { + if err = cst.AddFDForChild(count); err != nil { + break + } + reserved++ + } + + if err != nil { + for _, cst := range s.constraints[:reserved] { + cst.RemoveFDForChild(count) + } + } + + return err +} + +func (s *ResourceScope) AddFDForChild(count int) error { + s.Lock() + defer s.Unlock() + return s.rc.addFD(count) +} + +func (s *ResourceScope) RemoveFD(count int) { + s.Lock() + defer s.Unlock() + + s.rc.removeFD(count) + for _, cst := range s.constraints { + cst.RemoveFDForChild(count) + } +} + +func (s *ResourceScope) RemoveFDForChild(count int) { + s.Lock() + defer s.Unlock() + s.rc.removeFD(count) } func (s *ResourceScope) Done() { @@ -418,14 +528,18 @@ func (s *ResourceScope) Done() { for _, cst := range s.constraints { cst.ReleaseMemoryForChild(s.rc.memory) - cst.RemoveStreamForChild(s.rc.nstreams) - cst.RemoveConnForChild(s.rc.nconns) + cst.RemoveStreamForChild(s.rc.nstreamsIn, s.rc.nstreamsOut) + cst.RemoveConnForChild(s.rc.nconnsIn, s.rc.nconnsOut) + cst.RemoveFDForChild(s.rc.nfd) } s.rc.releaseBuffers() - s.rc.nstreams = 0 - s.rc.nconns = 0 + s.rc.nstreamsIn = 0 + s.rc.nstreamsOut = 0 + s.rc.nconnsIn = 0 + s.rc.nconnsOut = 0 + s.rc.nfd = 0 s.rc.memory = 0 s.rc.buffers = nil