mirror of
synced 2025-02-05 01:00:19 +08:00
This allows for easy filtering using standard JSON tooling (e.g. jq) to filter for a specific scope.
797 lines
19 KiB
797 lines
19 KiB
package rcmgr
import (
// resources tracks the current state of resource consumption
type resources struct {
limit Limit
nconnsIn, nconnsOut int
nstreamsIn, nstreamsOut int
nfd int
memory int64
type scopeName struct {
IsSystem, IsTransient bool
ConnID, StreamID, SpanID int64
Service string
Protocol protocol.ID
Peer peer.ID
func addToClass(cl, str string) string {
if cl == "" {
return str
return cl + "-" + str
func (s scopeName) MarshalJSON() ([]byte, error) {
var class string
if s.IsSystem {
class = "system"
if s.IsTransient {
class = "transient"
if s.Service != "" {
class = "service"
if s.Protocol != "" {
class = "protocol"
if s.Peer != "" {
class = addToClass(class, "peer")
if s.ConnID > 0 {
class = "conn"
if s.StreamID > 0 {
class = "stream"
if s.SpanID > 0 {
class = addToClass(class, "span")
return json.Marshal(struct {
Class string `json:"class,omitempty"`
Service string `json:"service,omitempty"`
Protocol string `json:"protocol,omitempty"`
Peer string `json:"peer,omitempty"`
Conn int64 `json:"conn,omitempty"`
Stream int64 `json:"stream,omitempty"`
Span int64 `json:"span,omitempty"`
Class: class,
Service: s.Service,
Protocol: string(s.Protocol),
Peer: s.Peer.String(),
Conn: s.ConnID,
Stream: s.StreamID,
Span: s.SpanID,
func (s scopeName) String() string {
var name string
if s.IsSystem {
name = "system"
if s.IsTransient {
name = "transient"
if s.Service != "" {
name = "service"
if s.Protocol != "" {
name = "protocol"
if s.Peer != "" {
name = addToClass(name, "peer:"+s.Peer.String())
if s.ConnID > 0 {
name = fmt.Sprintf("conn-%d", s.ConnID)
if s.StreamID > 0 {
name = fmt.Sprintf("stream-%d", s.StreamID)
if s.SpanID > 0 {
name = addToClass(name, fmt.Sprintf("span:%d", s.SpanID))
return name
// A resourceScope can be a DAG, where a downstream node is not allowed to outlive an upstream node
// (ie cannot call Done in the upstream node before the downstream node) and account for resources
// using a linearized parent set.
// A resourceScope can be a span scope, where it has a specific owner; span scopes create a tree rooted
// at the owner (which can be a DAG scope) and can outlive their parents -- this is important because
// span scopes are the main *user* interface for memory management, and the user may call
// Done in a span scope after the system has closed the root of the span tree in some background
// goroutine.
// If we didn't make this distinction we would have a double release problem in that case.
type resourceScope struct {
done bool
refCnt int
spanID int64
rc resources
owner *resourceScope // set in span scopes, which define trees
edges []*resourceScope // set in DAG scopes, it's the linearized parent set
name scopeName // for debugging purposes
trace *trace // debug tracing
metrics *metrics // metrics collection
var _ network.ResourceScope = (*resourceScope)(nil)
var _ network.ResourceScopeSpan = (*resourceScope)(nil)
func newResourceScope(limit Limit, edges []*resourceScope, name scopeName, trace *trace, metrics *metrics) *resourceScope {
for _, e := range edges {
r := &resourceScope{
rc: resources{limit: limit},
edges: edges,
name: name,
trace: trace,
metrics: metrics,
r.trace.CreateScope(name, limit)
return r
func newResourceScopeSpan(owner *resourceScope, id int64) *resourceScope {
sn := owner.name
sn.SpanID = id
r := &resourceScope{
rc: resources{limit: owner.rc.limit},
owner: owner,
name: sn,
trace: owner.trace,
metrics: owner.metrics,
r.trace.CreateScope(r.name, r.rc.limit)
return r
// Resources implementation
func (rc *resources) checkMemory(rsvp int64, prio uint8) error {
// overflow check; this also has the side effect that we cannot reserve negative memory.
newmem := rc.memory + rsvp
limit := rc.limit.GetMemoryLimit()
threshold := (1 + int64(prio)) * limit / 256
if newmem > threshold {
return network.ErrResourceLimitExceeded
return nil
func (rc *resources) reserveMemory(size int64, prio uint8) error {
if err := rc.checkMemory(size, prio); err != nil {
return err
rc.memory += size
return nil
func (rc *resources) releaseMemory(size int64) {
rc.memory -= size
// sanity check for bugs upstream
if rc.memory < 0 {
log.Warn("BUG: too much memory released")
rc.memory = 0
func (rc *resources) addStream(dir network.Direction) error {
if dir == network.DirInbound {
return rc.addStreams(1, 0)
return rc.addStreams(0, 1)
func (rc *resources) addStreams(incount, outcount int) error {
if incount > 0 && rc.nstreamsIn+incount > rc.limit.GetStreamLimit(network.DirInbound) {
return fmt.Errorf("cannot reserve stream: %w", network.ErrResourceLimitExceeded)
if outcount > 0 && rc.nstreamsOut+outcount > rc.limit.GetStreamLimit(network.DirOutbound) {
return fmt.Errorf("cannot reserve stream: %w", network.ErrResourceLimitExceeded)
if rc.nstreamsIn+incount+rc.nstreamsOut+outcount > rc.limit.GetStreamTotalLimit() {
return fmt.Errorf("cannot reserve stream: %w", network.ErrResourceLimitExceeded)
rc.nstreamsIn += incount
rc.nstreamsOut += outcount
return nil
func (rc *resources) removeStream(dir network.Direction) {
if dir == network.DirInbound {
rc.removeStreams(1, 0)
} else {
rc.removeStreams(0, 1)
func (rc *resources) removeStreams(incount, outcount int) {
rc.nstreamsIn -= incount
rc.nstreamsOut -= outcount
if rc.nstreamsIn < 0 {
log.Warn("BUG: too many inbound streams released")
rc.nstreamsIn = 0
if rc.nstreamsOut < 0 {
log.Warn("BUG: too many outbound streams released")
rc.nstreamsOut = 0
func (rc *resources) addConn(dir network.Direction, usefd bool) error {
var fd int
if usefd {
fd = 1
if dir == network.DirInbound {
return rc.addConns(1, 0, fd)
return rc.addConns(0, 1, fd)
func (rc *resources) addConns(incount, outcount, fdcount int) error {
if incount > 0 && rc.nconnsIn+incount > rc.limit.GetConnLimit(network.DirInbound) {
return fmt.Errorf("cannot reserve connection: %w", network.ErrResourceLimitExceeded)
if outcount > 0 && rc.nconnsOut+outcount > rc.limit.GetConnLimit(network.DirOutbound) {
return fmt.Errorf("cannot reserve connection: %w", network.ErrResourceLimitExceeded)
if rc.nconnsIn+incount+rc.nconnsOut+outcount > rc.limit.GetConnTotalLimit() {
return fmt.Errorf("cannot reserve connection: %w", network.ErrResourceLimitExceeded)
if fdcount > 0 && rc.nfd+fdcount > rc.limit.GetFDLimit() {
return fmt.Errorf("cannot reserve file descriptor: %w", network.ErrResourceLimitExceeded)
rc.nconnsIn += incount
rc.nconnsOut += outcount
rc.nfd += fdcount
return nil
func (rc *resources) removeConn(dir network.Direction, usefd bool) {
var fd int
if usefd {
fd = 1
if dir == network.DirInbound {
rc.removeConns(1, 0, fd)
} else {
rc.removeConns(0, 1, fd)
func (rc *resources) removeConns(incount, outcount, fdcount int) {
rc.nconnsIn -= incount
rc.nconnsOut -= outcount
rc.nfd -= fdcount
if rc.nconnsIn < 0 {
log.Warn("BUG: too many inbound connections released")
rc.nconnsIn = 0
if rc.nconnsOut < 0 {
log.Warn("BUG: too many outbound connections released")
rc.nconnsOut = 0
if rc.nfd < 0 {
log.Warn("BUG: too many file descriptors released")
rc.nfd = 0
func (rc *resources) stat() network.ScopeStat {
return network.ScopeStat{
Memory: rc.memory,
NumStreamsInbound: rc.nstreamsIn,
NumStreamsOutbound: rc.nstreamsOut,
NumConnsInbound: rc.nconnsIn,
NumConnsOutbound: rc.nconnsOut,
NumFD: rc.nfd,
// resourceScope implementation
func (s *resourceScope) wrapError(err error) error {
return fmt.Errorf("%s: %w", s.name.String(), err)
func (s *resourceScope) ReserveMemory(size int, prio uint8) error {
defer s.Unlock()
if s.done {
return s.wrapError(network.ErrResourceScopeClosed)
if err := s.rc.reserveMemory(int64(size), prio); err != nil {
log.Debugw("blocked memory reservation", "scope", s.name, "size", size, "priority", prio, "stat", s.rc.stat(), "error", err)
s.trace.BlockReserveMemory(s.name, prio, int64(size), s.rc.memory)
return s.wrapError(err)
if err := s.reserveMemoryForEdges(size, prio); err != nil {
return s.wrapError(err)
s.trace.ReserveMemory(s.name, prio, int64(size), s.rc.memory)
return nil
func (s *resourceScope) reserveMemoryForEdges(size int, prio uint8) error {
if s.owner != nil {
return s.owner.ReserveMemory(size, prio)
var reserved int
var err error
for _, e := range s.edges {
if err = e.ReserveMemoryForChild(int64(size), prio); err != nil {
log.Debugw("blocked memory reservation from constraining edge", "scope", s.name, "edge", e.name, "size", size, "priority", prio, "stat", e.Stat(), "error", err)
if err != nil {
// we failed because of a constraint; undo memory reservations
for _, e := range s.edges[:reserved] {
return err
func (s *resourceScope) releaseMemoryForEdges(size int) {
if s.owner != nil {
for _, e := range s.edges {
func (s *resourceScope) ReserveMemoryForChild(size int64, prio uint8) error {
defer s.Unlock()
if s.done {
return s.wrapError(network.ErrResourceScopeClosed)
if err := s.rc.reserveMemory(size, prio); err != nil {
s.trace.BlockReserveMemory(s.name, prio, size, s.rc.memory)
return s.wrapError(err)
s.trace.ReserveMemory(s.name, prio, size, s.rc.memory)
return nil
func (s *resourceScope) ReleaseMemory(size int) {
defer s.Unlock()
if s.done {
s.trace.ReleaseMemory(s.name, int64(size), s.rc.memory)
func (s *resourceScope) ReleaseMemoryForChild(size int64) {
defer s.Unlock()
if s.done {
s.trace.ReleaseMemory(s.name, size, s.rc.memory)
func (s *resourceScope) AddStream(dir network.Direction) error {
defer s.Unlock()
if s.done {
return s.wrapError(network.ErrResourceScopeClosed)
if err := s.rc.addStream(dir); err != nil {
log.Debugw("blocked stream", "scope", s.name, "direction", dir, "stat", s.rc.stat(), "error", err)
s.trace.BlockAddStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
return s.wrapError(err)
if err := s.addStreamForEdges(dir); err != nil {
return s.wrapError(err)
s.trace.AddStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
return nil
func (s *resourceScope) addStreamForEdges(dir network.Direction) error {
if s.owner != nil {
return s.owner.AddStream(dir)
var err error
var reserved int
for _, e := range s.edges {
if err = e.AddStreamForChild(dir); err != nil {
log.Debugw("blocked stream from constraining edge", "scope", s.name, "edge", e.name, "direction", dir, "stat", e.Stat(), "error", err)
if err != nil {
for _, e := range s.edges[:reserved] {
return err
func (s *resourceScope) AddStreamForChild(dir network.Direction) error {
defer s.Unlock()
if s.done {
return s.wrapError(network.ErrResourceScopeClosed)
if err := s.rc.addStream(dir); err != nil {
s.trace.BlockAddStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
return s.wrapError(err)
s.trace.AddStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
return nil
func (s *resourceScope) RemoveStream(dir network.Direction) {
defer s.Unlock()
if s.done {
s.trace.RemoveStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
func (s *resourceScope) removeStreamForEdges(dir network.Direction) {
if s.owner != nil {
for _, e := range s.edges {
func (s *resourceScope) RemoveStreamForChild(dir network.Direction) {
defer s.Unlock()
if s.done {
s.trace.RemoveStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
func (s *resourceScope) AddConn(dir network.Direction, usefd bool) error {
defer s.Unlock()
if s.done {
return s.wrapError(network.ErrResourceScopeClosed)
if err := s.rc.addConn(dir, usefd); err != nil {
log.Debugw("blocked connection", "scope", s.name, "direction", dir, "usefd", usefd, "stat", s.rc.stat(), "error", err)
s.trace.BlockAddConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
return s.wrapError(err)
if err := s.addConnForEdges(dir, usefd); err != nil {
s.rc.removeConn(dir, usefd)
return s.wrapError(err)
s.trace.AddConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
return nil
func (s *resourceScope) addConnForEdges(dir network.Direction, usefd bool) error {
if s.owner != nil {
return s.owner.AddConn(dir, usefd)
var err error
var reserved int
for _, e := range s.edges {
if err = e.AddConnForChild(dir, usefd); err != nil {
log.Debugw("blocked connection from constraining edge", "scope", s.name, "edge", e.name, "direction", dir, "usefd", usefd, "stat", e.Stat(), "error", err)
if err != nil {
for _, e := range s.edges[:reserved] {
e.RemoveConnForChild(dir, usefd)
return err
func (s *resourceScope) AddConnForChild(dir network.Direction, usefd bool) error {
defer s.Unlock()
if s.done {
return s.wrapError(network.ErrResourceScopeClosed)
if err := s.rc.addConn(dir, usefd); err != nil {
s.trace.BlockAddConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
return s.wrapError(err)
s.trace.AddConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
return nil
func (s *resourceScope) RemoveConn(dir network.Direction, usefd bool) {
defer s.Unlock()
if s.done {
s.rc.removeConn(dir, usefd)
s.removeConnForEdges(dir, usefd)
s.trace.RemoveConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
func (s *resourceScope) removeConnForEdges(dir network.Direction, usefd bool) {
if s.owner != nil {
s.owner.RemoveConn(dir, usefd)
for _, e := range s.edges {
e.RemoveConnForChild(dir, usefd)
func (s *resourceScope) RemoveConnForChild(dir network.Direction, usefd bool) {
defer s.Unlock()
if s.done {
s.rc.removeConn(dir, usefd)
s.trace.RemoveConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
func (s *resourceScope) ReserveForChild(st network.ScopeStat) error {
defer s.Unlock()
if s.done {
return s.wrapError(network.ErrResourceScopeClosed)
if err := s.rc.reserveMemory(st.Memory, network.ReservationPriorityAlways); err != nil {
s.trace.BlockReserveMemory(s.name, 255, st.Memory, s.rc.memory)
return s.wrapError(err)
if err := s.rc.addStreams(st.NumStreamsInbound, st.NumStreamsOutbound); err != nil {
s.trace.BlockAddStreams(s.name, st.NumStreamsInbound, st.NumStreamsOutbound, s.rc.nstreamsIn, s.rc.nstreamsOut)
return s.wrapError(err)
if err := s.rc.addConns(st.NumConnsInbound, st.NumConnsOutbound, st.NumFD); err != nil {
s.trace.BlockAddConns(s.name, st.NumConnsInbound, st.NumConnsOutbound, st.NumFD, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
s.rc.removeStreams(st.NumStreamsInbound, st.NumStreamsOutbound)
return s.wrapError(err)
s.trace.ReserveMemory(s.name, 255, st.Memory, s.rc.memory)
s.trace.AddStreams(s.name, st.NumStreamsInbound, st.NumStreamsOutbound, s.rc.nstreamsIn, s.rc.nstreamsOut)
s.trace.AddConns(s.name, st.NumConnsInbound, st.NumConnsOutbound, st.NumFD, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
return nil
func (s *resourceScope) ReleaseForChild(st network.ScopeStat) {
defer s.Unlock()
if s.done {
s.rc.removeStreams(st.NumStreamsInbound, st.NumStreamsOutbound)
s.rc.removeConns(st.NumConnsInbound, st.NumConnsOutbound, st.NumFD)
s.trace.ReleaseMemory(s.name, st.Memory, s.rc.memory)
s.trace.RemoveStreams(s.name, st.NumStreamsInbound, st.NumStreamsOutbound, s.rc.nstreamsIn, s.rc.nstreamsOut)
s.trace.RemoveConns(s.name, st.NumConnsInbound, st.NumConnsOutbound, st.NumFD, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
func (s *resourceScope) ReleaseResources(st network.ScopeStat) {
defer s.Unlock()
if s.done {
s.rc.removeStreams(st.NumStreamsInbound, st.NumStreamsOutbound)
s.rc.removeConns(st.NumConnsInbound, st.NumConnsOutbound, st.NumFD)
if s.owner != nil {
} else {
for _, e := range s.edges {
s.trace.ReleaseMemory(s.name, st.Memory, s.rc.memory)
s.trace.RemoveStreams(s.name, st.NumStreamsInbound, st.NumStreamsOutbound, s.rc.nstreamsIn, s.rc.nstreamsOut)
s.trace.RemoveConns(s.name, st.NumConnsInbound, st.NumConnsOutbound, st.NumFD, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
func (s *resourceScope) nextSpanID() int64 {
return s.spanID
func (s *resourceScope) BeginSpan() (network.ResourceScopeSpan, error) {
defer s.Unlock()
if s.done {
return nil, s.wrapError(network.ErrResourceScopeClosed)
return newResourceScopeSpan(s, s.nextSpanID()), nil
func (s *resourceScope) Done() {
defer s.Unlock()
if s.done {
stat := s.rc.stat()
if s.owner != nil {
} else {
for _, e := range s.edges {
s.rc.nstreamsIn = 0
s.rc.nstreamsOut = 0
s.rc.nconnsIn = 0
s.rc.nconnsOut = 0
s.rc.nfd = 0
s.rc.memory = 0
s.done = true
func (s *resourceScope) Stat() network.ScopeStat {
defer s.Unlock()
return s.rc.stat()
func (s *resourceScope) IncRef() {
defer s.Unlock()
func (s *resourceScope) DecRef() {
defer s.Unlock()
func (s *resourceScope) IsUnused() bool {
defer s.Unlock()
if s.done {
return true
if s.refCnt > 0 {
return false
st := s.rc.stat()
return st.NumStreamsInbound == 0 &&
st.NumStreamsOutbound == 0 &&
st.NumConnsInbound == 0 &&
st.NumConnsOutbound == 0 &&
st.NumFD == 0