mirror of
https://github.com/libp2p/go-libp2p-resource-manager.git
synced 2025-01-27 12:50:07 +08:00
metrics collection support
This commit is contained in:
parent
840c01c4ce
commit
6d91069894
93
metrics.go
Normal file
93
metrics.go
Normal file
@ -0,0 +1,93 @@
|
||||
package rcmgr
|
||||
|
||||
import (
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/protocol"
|
||||
)
|
||||
|
||||
// MetricsReporter is an interface for collecting metrics from resource manager actions
|
||||
type MetricsReporter interface {
|
||||
// BlockOpenConn is invoked when opening a connection is blocked
|
||||
BlockOpenConn(dir network.Direction, usefd bool)
|
||||
// BlockOpenStream is invoked when opening a stream is blocked
|
||||
BlockOpenStream(p peer.ID, dir network.Direction)
|
||||
// BlockSetPeer is invoked when attaching ac onnection to a peer is blocked
|
||||
BlockSetPeer(p peer.ID)
|
||||
// BlockSetProtocol is invoked when setting the protocol for a stream is blocked
|
||||
BlockSetProtocol(proto protocol.ID)
|
||||
// BlockedSetProtocolPeer is invoekd when setting the protocol for a stream is blocked at the per protocol peer scope
|
||||
BlockSetProtocolPeer(proto protocol.ID, p peer.ID)
|
||||
// BlockSetPService is invoked when setting the protocol for a stream is blocked
|
||||
BlockSetService(svc string)
|
||||
// BlockedSetServicePeer is invoekd when setting the service for a stream is blocked at the per service peer scope
|
||||
BlockSetServicePeer(svc string, p peer.ID)
|
||||
}
|
||||
|
||||
type metrics struct {
|
||||
reporter MetricsReporter
|
||||
}
|
||||
|
||||
// WithMetrics is a resource manager option to enable metrics collection
|
||||
func WithMetrics(reporter MetricsReporter) Option {
|
||||
return func(r *resourceManager) error {
|
||||
r.metrics = &metrics{reporter: reporter}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (m *metrics) BlockOpenConn(dir network.Direction, usefd bool) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
|
||||
m.reporter.BlockOpenConn(dir, usefd)
|
||||
}
|
||||
|
||||
func (m *metrics) BlockOpenStream(p peer.ID, dir network.Direction) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
|
||||
m.reporter.BlockOpenStream(p, dir)
|
||||
}
|
||||
|
||||
func (m *metrics) BlockSetPeer(p peer.ID) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
|
||||
m.reporter.BlockSetPeer(p)
|
||||
}
|
||||
|
||||
func (m *metrics) BlockSetProtocol(proto protocol.ID) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
|
||||
m.reporter.BlockSetProtocol(proto)
|
||||
}
|
||||
|
||||
func (m *metrics) BlockSetProtocolPeer(proto protocol.ID, p peer.ID) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
|
||||
m.reporter.BlockSetProtocolPeer(proto, p)
|
||||
}
|
||||
|
||||
func (m *metrics) BlockSetService(svc string) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
|
||||
m.reporter.BlockSetService(svc)
|
||||
}
|
||||
|
||||
func (m *metrics) BlockSetServicePeer(svc string, p peer.ID) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
|
||||
m.reporter.BlockSetServicePeer(svc, p)
|
||||
}
|
10
rcmgr.go
10
rcmgr.go
@ -18,7 +18,8 @@ var log = logging.Logger("rcmgr")
|
||||
type resourceManager struct {
|
||||
limits Limiter
|
||||
|
||||
trace *trace
|
||||
trace *trace
|
||||
metrics *metrics
|
||||
|
||||
system *systemScope
|
||||
transient *transientScope
|
||||
@ -259,6 +260,7 @@ func (r *resourceManager) OpenConnection(dir network.Direction, usefd bool) (net
|
||||
|
||||
if err := conn.AddConn(dir, usefd); err != nil {
|
||||
conn.Done()
|
||||
r.metrics.BlockOpenConn(dir, usefd)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -273,6 +275,7 @@ func (r *resourceManager) OpenStream(p peer.ID, dir network.Direction) (network.
|
||||
err := stream.AddStream(dir)
|
||||
if err != nil {
|
||||
stream.Done()
|
||||
r.metrics.BlockOpenStream(p, dir)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -497,6 +500,7 @@ func (s *connectionScope) SetPeer(p peer.ID) error {
|
||||
if err := s.peer.ReserveForChild(stat); err != nil {
|
||||
s.peer.DecRef()
|
||||
s.peer = nil
|
||||
s.rcmgr.metrics.BlockSetPeer(p)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -540,6 +544,7 @@ func (s *streamScope) SetProtocol(proto protocol.ID) error {
|
||||
if err := s.proto.ReserveForChild(stat); err != nil {
|
||||
s.proto.DecRef()
|
||||
s.proto = nil
|
||||
s.rcmgr.metrics.BlockSetProtocol(proto)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -550,6 +555,7 @@ func (s *streamScope) SetProtocol(proto protocol.ID) error {
|
||||
s.proto = nil
|
||||
s.peerProtoScope.DecRef()
|
||||
s.peerProtoScope = nil
|
||||
s.rcmgr.metrics.BlockSetProtocolPeer(proto, s.peer.peer)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -598,6 +604,7 @@ func (s *streamScope) SetService(svc string) error {
|
||||
if err := s.svc.ReserveForChild(stat); err != nil {
|
||||
s.svc.DecRef()
|
||||
s.svc = nil
|
||||
s.rcmgr.metrics.BlockSetService(svc)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -609,6 +616,7 @@ func (s *streamScope) SetService(svc string) error {
|
||||
s.svc = nil
|
||||
s.peerSvcScope.DecRef()
|
||||
s.peerSvcScope = nil
|
||||
s.rcmgr.metrics.BlockSetServicePeer(svc, s.peer.peer)
|
||||
return err
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user