Merge pull request #11 from libp2p/feat/metrics

metrics collection support
This commit is contained in:
vyzo 2022-02-14 19:14:23 +02:00 committed by GitHub
commit 0bf1f7df46
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 250 additions and 51 deletions

168
metrics.go Normal file
View File

@ -0,0 +1,168 @@
package rcmgr
import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)
// MetricsReporter is an interface for collecting metrics from resource manager actions
type MetricsReporter interface {
// AllowConn is invoked when opening a connection is allowed
AllowConn(dir network.Direction, usefd bool)
// BlockConn is invoked when opening a connection is blocked
BlockConn(dir network.Direction, usefd bool)
// AllowStream is invoked when opening a stream is allowed
AllowStream(p peer.ID, dir network.Direction)
// BlockStream is invoked when opening a stream is blocked
BlockStream(p peer.ID, dir network.Direction)
// AllowPeer is invoked when attaching ac onnection to a peer is allowed
AllowPeer(p peer.ID)
// BlockPeer is invoked when attaching ac onnection to a peer is blocked
BlockPeer(p peer.ID)
// AllowProtocol is invoked when setting the protocol for a stream is allowed
AllowProtocol(proto protocol.ID)
// BlockProtocol is invoked when setting the protocol for a stream is blocked
BlockProtocol(proto protocol.ID)
// BlockedProtocolPeer is invoekd when setting the protocol for a stream is blocked at the per protocol peer scope
BlockProtocolPeer(proto protocol.ID, p peer.ID)
// AllowPService is invoked when setting the protocol for a stream is allowed
AllowService(svc string)
// BlockPService is invoked when setting the protocol for a stream is blocked
BlockService(svc string)
// BlockedServicePeer is invoked when setting the service for a stream is blocked at the per service peer scope
BlockServicePeer(svc string, p peer.ID)
// AllowMemory is invoked when a memory reservation is allowed
AllowMemory(size int)
// BlockMemory is invoked when a memory reservation is blocked
BlockMemory(size int)
}
type metrics struct {
reporter MetricsReporter
}
// WithMetrics is a resource manager option to enable metrics collection
func WithMetrics(reporter MetricsReporter) Option {
return func(r *resourceManager) error {
r.metrics = &metrics{reporter: reporter}
return nil
}
}
func (m *metrics) AllowConn(dir network.Direction, usefd bool) {
if m == nil {
return
}
m.reporter.AllowConn(dir, usefd)
}
func (m *metrics) BlockConn(dir network.Direction, usefd bool) {
if m == nil {
return
}
m.reporter.BlockConn(dir, usefd)
}
func (m *metrics) AllowStream(p peer.ID, dir network.Direction) {
if m == nil {
return
}
m.reporter.AllowStream(p, dir)
}
func (m *metrics) BlockStream(p peer.ID, dir network.Direction) {
if m == nil {
return
}
m.reporter.BlockStream(p, dir)
}
func (m *metrics) AllowPeer(p peer.ID) {
if m == nil {
return
}
m.reporter.AllowPeer(p)
}
func (m *metrics) BlockPeer(p peer.ID) {
if m == nil {
return
}
m.reporter.BlockPeer(p)
}
func (m *metrics) AllowProtocol(proto protocol.ID) {
if m == nil {
return
}
m.reporter.AllowProtocol(proto)
}
func (m *metrics) BlockProtocol(proto protocol.ID) {
if m == nil {
return
}
m.reporter.BlockProtocol(proto)
}
func (m *metrics) BlockProtocolPeer(proto protocol.ID, p peer.ID) {
if m == nil {
return
}
m.reporter.BlockProtocolPeer(proto, p)
}
func (m *metrics) AllowService(svc string) {
if m == nil {
return
}
m.reporter.AllowService(svc)
}
func (m *metrics) BlockService(svc string) {
if m == nil {
return
}
m.reporter.BlockService(svc)
}
func (m *metrics) BlockServicePeer(svc string, p peer.ID) {
if m == nil {
return
}
m.reporter.BlockServicePeer(svc, p)
}
func (m *metrics) AllowMemory(size int) {
if m == nil {
return
}
m.reporter.AllowMemory(size)
}
func (m *metrics) BlockMemory(size int) {
if m == nil {
return
}
m.reporter.BlockMemory(size)
}

View File

@ -18,7 +18,8 @@ var log = logging.Logger("rcmgr")
type resourceManager struct {
limits Limiter
trace *trace
trace *trace
metrics *metrics
system *systemScope
transient *transientScope
@ -259,9 +260,11 @@ func (r *resourceManager) OpenConnection(dir network.Direction, usefd bool) (net
if err := conn.AddConn(dir, usefd); err != nil {
conn.Done()
r.metrics.BlockConn(dir, usefd)
return nil, err
}
r.metrics.AllowConn(dir, usefd)
return conn, nil
}
@ -273,9 +276,11 @@ func (r *resourceManager) OpenStream(p peer.ID, dir network.Direction) (network.
err := stream.AddStream(dir)
if err != nil {
stream.Done()
r.metrics.BlockStream(p, dir)
return nil, err
}
r.metrics.AllowStream(p, dir)
return stream, nil
}
@ -360,56 +365,68 @@ func (r *resourceManager) gc() {
func newSystemScope(limit Limit, rcmgr *resourceManager) *systemScope {
return &systemScope{
resourceScope: newResourceScope(limit, nil, "system", rcmgr.trace),
resourceScope: newResourceScope(limit, nil, "system", rcmgr.trace, rcmgr.metrics),
}
}
func newTransientScope(limit Limit, rcmgr *resourceManager) *transientScope {
return &transientScope{
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, "transient", rcmgr.trace),
system: rcmgr.system,
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.system.resourceScope},
"transient", rcmgr.trace, rcmgr.metrics),
system: rcmgr.system,
}
}
func newServiceScope(name string, limit Limit, rcmgr *resourceManager) *serviceScope {
return &serviceScope{
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("service:%s", name), rcmgr.trace),
name: name,
rcmgr: rcmgr,
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.system.resourceScope},
fmt.Sprintf("service:%s", name), rcmgr.trace, rcmgr.metrics),
name: name,
rcmgr: rcmgr,
}
}
func newProtocolScope(proto protocol.ID, limit Limit, rcmgr *resourceManager) *protocolScope {
return &protocolScope{
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("protocol:%s", proto), rcmgr.trace),
proto: proto,
rcmgr: rcmgr,
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.system.resourceScope},
fmt.Sprintf("protocol:%s", proto), rcmgr.trace, rcmgr.metrics),
proto: proto,
rcmgr: rcmgr,
}
}
func newPeerScope(p peer.ID, limit Limit, rcmgr *resourceManager) *peerScope {
return &peerScope{
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("peer:%s", p), rcmgr.trace),
peer: p,
rcmgr: rcmgr,
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.system.resourceScope},
fmt.Sprintf("peer:%s", p), rcmgr.trace, rcmgr.metrics),
peer: p,
rcmgr: rcmgr,
}
}
func newConnectionScope(dir network.Direction, usefd bool, limit Limit, rcmgr *resourceManager) *connectionScope {
return &connectionScope{
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.transient.resourceScope, rcmgr.system.resourceScope}, fmt.Sprintf("conn-%d", rcmgr.nextConnId()), rcmgr.trace),
dir: dir,
usefd: usefd,
rcmgr: rcmgr,
resourceScope: newResourceScope(limit,
[]*resourceScope{rcmgr.transient.resourceScope, rcmgr.system.resourceScope},
fmt.Sprintf("conn-%d", rcmgr.nextConnId()), rcmgr.trace, rcmgr.metrics),
dir: dir,
usefd: usefd,
rcmgr: rcmgr,
}
}
func newStreamScope(dir network.Direction, limit Limit, peer *peerScope, rcmgr *resourceManager) *streamScope {
return &streamScope{
resourceScope: newResourceScope(limit, []*resourceScope{peer.resourceScope, rcmgr.transient.resourceScope, rcmgr.system.resourceScope}, fmt.Sprintf("stream-%d", rcmgr.nextStreamId()), rcmgr.trace),
dir: dir,
rcmgr: peer.rcmgr,
peer: peer,
resourceScope: newResourceScope(limit,
[]*resourceScope{peer.resourceScope, rcmgr.transient.resourceScope, rcmgr.system.resourceScope},
fmt.Sprintf("stream-%d", rcmgr.nextStreamId()), rcmgr.trace, rcmgr.metrics),
dir: dir,
rcmgr: peer.rcmgr,
peer: peer,
}
}
@ -433,7 +450,7 @@ func (s *serviceScope) getPeerScope(p peer.ID) *resourceScope {
s.peers = make(map[peer.ID]*resourceScope)
}
ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace)
ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace, s.rcmgr.metrics)
s.peers[p] = ps
ps.IncRef()
@ -460,7 +477,7 @@ func (s *protocolScope) getPeerScope(p peer.ID) *resourceScope {
s.peers = make(map[peer.ID]*resourceScope)
}
ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace)
ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace, s.rcmgr.metrics)
s.peers[p] = ps
ps.IncRef()
@ -497,6 +514,7 @@ func (s *connectionScope) SetPeer(p peer.ID) error {
if err := s.peer.ReserveForChild(stat); err != nil {
s.peer.DecRef()
s.peer = nil
s.rcmgr.metrics.BlockPeer(p)
return err
}
@ -510,6 +528,7 @@ func (s *connectionScope) SetPeer(p peer.ID) error {
}
s.resourceScope.edges = edges
s.rcmgr.metrics.AllowPeer(p)
return nil
}
@ -540,6 +559,7 @@ func (s *streamScope) SetProtocol(proto protocol.ID) error {
if err := s.proto.ReserveForChild(stat); err != nil {
s.proto.DecRef()
s.proto = nil
s.rcmgr.metrics.BlockProtocol(proto)
return err
}
@ -550,6 +570,7 @@ func (s *streamScope) SetProtocol(proto protocol.ID) error {
s.proto = nil
s.peerProtoScope.DecRef()
s.peerProtoScope = nil
s.rcmgr.metrics.BlockProtocolPeer(proto, s.peer.peer)
return err
}
@ -565,6 +586,7 @@ func (s *streamScope) SetProtocol(proto protocol.ID) error {
}
s.resourceScope.edges = edges
s.rcmgr.metrics.AllowProtocol(proto)
return nil
}
@ -598,6 +620,7 @@ func (s *streamScope) SetService(svc string) error {
if err := s.svc.ReserveForChild(stat); err != nil {
s.svc.DecRef()
s.svc = nil
s.rcmgr.metrics.BlockService(svc)
return err
}
@ -609,6 +632,7 @@ func (s *streamScope) SetService(svc string) error {
s.svc = nil
s.peerSvcScope.DecRef()
s.peerSvcScope = nil
s.rcmgr.metrics.BlockServicePeer(svc, s.peer.peer)
return err
}
@ -623,6 +647,7 @@ func (s *streamScope) SetService(svc string) error {
}
s.resourceScope.edges = edges
s.rcmgr.metrics.AllowService(svc)
return nil
}

View File

@ -36,22 +36,24 @@ type resourceScope struct {
owner *resourceScope // set in span scopes, which define trees
edges []*resourceScope // set in DAG scopes, it's the linearized parent set
name string // for debugging purposes
trace *trace // debug tracing
name string // for debugging purposes
trace *trace // debug tracing
metrics *metrics // metrics collection
}
var _ network.ResourceScope = (*resourceScope)(nil)
var _ network.ResourceScopeSpan = (*resourceScope)(nil)
func newResourceScope(limit Limit, edges []*resourceScope, name string, trace *trace) *resourceScope {
func newResourceScope(limit Limit, edges []*resourceScope, name string, trace *trace, metrics *metrics) *resourceScope {
for _, e := range edges {
e.IncRef()
}
r := &resourceScope{
rc: resources{limit: limit},
edges: edges,
name: name,
trace: trace,
rc: resources{limit: limit},
edges: edges,
name: name,
trace: trace,
metrics: metrics,
}
r.trace.CreateScope(name, limit)
return r
@ -59,10 +61,11 @@ func newResourceScope(limit Limit, edges []*resourceScope, name string, trace *t
func newResourceScopeSpan(owner *resourceScope) *resourceScope {
r := &resourceScope{
rc: resources{limit: owner.rc.limit},
owner: owner,
name: fmt.Sprintf("%s.span", owner.name),
trace: owner.trace,
rc: resources{limit: owner.rc.limit},
owner: owner,
name: fmt.Sprintf("%s.span", owner.name),
trace: owner.trace,
metrics: owner.metrics,
}
r.trace.CreateScope(r.name, r.rc.limit)
return r
@ -238,6 +241,7 @@ func (s *resourceScope) ReserveMemory(size int, prio uint8) error {
if err := s.rc.reserveMemory(int64(size), prio); err != nil {
log.Debugw("blocked memory reservation", "scope", s.name, "size", size, "priority", prio, "error", err)
s.trace.BlockReserveMemory(s.name, prio, int64(size), s.rc.memory)
s.metrics.BlockMemory(size)
return s.wrapError(err)
}
@ -245,10 +249,12 @@ func (s *resourceScope) ReserveMemory(size int, prio uint8) error {
log.Debugw("blocked memory reservation from constraining edge", "scope", s.name, "size", size, "priority", prio, "error", err)
s.rc.releaseMemory(int64(size))
s.metrics.BlockMemory(size)
return s.wrapError(err)
}
s.trace.ReserveMemory(s.name, prio, int64(size), s.rc.memory)
s.metrics.AllowMemory(size)
return nil
}

View File

@ -257,7 +257,7 @@ func TestResourceScopeSimple(t *testing.T) {
FD: 1,
},
},
nil, "test", nil,
nil, "test", nil, nil,
)
s.IncRef()
@ -391,7 +391,7 @@ func TestResourceScopeTxnBasic(t *testing.T) {
FD: 1,
},
},
nil, "test", nil,
nil, "test", nil, nil,
)
txn, err := s.BeginSpan()
@ -428,7 +428,7 @@ func TestResourceScopeTxnZombie(t *testing.T) {
FD: 1,
},
},
nil, "test", nil,
nil, "test", nil, nil,
)
txn1, err := s.BeginSpan()
@ -472,7 +472,7 @@ func TestResourceScopeTxnTree(t *testing.T) {
FD: 1,
},
},
nil, "test", nil,
nil, "test", nil, nil,
)
txn1, err := s.BeginSpan()
@ -583,7 +583,7 @@ func TestResourceScopeDAG(t *testing.T) {
FD: 4,
},
},
nil, "test", nil,
nil, "test", nil, nil,
)
s2 := newResourceScope(
&StaticLimit{
@ -598,7 +598,7 @@ func TestResourceScopeDAG(t *testing.T) {
FD: 2,
},
},
[]*resourceScope{s1}, "test", nil,
[]*resourceScope{s1}, "test", nil, nil,
)
s3 := newResourceScope(
&StaticLimit{
@ -613,7 +613,7 @@ func TestResourceScopeDAG(t *testing.T) {
FD: 2,
},
},
[]*resourceScope{s1}, "test", nil,
[]*resourceScope{s1}, "test", nil, nil,
)
s4 := newResourceScope(
&StaticLimit{
@ -628,7 +628,7 @@ func TestResourceScopeDAG(t *testing.T) {
FD: 2,
},
},
[]*resourceScope{s2, s3, s1}, "test", nil,
[]*resourceScope{s2, s3, s1}, "test", nil, nil,
)
s5 := newResourceScope(
&StaticLimit{
@ -643,7 +643,7 @@ func TestResourceScopeDAG(t *testing.T) {
FD: 2,
},
},
[]*resourceScope{s2, s1}, "test", nil,
[]*resourceScope{s2, s1}, "test", nil, nil,
)
s6 := newResourceScope(
&StaticLimit{
@ -658,7 +658,7 @@ func TestResourceScopeDAG(t *testing.T) {
FD: 2,
},
},
[]*resourceScope{s3, s1}, "test", nil,
[]*resourceScope{s3, s1}, "test", nil, nil,
)
if err := s4.ReserveMemory(1024, network.ReservationPriorityAlways); err != nil {
@ -1098,37 +1098,37 @@ func TestResourceScopeDAGTxn(t *testing.T) {
&StaticLimit{
Memory: 8192,
},
nil, "test", nil,
nil, "test", nil, nil,
)
s2 := newResourceScope(
&StaticLimit{
Memory: 4096 + 2048,
},
[]*resourceScope{s1}, "test", nil,
[]*resourceScope{s1}, "test", nil, nil,
)
s3 := newResourceScope(
&StaticLimit{
Memory: 4096 + 2048,
},
[]*resourceScope{s1}, "test", nil,
[]*resourceScope{s1}, "test", nil, nil,
)
s4 := newResourceScope(
&StaticLimit{
Memory: 4096 + 1024,
},
[]*resourceScope{s2, s3, s1}, "test", nil,
[]*resourceScope{s2, s3, s1}, "test", nil, nil,
)
s5 := newResourceScope(
&StaticLimit{
Memory: 4096 + 1024,
},
[]*resourceScope{s2, s1}, "test", nil,
[]*resourceScope{s2, s1}, "test", nil, nil,
)
s6 := newResourceScope(
&StaticLimit{
Memory: 4096 + 1024,
},
[]*resourceScope{s3, s1}, "test", nil,
[]*resourceScope{s3, s1}, "test", nil, nil,
)
txn4, err := s4.BeginSpan()

View File

@ -1,3 +1,3 @@
{
"version": "v0.1.3"
"version": "v0.1.4"
}