go-libp2p-resource-manager/scope.go
2022-01-17 12:33:01 +02:00

622 lines
13 KiB
Go

package rcmgr
import (
"fmt"
"sync"
"github.com/libp2p/go-libp2p-core/network"
)
// resources tracks the current state of resource consumption
type resources struct {
limit Limit
nconnsIn, nconnsOut int
nstreamsIn, nstreamsOut int
nfd int
memory int64
}
// A resourceScope can be a DAG, where a downstream node is not allowed to outlive an upstream node
// (ie cannot call Done in the upstream node before the downstream node) and account for resources
// using a linearized parent set.
// A resourceScope can be a txn scope, where it has a specific owner; txn scopes create a tree rooted
// at the owner (which can be a DAG scope) and can outlive their parents -- this is important because
// txn scopes are the main *user* interface for memory management, and the user may call
// Done in a txn scope after the system has closed the root of the txn tree in some background
// goroutine.
// If we didn't make this distinction we would have a double release problem in that case.
type resourceScope struct {
sync.Mutex
done bool
refCnt int
rc resources
owner *resourceScope // set in transaction scopes, which define trees
constraints []*resourceScope // set in DAG scopes, it's the linearized parent set
}
var _ network.ResourceScope = (*resourceScope)(nil)
var _ network.TransactionalScope = (*resourceScope)(nil)
func newResourceScope(limit Limit, constraints []*resourceScope) *resourceScope {
for _, cst := range constraints {
cst.IncRef()
}
return &resourceScope{
rc: resources{limit: limit},
constraints: constraints,
}
}
func newTxnResourceScope(owner *resourceScope) *resourceScope {
return &resourceScope{
rc: resources{limit: owner.rc.limit},
owner: owner,
}
}
// Resources implementation
func (rc *resources) checkMemory(rsvp int64) (network.MemoryStatus, error) {
// overflow check; this also has the side effect that we cannot reserve negative memory.
newmem := rc.memory + rsvp
if newmem < rc.memory {
return network.MemoryStatusOK, fmt.Errorf("memory reservation overflow: %w", network.ErrResourceLimitExceeded)
}
// limit check
limit := rc.limit.GetMemoryLimit()
switch {
case newmem > limit:
return network.MemoryStatusOK, fmt.Errorf("cannot reserve memory: %w", network.ErrResourceLimitExceeded)
case newmem > int64(0.8*float64(limit)):
return network.MemoryStatusCritical, nil
case newmem > int64(0.5*float64(limit)):
return network.MemoryStatusCaution, nil
default:
return network.MemoryStatusOK, nil
}
}
func (rc *resources) reserveMemory(size int64) (status network.MemoryStatus, err error) {
if status, err = rc.checkMemory(size); err != nil {
return status, err
}
rc.memory += size
return status, nil
}
func (rc *resources) releaseMemory(size int64) {
rc.memory -= size
// sanity check for bugs upstream
if rc.memory < 0 {
panic("BUG: too much memory released")
}
}
func (rc *resources) addStream(dir network.Direction) error {
if dir == network.DirInbound {
return rc.addStreams(1, 0)
}
return rc.addStreams(0, 1)
}
func (rc *resources) addStreams(incount, outcount int) error {
if incount > 0 && rc.nstreamsIn+incount > rc.limit.GetStreamLimit(network.DirInbound) {
return fmt.Errorf("cannot reserve stream: %w", network.ErrResourceLimitExceeded)
}
if outcount > 0 && rc.nstreamsOut+outcount > rc.limit.GetStreamLimit(network.DirOutbound) {
return fmt.Errorf("cannot reserve stream: %w", network.ErrResourceLimitExceeded)
}
rc.nstreamsIn += incount
rc.nstreamsOut += outcount
return nil
}
func (rc *resources) removeStream(dir network.Direction) {
if dir == network.DirInbound {
rc.removeStreams(1, 0)
} else {
rc.removeStreams(0, 1)
}
}
func (rc *resources) removeStreams(incount, outcount int) {
rc.nstreamsIn -= incount
rc.nstreamsOut -= outcount
if rc.nstreamsIn < 0 || rc.nstreamsOut < 0 {
panic("BUG: too many streams released")
}
}
func (rc *resources) addConn(dir network.Direction, usefd bool) error {
var fd int
if usefd {
fd = 1
}
if dir == network.DirInbound {
return rc.addConns(1, 0, fd)
}
return rc.addConns(0, 1, fd)
}
func (rc *resources) addConns(incount, outcount, fdcount int) error {
if incount > 0 && rc.nconnsIn+incount > rc.limit.GetConnLimit(network.DirInbound) {
return fmt.Errorf("cannot reserve connection: %w", network.ErrResourceLimitExceeded)
}
if outcount > 0 && rc.nconnsOut+outcount > rc.limit.GetConnLimit(network.DirOutbound) {
return fmt.Errorf("cannot reserve connection: %w", network.ErrResourceLimitExceeded)
}
if fdcount > 0 && rc.nfd+fdcount > rc.limit.GetFDLimit() {
return fmt.Errorf("cannot reserve file descriptor: %w", network.ErrResourceLimitExceeded)
}
rc.nconnsIn += incount
rc.nconnsOut += outcount
rc.nfd += fdcount
return nil
}
func (rc *resources) removeConn(dir network.Direction, usefd bool) {
var fd int
if usefd {
fd = 1
}
if dir == network.DirInbound {
rc.removeConns(1, 0, fd)
} else {
rc.removeConns(0, 1, fd)
}
}
func (rc *resources) removeConns(incount, outcount, fdcount int) {
rc.nconnsIn -= incount
rc.nconnsOut -= outcount
rc.nfd -= fdcount
if rc.nconnsIn < 0 || rc.nconnsOut < 0 {
panic("BUG: too many connections released")
}
if rc.nfd < 0 {
panic("BUG: too many file descriptors released")
}
}
func (rc *resources) stat() network.ScopeStat {
return network.ScopeStat{
Memory: rc.memory,
NumStreamsInbound: rc.nstreamsIn,
NumStreamsOutbound: rc.nstreamsOut,
NumConnsInbound: rc.nconnsIn,
NumConnsOutbound: rc.nconnsOut,
NumFD: rc.nfd,
}
}
// resourceScope implementation
func (s *resourceScope) ReserveMemory(size int) (status network.MemoryStatus, err error) {
s.Lock()
defer s.Unlock()
if s.done {
return network.MemoryStatusOK, network.ErrResourceScopeClosed
}
if status, err = s.rc.reserveMemory(int64(size)); err != nil {
return status, err
}
var statusCst network.MemoryStatus
if statusCst, err = s.reserveMemoryForConstraints(size); err != nil {
s.rc.releaseMemory(int64(size))
return status, err
}
if statusCst > status {
status = statusCst
}
return status, nil
}
func (s *resourceScope) reserveMemoryForConstraints(size int) (status network.MemoryStatus, err error) {
if s.owner != nil {
return s.owner.ReserveMemory(size)
}
var reserved int
for _, cst := range s.constraints {
var statusCst network.MemoryStatus
if statusCst, err = cst.ReserveMemoryForChild(int64(size)); err != nil {
break
}
if statusCst > status {
status = statusCst
}
reserved++
}
if err != nil {
// we failed because of a constraint; undo memory reservations
for _, cst := range s.constraints[:reserved] {
cst.ReleaseMemoryForChild(int64(size))
}
}
return status, err
}
func (s *resourceScope) releaseMemoryForConstraints(size int) {
if s.owner != nil {
s.owner.ReleaseMemory(size)
return
}
for _, cst := range s.constraints {
cst.ReleaseMemoryForChild(int64(size))
}
}
func (s *resourceScope) ReserveMemoryForChild(size int64) (network.MemoryStatus, error) {
s.Lock()
defer s.Unlock()
if s.done {
return network.MemoryStatusOK, network.ErrResourceScopeClosed
}
return s.rc.reserveMemory(size)
}
func (s *resourceScope) ReleaseMemory(size int) {
s.Lock()
defer s.Unlock()
if s.done {
return
}
s.rc.releaseMemory(int64(size))
s.releaseMemoryForConstraints(size)
}
func (s *resourceScope) ReleaseMemoryForChild(size int64) {
s.Lock()
defer s.Unlock()
if s.done {
return
}
s.rc.releaseMemory(size)
}
func (s *resourceScope) AddStream(dir network.Direction) error {
s.Lock()
defer s.Unlock()
if s.done {
return network.ErrResourceScopeClosed
}
if err := s.rc.addStream(dir); err != nil {
return err
}
if err := s.addStreamForConstraints(dir); err != nil {
s.rc.removeStream(dir)
return err
}
return nil
}
func (s *resourceScope) addStreamForConstraints(dir network.Direction) error {
if s.owner != nil {
return s.owner.AddStream(dir)
}
var err error
var reserved int
for _, cst := range s.constraints {
if err = cst.AddStreamForChild(dir); err != nil {
break
}
reserved++
}
if err != nil {
for _, cst := range s.constraints[:reserved] {
cst.RemoveStreamForChild(dir)
}
}
return err
}
func (s *resourceScope) AddStreamForChild(dir network.Direction) error {
s.Lock()
defer s.Unlock()
if s.done {
return network.ErrResourceScopeClosed
}
return s.rc.addStream(dir)
}
func (s *resourceScope) RemoveStream(dir network.Direction) {
s.Lock()
defer s.Unlock()
if s.done {
return
}
s.rc.removeStream(dir)
s.removeStreamForConstraints(dir)
}
func (s *resourceScope) removeStreamForConstraints(dir network.Direction) {
if s.owner != nil {
s.owner.RemoveStream(dir)
return
}
for _, cst := range s.constraints {
cst.RemoveStreamForChild(dir)
}
}
func (s *resourceScope) RemoveStreamForChild(dir network.Direction) {
s.Lock()
defer s.Unlock()
if s.done {
return
}
s.rc.removeStream(dir)
}
func (s *resourceScope) AddConn(dir network.Direction, usefd bool) error {
s.Lock()
defer s.Unlock()
if s.done {
return network.ErrResourceScopeClosed
}
if err := s.rc.addConn(dir, usefd); err != nil {
return err
}
if err := s.addConnForConstraints(dir, usefd); err != nil {
s.rc.removeConn(dir, usefd)
return err
}
return nil
}
func (s *resourceScope) addConnForConstraints(dir network.Direction, usefd bool) error {
if s.owner != nil {
return s.owner.AddConn(dir, usefd)
}
var err error
var reserved int
for _, cst := range s.constraints {
if err = cst.AddConnForChild(dir, usefd); err != nil {
break
}
reserved++
}
if err != nil {
for _, cst := range s.constraints[:reserved] {
cst.RemoveConnForChild(dir, usefd)
}
}
return err
}
func (s *resourceScope) AddConnForChild(dir network.Direction, usefd bool) error {
s.Lock()
defer s.Unlock()
if s.done {
return network.ErrResourceScopeClosed
}
return s.rc.addConn(dir, usefd)
}
func (s *resourceScope) RemoveConn(dir network.Direction, usefd bool) {
s.Lock()
defer s.Unlock()
if s.done {
return
}
s.rc.removeConn(dir, usefd)
s.removeConnForConstraints(dir, usefd)
}
func (s *resourceScope) removeConnForConstraints(dir network.Direction, usefd bool) {
if s.owner != nil {
s.owner.RemoveConn(dir, usefd)
}
for _, cst := range s.constraints {
cst.RemoveConnForChild(dir, usefd)
}
}
func (s *resourceScope) RemoveConnForChild(dir network.Direction, usefd bool) {
s.Lock()
defer s.Unlock()
if s.done {
return
}
s.rc.removeConn(dir, usefd)
}
func (s *resourceScope) ReserveForChild(st network.ScopeStat) (status network.MemoryStatus, err error) {
s.Lock()
defer s.Unlock()
if s.done {
return network.MemoryStatusOK, network.ErrResourceScopeClosed
}
if status, err = s.rc.reserveMemory(st.Memory); err != nil {
return status, err
}
if err := s.rc.addStreams(st.NumStreamsInbound, st.NumStreamsOutbound); err != nil {
s.rc.releaseMemory(st.Memory)
return status, err
}
if err := s.rc.addConns(st.NumConnsInbound, st.NumConnsOutbound, st.NumFD); err != nil {
s.rc.releaseMemory(st.Memory)
s.rc.removeStreams(st.NumStreamsInbound, st.NumStreamsOutbound)
return status, err
}
return status, nil
}
func (s *resourceScope) ReleaseForChild(st network.ScopeStat) {
s.Lock()
defer s.Unlock()
if s.done {
return
}
s.rc.releaseMemory(st.Memory)
s.rc.removeStreams(st.NumStreamsInbound, st.NumStreamsOutbound)
s.rc.removeConns(st.NumConnsInbound, st.NumConnsOutbound, st.NumFD)
}
func (s *resourceScope) ReleaseResources(st network.ScopeStat) {
s.Lock()
defer s.Unlock()
if s.done {
return
}
s.rc.releaseMemory(st.Memory)
s.rc.removeStreams(st.NumStreamsInbound, st.NumStreamsOutbound)
s.rc.removeConns(st.NumConnsInbound, st.NumConnsOutbound, st.NumFD)
if s.owner != nil {
s.owner.ReleaseResources(st)
} else {
for _, cst := range s.constraints {
cst.ReleaseForChild(st)
}
}
}
func (s *resourceScope) BeginTransaction() (network.TransactionalScope, error) {
s.Lock()
defer s.Unlock()
if s.done {
return nil, network.ErrResourceScopeClosed
}
s.refCnt++
return newTxnResourceScope(s), nil
}
func (s *resourceScope) Done() {
s.Lock()
defer s.Unlock()
if s.done {
return
}
stat := s.rc.stat()
if s.owner != nil {
s.owner.ReleaseResources(stat)
s.owner.DecRef()
} else {
for _, cst := range s.constraints {
cst.ReleaseForChild(stat)
cst.DecRef()
}
}
s.rc.nstreamsIn = 0
s.rc.nstreamsOut = 0
s.rc.nconnsIn = 0
s.rc.nconnsOut = 0
s.rc.nfd = 0
s.rc.memory = 0
s.done = true
}
func (s *resourceScope) Stat() network.ScopeStat {
s.Lock()
defer s.Unlock()
return s.rc.stat()
}
func (s *resourceScope) IncRef() {
s.Lock()
defer s.Unlock()
s.refCnt++
}
func (s *resourceScope) DecRef() {
s.Lock()
defer s.Unlock()
s.refCnt--
}
func (s *resourceScope) IsUnused() bool {
s.Lock()
defer s.Unlock()
if s.done {
return true
}
if s.refCnt > 0 {
return false
}
st := s.rc.stat()
return st.NumStreamsInbound == 0 &&
st.NumStreamsOutbound == 0 &&
st.NumConnsInbound == 0 &&
st.NumConnsOutbound == 0 &&
st.NumFD == 0
}