blocked memory reservation metrics

This commit is contained in:
vyzo 2022-02-12 13:59:46 +02:00
parent 019fe3da54
commit 8a85c1fbce
3 changed files with 60 additions and 33 deletions

View File

@ -22,6 +22,8 @@ type MetricsReporter interface {
BlockService(svc string)
// BlockedServicePeer is invoekd when setting the service for a stream is blocked at the per service peer scope
BlockServicePeer(svc string, p peer.ID)
// BlockMemory is invoked when a memory reservation fails
BlockMemory(size int)
}
type metrics struct {
@ -91,3 +93,11 @@ func (m *metrics) BlockServicePeer(svc string, p peer.ID) {
m.reporter.BlockServicePeer(svc, p)
}
func (m *metrics) BlockMemory(size int) {
if m == nil {
return
}
m.reporter.BlockMemory(size)
}

View File

@ -363,56 +363,68 @@ func (r *resourceManager) gc() {
func newSystemScope(limit Limit, rcmgr *resourceManager) *systemScope {
return &systemScope{
resourceScope: newResourceScope(limit, nil, "system", rcmgr.trace),
resourceScope: newResourceScope(limit, nil, "system", rcmgr.trace, rcmgr.metrics),
}
}
func newTransientScope(limit Limit, rcmgr *resourceManager) *transientScope {
return &transientScope{
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, "transient", rcmgr.trace),
system: rcmgr.system,
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.system.resourceScope},
"transient", rcmgr.trace, rcmgr.metrics),
system: rcmgr.system,
}
}
func newServiceScope(name string, limit Limit, rcmgr *resourceManager) *serviceScope {
return &serviceScope{
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("service:%s", name), rcmgr.trace),
name: name,
rcmgr: rcmgr,
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.system.resourceScope},
fmt.Sprintf("service:%s", name), rcmgr.trace, rcmgr.metrics),
name: name,
rcmgr: rcmgr,
}
}
func newProtocolScope(proto protocol.ID, limit Limit, rcmgr *resourceManager) *protocolScope {
return &protocolScope{
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("protocol:%s", proto), rcmgr.trace),
proto: proto,
rcmgr: rcmgr,
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.system.resourceScope},
fmt.Sprintf("protocol:%s", proto), rcmgr.trace, rcmgr.metrics),
proto: proto,
rcmgr: rcmgr,
}
}
func newPeerScope(p peer.ID, limit Limit, rcmgr *resourceManager) *peerScope {
return &peerScope{
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("peer:%s", p), rcmgr.trace),
peer: p,
rcmgr: rcmgr,
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.system.resourceScope},
fmt.Sprintf("peer:%s", p), rcmgr.trace, rcmgr.metrics),
peer: p,
rcmgr: rcmgr,
}
}
func newConnectionScope(dir network.Direction, usefd bool, limit Limit, rcmgr *resourceManager) *connectionScope {
return &connectionScope{
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.transient.resourceScope, rcmgr.system.resourceScope}, fmt.Sprintf("conn-%d", rcmgr.nextConnId()), rcmgr.trace),
dir: dir,
usefd: usefd,
rcmgr: rcmgr,
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.transient.resourceScope, rcmgr.system.resourceScope},
fmt.Sprintf("conn-%d", rcmgr.nextConnId()), rcmgr.trace, rcmgr.metrics),
dir: dir,
usefd: usefd,
rcmgr: rcmgr,
}
}
func newStreamScope(dir network.Direction, limit Limit, peer *peerScope, rcmgr *resourceManager) *streamScope {
return &streamScope{
resourceScope: newResourceScope(limit, []*resourceScope{peer.resourceScope, rcmgr.transient.resourceScope, rcmgr.system.resourceScope}, fmt.Sprintf("stream-%d", rcmgr.nextStreamId()), rcmgr.trace),
dir: dir,
rcmgr: peer.rcmgr,
peer: peer,
resourceScope: newResourceScope(limit,
[]*resourceScope{peer.resourceScope, rcmgr.transient.resourceScope, rcmgr.system.resourceScope},
fmt.Sprintf("stream-%d", rcmgr.nextStreamId()), rcmgr.trace, rcmgr.metrics),
dir: dir,
rcmgr: peer.rcmgr,
peer: peer,
}
}
@ -436,7 +448,7 @@ func (s *serviceScope) getPeerScope(p peer.ID) *resourceScope {
s.peers = make(map[peer.ID]*resourceScope)
}
ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace)
ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace, s.rcmgr.metrics)
s.peers[p] = ps
ps.IncRef()
@ -463,7 +475,7 @@ func (s *protocolScope) getPeerScope(p peer.ID) *resourceScope {
s.peers = make(map[peer.ID]*resourceScope)
}
ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace)
ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace, s.rcmgr.metrics)
s.peers[p] = ps
ps.IncRef()

View File

@ -36,22 +36,24 @@ type resourceScope struct {
owner *resourceScope // set in span scopes, which define trees
edges []*resourceScope // set in DAG scopes, it's the linearized parent set
name string // for debugging purposes
trace *trace // debug tracing
name string // for debugging purposes
trace *trace // debug tracing
metrics *metrics // metrics collection
}
var _ network.ResourceScope = (*resourceScope)(nil)
var _ network.ResourceScopeSpan = (*resourceScope)(nil)
func newResourceScope(limit Limit, edges []*resourceScope, name string, trace *trace) *resourceScope {
func newResourceScope(limit Limit, edges []*resourceScope, name string, trace *trace, metrics *metrics) *resourceScope {
for _, e := range edges {
e.IncRef()
}
r := &resourceScope{
rc: resources{limit: limit},
edges: edges,
name: name,
trace: trace,
rc: resources{limit: limit},
edges: edges,
name: name,
trace: trace,
metrics: metrics,
}
r.trace.CreateScope(name, limit)
return r
@ -59,10 +61,11 @@ func newResourceScope(limit Limit, edges []*resourceScope, name string, trace *t
func newResourceScopeSpan(owner *resourceScope) *resourceScope {
r := &resourceScope{
rc: resources{limit: owner.rc.limit},
owner: owner,
name: fmt.Sprintf("%s.span", owner.name),
trace: owner.trace,
rc: resources{limit: owner.rc.limit},
owner: owner,
name: fmt.Sprintf("%s.span", owner.name),
trace: owner.trace,
metrics: owner.metrics,
}
r.trace.CreateScope(r.name, r.rc.limit)
return r
@ -238,6 +241,7 @@ func (s *resourceScope) ReserveMemory(size int, prio uint8) error {
if err := s.rc.reserveMemory(int64(size), prio); err != nil {
log.Debugw("blocked memory reservation", "scope", s.name, "size", size, "priority", prio, "error", err)
s.trace.BlockReserveMemory(s.name, prio, int64(size), s.rc.memory)
s.metrics.BlockMemory(size)
return s.wrapError(err)
}
@ -245,6 +249,7 @@ func (s *resourceScope) ReserveMemory(size int, prio uint8) error {
log.Debugw("blocked memory reservation from constraining edge", "scope", s.name, "size", size, "priority", prio, "error", err)
s.rc.releaseMemory(int64(size))
s.metrics.BlockMemory(size)
return s.wrapError(err)
}