diff --git a/metrics.go b/metrics.go new file mode 100644 index 0000000..4686016 --- /dev/null +++ b/metrics.go @@ -0,0 +1,93 @@ +package rcmgr + +import ( + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" +) + +// MetricsReporter is an interface for collecting metrics from resource manager actions +type MetricsReporter interface { + // BlockOpenConn is invoked when opening a connection is blocked + BlockOpenConn(dir network.Direction, usefd bool) + // BlockOpenStream is invoked when opening a stream is blocked + BlockOpenStream(p peer.ID, dir network.Direction) + // BlockSetPeer is invoked when attaching ac onnection to a peer is blocked + BlockSetPeer(p peer.ID) + // BlockSetProtocol is invoked when setting the protocol for a stream is blocked + BlockSetProtocol(proto protocol.ID) + // BlockedSetProtocolPeer is invoekd when setting the protocol for a stream is blocked at the per protocol peer scope + BlockSetProtocolPeer(proto protocol.ID, p peer.ID) + // BlockSetPService is invoked when setting the protocol for a stream is blocked + BlockSetService(svc string) + // BlockedSetServicePeer is invoekd when setting the service for a stream is blocked at the per service peer scope + BlockSetServicePeer(svc string, p peer.ID) +} + +type metrics struct { + reporter MetricsReporter +} + +// WithMetrics is a resource manager option to enable metrics collection +func WithMetrics(reporter MetricsReporter) Option { + return func(r *resourceManager) error { + r.metrics = &metrics{reporter: reporter} + return nil + } +} + +func (m *metrics) BlockOpenConn(dir network.Direction, usefd bool) { + if m == nil { + return + } + + m.reporter.BlockOpenConn(dir, usefd) +} + +func (m *metrics) BlockOpenStream(p peer.ID, dir network.Direction) { + if m == nil { + return + } + + m.reporter.BlockOpenStream(p, dir) +} + +func (m *metrics) BlockSetPeer(p peer.ID) { + if m == nil { + return + } + + m.reporter.BlockSetPeer(p) +} + +func (m *metrics) BlockSetProtocol(proto protocol.ID) { + if m == nil { + return + } + + m.reporter.BlockSetProtocol(proto) +} + +func (m *metrics) BlockSetProtocolPeer(proto protocol.ID, p peer.ID) { + if m == nil { + return + } + + m.reporter.BlockSetProtocolPeer(proto, p) +} + +func (m *metrics) BlockSetService(svc string) { + if m == nil { + return + } + + m.reporter.BlockSetService(svc) +} + +func (m *metrics) BlockSetServicePeer(svc string, p peer.ID) { + if m == nil { + return + } + + m.reporter.BlockSetServicePeer(svc, p) +} diff --git a/rcmgr.go b/rcmgr.go index 08d78a4..f387dd1 100644 --- a/rcmgr.go +++ b/rcmgr.go @@ -18,7 +18,8 @@ var log = logging.Logger("rcmgr") type resourceManager struct { limits Limiter - trace *trace + trace *trace + metrics *metrics system *systemScope transient *transientScope @@ -259,6 +260,7 @@ func (r *resourceManager) OpenConnection(dir network.Direction, usefd bool) (net if err := conn.AddConn(dir, usefd); err != nil { conn.Done() + r.metrics.BlockOpenConn(dir, usefd) return nil, err } @@ -273,6 +275,7 @@ func (r *resourceManager) OpenStream(p peer.ID, dir network.Direction) (network. err := stream.AddStream(dir) if err != nil { stream.Done() + r.metrics.BlockOpenStream(p, dir) return nil, err } @@ -497,6 +500,7 @@ func (s *connectionScope) SetPeer(p peer.ID) error { if err := s.peer.ReserveForChild(stat); err != nil { s.peer.DecRef() s.peer = nil + s.rcmgr.metrics.BlockSetPeer(p) return err } @@ -540,6 +544,7 @@ func (s *streamScope) SetProtocol(proto protocol.ID) error { if err := s.proto.ReserveForChild(stat); err != nil { s.proto.DecRef() s.proto = nil + s.rcmgr.metrics.BlockSetProtocol(proto) return err } @@ -550,6 +555,7 @@ func (s *streamScope) SetProtocol(proto protocol.ID) error { s.proto = nil s.peerProtoScope.DecRef() s.peerProtoScope = nil + s.rcmgr.metrics.BlockSetProtocolPeer(proto, s.peer.peer) return err } @@ -598,6 +604,7 @@ func (s *streamScope) SetService(svc string) error { if err := s.svc.ReserveForChild(stat); err != nil { s.svc.DecRef() s.svc = nil + s.rcmgr.metrics.BlockSetService(svc) return err } @@ -609,6 +616,7 @@ func (s *streamScope) SetService(svc string) error { s.svc = nil s.peerSvcScope.DecRef() s.peerSvcScope = nil + s.rcmgr.metrics.BlockSetServicePeer(svc, s.peer.peer) return err }