diff --git a/limit.go b/limit.go index 48327ca..9b5f826 100644 --- a/limit.go +++ b/limit.go @@ -2,4 +2,6 @@ package rcmgr type Limit interface { GetMemoryLimit() int64 + GetStreamLimit() int + GetConnLimit() int } diff --git a/scope.go b/scope.go index e7c5089..8225653 100644 --- a/scope.go +++ b/scope.go @@ -8,17 +8,31 @@ import ( "github.com/libp2p/go-libp2p-core/network" ) -type ResourceScope struct { - sync.Mutex +// Basic resource mamagement. +type BasicResourceScope struct { + limit Limit + nconns int + nstreams int + memory int64 + buffers map[interface{}][]byte +} - limit Limit - memory int64 - buffers map[interface{}][]byte +// DAG ResourceScopes. A +// BasicResourceScope accounts for the node usage, constraints signify +// the dependencies that constrains resource usage. +type ResourceScope struct { + mx sync.Mutex + rc *BasicResourceScope + done bool + + constraints []*ResourceScope } var _ network.ResourceScope = (*ResourceScope)(nil) +var _ network.TransactionalScope = (*ResourceScope)(nil) -func (rc *ResourceScope) checkMemory(rsvp int) error { +// BasicResourceScope implementation +func (rc *BasicResourceScope) checkMemory(rsvp int) error { // overflow check; this also has the side-effect that we cannot reserve negative memory. newmem := rc.memory + int64(rsvp) if newmem < rc.memory { @@ -33,21 +47,14 @@ func (rc *ResourceScope) checkMemory(rsvp int) error { return nil } -func (rc *ResourceScope) releaseBuffers() { +func (rc *BasicResourceScope) releaseBuffers() { for key, buf := range rc.buffers { pool.Put(buf) delete(rc.buffers, key) } } -func (rc *ResourceScope) ReserveMemory(size int) error { - if rc == nil { - return nil - } - - rc.Lock() - defer rc.Unlock() - +func (rc *BasicResourceScope) reserveMemory(size int) error { if err := rc.checkMemory(size); err != nil { return err } @@ -56,15 +63,8 @@ func (rc *ResourceScope) ReserveMemory(size int) error { return nil } -func (rc *ResourceScope) ReleaseMemory(size int) { - if rc == nil { - return - } - - rc.Lock() - defer rc.Unlock() - - rc.memory -= int64(size) +func (rc *BasicResourceScope) releaseMemory(size int64) { + rc.memory -= size // sanity check for bugs upstream if rc.memory < 0 { @@ -72,14 +72,7 @@ func (rc *ResourceScope) ReleaseMemory(size int) { } } -func (rc *ResourceScope) GetBuffer(size int) ([]byte, error) { - if rc == nil { - return make([]byte, size), nil - } - - rc.Lock() - defer rc.Unlock() - +func (rc *BasicResourceScope) getBuffer(size int) ([]byte, error) { if err := rc.checkMemory(size); err != nil { return nil, err } @@ -92,16 +85,7 @@ func (rc *ResourceScope) GetBuffer(size int) ([]byte, error) { return buf, nil } -func (rc *ResourceScope) GrowBuffer(oldbuf []byte, newsize int) ([]byte, error) { - if rc == nil { - newbuf := make([]byte, newsize) - copy(newbuf, oldbuf) - return newbuf, nil - } - - rc.Lock() - defer rc.Unlock() - +func (rc *BasicResourceScope) growBuffer(oldbuf []byte, newsize int) ([]byte, error) { grow := newsize - len(oldbuf) if err := rc.checkMemory(grow); err != nil { return nil, err @@ -117,10 +101,7 @@ func (rc *ResourceScope) GrowBuffer(oldbuf []byte, newsize int) ([]byte, error) return newbuf, nil } -func (rc *ResourceScope) ReleaseBuffer(buf []byte) { - rc.Lock() - defer rc.Unlock() - +func (rc *BasicResourceScope) releaseBuffer(buf []byte) { rc.memory -= int64(len(buf)) // sanity check for bugs upstream @@ -132,13 +113,313 @@ func (rc *ResourceScope) ReleaseBuffer(buf []byte) { pool.Put(buf) } -func (rc *ResourceScope) Stat() network.ScopeStat { - if rc == nil { - return network.ScopeStat{} +func (rc *BasicResourceScope) addStream(count int) error { + if rc.nstreams+count > rc.limit.GetStreamLimit() { + return fmt.Errorf("cannot reserve stream: %w", ErrResourceLimitExceeded) } - rc.Lock() - defer rc.Unlock() - - return network.ScopeStat{Memory: rc.memory} + rc.nstreams += count + return nil +} + +func (rc *BasicResourceScope) removeStream(count int) { + rc.nstreams -= count + + if rc.nstreams < 0 { + panic("BUG: too many streams released") + } +} + +func (rc *BasicResourceScope) addConn(count int) error { + if rc.nconns+count > rc.limit.GetConnLimit() { + return fmt.Errorf("cannot reserve connection: %w", ErrResourceLimitExceeded) + } + + rc.nconns += count + return nil +} + +func (rc *BasicResourceScope) removeConn(count int) { + rc.nconns -= count + + if rc.nconns < 0 { + panic("BUG: too many connections released") + } +} + +func (rc *BasicResourceScope) stat() network.ScopeStat { + return network.ScopeStat{ + Memory: rc.memory, + NumConns: rc.nconns, + NumStreams: rc.nstreams, + } +} + +// ResourceScope implementation +func (s *ResourceScope) ReserveMemory(size int) error { + s.mx.Lock() + defer s.mx.Unlock() + + if s.done { + return ErrResourceScopeClosed + } + + if err := s.rc.reserveMemory(size); err != nil { + return err + } + + if err := s.reserveMemoryForConstraints(size); err != nil { + s.rc.releaseMemory(int64(size)) + return err + } + + return nil +} + +func (s *ResourceScope) reserveMemoryForConstraints(size int) error { + var reserved int + var err error + for _, cst := range s.constraints { + if err = cst.ReserveMemoryForChild(size); err != nil { + break + } + reserved++ + } + + if err != nil { + // we failed because of a constraint; undo memory reservations + for _, cst := range s.constraints[:reserved] { + cst.ReleaseMemoryForChild(int64(size)) + } + } + + return err +} + +func (s *ResourceScope) ReserveMemoryForChild(size int) error { + s.mx.Lock() + defer s.mx.Unlock() + + if s.done { + return ErrResourceScopeClosed + } + + return s.rc.reserveMemory(size) +} + +func (s *ResourceScope) ReleaseMemory(size int) { + s.mx.Lock() + defer s.mx.Unlock() + + if s.done { + return + } + + s.rc.releaseMemory(int64(size)) + for _, cst := range s.constraints { + cst.ReleaseMemoryForChild(int64(size)) + } +} + +func (s *ResourceScope) ReleaseMemoryForChild(size int64) { + s.mx.Lock() + defer s.mx.Unlock() + + if s.done { + return + } + + s.rc.releaseMemory(size) +} + +func (s *ResourceScope) GetBuffer(size int) ([]byte, error) { + s.mx.Lock() + defer s.mx.Unlock() + + if s.done { + return nil, ErrResourceScopeClosed + } + + buf, err := s.rc.getBuffer(size) + if err != nil { + return nil, err + } + + if err := s.reserveMemoryForConstraints(size); err != nil { + s.rc.releaseBuffer(buf) + return nil, err + } + + return buf, err +} + +func (s *ResourceScope) GrowBuffer(oldbuf []byte, newsize int) ([]byte, error) { + s.mx.Lock() + defer s.mx.Unlock() + + if s.done { + return nil, ErrResourceScopeClosed + } + + buf, err := s.rc.growBuffer(oldbuf, newsize) + if err != nil { + return nil, err + } + + if err := s.reserveMemoryForConstraints(newsize - len(oldbuf)); err != nil { + s.rc.releaseBuffer(buf) + return nil, err + } + + return buf, err +} + +func (s *ResourceScope) ReleaseBuffer(buf []byte) { + s.mx.Lock() + defer s.mx.Unlock() + + if s.done { + return + } + + s.rc.releaseBuffer(buf) + for _, cst := range s.constraints { + cst.ReleaseMemoryForChild(int64(len(buf))) + } +} + +func (s *ResourceScope) AddStream(count int) error { + s.mx.Lock() + defer s.mx.Unlock() + + if s.done { + return ErrResourceScopeClosed + } + + if err := s.rc.addStream(count); err != nil { + return err + } + + var err error + var reserved int + for _, cst := range s.constraints { + if err = cst.AddStreamForChild(count); err != nil { + break + } + reserved++ + } + + if err != nil { + for _, cst := range s.constraints[:reserved] { + cst.RemoveStreamForChild(count) + } + } + + return err +} + +func (s *ResourceScope) AddStreamForChild(count int) error { + s.mx.Lock() + defer s.mx.Unlock() + + return s.rc.addStream(count) +} + +func (s *ResourceScope) RemoveStream(count int) { + s.mx.Lock() + defer s.mx.Unlock() + + s.rc.removeStream(count) + for _, cst := range s.constraints { + cst.RemoveStreamForChild(count) + } +} + +func (s *ResourceScope) RemoveStreamForChild(count int) { + s.mx.Lock() + defer s.mx.Unlock() + s.rc.removeStream(count) +} + +func (s *ResourceScope) AddConn(count int) error { + s.mx.Lock() + defer s.mx.Unlock() + + if s.done { + return ErrResourceScopeClosed + } + + if err := s.rc.addConn(count); err != nil { + return err + } + + var err error + var reserved int + for _, cst := range s.constraints { + if err = cst.AddConnForChild(count); err != nil { + break + } + reserved++ + } + + if err != nil { + for _, cst := range s.constraints[:reserved] { + cst.RemoveConnForChild(count) + } + } + + return err +} + +func (s *ResourceScope) AddConnForChild(count int) error { + s.mx.Lock() + defer s.mx.Unlock() + + return s.rc.addConn(count) +} + +func (s *ResourceScope) RemoveConn(count int) { + s.mx.Lock() + defer s.mx.Unlock() + + s.rc.removeConn(count) + for _, cst := range s.constraints { + cst.RemoveConnForChild(count) + } +} + +func (s *ResourceScope) RemoveConnForChild(count int) { + s.mx.Lock() + defer s.mx.Unlock() + s.rc.removeConn(count) +} + +func (s *ResourceScope) Done() { + s.mx.Lock() + defer s.mx.Unlock() + + if s.done { + return + } + + for _, cst := range s.constraints { + cst.ReleaseMemoryForChild(s.rc.memory) + cst.RemoveStreamForChild(s.rc.nstreams) + cst.RemoveConnForChild(s.rc.nconns) + } + + s.rc.releaseBuffers() + + s.rc.memory = 0 + s.rc.nstreams = 0 + s.rc.nconns = 0 + + s.done = true +} + +func (s *ResourceScope) Stat() network.ScopeStat { + s.mx.Lock() + defer s.mx.Unlock() + + return s.rc.stat() }