mirror of
https://github.com/libp2p/go-libp2p-resource-manager.git
synced 2025-01-27 12:50:07 +08:00
implement tracer
This commit is contained in:
parent
98870b0c0c
commit
1dc0961ae6
392
trace.go
392
trace.go
@ -1,17 +1,171 @@
|
||||
package rcmgr
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
)
|
||||
|
||||
type trace struct{}
|
||||
type trace struct {
|
||||
path string
|
||||
|
||||
ctx context.Context
|
||||
cancel func()
|
||||
closed chan struct{}
|
||||
|
||||
mx sync.Mutex
|
||||
done bool
|
||||
pend []interface{}
|
||||
}
|
||||
|
||||
func WithTrace(path string) Option {
|
||||
return func(r *resourceManager) error {
|
||||
r.trace = &trace{path: path}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
const (
|
||||
traceStartEvt = iota
|
||||
traceCreateScopeEvt
|
||||
traceDestroyScopeEvt
|
||||
traceReserveMemoryEvt
|
||||
traceBlockReserveMemoryEvt
|
||||
traceReleaseMemoryEvt
|
||||
traceAddStreamEvt
|
||||
traceBlockAddStreamEvt
|
||||
traceRemoveStreamEvt
|
||||
traceAddConnEvt
|
||||
traceBlockAddConnEvt
|
||||
traceRemoveConnEvt
|
||||
)
|
||||
|
||||
type traceEvt struct {
|
||||
Type int
|
||||
|
||||
Scope string `json:"omitempty"`
|
||||
|
||||
Limit interface{} `json:"omitempty"`
|
||||
|
||||
Priority uint8 `json:"omitempty"`
|
||||
|
||||
Delta int64 `json:"omitempty"`
|
||||
DeltaIn int `json:"omitempty"`
|
||||
DeltaOut int `json:"omitempty"`
|
||||
|
||||
Memory int64 `json:"omitempty"`
|
||||
|
||||
StreamsIn int `json:"omitempty"`
|
||||
StreamsOut int `json:"omitempty"`
|
||||
|
||||
ConnsIn int `json:"omitempty"`
|
||||
ConnsOut int `json:"omitempty"`
|
||||
|
||||
FD int `json:"omitempty"`
|
||||
}
|
||||
|
||||
func (t *trace) push(evt interface{}) {
|
||||
t.mx.Lock()
|
||||
defer t.mx.Unlock()
|
||||
|
||||
if t.done {
|
||||
return
|
||||
}
|
||||
|
||||
t.pend = append(t.pend, evt)
|
||||
}
|
||||
|
||||
func (t *trace) background(out io.WriteCloser) {
|
||||
defer close(t.closed)
|
||||
defer out.Close()
|
||||
|
||||
gzOut := gzip.NewWriter(out)
|
||||
defer gzOut.Close()
|
||||
|
||||
jsonOut := json.NewEncoder(gzOut)
|
||||
|
||||
ticker := time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
var pend []interface{}
|
||||
|
||||
getEvents := func() {
|
||||
t.mx.Lock()
|
||||
tmp := t.pend
|
||||
t.pend = pend[:0]
|
||||
pend = tmp
|
||||
t.mx.Unlock()
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
getEvents()
|
||||
|
||||
if err := t.writeEvents(pend, jsonOut); err != nil {
|
||||
log.Warnf("error writing rcmgr trace: %s", err)
|
||||
t.mx.Lock()
|
||||
t.done = true
|
||||
t.mx.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
if err := gzOut.Flush(); err != nil {
|
||||
log.Warnf("error flushing rcmgr trace: %s", err)
|
||||
t.mx.Lock()
|
||||
t.done = true
|
||||
t.mx.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
case <-t.ctx.Done():
|
||||
getEvents()
|
||||
|
||||
if err := t.writeEvents(pend, jsonOut); err != nil {
|
||||
log.Warnf("error writing rcmgr trace: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := gzOut.Flush(); err != nil {
|
||||
log.Warnf("error flushing rcmgr trace: %s", err)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *trace) writeEvents(pend []interface{}, jout *json.Encoder) error {
|
||||
for _, e := range pend {
|
||||
if err := jout.Encode(e); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *trace) Start(limits Limiter) error {
|
||||
if t == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO
|
||||
t.ctx, t.cancel = context.WithCancel(context.Background())
|
||||
t.closed = make(chan struct{})
|
||||
|
||||
out, err := os.OpenFile(t.path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
go t.background(out)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -20,24 +174,42 @@ func (t *trace) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO
|
||||
t.mx.Lock()
|
||||
|
||||
if t.done {
|
||||
t.mx.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
t.cancel()
|
||||
t.done = true
|
||||
t.mx.Unlock()
|
||||
|
||||
<-t.closed
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *trace) CreateScope(name string, limit Limit) {
|
||||
func (t *trace) CreateScope(scope string, limit Limit) {
|
||||
if t == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO
|
||||
t.push(traceEvt{
|
||||
Type: traceCreateScopeEvt,
|
||||
Scope: scope,
|
||||
Limit: limit,
|
||||
})
|
||||
}
|
||||
|
||||
func (t *trace) DestroyScope(name string) {
|
||||
func (t *trace) DestroyScope(scope string) {
|
||||
if t == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO
|
||||
t.push(traceEvt{
|
||||
Type: traceDestroyScopeEvt,
|
||||
Scope: scope,
|
||||
})
|
||||
}
|
||||
|
||||
func (t *trace) ReserveMemory(scope string, prio uint8, size, mem int64) {
|
||||
@ -45,7 +217,13 @@ func (t *trace) ReserveMemory(scope string, prio uint8, size, mem int64) {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO
|
||||
t.push(traceEvt{
|
||||
Type: traceReserveMemoryEvt,
|
||||
Scope: scope,
|
||||
Priority: prio,
|
||||
Delta: size,
|
||||
Memory: mem,
|
||||
})
|
||||
}
|
||||
|
||||
func (t *trace) BlockReserveMemory(scope string, prio uint8, size, mem int64) {
|
||||
@ -53,7 +231,13 @@ func (t *trace) BlockReserveMemory(scope string, prio uint8, size, mem int64) {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO
|
||||
t.push(traceEvt{
|
||||
Type: traceBlockReserveMemoryEvt,
|
||||
Scope: scope,
|
||||
Priority: prio,
|
||||
Delta: size,
|
||||
Memory: mem,
|
||||
})
|
||||
}
|
||||
|
||||
func (t *trace) ReleaseMemory(scope string, size, mem int64) {
|
||||
@ -61,7 +245,12 @@ func (t *trace) ReleaseMemory(scope string, size, mem int64) {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO
|
||||
t.push(traceEvt{
|
||||
Type: traceReleaseMemoryEvt,
|
||||
Scope: scope,
|
||||
Delta: -size,
|
||||
Memory: mem,
|
||||
})
|
||||
}
|
||||
|
||||
func (t *trace) AddStream(scope string, dir network.Direction, nstreamsIn, nstreamsOut int) {
|
||||
@ -69,7 +258,21 @@ func (t *trace) AddStream(scope string, dir network.Direction, nstreamsIn, nstre
|
||||
return
|
||||
}
|
||||
|
||||
// TODO
|
||||
var deltaIn, deltaOut int
|
||||
if dir == network.DirInbound {
|
||||
deltaIn = 1
|
||||
} else {
|
||||
deltaOut = 1
|
||||
}
|
||||
|
||||
t.push(traceEvt{
|
||||
Type: traceAddStreamEvt,
|
||||
Scope: scope,
|
||||
DeltaIn: deltaIn,
|
||||
DeltaOut: deltaOut,
|
||||
StreamsIn: nstreamsIn,
|
||||
StreamsOut: nstreamsOut,
|
||||
})
|
||||
}
|
||||
|
||||
func (t *trace) BlockAddStream(scope string, dir network.Direction, nstreamsIn, nstreamsOut int) {
|
||||
@ -77,7 +280,21 @@ func (t *trace) BlockAddStream(scope string, dir network.Direction, nstreamsIn,
|
||||
return
|
||||
}
|
||||
|
||||
// TODO
|
||||
var deltaIn, deltaOut int
|
||||
if dir == network.DirInbound {
|
||||
deltaIn = 1
|
||||
} else {
|
||||
deltaOut = 1
|
||||
}
|
||||
|
||||
t.push(traceEvt{
|
||||
Type: traceBlockAddStreamEvt,
|
||||
Scope: scope,
|
||||
DeltaIn: deltaIn,
|
||||
DeltaOut: deltaOut,
|
||||
StreamsIn: nstreamsIn,
|
||||
StreamsOut: nstreamsOut,
|
||||
})
|
||||
}
|
||||
|
||||
func (t *trace) RemoveStream(scope string, dir network.Direction, nstreamsIn, nstreamsOut int) {
|
||||
@ -85,31 +302,66 @@ func (t *trace) RemoveStream(scope string, dir network.Direction, nstreamsIn, ns
|
||||
return
|
||||
}
|
||||
|
||||
// TODO
|
||||
var deltaIn, deltaOut int
|
||||
if dir == network.DirInbound {
|
||||
deltaIn = -1
|
||||
} else {
|
||||
deltaOut = -1
|
||||
}
|
||||
|
||||
t.push(traceEvt{
|
||||
Type: traceRemoveStreamEvt,
|
||||
Scope: scope,
|
||||
DeltaIn: deltaIn,
|
||||
DeltaOut: deltaOut,
|
||||
StreamsIn: nstreamsIn,
|
||||
StreamsOut: nstreamsOut,
|
||||
})
|
||||
}
|
||||
|
||||
func (t *trace) AddStreams(scope string, rsvpIn, rsvpOut, nstreamsIn, nstreamsOut int) {
|
||||
func (t *trace) AddStreams(scope string, deltaIn, deltaOut, nstreamsIn, nstreamsOut int) {
|
||||
if t == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO
|
||||
t.push(traceEvt{
|
||||
Type: traceAddStreamEvt,
|
||||
Scope: scope,
|
||||
DeltaIn: deltaIn,
|
||||
DeltaOut: deltaOut,
|
||||
StreamsIn: nstreamsIn,
|
||||
StreamsOut: nstreamsOut,
|
||||
})
|
||||
}
|
||||
|
||||
func (t *trace) BlockAddStreams(scope string, rsvpIn, rsvpOut, nstreamsIn, nstreamsOut int) {
|
||||
func (t *trace) BlockAddStreams(scope string, deltaIn, deltaOut, nstreamsIn, nstreamsOut int) {
|
||||
if t == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO
|
||||
t.push(traceEvt{
|
||||
Type: traceBlockAddStreamEvt,
|
||||
Scope: scope,
|
||||
DeltaIn: deltaIn,
|
||||
DeltaOut: deltaOut,
|
||||
StreamsIn: nstreamsIn,
|
||||
StreamsOut: nstreamsOut,
|
||||
})
|
||||
}
|
||||
|
||||
func (t *trace) RemoveStreams(scope string, rsvpIn, rsvpOut, nstreamsIn, nstreamsOut int) {
|
||||
func (t *trace) RemoveStreams(scope string, deltaIn, deltaOut, nstreamsIn, nstreamsOut int) {
|
||||
if t == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO
|
||||
t.push(traceEvt{
|
||||
Type: traceRemoveStreamEvt,
|
||||
Scope: scope,
|
||||
DeltaIn: -deltaIn,
|
||||
DeltaOut: -deltaOut,
|
||||
StreamsIn: nstreamsIn,
|
||||
StreamsOut: nstreamsOut,
|
||||
})
|
||||
}
|
||||
|
||||
func (t *trace) AddConn(scope string, dir network.Direction, usefd bool, nconnsIn, nconnsOut, nfd int) {
|
||||
@ -117,7 +369,26 @@ func (t *trace) AddConn(scope string, dir network.Direction, usefd bool, nconnsI
|
||||
return
|
||||
}
|
||||
|
||||
// TODO
|
||||
var deltaIn, deltaOut, deltafd int
|
||||
if dir == network.DirInbound {
|
||||
deltaIn = 1
|
||||
} else {
|
||||
deltaOut = 1
|
||||
}
|
||||
if usefd {
|
||||
deltafd = 1
|
||||
}
|
||||
|
||||
t.push(traceEvt{
|
||||
Type: traceAddConnEvt,
|
||||
Scope: scope,
|
||||
DeltaIn: deltaIn,
|
||||
DeltaOut: deltaOut,
|
||||
Delta: int64(deltafd),
|
||||
ConnsIn: nconnsIn,
|
||||
ConnsOut: nconnsOut,
|
||||
FD: nfd,
|
||||
})
|
||||
}
|
||||
|
||||
func (t *trace) BlockAddConn(scope string, dir network.Direction, usefd bool, nconnsIn, nconnsOut, nfd int) {
|
||||
@ -125,7 +396,26 @@ func (t *trace) BlockAddConn(scope string, dir network.Direction, usefd bool, nc
|
||||
return
|
||||
}
|
||||
|
||||
// TODO
|
||||
var deltaIn, deltaOut, deltafd int
|
||||
if dir == network.DirInbound {
|
||||
deltaIn = 1
|
||||
} else {
|
||||
deltaOut = 1
|
||||
}
|
||||
if usefd {
|
||||
deltafd = 1
|
||||
}
|
||||
|
||||
t.push(traceEvt{
|
||||
Type: traceBlockAddConnEvt,
|
||||
Scope: scope,
|
||||
DeltaIn: deltaIn,
|
||||
DeltaOut: deltaOut,
|
||||
Delta: int64(deltafd),
|
||||
ConnsIn: nconnsIn,
|
||||
ConnsOut: nconnsOut,
|
||||
FD: nfd,
|
||||
})
|
||||
}
|
||||
|
||||
func (t *trace) RemoveConn(scope string, dir network.Direction, usefd bool, nconnsIn, nconnsOut, nfd int) {
|
||||
@ -133,29 +423,75 @@ func (t *trace) RemoveConn(scope string, dir network.Direction, usefd bool, ncon
|
||||
return
|
||||
}
|
||||
|
||||
// TODO
|
||||
var deltaIn, deltaOut, deltafd int
|
||||
if dir == network.DirInbound {
|
||||
deltaIn = -1
|
||||
} else {
|
||||
deltaOut = -1
|
||||
}
|
||||
if usefd {
|
||||
deltafd = -1
|
||||
}
|
||||
|
||||
t.push(traceEvt{
|
||||
Type: traceRemoveConnEvt,
|
||||
Scope: scope,
|
||||
DeltaIn: deltaIn,
|
||||
DeltaOut: deltaOut,
|
||||
Delta: int64(deltafd),
|
||||
ConnsIn: nconnsIn,
|
||||
ConnsOut: nconnsOut,
|
||||
FD: nfd,
|
||||
})
|
||||
}
|
||||
|
||||
func (t *trace) AddConns(scope string, rsvpIn, rsvpOut, rsvpFD, nconnsIn, nconnsOut, nfd int) {
|
||||
func (t *trace) AddConns(scope string, deltaIn, deltaOut, deltafd, nconnsIn, nconnsOut, nfd int) {
|
||||
if t == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO
|
||||
t.push(traceEvt{
|
||||
Type: traceAddConnEvt,
|
||||
Scope: scope,
|
||||
DeltaIn: deltaIn,
|
||||
DeltaOut: deltaOut,
|
||||
Delta: int64(deltafd),
|
||||
ConnsIn: nconnsIn,
|
||||
ConnsOut: nconnsOut,
|
||||
FD: nfd,
|
||||
})
|
||||
}
|
||||
|
||||
func (t *trace) BlockAddConns(scope string, rsvpIn, rsvpOut, rsvpFD, nconnsIn, nconnsOut, nfd int) {
|
||||
func (t *trace) BlockAddConns(scope string, deltaIn, deltaOut, deltafd, nconnsIn, nconnsOut, nfd int) {
|
||||
if t == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO
|
||||
t.push(traceEvt{
|
||||
Type: traceBlockAddConnEvt,
|
||||
Scope: scope,
|
||||
DeltaIn: deltaIn,
|
||||
DeltaOut: deltaOut,
|
||||
Delta: int64(deltafd),
|
||||
ConnsIn: nconnsIn,
|
||||
ConnsOut: nconnsOut,
|
||||
FD: nfd,
|
||||
})
|
||||
}
|
||||
|
||||
func (t *trace) RemoveConns(scope string, rsvpIn, rsvpOut, rsvpFD, nconnsIn, nconnsOut, nfd int) {
|
||||
func (t *trace) RemoveConns(scope string, deltaIn, deltaOut, deltafd, nconnsIn, nconnsOut, nfd int) {
|
||||
if t == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO
|
||||
t.push(traceEvt{
|
||||
Type: traceRemoveConnEvt,
|
||||
Scope: scope,
|
||||
DeltaIn: -deltaIn,
|
||||
DeltaOut: -deltaOut,
|
||||
Delta: -int64(deltafd),
|
||||
ConnsIn: nconnsIn,
|
||||
ConnsOut: nconnsOut,
|
||||
FD: nfd,
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user