mirror of
https://github.com/libp2p/go-libp2p-resource-manager.git
synced 2025-03-24 08:10:55 +08:00
include the current limits in conn allocation errors
This commit is contained in:
parent
2855d5b5e2
commit
069f2c2f6e
82
scope.go
82
scope.go
@ -19,14 +19,14 @@ type resources struct {
|
||||
memory int64
|
||||
}
|
||||
|
||||
type errStreamLimitExceeded struct {
|
||||
type errStreamOrConnLimitExceeded struct {
|
||||
current, attempted, limit int
|
||||
err error
|
||||
}
|
||||
|
||||
func (e *errStreamLimitExceeded) Error() string { return e.err.Error() }
|
||||
func (e *errStreamLimitExceeded) Unwrap() error { return e.err }
|
||||
func (e *errStreamLimitExceeded) AppendLogValues(v []interface{}) []interface{} {
|
||||
func (e *errStreamOrConnLimitExceeded) Error() string { return e.err.Error() }
|
||||
func (e *errStreamOrConnLimitExceeded) Unwrap() error { return e.err }
|
||||
func (e *errStreamOrConnLimitExceeded) AppendLogValues(v []interface{}) []interface{} {
|
||||
return append(v,
|
||||
"current", e.current,
|
||||
"attempted", e.attempted,
|
||||
@ -131,7 +131,7 @@ func (rc *resources) addStreams(incount, outcount int) error {
|
||||
if incount > 0 {
|
||||
limit := rc.limit.GetStreamLimit(network.DirInbound)
|
||||
if rc.nstreamsIn+incount > limit {
|
||||
return &errStreamLimitExceeded{
|
||||
return &errStreamOrConnLimitExceeded{
|
||||
current: rc.nstreamsIn,
|
||||
attempted: incount,
|
||||
limit: limit,
|
||||
@ -142,7 +142,7 @@ func (rc *resources) addStreams(incount, outcount int) error {
|
||||
if outcount > 0 {
|
||||
limit := rc.limit.GetStreamLimit(network.DirOutbound)
|
||||
if rc.nstreamsOut+outcount > limit {
|
||||
return &errStreamLimitExceeded{
|
||||
return &errStreamOrConnLimitExceeded{
|
||||
current: rc.nstreamsOut,
|
||||
attempted: outcount,
|
||||
limit: limit,
|
||||
@ -152,7 +152,7 @@ func (rc *resources) addStreams(incount, outcount int) error {
|
||||
}
|
||||
|
||||
if limit := rc.limit.GetStreamTotalLimit(); rc.nstreamsIn+incount+rc.nstreamsOut+outcount > limit {
|
||||
return &errStreamLimitExceeded{
|
||||
return &errStreamOrConnLimitExceeded{
|
||||
current: rc.nstreamsIn + rc.nstreamsOut,
|
||||
attempted: incount + outcount,
|
||||
limit: limit,
|
||||
@ -201,17 +201,47 @@ func (rc *resources) addConn(dir network.Direction, usefd bool) error {
|
||||
}
|
||||
|
||||
func (rc *resources) addConns(incount, outcount, fdcount int) error {
|
||||
if incount > 0 && rc.nconnsIn+incount > rc.limit.GetConnLimit(network.DirInbound) {
|
||||
return fmt.Errorf("cannot reserve connection: %w", network.ErrResourceLimitExceeded)
|
||||
if incount > 0 {
|
||||
limit := rc.limit.GetConnLimit(network.DirInbound)
|
||||
if rc.nconnsIn+incount > limit {
|
||||
return &errStreamOrConnLimitExceeded{
|
||||
current: rc.nconnsIn,
|
||||
attempted: incount,
|
||||
limit: limit,
|
||||
err: fmt.Errorf("cannot reserve inbound connection: %w", network.ErrResourceLimitExceeded),
|
||||
}
|
||||
}
|
||||
}
|
||||
if outcount > 0 && rc.nconnsOut+outcount > rc.limit.GetConnLimit(network.DirOutbound) {
|
||||
return fmt.Errorf("cannot reserve connection: %w", network.ErrResourceLimitExceeded)
|
||||
if outcount > 0 {
|
||||
limit := rc.limit.GetConnLimit(network.DirOutbound)
|
||||
if rc.nconnsOut+outcount > limit {
|
||||
return &errStreamOrConnLimitExceeded{
|
||||
current: rc.nconnsOut,
|
||||
attempted: outcount,
|
||||
limit: limit,
|
||||
err: fmt.Errorf("cannot reserve outbound connection: %w", network.ErrResourceLimitExceeded),
|
||||
}
|
||||
}
|
||||
}
|
||||
if rc.nconnsIn+incount+rc.nconnsOut+outcount > rc.limit.GetConnTotalLimit() {
|
||||
return fmt.Errorf("cannot reserve connection: %w", network.ErrResourceLimitExceeded)
|
||||
|
||||
if connLimit := rc.limit.GetConnTotalLimit(); rc.nconnsIn+incount+rc.nconnsOut+outcount > connLimit {
|
||||
return &errStreamOrConnLimitExceeded{
|
||||
current: rc.nconnsIn + rc.nconnsOut,
|
||||
attempted: incount + outcount,
|
||||
limit: connLimit,
|
||||
err: fmt.Errorf("cannot reserve connection: %w", network.ErrResourceLimitExceeded),
|
||||
}
|
||||
}
|
||||
if fdcount > 0 && rc.nfd+fdcount > rc.limit.GetFDLimit() {
|
||||
return fmt.Errorf("cannot reserve file descriptor: %w", network.ErrResourceLimitExceeded)
|
||||
if fdcount > 0 {
|
||||
limit := rc.limit.GetFDLimit()
|
||||
if rc.nfd+fdcount > limit {
|
||||
return &errStreamOrConnLimitExceeded{
|
||||
current: rc.nfd,
|
||||
attempted: fdcount,
|
||||
limit: limit,
|
||||
err: fmt.Errorf("cannot reserve file descriptor: %w", network.ErrResourceLimitExceeded),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
rc.nconnsIn += incount
|
||||
@ -384,7 +414,7 @@ func (s *resourceScope) AddStream(dir network.Direction) error {
|
||||
if err := s.rc.addStream(dir); err != nil {
|
||||
logValues := make([]interface{}, 0, 6)
|
||||
logValues = append(logValues, "scope", s.name, "direction", dir)
|
||||
var limitErr *errStreamLimitExceeded
|
||||
var limitErr *errStreamOrConnLimitExceeded
|
||||
if errors.As(err, &limitErr) {
|
||||
logValues = limitErr.AppendLogValues(logValues)
|
||||
}
|
||||
@ -414,7 +444,7 @@ func (s *resourceScope) addStreamForEdges(dir network.Direction) error {
|
||||
if err = e.AddStreamForChild(dir); err != nil {
|
||||
logValues := make([]interface{}, 0, 7)
|
||||
logValues = append(logValues, "scope", s.name, "edge", e.name, "direction", dir)
|
||||
var limitErr *errStreamLimitExceeded
|
||||
var limitErr *errStreamOrConnLimitExceeded
|
||||
if errors.As(err, &limitErr) {
|
||||
logValues = limitErr.AppendLogValues(logValues)
|
||||
}
|
||||
@ -496,7 +526,14 @@ func (s *resourceScope) AddConn(dir network.Direction, usefd bool) error {
|
||||
}
|
||||
|
||||
if err := s.rc.addConn(dir, usefd); err != nil {
|
||||
log.Debugw("blocked connection", "scope", s.name, "direction", dir, "usefd", usefd, "stat", s.rc.stat(), "error", err)
|
||||
logValues := make([]interface{}, 0, 7)
|
||||
logValues = append(logValues, "scope", s.name, "direction", dir, "usefd", usefd)
|
||||
var limitErr *errStreamOrConnLimitExceeded
|
||||
if errors.As(err, &limitErr) {
|
||||
logValues = limitErr.AppendLogValues(logValues)
|
||||
}
|
||||
logValues = append(logValues, "error", err)
|
||||
log.Debugw("blocked connection", logValues...)
|
||||
s.trace.BlockAddConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
|
||||
return s.wrapError(err)
|
||||
}
|
||||
@ -519,7 +556,14 @@ func (s *resourceScope) addConnForEdges(dir network.Direction, usefd bool) error
|
||||
var reserved int
|
||||
for _, e := range s.edges {
|
||||
if err = e.AddConnForChild(dir, usefd); err != nil {
|
||||
log.Debugw("blocked connection from constraining edge", "scope", s.name, "edge", e.name, "direction", dir, "usefd", usefd, "stat", e.Stat(), "error", err)
|
||||
logValues := make([]interface{}, 0, 7)
|
||||
logValues = append(logValues, "scope", s.name, "edge", e.name, "direction", dir, "usefd", usefd)
|
||||
var limitErr *errStreamOrConnLimitExceeded
|
||||
if errors.As(err, &limitErr) {
|
||||
logValues = limitErr.AppendLogValues(logValues)
|
||||
}
|
||||
logValues = append(logValues, "error", err)
|
||||
log.Debugw("blocked connection from constraining edge", logValues...)
|
||||
break
|
||||
}
|
||||
reserved++
|
||||
|
Loading…
Reference in New Issue
Block a user