include the current limits in stream allocation errors

This commit is contained in:
Marten Seemann 2022-06-06 12:56:21 +02:00
parent 5224eb6ad6
commit 2855d5b5e2

View File

@ -1,6 +1,7 @@
package rcmgr
import (
"errors"
"fmt"
"sync"
@ -18,6 +19,21 @@ type resources struct {
memory int64
}
type errStreamLimitExceeded struct {
current, attempted, limit int
err error
}
func (e *errStreamLimitExceeded) Error() string { return e.err.Error() }
func (e *errStreamLimitExceeded) Unwrap() error { return e.err }
func (e *errStreamLimitExceeded) AppendLogValues(v []interface{}) []interface{} {
return append(v,
"current", e.current,
"attempted", e.attempted,
"limit", e.limit,
)
}
// A resourceScope can be a DAG, where a downstream node is not allowed to outlive an upstream node
// (ie cannot call Done in the upstream node before the downstream node) and account for resources
// using a linearized parent set.
@ -112,14 +128,36 @@ func (rc *resources) addStream(dir network.Direction) error {
}
func (rc *resources) addStreams(incount, outcount int) error {
if incount > 0 && rc.nstreamsIn+incount > rc.limit.GetStreamLimit(network.DirInbound) {
return fmt.Errorf("cannot reserve stream: %w", network.ErrResourceLimitExceeded)
if incount > 0 {
limit := rc.limit.GetStreamLimit(network.DirInbound)
if rc.nstreamsIn+incount > limit {
return &errStreamLimitExceeded{
current: rc.nstreamsIn,
attempted: incount,
limit: limit,
err: fmt.Errorf("cannot reserve inbound stream: %w", network.ErrResourceLimitExceeded),
}
}
}
if outcount > 0 && rc.nstreamsOut+outcount > rc.limit.GetStreamLimit(network.DirOutbound) {
return fmt.Errorf("cannot reserve stream: %w", network.ErrResourceLimitExceeded)
if outcount > 0 {
limit := rc.limit.GetStreamLimit(network.DirOutbound)
if rc.nstreamsOut+outcount > limit {
return &errStreamLimitExceeded{
current: rc.nstreamsOut,
attempted: outcount,
limit: limit,
err: fmt.Errorf("cannot reserve outbound stream: %w", network.ErrResourceLimitExceeded),
}
}
}
if rc.nstreamsIn+incount+rc.nstreamsOut+outcount > rc.limit.GetStreamTotalLimit() {
return fmt.Errorf("cannot reserve stream: %w", network.ErrResourceLimitExceeded)
if limit := rc.limit.GetStreamTotalLimit(); rc.nstreamsIn+incount+rc.nstreamsOut+outcount > limit {
return &errStreamLimitExceeded{
current: rc.nstreamsIn + rc.nstreamsOut,
attempted: incount + outcount,
limit: limit,
err: fmt.Errorf("cannot reserve stream: %w", network.ErrResourceLimitExceeded),
}
}
rc.nstreamsIn += incount
@ -344,7 +382,14 @@ func (s *resourceScope) AddStream(dir network.Direction) error {
}
if err := s.rc.addStream(dir); err != nil {
log.Debugw("blocked stream", "scope", s.name, "direction", dir, "stat", s.rc.stat(), "error", err)
logValues := make([]interface{}, 0, 6)
logValues = append(logValues, "scope", s.name, "direction", dir)
var limitErr *errStreamLimitExceeded
if errors.As(err, &limitErr) {
logValues = limitErr.AppendLogValues(logValues)
}
logValues = append(logValues, "error", err)
log.Debugw("blocked stream", logValues...)
s.trace.BlockAddStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
return s.wrapError(err)
}
@ -367,7 +412,14 @@ func (s *resourceScope) addStreamForEdges(dir network.Direction) error {
var reserved int
for _, e := range s.edges {
if err = e.AddStreamForChild(dir); err != nil {
log.Debugw("blocked stream from constraining edge", "scope", s.name, "edge", e.name, "direction", dir, "stat", e.Stat(), "error", err)
logValues := make([]interface{}, 0, 7)
logValues = append(logValues, "scope", s.name, "edge", e.name, "direction", dir)
var limitErr *errStreamLimitExceeded
if errors.As(err, &limitErr) {
logValues = limitErr.AppendLogValues(logValues)
}
logValues = append(logValues, "error", err)
log.Debugw("blocked stream from constraining edge", logValues...)
break
}
reserved++