diff --git a/metrics.go b/metrics.go index 71632d9..2342d0b 100644 --- a/metrics.go +++ b/metrics.go @@ -22,6 +22,8 @@ type MetricsReporter interface { BlockService(svc string) // BlockedServicePeer is invoekd when setting the service for a stream is blocked at the per service peer scope BlockServicePeer(svc string, p peer.ID) + // BlockMemory is invoked when a memory reservation fails + BlockMemory(size int) } type metrics struct { @@ -91,3 +93,11 @@ func (m *metrics) BlockServicePeer(svc string, p peer.ID) { m.reporter.BlockServicePeer(svc, p) } + +func (m *metrics) BlockMemory(size int) { + if m == nil { + return + } + + m.reporter.BlockMemory(size) +} diff --git a/rcmgr.go b/rcmgr.go index 2762475..3a07227 100644 --- a/rcmgr.go +++ b/rcmgr.go @@ -363,56 +363,68 @@ func (r *resourceManager) gc() { func newSystemScope(limit Limit, rcmgr *resourceManager) *systemScope { return &systemScope{ - resourceScope: newResourceScope(limit, nil, "system", rcmgr.trace), + resourceScope: newResourceScope(limit, nil, "system", rcmgr.trace, rcmgr.metrics), } } func newTransientScope(limit Limit, rcmgr *resourceManager) *transientScope { return &transientScope{ - resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, "transient", rcmgr.trace), - system: rcmgr.system, + resourceScope: newResourceScope(limit, + []*resourceScope{rcmgr.system.resourceScope}, + "transient", rcmgr.trace, rcmgr.metrics), + system: rcmgr.system, } } func newServiceScope(name string, limit Limit, rcmgr *resourceManager) *serviceScope { return &serviceScope{ - resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("service:%s", name), rcmgr.trace), - name: name, - rcmgr: rcmgr, + resourceScope: newResourceScope(limit, + []*resourceScope{rcmgr.system.resourceScope}, + fmt.Sprintf("service:%s", name), rcmgr.trace, rcmgr.metrics), + name: name, + rcmgr: rcmgr, } } func newProtocolScope(proto protocol.ID, limit Limit, rcmgr *resourceManager) *protocolScope { return &protocolScope{ - resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("protocol:%s", proto), rcmgr.trace), - proto: proto, - rcmgr: rcmgr, + resourceScope: newResourceScope(limit, + []*resourceScope{rcmgr.system.resourceScope}, + fmt.Sprintf("protocol:%s", proto), rcmgr.trace, rcmgr.metrics), + proto: proto, + rcmgr: rcmgr, } } func newPeerScope(p peer.ID, limit Limit, rcmgr *resourceManager) *peerScope { return &peerScope{ - resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("peer:%s", p), rcmgr.trace), - peer: p, - rcmgr: rcmgr, + resourceScope: newResourceScope(limit, + []*resourceScope{rcmgr.system.resourceScope}, + fmt.Sprintf("peer:%s", p), rcmgr.trace, rcmgr.metrics), + peer: p, + rcmgr: rcmgr, } } func newConnectionScope(dir network.Direction, usefd bool, limit Limit, rcmgr *resourceManager) *connectionScope { return &connectionScope{ - resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.transient.resourceScope, rcmgr.system.resourceScope}, fmt.Sprintf("conn-%d", rcmgr.nextConnId()), rcmgr.trace), - dir: dir, - usefd: usefd, - rcmgr: rcmgr, + resourceScope: newResourceScope(limit, + []*resourceScope{rcmgr.transient.resourceScope, rcmgr.system.resourceScope}, + fmt.Sprintf("conn-%d", rcmgr.nextConnId()), rcmgr.trace, rcmgr.metrics), + dir: dir, + usefd: usefd, + rcmgr: rcmgr, } } func newStreamScope(dir network.Direction, limit Limit, peer *peerScope, rcmgr *resourceManager) *streamScope { return &streamScope{ - resourceScope: newResourceScope(limit, []*resourceScope{peer.resourceScope, rcmgr.transient.resourceScope, rcmgr.system.resourceScope}, fmt.Sprintf("stream-%d", rcmgr.nextStreamId()), rcmgr.trace), - dir: dir, - rcmgr: peer.rcmgr, - peer: peer, + resourceScope: newResourceScope(limit, + []*resourceScope{peer.resourceScope, rcmgr.transient.resourceScope, rcmgr.system.resourceScope}, + fmt.Sprintf("stream-%d", rcmgr.nextStreamId()), rcmgr.trace, rcmgr.metrics), + dir: dir, + rcmgr: peer.rcmgr, + peer: peer, } } @@ -436,7 +448,7 @@ func (s *serviceScope) getPeerScope(p peer.ID) *resourceScope { s.peers = make(map[peer.ID]*resourceScope) } - ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace) + ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace, s.rcmgr.metrics) s.peers[p] = ps ps.IncRef() @@ -463,7 +475,7 @@ func (s *protocolScope) getPeerScope(p peer.ID) *resourceScope { s.peers = make(map[peer.ID]*resourceScope) } - ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace) + ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace, s.rcmgr.metrics) s.peers[p] = ps ps.IncRef() diff --git a/scope.go b/scope.go index beb6160..9d42207 100644 --- a/scope.go +++ b/scope.go @@ -36,22 +36,24 @@ type resourceScope struct { owner *resourceScope // set in span scopes, which define trees edges []*resourceScope // set in DAG scopes, it's the linearized parent set - name string // for debugging purposes - trace *trace // debug tracing + name string // for debugging purposes + trace *trace // debug tracing + metrics *metrics // metrics collection } var _ network.ResourceScope = (*resourceScope)(nil) var _ network.ResourceScopeSpan = (*resourceScope)(nil) -func newResourceScope(limit Limit, edges []*resourceScope, name string, trace *trace) *resourceScope { +func newResourceScope(limit Limit, edges []*resourceScope, name string, trace *trace, metrics *metrics) *resourceScope { for _, e := range edges { e.IncRef() } r := &resourceScope{ - rc: resources{limit: limit}, - edges: edges, - name: name, - trace: trace, + rc: resources{limit: limit}, + edges: edges, + name: name, + trace: trace, + metrics: metrics, } r.trace.CreateScope(name, limit) return r @@ -59,10 +61,11 @@ func newResourceScope(limit Limit, edges []*resourceScope, name string, trace *t func newResourceScopeSpan(owner *resourceScope) *resourceScope { r := &resourceScope{ - rc: resources{limit: owner.rc.limit}, - owner: owner, - name: fmt.Sprintf("%s.span", owner.name), - trace: owner.trace, + rc: resources{limit: owner.rc.limit}, + owner: owner, + name: fmt.Sprintf("%s.span", owner.name), + trace: owner.trace, + metrics: owner.metrics, } r.trace.CreateScope(r.name, r.rc.limit) return r @@ -238,6 +241,7 @@ func (s *resourceScope) ReserveMemory(size int, prio uint8) error { if err := s.rc.reserveMemory(int64(size), prio); err != nil { log.Debugw("blocked memory reservation", "scope", s.name, "size", size, "priority", prio, "error", err) s.trace.BlockReserveMemory(s.name, prio, int64(size), s.rc.memory) + s.metrics.BlockMemory(size) return s.wrapError(err) } @@ -245,6 +249,7 @@ func (s *resourceScope) ReserveMemory(size int, prio uint8) error { log.Debugw("blocked memory reservation from constraining edge", "scope", s.name, "size", size, "priority", prio, "error", err) s.rc.releaseMemory(int64(size)) + s.metrics.BlockMemory(size) return s.wrapError(err) }