refactor scopes, introduce constrained scopes

This commit is contained in:
vyzo 2021-12-23 11:13:12 +02:00
parent 004a6948b2
commit 17a30366ae
2 changed files with 336 additions and 53 deletions

View File

@ -2,4 +2,6 @@ package rcmgr
type Limit interface {
GetMemoryLimit() int64
GetStreamLimit() int
GetConnLimit() int
}

387
scope.go
View File

@ -8,17 +8,31 @@ import (
"github.com/libp2p/go-libp2p-core/network"
)
type ResourceScope struct {
sync.Mutex
// Basic resource mamagement.
type BasicResourceScope struct {
limit Limit
nconns int
nstreams int
memory int64
buffers map[interface{}][]byte
}
limit Limit
memory int64
buffers map[interface{}][]byte
// DAG ResourceScopes. A
// BasicResourceScope accounts for the node usage, constraints signify
// the dependencies that constrains resource usage.
type ResourceScope struct {
mx sync.Mutex
rc *BasicResourceScope
done bool
constraints []*ResourceScope
}
var _ network.ResourceScope = (*ResourceScope)(nil)
var _ network.TransactionalScope = (*ResourceScope)(nil)
func (rc *ResourceScope) checkMemory(rsvp int) error {
// BasicResourceScope implementation
func (rc *BasicResourceScope) checkMemory(rsvp int) error {
// overflow check; this also has the side-effect that we cannot reserve negative memory.
newmem := rc.memory + int64(rsvp)
if newmem < rc.memory {
@ -33,21 +47,14 @@ func (rc *ResourceScope) checkMemory(rsvp int) error {
return nil
}
func (rc *ResourceScope) releaseBuffers() {
func (rc *BasicResourceScope) releaseBuffers() {
for key, buf := range rc.buffers {
pool.Put(buf)
delete(rc.buffers, key)
}
}
func (rc *ResourceScope) ReserveMemory(size int) error {
if rc == nil {
return nil
}
rc.Lock()
defer rc.Unlock()
func (rc *BasicResourceScope) reserveMemory(size int) error {
if err := rc.checkMemory(size); err != nil {
return err
}
@ -56,15 +63,8 @@ func (rc *ResourceScope) ReserveMemory(size int) error {
return nil
}
func (rc *ResourceScope) ReleaseMemory(size int) {
if rc == nil {
return
}
rc.Lock()
defer rc.Unlock()
rc.memory -= int64(size)
func (rc *BasicResourceScope) releaseMemory(size int64) {
rc.memory -= size
// sanity check for bugs upstream
if rc.memory < 0 {
@ -72,14 +72,7 @@ func (rc *ResourceScope) ReleaseMemory(size int) {
}
}
func (rc *ResourceScope) GetBuffer(size int) ([]byte, error) {
if rc == nil {
return make([]byte, size), nil
}
rc.Lock()
defer rc.Unlock()
func (rc *BasicResourceScope) getBuffer(size int) ([]byte, error) {
if err := rc.checkMemory(size); err != nil {
return nil, err
}
@ -92,16 +85,7 @@ func (rc *ResourceScope) GetBuffer(size int) ([]byte, error) {
return buf, nil
}
func (rc *ResourceScope) GrowBuffer(oldbuf []byte, newsize int) ([]byte, error) {
if rc == nil {
newbuf := make([]byte, newsize)
copy(newbuf, oldbuf)
return newbuf, nil
}
rc.Lock()
defer rc.Unlock()
func (rc *BasicResourceScope) growBuffer(oldbuf []byte, newsize int) ([]byte, error) {
grow := newsize - len(oldbuf)
if err := rc.checkMemory(grow); err != nil {
return nil, err
@ -117,10 +101,7 @@ func (rc *ResourceScope) GrowBuffer(oldbuf []byte, newsize int) ([]byte, error)
return newbuf, nil
}
func (rc *ResourceScope) ReleaseBuffer(buf []byte) {
rc.Lock()
defer rc.Unlock()
func (rc *BasicResourceScope) releaseBuffer(buf []byte) {
rc.memory -= int64(len(buf))
// sanity check for bugs upstream
@ -132,13 +113,313 @@ func (rc *ResourceScope) ReleaseBuffer(buf []byte) {
pool.Put(buf)
}
func (rc *ResourceScope) Stat() network.ScopeStat {
if rc == nil {
return network.ScopeStat{}
func (rc *BasicResourceScope) addStream(count int) error {
if rc.nstreams+count > rc.limit.GetStreamLimit() {
return fmt.Errorf("cannot reserve stream: %w", ErrResourceLimitExceeded)
}
rc.Lock()
defer rc.Unlock()
return network.ScopeStat{Memory: rc.memory}
rc.nstreams += count
return nil
}
func (rc *BasicResourceScope) removeStream(count int) {
rc.nstreams -= count
if rc.nstreams < 0 {
panic("BUG: too many streams released")
}
}
func (rc *BasicResourceScope) addConn(count int) error {
if rc.nconns+count > rc.limit.GetConnLimit() {
return fmt.Errorf("cannot reserve connection: %w", ErrResourceLimitExceeded)
}
rc.nconns += count
return nil
}
func (rc *BasicResourceScope) removeConn(count int) {
rc.nconns -= count
if rc.nconns < 0 {
panic("BUG: too many connections released")
}
}
func (rc *BasicResourceScope) stat() network.ScopeStat {
return network.ScopeStat{
Memory: rc.memory,
NumConns: rc.nconns,
NumStreams: rc.nstreams,
}
}
// ResourceScope implementation
func (s *ResourceScope) ReserveMemory(size int) error {
s.mx.Lock()
defer s.mx.Unlock()
if s.done {
return ErrResourceScopeClosed
}
if err := s.rc.reserveMemory(size); err != nil {
return err
}
if err := s.reserveMemoryForConstraints(size); err != nil {
s.rc.releaseMemory(int64(size))
return err
}
return nil
}
func (s *ResourceScope) reserveMemoryForConstraints(size int) error {
var reserved int
var err error
for _, cst := range s.constraints {
if err = cst.ReserveMemoryForChild(size); err != nil {
break
}
reserved++
}
if err != nil {
// we failed because of a constraint; undo memory reservations
for _, cst := range s.constraints[:reserved] {
cst.ReleaseMemoryForChild(int64(size))
}
}
return err
}
func (s *ResourceScope) ReserveMemoryForChild(size int) error {
s.mx.Lock()
defer s.mx.Unlock()
if s.done {
return ErrResourceScopeClosed
}
return s.rc.reserveMemory(size)
}
func (s *ResourceScope) ReleaseMemory(size int) {
s.mx.Lock()
defer s.mx.Unlock()
if s.done {
return
}
s.rc.releaseMemory(int64(size))
for _, cst := range s.constraints {
cst.ReleaseMemoryForChild(int64(size))
}
}
func (s *ResourceScope) ReleaseMemoryForChild(size int64) {
s.mx.Lock()
defer s.mx.Unlock()
if s.done {
return
}
s.rc.releaseMemory(size)
}
func (s *ResourceScope) GetBuffer(size int) ([]byte, error) {
s.mx.Lock()
defer s.mx.Unlock()
if s.done {
return nil, ErrResourceScopeClosed
}
buf, err := s.rc.getBuffer(size)
if err != nil {
return nil, err
}
if err := s.reserveMemoryForConstraints(size); err != nil {
s.rc.releaseBuffer(buf)
return nil, err
}
return buf, err
}
func (s *ResourceScope) GrowBuffer(oldbuf []byte, newsize int) ([]byte, error) {
s.mx.Lock()
defer s.mx.Unlock()
if s.done {
return nil, ErrResourceScopeClosed
}
buf, err := s.rc.growBuffer(oldbuf, newsize)
if err != nil {
return nil, err
}
if err := s.reserveMemoryForConstraints(newsize - len(oldbuf)); err != nil {
s.rc.releaseBuffer(buf)
return nil, err
}
return buf, err
}
func (s *ResourceScope) ReleaseBuffer(buf []byte) {
s.mx.Lock()
defer s.mx.Unlock()
if s.done {
return
}
s.rc.releaseBuffer(buf)
for _, cst := range s.constraints {
cst.ReleaseMemoryForChild(int64(len(buf)))
}
}
func (s *ResourceScope) AddStream(count int) error {
s.mx.Lock()
defer s.mx.Unlock()
if s.done {
return ErrResourceScopeClosed
}
if err := s.rc.addStream(count); err != nil {
return err
}
var err error
var reserved int
for _, cst := range s.constraints {
if err = cst.AddStreamForChild(count); err != nil {
break
}
reserved++
}
if err != nil {
for _, cst := range s.constraints[:reserved] {
cst.RemoveStreamForChild(count)
}
}
return err
}
func (s *ResourceScope) AddStreamForChild(count int) error {
s.mx.Lock()
defer s.mx.Unlock()
return s.rc.addStream(count)
}
func (s *ResourceScope) RemoveStream(count int) {
s.mx.Lock()
defer s.mx.Unlock()
s.rc.removeStream(count)
for _, cst := range s.constraints {
cst.RemoveStreamForChild(count)
}
}
func (s *ResourceScope) RemoveStreamForChild(count int) {
s.mx.Lock()
defer s.mx.Unlock()
s.rc.removeStream(count)
}
func (s *ResourceScope) AddConn(count int) error {
s.mx.Lock()
defer s.mx.Unlock()
if s.done {
return ErrResourceScopeClosed
}
if err := s.rc.addConn(count); err != nil {
return err
}
var err error
var reserved int
for _, cst := range s.constraints {
if err = cst.AddConnForChild(count); err != nil {
break
}
reserved++
}
if err != nil {
for _, cst := range s.constraints[:reserved] {
cst.RemoveConnForChild(count)
}
}
return err
}
func (s *ResourceScope) AddConnForChild(count int) error {
s.mx.Lock()
defer s.mx.Unlock()
return s.rc.addConn(count)
}
func (s *ResourceScope) RemoveConn(count int) {
s.mx.Lock()
defer s.mx.Unlock()
s.rc.removeConn(count)
for _, cst := range s.constraints {
cst.RemoveConnForChild(count)
}
}
func (s *ResourceScope) RemoveConnForChild(count int) {
s.mx.Lock()
defer s.mx.Unlock()
s.rc.removeConn(count)
}
func (s *ResourceScope) Done() {
s.mx.Lock()
defer s.mx.Unlock()
if s.done {
return
}
for _, cst := range s.constraints {
cst.ReleaseMemoryForChild(s.rc.memory)
cst.RemoveStreamForChild(s.rc.nstreams)
cst.RemoveConnForChild(s.rc.nconns)
}
s.rc.releaseBuffers()
s.rc.memory = 0
s.rc.nstreams = 0
s.rc.nconns = 0
s.done = true
}
func (s *ResourceScope) Stat() network.ScopeStat {
s.mx.Lock()
defer s.mx.Unlock()
return s.rc.stat()
}