mirror of
https://github.com/libp2p/go-libp2p-resource-manager.git
synced 2025-01-27 12:50:07 +08:00
f4b09ff457
Arguably, it's not a library's responsibility to create a trace file, and to decide on the compression algorithm. The design is cleaner if we just dump the trace to an io.WriteCloser, and let the application decide how to handle the concrete implementation.
525 lines
8.9 KiB
Go
525 lines
8.9 KiB
Go
package rcmgr
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/libp2p/go-libp2p-core/network"
|
|
)
|
|
|
|
type trace struct {
|
|
out io.WriteCloser
|
|
|
|
ctx context.Context
|
|
cancel func()
|
|
closed chan struct{}
|
|
|
|
mx sync.Mutex
|
|
done bool
|
|
pend []interface{}
|
|
}
|
|
|
|
func WithTrace(out io.WriteCloser) Option {
|
|
return func(r *resourceManager) error {
|
|
r.trace = &trace{out: out}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
const (
|
|
traceStartEvt = "start"
|
|
traceCreateScopeEvt = "create_scope"
|
|
traceDestroyScopeEvt = "destroy_scope"
|
|
traceReserveMemoryEvt = "reserve_memory"
|
|
traceBlockReserveMemoryEvt = "block_reserve_memory"
|
|
traceReleaseMemoryEvt = "release_memory"
|
|
traceAddStreamEvt = "add_stream"
|
|
traceBlockAddStreamEvt = "block_add_stream"
|
|
traceRemoveStreamEvt = "remove_stream"
|
|
traceAddConnEvt = "add_conn"
|
|
traceBlockAddConnEvt = "block_add_conn"
|
|
traceRemoveConnEvt = "remove_conn"
|
|
)
|
|
|
|
type TraceEvt struct {
|
|
Time string
|
|
Type string
|
|
|
|
Scope string `json:",omitempty"`
|
|
|
|
Limit interface{} `json:",omitempty"`
|
|
|
|
Priority uint8 `json:",omitempty"`
|
|
|
|
Delta int64 `json:",omitempty"`
|
|
DeltaIn int `json:",omitempty"`
|
|
DeltaOut int `json:",omitempty"`
|
|
|
|
Memory int64 `json:",omitempty"`
|
|
|
|
StreamsIn int `json:",omitempty"`
|
|
StreamsOut int `json:",omitempty"`
|
|
|
|
ConnsIn int `json:",omitempty"`
|
|
ConnsOut int `json:",omitempty"`
|
|
|
|
FD int `json:",omitempty"`
|
|
}
|
|
|
|
func (t *trace) push(evt TraceEvt) {
|
|
t.mx.Lock()
|
|
defer t.mx.Unlock()
|
|
|
|
if t.done {
|
|
return
|
|
}
|
|
evt.Time = time.Now().Format(time.RFC3339Nano)
|
|
|
|
t.pend = append(t.pend, evt)
|
|
}
|
|
|
|
func (t *trace) background() {
|
|
defer close(t.closed)
|
|
defer t.out.Close()
|
|
|
|
jsonOut := json.NewEncoder(t.out)
|
|
|
|
ticker := time.NewTicker(time.Second)
|
|
defer ticker.Stop()
|
|
|
|
var pend []interface{}
|
|
|
|
getEvents := func() {
|
|
t.mx.Lock()
|
|
tmp := t.pend
|
|
t.pend = pend[:0]
|
|
pend = tmp
|
|
t.mx.Unlock()
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
getEvents()
|
|
|
|
if len(pend) == 0 {
|
|
continue
|
|
}
|
|
|
|
if err := t.writeEvents(pend, jsonOut); err != nil {
|
|
log.Warnf("error writing rcmgr trace: %s", err)
|
|
t.mx.Lock()
|
|
t.done = true
|
|
t.mx.Unlock()
|
|
return
|
|
}
|
|
case <-t.ctx.Done():
|
|
getEvents()
|
|
|
|
if len(pend) == 0 {
|
|
return
|
|
}
|
|
|
|
if err := t.writeEvents(pend, jsonOut); err != nil {
|
|
log.Warnf("error writing rcmgr trace: %s", err)
|
|
return
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *trace) writeEvents(pend []interface{}, jout *json.Encoder) error {
|
|
for _, e := range pend {
|
|
if err := jout.Encode(e); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *trace) Start(limits Limiter) error {
|
|
if t == nil {
|
|
return nil
|
|
}
|
|
|
|
t.ctx, t.cancel = context.WithCancel(context.Background())
|
|
t.closed = make(chan struct{})
|
|
|
|
go t.background()
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceStartEvt,
|
|
Limit: limits,
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *trace) Close() error {
|
|
if t == nil {
|
|
return nil
|
|
}
|
|
|
|
t.mx.Lock()
|
|
|
|
if t.done {
|
|
t.mx.Unlock()
|
|
return nil
|
|
}
|
|
|
|
t.cancel()
|
|
t.done = true
|
|
t.mx.Unlock()
|
|
|
|
<-t.closed
|
|
return nil
|
|
}
|
|
|
|
func (t *trace) CreateScope(scope string, limit Limit) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceCreateScopeEvt,
|
|
Scope: scope,
|
|
Limit: limit,
|
|
})
|
|
}
|
|
|
|
func (t *trace) DestroyScope(scope string) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceDestroyScopeEvt,
|
|
Scope: scope,
|
|
})
|
|
}
|
|
|
|
func (t *trace) ReserveMemory(scope string, prio uint8, size, mem int64) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
if size == 0 {
|
|
return
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceReserveMemoryEvt,
|
|
Scope: scope,
|
|
Priority: prio,
|
|
Delta: size,
|
|
Memory: mem,
|
|
})
|
|
}
|
|
|
|
func (t *trace) BlockReserveMemory(scope string, prio uint8, size, mem int64) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
if size == 0 {
|
|
return
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceBlockReserveMemoryEvt,
|
|
Scope: scope,
|
|
Priority: prio,
|
|
Delta: size,
|
|
Memory: mem,
|
|
})
|
|
}
|
|
|
|
func (t *trace) ReleaseMemory(scope string, size, mem int64) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
if size == 0 {
|
|
return
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceReleaseMemoryEvt,
|
|
Scope: scope,
|
|
Delta: size,
|
|
Memory: mem,
|
|
})
|
|
}
|
|
|
|
func (t *trace) AddStream(scope string, dir network.Direction, nstreamsIn, nstreamsOut int) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
var deltaIn, deltaOut int
|
|
if dir == network.DirInbound {
|
|
deltaIn = 1
|
|
} else {
|
|
deltaOut = 1
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceAddStreamEvt,
|
|
Scope: scope,
|
|
DeltaIn: deltaIn,
|
|
DeltaOut: deltaOut,
|
|
StreamsIn: nstreamsIn,
|
|
StreamsOut: nstreamsOut,
|
|
})
|
|
}
|
|
|
|
func (t *trace) BlockAddStream(scope string, dir network.Direction, nstreamsIn, nstreamsOut int) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
var deltaIn, deltaOut int
|
|
if dir == network.DirInbound {
|
|
deltaIn = 1
|
|
} else {
|
|
deltaOut = 1
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceBlockAddStreamEvt,
|
|
Scope: scope,
|
|
DeltaIn: deltaIn,
|
|
DeltaOut: deltaOut,
|
|
StreamsIn: nstreamsIn,
|
|
StreamsOut: nstreamsOut,
|
|
})
|
|
}
|
|
|
|
func (t *trace) RemoveStream(scope string, dir network.Direction, nstreamsIn, nstreamsOut int) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
var deltaIn, deltaOut int
|
|
if dir == network.DirInbound {
|
|
deltaIn = -1
|
|
} else {
|
|
deltaOut = -1
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceRemoveStreamEvt,
|
|
Scope: scope,
|
|
DeltaIn: deltaIn,
|
|
DeltaOut: deltaOut,
|
|
StreamsIn: nstreamsIn,
|
|
StreamsOut: nstreamsOut,
|
|
})
|
|
}
|
|
|
|
func (t *trace) AddStreams(scope string, deltaIn, deltaOut, nstreamsIn, nstreamsOut int) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
if deltaIn == 0 && deltaOut == 0 {
|
|
return
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceAddStreamEvt,
|
|
Scope: scope,
|
|
DeltaIn: deltaIn,
|
|
DeltaOut: deltaOut,
|
|
StreamsIn: nstreamsIn,
|
|
StreamsOut: nstreamsOut,
|
|
})
|
|
}
|
|
|
|
func (t *trace) BlockAddStreams(scope string, deltaIn, deltaOut, nstreamsIn, nstreamsOut int) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
if deltaIn == 0 && deltaOut == 0 {
|
|
return
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceBlockAddStreamEvt,
|
|
Scope: scope,
|
|
DeltaIn: deltaIn,
|
|
DeltaOut: deltaOut,
|
|
StreamsIn: nstreamsIn,
|
|
StreamsOut: nstreamsOut,
|
|
})
|
|
}
|
|
|
|
func (t *trace) RemoveStreams(scope string, deltaIn, deltaOut, nstreamsIn, nstreamsOut int) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
if deltaIn == 0 && deltaOut == 0 {
|
|
return
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceRemoveStreamEvt,
|
|
Scope: scope,
|
|
DeltaIn: -deltaIn,
|
|
DeltaOut: -deltaOut,
|
|
StreamsIn: nstreamsIn,
|
|
StreamsOut: nstreamsOut,
|
|
})
|
|
}
|
|
|
|
func (t *trace) AddConn(scope string, dir network.Direction, usefd bool, nconnsIn, nconnsOut, nfd int) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
var deltaIn, deltaOut, deltafd int
|
|
if dir == network.DirInbound {
|
|
deltaIn = 1
|
|
} else {
|
|
deltaOut = 1
|
|
}
|
|
if usefd {
|
|
deltafd = 1
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceAddConnEvt,
|
|
Scope: scope,
|
|
DeltaIn: deltaIn,
|
|
DeltaOut: deltaOut,
|
|
Delta: int64(deltafd),
|
|
ConnsIn: nconnsIn,
|
|
ConnsOut: nconnsOut,
|
|
FD: nfd,
|
|
})
|
|
}
|
|
|
|
func (t *trace) BlockAddConn(scope string, dir network.Direction, usefd bool, nconnsIn, nconnsOut, nfd int) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
var deltaIn, deltaOut, deltafd int
|
|
if dir == network.DirInbound {
|
|
deltaIn = 1
|
|
} else {
|
|
deltaOut = 1
|
|
}
|
|
if usefd {
|
|
deltafd = 1
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceBlockAddConnEvt,
|
|
Scope: scope,
|
|
DeltaIn: deltaIn,
|
|
DeltaOut: deltaOut,
|
|
Delta: int64(deltafd),
|
|
ConnsIn: nconnsIn,
|
|
ConnsOut: nconnsOut,
|
|
FD: nfd,
|
|
})
|
|
}
|
|
|
|
func (t *trace) RemoveConn(scope string, dir network.Direction, usefd bool, nconnsIn, nconnsOut, nfd int) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
var deltaIn, deltaOut, deltafd int
|
|
if dir == network.DirInbound {
|
|
deltaIn = -1
|
|
} else {
|
|
deltaOut = -1
|
|
}
|
|
if usefd {
|
|
deltafd = -1
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceRemoveConnEvt,
|
|
Scope: scope,
|
|
DeltaIn: deltaIn,
|
|
DeltaOut: deltaOut,
|
|
Delta: int64(deltafd),
|
|
ConnsIn: nconnsIn,
|
|
ConnsOut: nconnsOut,
|
|
FD: nfd,
|
|
})
|
|
}
|
|
|
|
func (t *trace) AddConns(scope string, deltaIn, deltaOut, deltafd, nconnsIn, nconnsOut, nfd int) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
if deltaIn == 0 && deltaOut == 0 && deltafd == 0 {
|
|
return
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceAddConnEvt,
|
|
Scope: scope,
|
|
DeltaIn: deltaIn,
|
|
DeltaOut: deltaOut,
|
|
Delta: int64(deltafd),
|
|
ConnsIn: nconnsIn,
|
|
ConnsOut: nconnsOut,
|
|
FD: nfd,
|
|
})
|
|
}
|
|
|
|
func (t *trace) BlockAddConns(scope string, deltaIn, deltaOut, deltafd, nconnsIn, nconnsOut, nfd int) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
if deltaIn == 0 && deltaOut == 0 && deltafd == 0 {
|
|
return
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceBlockAddConnEvt,
|
|
Scope: scope,
|
|
DeltaIn: deltaIn,
|
|
DeltaOut: deltaOut,
|
|
Delta: int64(deltafd),
|
|
ConnsIn: nconnsIn,
|
|
ConnsOut: nconnsOut,
|
|
FD: nfd,
|
|
})
|
|
}
|
|
|
|
func (t *trace) RemoveConns(scope string, deltaIn, deltaOut, deltafd, nconnsIn, nconnsOut, nfd int) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
if deltaIn == 0 && deltaOut == 0 && deltafd == 0 {
|
|
return
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceRemoveConnEvt,
|
|
Scope: scope,
|
|
DeltaIn: -deltaIn,
|
|
DeltaOut: -deltaOut,
|
|
Delta: -int64(deltafd),
|
|
ConnsIn: nconnsIn,
|
|
ConnsOut: nconnsOut,
|
|
FD: nfd,
|
|
})
|
|
}
|