diff --git a/rcmgr.go b/rcmgr.go index 199bb23..2d403bc 100644 --- a/rcmgr.go +++ b/rcmgr.go @@ -18,6 +18,8 @@ var log = logging.Logger("rcmgr") type resourceManager struct { limits Limiter + trace *trace + system *systemScope transient *transientScope @@ -29,6 +31,8 @@ type resourceManager struct { svc map[string]*serviceScope proto map[protocol.ID]*protocolScope peer map[peer.ID]*peerScope + + connId, streamId int64 } var _ network.ResourceManager = (*resourceManager)(nil) @@ -122,9 +126,13 @@ func NewResourceManager(limits Limiter, opts ...Option) (network.ResourceManager } } - r.system = newSystemScope(limits.GetSystemLimits()) + if err := r.trace.Start(limits); err != nil { + return nil, err + } + + r.system = newSystemScope(limits.GetSystemLimits(), r) r.system.IncRef() - r.transient = newTransientScope(limits.GetTransientLimits(), r.system) + r.transient = newTransientScope(limits.GetTransientLimits(), r) r.transient.IncRef() r.cancelCtx, r.cancel = context.WithCancel(context.Background()) @@ -206,6 +214,22 @@ func (r *resourceManager) getPeerScope(p peer.ID) *peerScope { return s } +func (r *resourceManager) nextConnId() int64 { + r.mx.Lock() + defer r.mx.Unlock() + + r.connId++ + return r.connId +} + +func (r *resourceManager) nextStreamId() int64 { + r.mx.Lock() + defer r.mx.Unlock() + + r.streamId++ + return r.streamId +} + func (r *resourceManager) OpenConnection(dir network.Direction, usefd bool) (network.ConnManagementScope, error) { conn := newConnectionScope(dir, usefd, r.limits.GetConnLimits(), r) @@ -219,7 +243,7 @@ func (r *resourceManager) OpenConnection(dir network.Direction, usefd bool) (net func (r *resourceManager) OpenStream(p peer.ID, dir network.Direction) (network.StreamManagementScope, error) { peer := r.getPeerScope(p) - stream := newStreamScope(dir, r.limits.GetStreamLimits(p), peer) + stream := newStreamScope(dir, r.limits.GetStreamLimits(p), peer, r) peer.DecRef() // we have the reference in edges err := stream.AddStream(dir) @@ -234,6 +258,7 @@ func (r *resourceManager) OpenStream(p peer.ID, dir network.Direction) (network. func (r *resourceManager) Close() error { r.cancel() r.wg.Wait() + r.trace.Close() return nil } @@ -300,22 +325,22 @@ func (r *resourceManager) gc() { } } -func newSystemScope(limit Limit) *systemScope { +func newSystemScope(limit Limit, rcmgr *resourceManager) *systemScope { return &systemScope{ - resourceScope: newResourceScope(limit, nil, "system"), + resourceScope: newResourceScope(limit, nil, "system", rcmgr.trace), } } -func newTransientScope(limit Limit, system *systemScope) *transientScope { +func newTransientScope(limit Limit, rcmgr *resourceManager) *transientScope { return &transientScope{ - resourceScope: newResourceScope(limit, []*resourceScope{system.resourceScope}, "transient"), - system: system, + resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, "transient", rcmgr.trace), + system: rcmgr.system, } } func newServiceScope(name string, limit Limit, rcmgr *resourceManager) *serviceScope { return &serviceScope{ - resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("service.%s", name)), + resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("service:%s", name), rcmgr.trace), name: name, rcmgr: rcmgr, } @@ -323,7 +348,7 @@ func newServiceScope(name string, limit Limit, rcmgr *resourceManager) *serviceS func newProtocolScope(proto protocol.ID, limit Limit, rcmgr *resourceManager) *protocolScope { return &protocolScope{ - resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("protocol.%s", proto)), + resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("protocol:%s", proto), rcmgr.trace), proto: proto, rcmgr: rcmgr, } @@ -331,7 +356,7 @@ func newProtocolScope(proto protocol.ID, limit Limit, rcmgr *resourceManager) *p func newPeerScope(p peer.ID, limit Limit, rcmgr *resourceManager) *peerScope { return &peerScope{ - resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("peer.%s", p)), + resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.system.resourceScope}, fmt.Sprintf("peer:%s", p), rcmgr.trace), peer: p, rcmgr: rcmgr, } @@ -339,16 +364,16 @@ func newPeerScope(p peer.ID, limit Limit, rcmgr *resourceManager) *peerScope { func newConnectionScope(dir network.Direction, usefd bool, limit Limit, rcmgr *resourceManager) *connectionScope { return &connectionScope{ - resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.transient.resourceScope, rcmgr.system.resourceScope}, "connection"), + resourceScope: newResourceScope(limit, []*resourceScope{rcmgr.transient.resourceScope, rcmgr.system.resourceScope}, fmt.Sprintf("conn-%d", rcmgr.nextConnId()), rcmgr.trace), dir: dir, usefd: usefd, rcmgr: rcmgr, } } -func newStreamScope(dir network.Direction, limit Limit, peer *peerScope) *streamScope { +func newStreamScope(dir network.Direction, limit Limit, peer *peerScope, rcmgr *resourceManager) *streamScope { return &streamScope{ - resourceScope: newResourceScope(limit, []*resourceScope{peer.resourceScope, peer.rcmgr.transient.resourceScope, peer.rcmgr.system.resourceScope}, "stream"), + resourceScope: newResourceScope(limit, []*resourceScope{peer.resourceScope, rcmgr.transient.resourceScope, rcmgr.system.resourceScope}, fmt.Sprintf("stream-%d", rcmgr.nextStreamId()), rcmgr.trace), dir: dir, rcmgr: peer.rcmgr, peer: peer, @@ -375,7 +400,7 @@ func (s *serviceScope) getPeerScope(p peer.ID) *resourceScope { s.peers = make(map[peer.ID]*resourceScope) } - ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer", s.name)) + ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace) s.peers[p] = ps ps.IncRef() @@ -402,7 +427,7 @@ func (s *protocolScope) getPeerScope(p peer.ID) *resourceScope { s.peers = make(map[peer.ID]*resourceScope) } - ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer", s.name)) + ps = newResourceScope(l, nil, fmt.Sprintf("%s.peer:%s", s.name, p), s.rcmgr.trace) s.peers[p] = ps ps.IncRef() diff --git a/scope.go b/scope.go index cbfd6a8..2b82233 100644 --- a/scope.go +++ b/scope.go @@ -36,29 +36,36 @@ type resourceScope struct { owner *resourceScope // set in span scopes, which define trees edges []*resourceScope // set in DAG scopes, it's the linearized parent set - name string // for debugging purposes + name string // for debugging purposes + trace *trace // debug tracing } var _ network.ResourceScope = (*resourceScope)(nil) var _ network.ResourceScopeSpan = (*resourceScope)(nil) -func newResourceScope(limit Limit, edges []*resourceScope, name string) *resourceScope { +func newResourceScope(limit Limit, edges []*resourceScope, name string, trace *trace) *resourceScope { for _, e := range edges { e.IncRef() } - return &resourceScope{ + r := &resourceScope{ rc: resources{limit: limit}, edges: edges, name: name, + trace: trace, } + r.trace.CreateScope(name, limit) + return r } func newResourceScopeSpan(owner *resourceScope) *resourceScope { - return &resourceScope{ + r := &resourceScope{ rc: resources{limit: owner.rc.limit}, owner: owner, - name: fmt.Sprintf("%s.txn", owner.name), + name: fmt.Sprintf("%s.span", owner.name), + trace: owner.trace, } + r.trace.CreateScope(r.name, r.rc.limit) + return r } // Resources implementation @@ -230,6 +237,7 @@ func (s *resourceScope) ReserveMemory(size int, prio uint8) error { if err := s.rc.reserveMemory(int64(size), prio); err != nil { log.Debugw("blocked memory reservation", "scope", s.name, "size", size, "priority", prio, "error", err) + s.trace.BlockReserveMemory(s.name, prio, int64(size), s.rc.memory) return s.wrapError(err) } @@ -240,6 +248,7 @@ func (s *resourceScope) ReserveMemory(size int, prio uint8) error { return s.wrapError(err) } + s.trace.ReserveMemory(s.name, prio, int64(size), s.rc.memory) return nil } @@ -288,9 +297,11 @@ func (s *resourceScope) ReserveMemoryForChild(size int64, prio uint8) error { } if err := s.rc.reserveMemory(size, prio); err != nil { + s.trace.BlockReserveMemory(s.name, prio, size, s.rc.memory) return s.wrapError(err) } + s.trace.ReserveMemory(s.name, prio, size, s.rc.memory) return nil } @@ -304,6 +315,7 @@ func (s *resourceScope) ReleaseMemory(size int) { s.rc.releaseMemory(int64(size)) s.releaseMemoryForEdges(size) + s.trace.ReleaseMemory(s.name, int64(size), s.rc.memory) } func (s *resourceScope) ReleaseMemoryForChild(size int64) { @@ -315,6 +327,7 @@ func (s *resourceScope) ReleaseMemoryForChild(size int64) { } s.rc.releaseMemory(size) + s.trace.ReleaseMemory(s.name, size, s.rc.memory) } func (s *resourceScope) AddStream(dir network.Direction) error { @@ -327,6 +340,7 @@ func (s *resourceScope) AddStream(dir network.Direction) error { if err := s.rc.addStream(dir); err != nil { log.Debugw("blocked stream", "scope", s.name, "direction", dir, "error", err) + s.trace.BlockAddStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut) return s.wrapError(err) } @@ -336,6 +350,7 @@ func (s *resourceScope) AddStream(dir network.Direction) error { return s.wrapError(err) } + s.trace.AddStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut) return nil } @@ -371,9 +386,11 @@ func (s *resourceScope) AddStreamForChild(dir network.Direction) error { } if err := s.rc.addStream(dir); err != nil { + s.trace.BlockAddStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut) return s.wrapError(err) } + s.trace.AddStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut) return nil } @@ -387,6 +404,7 @@ func (s *resourceScope) RemoveStream(dir network.Direction) { s.rc.removeStream(dir) s.removeStreamForEdges(dir) + s.trace.RemoveStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut) } func (s *resourceScope) removeStreamForEdges(dir network.Direction) { @@ -409,6 +427,7 @@ func (s *resourceScope) RemoveStreamForChild(dir network.Direction) { } s.rc.removeStream(dir) + s.trace.RemoveStream(s.name, dir, s.rc.nstreamsIn, s.rc.nstreamsOut) } func (s *resourceScope) AddConn(dir network.Direction, usefd bool) error { @@ -421,6 +440,7 @@ func (s *resourceScope) AddConn(dir network.Direction, usefd bool) error { if err := s.rc.addConn(dir, usefd); err != nil { log.Debugw("blocked connection", "scope", s.name, "direction", dir, "usefd", usefd, "error", err) + s.trace.BlockAddConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd) return s.wrapError(err) } @@ -430,6 +450,7 @@ func (s *resourceScope) AddConn(dir network.Direction, usefd bool) error { return s.wrapError(err) } + s.trace.AddConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd) return nil } @@ -465,9 +486,11 @@ func (s *resourceScope) AddConnForChild(dir network.Direction, usefd bool) error } if err := s.rc.addConn(dir, usefd); err != nil { + s.trace.BlockAddConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd) return s.wrapError(err) } + s.trace.AddConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd) return nil } @@ -481,6 +504,7 @@ func (s *resourceScope) RemoveConn(dir network.Direction, usefd bool) { s.rc.removeConn(dir, usefd) s.removeConnForEdges(dir, usefd) + s.trace.RemoveConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd) } func (s *resourceScope) removeConnForEdges(dir network.Direction, usefd bool) { @@ -502,6 +526,7 @@ func (s *resourceScope) RemoveConnForChild(dir network.Direction, usefd bool) { } s.rc.removeConn(dir, usefd) + s.trace.RemoveConn(s.name, dir, usefd, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd) } func (s *resourceScope) ReserveForChild(st network.ScopeStat) error { @@ -513,20 +538,28 @@ func (s *resourceScope) ReserveForChild(st network.ScopeStat) error { } if err := s.rc.reserveMemory(st.Memory, network.ReservationPriorityAlways); err != nil { + s.trace.BlockReserveMemory(s.name, 255, st.Memory, s.rc.memory) return s.wrapError(err) } if err := s.rc.addStreams(st.NumStreamsInbound, st.NumStreamsOutbound); err != nil { + s.trace.BlockAddStreams(s.name, st.NumStreamsInbound, st.NumStreamsOutbound, s.rc.nstreamsIn, s.rc.nstreamsOut) s.rc.releaseMemory(st.Memory) return s.wrapError(err) } if err := s.rc.addConns(st.NumConnsInbound, st.NumConnsOutbound, st.NumFD); err != nil { + s.trace.BlockAddConns(s.name, st.NumConnsInbound, st.NumConnsOutbound, st.NumFD, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd) + s.rc.releaseMemory(st.Memory) s.rc.removeStreams(st.NumStreamsInbound, st.NumStreamsOutbound) return s.wrapError(err) } + s.trace.ReserveMemory(s.name, 255, st.Memory, s.rc.memory) + s.trace.AddStreams(s.name, st.NumStreamsInbound, st.NumStreamsOutbound, s.rc.nstreamsIn, s.rc.nstreamsOut) + s.trace.AddConns(s.name, st.NumConnsInbound, st.NumConnsOutbound, st.NumFD, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd) + return nil } @@ -541,6 +574,10 @@ func (s *resourceScope) ReleaseForChild(st network.ScopeStat) { s.rc.releaseMemory(st.Memory) s.rc.removeStreams(st.NumStreamsInbound, st.NumStreamsOutbound) s.rc.removeConns(st.NumConnsInbound, st.NumConnsOutbound, st.NumFD) + + s.trace.ReleaseMemory(s.name, st.Memory, s.rc.memory) + s.trace.RemoveStreams(s.name, st.NumStreamsInbound, st.NumStreamsOutbound, s.rc.nstreamsIn, s.rc.nstreamsOut) + s.trace.RemoveConns(s.name, st.NumConnsInbound, st.NumConnsOutbound, st.NumFD, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd) } func (s *resourceScope) ReleaseResources(st network.ScopeStat) { @@ -562,6 +599,10 @@ func (s *resourceScope) ReleaseResources(st network.ScopeStat) { e.ReleaseForChild(st) } } + + s.trace.ReleaseMemory(s.name, st.Memory, s.rc.memory) + s.trace.RemoveStreams(s.name, st.NumStreamsInbound, st.NumStreamsOutbound, s.rc.nstreamsIn, s.rc.nstreamsOut) + s.trace.RemoveConns(s.name, st.NumConnsInbound, st.NumConnsOutbound, st.NumFD, s.rc.nconnsIn, s.rc.nconnsOut, s.rc.nfd) } func (s *resourceScope) BeginSpan() (network.ResourceScopeSpan, error) { @@ -603,6 +644,8 @@ func (s *resourceScope) Done() { s.rc.memory = 0 s.done = true + + s.trace.DestroyScope(s.name) } func (s *resourceScope) Stat() network.ScopeStat { diff --git a/scope_test.go b/scope_test.go index dae829d..cfd885b 100644 --- a/scope_test.go +++ b/scope_test.go @@ -257,7 +257,7 @@ func TestResourceScopeSimple(t *testing.T) { FD: 1, }, }, - nil, "test", + nil, "test", nil, ) s.IncRef() @@ -391,7 +391,7 @@ func TestResourceScopeTxnBasic(t *testing.T) { FD: 1, }, }, - nil, "test", + nil, "test", nil, ) txn, err := s.BeginSpan() @@ -428,7 +428,7 @@ func TestResourceScopeTxnZombie(t *testing.T) { FD: 1, }, }, - nil, "test", + nil, "test", nil, ) txn1, err := s.BeginSpan() @@ -472,7 +472,7 @@ func TestResourceScopeTxnTree(t *testing.T) { FD: 1, }, }, - nil, "test", + nil, "test", nil, ) txn1, err := s.BeginSpan() @@ -583,7 +583,7 @@ func TestResourceScopeDAG(t *testing.T) { FD: 4, }, }, - nil, "test", + nil, "test", nil, ) s2 := newResourceScope( &StaticLimit{ @@ -598,7 +598,7 @@ func TestResourceScopeDAG(t *testing.T) { FD: 2, }, }, - []*resourceScope{s1}, "test", + []*resourceScope{s1}, "test", nil, ) s3 := newResourceScope( &StaticLimit{ @@ -613,7 +613,7 @@ func TestResourceScopeDAG(t *testing.T) { FD: 2, }, }, - []*resourceScope{s1}, "test", + []*resourceScope{s1}, "test", nil, ) s4 := newResourceScope( &StaticLimit{ @@ -628,7 +628,7 @@ func TestResourceScopeDAG(t *testing.T) { FD: 2, }, }, - []*resourceScope{s2, s3, s1}, "test", + []*resourceScope{s2, s3, s1}, "test", nil, ) s5 := newResourceScope( &StaticLimit{ @@ -643,7 +643,7 @@ func TestResourceScopeDAG(t *testing.T) { FD: 2, }, }, - []*resourceScope{s2, s1}, "test", + []*resourceScope{s2, s1}, "test", nil, ) s6 := newResourceScope( &StaticLimit{ @@ -658,7 +658,7 @@ func TestResourceScopeDAG(t *testing.T) { FD: 2, }, }, - []*resourceScope{s3, s1}, "test", + []*resourceScope{s3, s1}, "test", nil, ) if err := s4.ReserveMemory(1024, network.ReservationPriorityAlways); err != nil { @@ -1098,37 +1098,37 @@ func TestResourceScopeDAGTxn(t *testing.T) { &StaticLimit{ Memory: 8192, }, - nil, "test", + nil, "test", nil, ) s2 := newResourceScope( &StaticLimit{ Memory: 4096 + 2048, }, - []*resourceScope{s1}, "test", + []*resourceScope{s1}, "test", nil, ) s3 := newResourceScope( &StaticLimit{ Memory: 4096 + 2048, }, - []*resourceScope{s1}, "test", + []*resourceScope{s1}, "test", nil, ) s4 := newResourceScope( &StaticLimit{ Memory: 4096 + 1024, }, - []*resourceScope{s2, s3, s1}, "test", + []*resourceScope{s2, s3, s1}, "test", nil, ) s5 := newResourceScope( &StaticLimit{ Memory: 4096 + 1024, }, - []*resourceScope{s2, s1}, "test", + []*resourceScope{s2, s1}, "test", nil, ) s6 := newResourceScope( &StaticLimit{ Memory: 4096 + 1024, }, - []*resourceScope{s3, s1}, "test", + []*resourceScope{s3, s1}, "test", nil, ) txn4, err := s4.BeginSpan() diff --git a/trace.go b/trace.go new file mode 100644 index 0000000..15d8323 --- /dev/null +++ b/trace.go @@ -0,0 +1,161 @@ +package rcmgr + +import ( + "github.com/libp2p/go-libp2p-core/network" +) + +type trace struct{} + +func (t *trace) Start(limits Limiter) error { + if t == nil { + return nil + } + + // TODO + return nil +} + +func (t *trace) Close() error { + if t == nil { + return nil + } + + // TODO + return nil +} + +func (t *trace) CreateScope(name string, limit Limit) { + if t == nil { + return + } + + // TODO +} + +func (t *trace) DestroyScope(name string) { + if t == nil { + return + } + + // TODO +} + +func (t *trace) ReserveMemory(scope string, prio uint8, size, mem int64) { + if t == nil { + return + } + + // TODO +} + +func (t *trace) BlockReserveMemory(scope string, prio uint8, size, mem int64) { + if t == nil { + return + } + + // TODO +} + +func (t *trace) ReleaseMemory(scope string, size, mem int64) { + if t == nil { + return + } + + // TODO +} + +func (t *trace) AddStream(scope string, dir network.Direction, nstreamsIn, nstreamsOut int) { + if t == nil { + return + } + + // TODO +} + +func (t *trace) BlockAddStream(scope string, dir network.Direction, nstreamsIn, nstreamsOut int) { + if t == nil { + return + } + + // TODO +} + +func (t *trace) RemoveStream(scope string, dir network.Direction, nstreamsIn, nstreamsOut int) { + if t == nil { + return + } + + // TODO +} + +func (t *trace) AddStreams(scope string, rsvpIn, rsvpOut, nstreamsIn, nstreamsOut int) { + if t == nil { + return + } + + // TODO +} + +func (t *trace) BlockAddStreams(scope string, rsvpIn, rsvpOut, nstreamsIn, nstreamsOut int) { + if t == nil { + return + } + + // TODO +} + +func (t *trace) RemoveStreams(scope string, rsvpIn, rsvpOut, nstreamsIn, nstreamsOut int) { + if t == nil { + return + } + + // TODO +} + +func (t *trace) AddConn(scope string, dir network.Direction, usefd bool, nconnsIn, nconnsOut, nfd int) { + if t == nil { + return + } + + // TODO +} + +func (t *trace) BlockAddConn(scope string, dir network.Direction, usefd bool, nconnsIn, nconnsOut, nfd int) { + if t == nil { + return + } + + // TODO +} + +func (t *trace) RemoveConn(scope string, dir network.Direction, usefd bool, nconnsIn, nconnsOut, nfd int) { + if t == nil { + return + } + + // TODO +} + +func (t *trace) AddConns(scope string, rsvpIn, rsvpOut, rsvpFD, nconnsIn, nconnsOut, nfd int) { + if t == nil { + return + } + + // TODO +} + +func (t *trace) BlockAddConns(scope string, rsvpIn, rsvpOut, rsvpFD, nconnsIn, nconnsOut, nfd int) { + if t == nil { + return + } + + // TODO +} + +func (t *trace) RemoveConns(scope string, rsvpIn, rsvpOut, rsvpFD, nconnsIn, nconnsOut, nfd int) { + if t == nil { + return + } + + // TODO +}