tracing instrumentation

This commit is contained in:
vyzo 2022-01-15 13:36:28 +02:00
parent d9e855dd69
commit 98870b0c0c
4 changed files with 266 additions and 37 deletions

View File

@ -18,6 +18,8 @@ var log = logging.Logger("rcmgr")
type resourceManager struct {
limits Limiter
trace *trace
system *systemScope
transient *transientScope
@ -29,6 +31,8 @@ type resourceManager struct {
svc map[string]*serviceScope
proto map[protocol.ID]*protocolScope
peer map[peer.ID]*peerScope
connId, streamId int64
}
var _ network.ResourceManager = (*resourceManager)(nil)
@ -122,9 +126,13 @@ func NewResourceManager(limits Limiter, opts ...Option) (network.ResourceManager
}
}
r.system = newSystemScope(limits.GetSystemLimits())
if err := r.trace.Start(limits); err != nil {
return nil, err
}
r.system = newSystemScope(limits.GetSystemLimits(), r)
r.system.IncRef()
r.transient = newTransientScope(limits.GetTransientLimits(), r.system)
r.transient = newTransientScope(limits.GetTransientLimits(), r)
r.transient.IncRef()
r.cancelCtx, r.cancel = context.WithCancel(context.Background())
@ -206,6 +214,22 @@ func (r *resourceManager) getPeerScope(p peer.ID) *peerScope {
return s
}
func (r *resourceManager) nextConnId() int64 {
r.mx.Lock()
defer r.mx.Unlock()
r.connId++
return r.connId
}
func (r *resourceManager) nextStreamId() int64 {
r.mx.Lock()
defer r.mx.Unlock()
r.streamId++
return r.streamId
}
func (r *resourceManager) OpenConnection(dir network.Direction, usefd bool) (network.ConnManagementScope, error) {
conn := newConnectionScope(dir, usefd, r.limits.GetConnLimits(), r)
@ -219,7 +243,7 @@ func (r *resourceManager) OpenConnection(dir network.Direction, usefd bool) (net
func (r *resourceManager) OpenStream(p peer.ID, dir network.Direction) (network.StreamManagementScope, error) {
peer := r.getPeerScope(p)
stream := newStreamScope(dir, r.limits.GetStreamLimits(p), peer)
stream := newStreamScope(dir, r.limits.GetStreamLimits(p), peer, r)
peer.DecRef() // we have the reference in edges
err := stream.AddStream(dir)
@ -234,6 +258,7 @@ func (r *resourceManager) OpenStream(p peer.ID, dir network.Direction) (network.
func (r *resourceManager) Close() error {
r.cancel()
r.wg.Wait()
r.trace.Close()
return nil
}
@ -300,22 +325,22 @@ func (r *resourceManager) gc() {
}
}
func newSystemScope(limit Limit) *systemScope {
func newSystemScope(limit Limit, rcmgr *resourceManager) *systemScope {
return &systemScope{
resourceScope: newResourceScope(limit, nil, "system"),
resourceScope: newResourceScope(limit, nil, "system", rcmgr.trace),
}
}
func newTransientScope(limit Limit, system *systemScope) *transientScope {
func newTransientScope(limit Limit, rcmgr *resourceManager) *transientScope {
return &transientScope{
resourceScope: newResourceScope(limit, []*resourceScope{system.resourceScope}, "transient"),
system: system,
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, "transient", rcmgr.trace),
system: rcmgr.system,
}
}
func newServiceScope(name string, limit Limit, rcmgr *resourceManager) *serviceScope {
return &serviceScope{
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("service.%s", name)),
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("service:%s", name), rcmgr.trace),
name: name,
rcmgr: rcmgr,
}
@ -323,7 +348,7 @@ func newServiceScope(name string, limit Limit, rcmgr *resourceManager) *serviceS
func newProtocolScope(proto protocol.ID, limit Limit, rcmgr *resourceManager) *protocolScope {
return &protocolScope{
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("protocol.%s", proto)),
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("protocol:%s", proto), rcmgr.trace),
proto: proto,
rcmgr: rcmgr,
}
@ -331,7 +356,7 @@ func newProtocolScope(proto protocol.ID, limit Limit, rcmgr *resourceManager) *p
func newPeerScope(p peer.ID, limit Limit, rcmgr *resourceManager) *peerScope {
return &peerScope{
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("peer.%s", p)),
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("peer:%s", p), rcmgr.trace),
peer: p,
rcmgr: rcmgr,
}
@ -339,16 +364,16 @@ func newPeerScope(p peer.ID, limit Limit, rcmgr *resourceManager) *peerScope {
func newConnectionScope(dir network.Direction, usefd bool, limit Limit, rcmgr *resourceManager) *connectionScope {
return &connectionScope{
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.transient.resourceScope, rcmgr.system.resourceScope}, "connection"),
resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.transient.resourceScope, rcmgr.system.resourceScope}, fmt.Sprintf("conn-%d", rcmgr.nextConnId()), rcmgr.trace),
dir: dir,
usefd: usefd,
rcmgr: rcmgr,
}
}
func newStreamScope(dir network.Direction, limit Limit, peer *peerScope) *streamScope {
func newStreamScope(dir network.Direction, limit Limit, peer *peerScope, rcmgr *resourceManager) *streamScope {
return &streamScope{
resourceScope: newResourceScope(limit, []*resourceScope{peer.resourceScope, peer.rcmgr.transient.resourceScope, peer.rcmgr.system.resourceScope}, "stream"),
resourceScope: newResourceScope(limit, []*resourceScope{peer.resourceScope, rcmgr.transient.resourceScope, rcmgr.system.resourceScope}, fmt.Sprintf("stream-%d", rcmgr.nextStreamId()), rcmgr.trace),
dir: dir,
rcmgr: peer.rcmgr,
peer: peer,
@ -375,7 +400,7 @@ func (s *serviceScope) getPeerScope(p peer.ID) *resourceScope {
s.peers = make(map[peer.ID]*resourceScope)
}
ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer", s.name))
ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace)
s.peers[p] = ps
ps.IncRef()
@ -402,7 +427,7 @@ func (s *protocolScope) getPeerScope(p peer.ID) *resourceScope {
s.peers = make(map[peer.ID]*resourceScope)
}
ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer", s.name))
ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace)
s.peers[p] = ps
ps.IncRef()

View File

@ -36,29 +36,36 @@ type resourceScope struct {
owner *resourceScope // set in span scopes, which define trees
edges []*resourceScope // set in DAG scopes, it's the linearized parent set
name string // for debugging purposes
name string // for debugging purposes
trace *trace // debug tracing
}
var _ network.ResourceScope = (*resourceScope)(nil)
var _ network.ResourceScopeSpan = (*resourceScope)(nil)
func newResourceScope(limit Limit, edges []*resourceScope, name string) *resourceScope {
func newResourceScope(limit Limit, edges []*resourceScope, name string, trace *trace) *resourceScope {
for _, e := range edges {
e.IncRef()
}
return &resourceScope{
r := &resourceScope{
rc: resources{limit: limit},
edges: edges,
name: name,
trace: trace,
}
r.trace.CreateScope(name, limit)
return r
}
func newResourceScopeSpan(owner *resourceScope) *resourceScope {
return &resourceScope{
r := &resourceScope{
rc: resources{limit: owner.rc.limit},
owner: owner,
name: fmt.Sprintf("%s.txn", owner.name),
name: fmt.Sprintf("%s.span", owner.name),
trace: owner.trace,
}
r.trace.CreateScope(r.name, r.rc.limit)
return r
}
// Resources implementation
@ -230,6 +237,7 @@ func (s *resourceScope) ReserveMemory(size int, prio uint8) error {
if err := s.rc.reserveMemory(int64(size), prio); err != nil {
log.Debugw("blocked memory reservation", "scope", s.name, "size", size, "priority", prio, "error", err)
s.trace.BlockReserveMemory(s.name, prio, int64(size), s.rc.memory)
return s.wrapError(err)
}
@ -240,6 +248,7 @@ func (s *resourceScope) ReserveMemory(size int, prio uint8) error {
return s.wrapError(err)
}
s.trace.ReserveMemory(s.name, prio, int64(size), s.rc.memory)
return nil
}
@ -288,9 +297,11 @@ func (s *resourceScope) ReserveMemoryForChild(size int64, prio uint8) error {
}
if err := s.rc.reserveMemory(size, prio); err != nil {
s.trace.BlockReserveMemory(s.name, prio, size, s.rc.memory)
return s.wrapError(err)
}
s.trace.ReserveMemory(s.name, prio, size, s.rc.memory)
return nil
}
@ -304,6 +315,7 @@ func (s *resourceScope) ReleaseMemory(size int) {
s.rc.releaseMemory(int64(size))
s.releaseMemoryForEdges(size)
s.trace.ReleaseMemory(s.name, int64(size), s.rc.memory)
}
func (s *resourceScope) ReleaseMemoryForChild(size int64) {
@ -315,6 +327,7 @@ func (s *resourceScope) ReleaseMemoryForChild(size int64) {
}
s.rc.releaseMemory(size)
s.trace.ReleaseMemory(s.name, size, s.rc.memory)
}
func (s *resourceScope) AddStream(dir network.Direction) error {
@ -327,6 +340,7 @@ func (s *resourceScope) AddStream(dir network.Direction) error {
if err := s.rc.addStream(dir); err != nil {
log.Debugw("blocked stream", "scope", s.name, "direction", dir, "error", err)
s.trace.BlockAddStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
return s.wrapError(err)
}
@ -336,6 +350,7 @@ func (s *resourceScope) AddStream(dir network.Direction) error {
return s.wrapError(err)
}
s.trace.AddStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
return nil
}
@ -371,9 +386,11 @@ func (s *resourceScope) AddStreamForChild(dir network.Direction) error {
}
if err := s.rc.addStream(dir); err != nil {
s.trace.BlockAddStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
return s.wrapError(err)
}
s.trace.AddStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
return nil
}
@ -387,6 +404,7 @@ func (s *resourceScope) RemoveStream(dir network.Direction) {
s.rc.removeStream(dir)
s.removeStreamForEdges(dir)
s.trace.RemoveStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
}
func (s *resourceScope) removeStreamForEdges(dir network.Direction) {
@ -409,6 +427,7 @@ func (s *resourceScope) RemoveStreamForChild(dir network.Direction) {
}
s.rc.removeStream(dir)
s.trace.RemoveStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut)
}
func (s *resourceScope) AddConn(dir network.Direction, usefd bool) error {
@ -421,6 +440,7 @@ func (s *resourceScope) AddConn(dir network.Direction, usefd bool) error {
if err := s.rc.addConn(dir, usefd); err != nil {
log.Debugw("blocked connection", "scope", s.name, "direction", dir, "usefd", usefd, "error", err)
s.trace.BlockAddConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
return s.wrapError(err)
}
@ -430,6 +450,7 @@ func (s *resourceScope) AddConn(dir network.Direction, usefd bool) error {
return s.wrapError(err)
}
s.trace.AddConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
return nil
}
@ -465,9 +486,11 @@ func (s *resourceScope) AddConnForChild(dir network.Direction, usefd bool) error
}
if err := s.rc.addConn(dir, usefd); err != nil {
s.trace.BlockAddConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
return s.wrapError(err)
}
s.trace.AddConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
return nil
}
@ -481,6 +504,7 @@ func (s *resourceScope) RemoveConn(dir network.Direction, usefd bool) {
s.rc.removeConn(dir, usefd)
s.removeConnForEdges(dir, usefd)
s.trace.RemoveConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
}
func (s *resourceScope) removeConnForEdges(dir network.Direction, usefd bool) {
@ -502,6 +526,7 @@ func (s *resourceScope) RemoveConnForChild(dir network.Direction, usefd bool) {
}
s.rc.removeConn(dir, usefd)
s.trace.RemoveConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
}
func (s *resourceScope) ReserveForChild(st network.ScopeStat) error {
@ -513,20 +538,28 @@ func (s *resourceScope) ReserveForChild(st network.ScopeStat) error {
}
if err := s.rc.reserveMemory(st.Memory, network.ReservationPriorityAlways); err != nil {
s.trace.BlockReserveMemory(s.name, 255, st.Memory, s.rc.memory)
return s.wrapError(err)
}
if err := s.rc.addStreams(st.NumStreamsInbound, st.NumStreamsOutbound); err != nil {
s.trace.BlockAddStreams(s.name, st.NumStreamsInbound, st.NumStreamsOutbound, s.rc.nstreamsIn, s.rc.nstreamsOut)
s.rc.releaseMemory(st.Memory)
return s.wrapError(err)
}
if err := s.rc.addConns(st.NumConnsInbound, st.NumConnsOutbound, st.NumFD); err != nil {
s.trace.BlockAddConns(s.name, st.NumConnsInbound, st.NumConnsOutbound, st.NumFD, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
s.rc.releaseMemory(st.Memory)
s.rc.removeStreams(st.NumStreamsInbound, st.NumStreamsOutbound)
return s.wrapError(err)
}
s.trace.ReserveMemory(s.name, 255, st.Memory, s.rc.memory)
s.trace.AddStreams(s.name, st.NumStreamsInbound, st.NumStreamsOutbound, s.rc.nstreamsIn, s.rc.nstreamsOut)
s.trace.AddConns(s.name, st.NumConnsInbound, st.NumConnsOutbound, st.NumFD, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
return nil
}
@ -541,6 +574,10 @@ func (s *resourceScope) ReleaseForChild(st network.ScopeStat) {
s.rc.releaseMemory(st.Memory)
s.rc.removeStreams(st.NumStreamsInbound, st.NumStreamsOutbound)
s.rc.removeConns(st.NumConnsInbound, st.NumConnsOutbound, st.NumFD)
s.trace.ReleaseMemory(s.name, st.Memory, s.rc.memory)
s.trace.RemoveStreams(s.name, st.NumStreamsInbound, st.NumStreamsOutbound, s.rc.nstreamsIn, s.rc.nstreamsOut)
s.trace.RemoveConns(s.name, st.NumConnsInbound, st.NumConnsOutbound, st.NumFD, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
}
func (s *resourceScope) ReleaseResources(st network.ScopeStat) {
@ -562,6 +599,10 @@ func (s *resourceScope) ReleaseResources(st network.ScopeStat) {
e.ReleaseForChild(st)
}
}
s.trace.ReleaseMemory(s.name, st.Memory, s.rc.memory)
s.trace.RemoveStreams(s.name, st.NumStreamsInbound, st.NumStreamsOutbound, s.rc.nstreamsIn, s.rc.nstreamsOut)
s.trace.RemoveConns(s.name, st.NumConnsInbound, st.NumConnsOutbound, st.NumFD, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd)
}
func (s *resourceScope) BeginSpan() (network.ResourceScopeSpan, error) {
@ -603,6 +644,8 @@ func (s *resourceScope) Done() {
s.rc.memory = 0
s.done = true
s.trace.DestroyScope(s.name)
}
func (s *resourceScope) Stat() network.ScopeStat {

View File

@ -257,7 +257,7 @@ func TestResourceScopeSimple(t *testing.T) {
FD: 1,
},
},
nil, "test",
nil, "test", nil,
)
s.IncRef()
@ -391,7 +391,7 @@ func TestResourceScopeTxnBasic(t *testing.T) {
FD: 1,
},
},
nil, "test",
nil, "test", nil,
)
txn, err := s.BeginSpan()
@ -428,7 +428,7 @@ func TestResourceScopeTxnZombie(t *testing.T) {
FD: 1,
},
},
nil, "test",
nil, "test", nil,
)
txn1, err := s.BeginSpan()
@ -472,7 +472,7 @@ func TestResourceScopeTxnTree(t *testing.T) {
FD: 1,
},
},
nil, "test",
nil, "test", nil,
)
txn1, err := s.BeginSpan()
@ -583,7 +583,7 @@ func TestResourceScopeDAG(t *testing.T) {
FD: 4,
},
},
nil, "test",
nil, "test", nil,
)
s2 := newResourceScope(
&StaticLimit{
@ -598,7 +598,7 @@ func TestResourceScopeDAG(t *testing.T) {
FD: 2,
},
},
[]*resourceScope{s1}, "test",
[]*resourceScope{s1}, "test", nil,
)
s3 := newResourceScope(
&StaticLimit{
@ -613,7 +613,7 @@ func TestResourceScopeDAG(t *testing.T) {
FD: 2,
},
},
[]*resourceScope{s1}, "test",
[]*resourceScope{s1}, "test", nil,
)
s4 := newResourceScope(
&StaticLimit{
@ -628,7 +628,7 @@ func TestResourceScopeDAG(t *testing.T) {
FD: 2,
},
},
[]*resourceScope{s2, s3, s1}, "test",
[]*resourceScope{s2, s3, s1}, "test", nil,
)
s5 := newResourceScope(
&StaticLimit{
@ -643,7 +643,7 @@ func TestResourceScopeDAG(t *testing.T) {
FD: 2,
},
},
[]*resourceScope{s2, s1}, "test",
[]*resourceScope{s2, s1}, "test", nil,
)
s6 := newResourceScope(
&StaticLimit{
@ -658,7 +658,7 @@ func TestResourceScopeDAG(t *testing.T) {
FD: 2,
},
},
[]*resourceScope{s3, s1}, "test",
[]*resourceScope{s3, s1}, "test", nil,
)
if err := s4.ReserveMemory(1024, network.ReservationPriorityAlways); err != nil {
@ -1098,37 +1098,37 @@ func TestResourceScopeDAGTxn(t *testing.T) {
&StaticLimit{
Memory: 8192,
},
nil, "test",
nil, "test", nil,
)
s2 := newResourceScope(
&StaticLimit{
Memory: 4096 + 2048,
},
[]*resourceScope{s1}, "test",
[]*resourceScope{s1}, "test", nil,
)
s3 := newResourceScope(
&StaticLimit{
Memory: 4096 + 2048,
},
[]*resourceScope{s1}, "test",
[]*resourceScope{s1}, "test", nil,
)
s4 := newResourceScope(
&StaticLimit{
Memory: 4096 + 1024,
},
[]*resourceScope{s2, s3, s1}, "test",
[]*resourceScope{s2, s3, s1}, "test", nil,
)
s5 := newResourceScope(
&StaticLimit{
Memory: 4096 + 1024,
},
[]*resourceScope{s2, s1}, "test",
[]*resourceScope{s2, s1}, "test", nil,
)
s6 := newResourceScope(
&StaticLimit{
Memory: 4096 + 1024,
},
[]*resourceScope{s3, s1}, "test",
[]*resourceScope{s3, s1}, "test", nil,
)
txn4, err := s4.BeginSpan()

161
trace.go Normal file
View File

@ -0,0 +1,161 @@
package rcmgr
import (
"github.com/libp2p/go-libp2p-core/network"
)
type trace struct{}
func (t *trace) Start(limits Limiter) error {
if t == nil {
return nil
}
// TODO
return nil
}
func (t *trace) Close() error {
if t == nil {
return nil
}
// TODO
return nil
}
func (t *trace) CreateScope(name string, limit Limit) {
if t == nil {
return
}
// TODO
}
func (t *trace) DestroyScope(name string) {
if t == nil {
return
}
// TODO
}
func (t *trace) ReserveMemory(scope string, prio uint8, size, mem int64) {
if t == nil {
return
}
// TODO
}
func (t *trace) BlockReserveMemory(scope string, prio uint8, size, mem int64) {
if t == nil {
return
}
// TODO
}
func (t *trace) ReleaseMemory(scope string, size, mem int64) {
if t == nil {
return
}
// TODO
}
func (t *trace) AddStream(scope string, dir network.Direction, nstreamsIn, nstreamsOut int) {
if t == nil {
return
}
// TODO
}
func (t *trace) BlockAddStream(scope string, dir network.Direction, nstreamsIn, nstreamsOut int) {
if t == nil {
return
}
// TODO
}
func (t *trace) RemoveStream(scope string, dir network.Direction, nstreamsIn, nstreamsOut int) {
if t == nil {
return
}
// TODO
}
func (t *trace) AddStreams(scope string, rsvpIn, rsvpOut, nstreamsIn, nstreamsOut int) {
if t == nil {
return
}
// TODO
}
func (t *trace) BlockAddStreams(scope string, rsvpIn, rsvpOut, nstreamsIn, nstreamsOut int) {
if t == nil {
return
}
// TODO
}
func (t *trace) RemoveStreams(scope string, rsvpIn, rsvpOut, nstreamsIn, nstreamsOut int) {
if t == nil {
return
}
// TODO
}
func (t *trace) AddConn(scope string, dir network.Direction, usefd bool, nconnsIn, nconnsOut, nfd int) {
if t == nil {
return
}
// TODO
}
func (t *trace) BlockAddConn(scope string, dir network.Direction, usefd bool, nconnsIn, nconnsOut, nfd int) {
if t == nil {
return
}
// TODO
}
func (t *trace) RemoveConn(scope string, dir network.Direction, usefd bool, nconnsIn, nconnsOut, nfd int) {
if t == nil {
return
}
// TODO
}
func (t *trace) AddConns(scope string, rsvpIn, rsvpOut, rsvpFD, nconnsIn, nconnsOut, nfd int) {
if t == nil {
return
}
// TODO
}
func (t *trace) BlockAddConns(scope string, rsvpIn, rsvpOut, rsvpFD, nconnsIn, nconnsOut, nfd int) {
if t == nil {
return
}
// TODO
}
func (t *trace) RemoveConns(scope string, rsvpIn, rsvpOut, rsvpFD, nconnsIn, nconnsOut, nfd int) {
if t == nil {
return
}
// TODO
}