go-libp2p-resource-manager/trace.go
Marten Seemann f4b09ff457 don't create a trace file here, write trace to io.Writer
Arguably, it's not a library's responsibility to create a trace file,
and to decide on the compression algorithm. The design is cleaner if we
just dump the trace to an io.WriteCloser, and let the application decide
how to handle the concrete implementation.
2022-06-07 11:44:34 +02:00

525 lines
8.9 KiB
Go

package rcmgr
import (
"context"
"encoding/json"
"io"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/network"
)
type trace struct {
out io.WriteCloser
ctx context.Context
cancel func()
closed chan struct{}
mx sync.Mutex
done bool
pend []interface{}
}
func WithTrace(out io.WriteCloser) Option {
return func(r *resourceManager) error {
r.trace = &trace{out: out}
return nil
}
}
const (
traceStartEvt = "start"
traceCreateScopeEvt = "create_scope"
traceDestroyScopeEvt = "destroy_scope"
traceReserveMemoryEvt = "reserve_memory"
traceBlockReserveMemoryEvt = "block_reserve_memory"
traceReleaseMemoryEvt = "release_memory"
traceAddStreamEvt = "add_stream"
traceBlockAddStreamEvt = "block_add_stream"
traceRemoveStreamEvt = "remove_stream"
traceAddConnEvt = "add_conn"
traceBlockAddConnEvt = "block_add_conn"
traceRemoveConnEvt = "remove_conn"
)
type TraceEvt struct {
Time string
Type string
Scope string `json:",omitempty"`
Limit interface{} `json:",omitempty"`
Priority uint8 `json:",omitempty"`
Delta int64 `json:",omitempty"`
DeltaIn int `json:",omitempty"`
DeltaOut int `json:",omitempty"`
Memory int64 `json:",omitempty"`
StreamsIn int `json:",omitempty"`
StreamsOut int `json:",omitempty"`
ConnsIn int `json:",omitempty"`
ConnsOut int `json:",omitempty"`
FD int `json:",omitempty"`
}
func (t *trace) push(evt TraceEvt) {
t.mx.Lock()
defer t.mx.Unlock()
if t.done {
return
}
evt.Time = time.Now().Format(time.RFC3339Nano)
t.pend = append(t.pend, evt)
}
func (t *trace) background() {
defer close(t.closed)
defer t.out.Close()
jsonOut := json.NewEncoder(t.out)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
var pend []interface{}
getEvents := func() {
t.mx.Lock()
tmp := t.pend
t.pend = pend[:0]
pend = tmp
t.mx.Unlock()
}
for {
select {
case <-ticker.C:
getEvents()
if len(pend) == 0 {
continue
}
if err := t.writeEvents(pend, jsonOut); err != nil {
log.Warnf("error writing rcmgr trace: %s", err)
t.mx.Lock()
t.done = true
t.mx.Unlock()
return
}
case <-t.ctx.Done():
getEvents()
if len(pend) == 0 {
return
}
if err := t.writeEvents(pend, jsonOut); err != nil {
log.Warnf("error writing rcmgr trace: %s", err)
return
}
return
}
}
}
func (t *trace) writeEvents(pend []interface{}, jout *json.Encoder) error {
for _, e := range pend {
if err := jout.Encode(e); err != nil {
return err
}
}
return nil
}
func (t *trace) Start(limits Limiter) error {
if t == nil {
return nil
}
t.ctx, t.cancel = context.WithCancel(context.Background())
t.closed = make(chan struct{})
go t.background()
t.push(TraceEvt{
Type: traceStartEvt,
Limit: limits,
})
return nil
}
func (t *trace) Close() error {
if t == nil {
return nil
}
t.mx.Lock()
if t.done {
t.mx.Unlock()
return nil
}
t.cancel()
t.done = true
t.mx.Unlock()
<-t.closed
return nil
}
func (t *trace) CreateScope(scope string, limit Limit) {
if t == nil {
return
}
t.push(TraceEvt{
Type: traceCreateScopeEvt,
Scope: scope,
Limit: limit,
})
}
func (t *trace) DestroyScope(scope string) {
if t == nil {
return
}
t.push(TraceEvt{
Type: traceDestroyScopeEvt,
Scope: scope,
})
}
func (t *trace) ReserveMemory(scope string, prio uint8, size, mem int64) {
if t == nil {
return
}
if size == 0 {
return
}
t.push(TraceEvt{
Type: traceReserveMemoryEvt,
Scope: scope,
Priority: prio,
Delta: size,
Memory: mem,
})
}
func (t *trace) BlockReserveMemory(scope string, prio uint8, size, mem int64) {
if t == nil {
return
}
if size == 0 {
return
}
t.push(TraceEvt{
Type: traceBlockReserveMemoryEvt,
Scope: scope,
Priority: prio,
Delta: size,
Memory: mem,
})
}
func (t *trace) ReleaseMemory(scope string, size, mem int64) {
if t == nil {
return
}
if size == 0 {
return
}
t.push(TraceEvt{
Type: traceReleaseMemoryEvt,
Scope: scope,
Delta: size,
Memory: mem,
})
}
func (t *trace) AddStream(scope string, dir network.Direction, nstreamsIn, nstreamsOut int) {
if t == nil {
return
}
var deltaIn, deltaOut int
if dir == network.DirInbound {
deltaIn = 1
} else {
deltaOut = 1
}
t.push(TraceEvt{
Type: traceAddStreamEvt,
Scope: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
StreamsIn: nstreamsIn,
StreamsOut: nstreamsOut,
})
}
func (t *trace) BlockAddStream(scope string, dir network.Direction, nstreamsIn, nstreamsOut int) {
if t == nil {
return
}
var deltaIn, deltaOut int
if dir == network.DirInbound {
deltaIn = 1
} else {
deltaOut = 1
}
t.push(TraceEvt{
Type: traceBlockAddStreamEvt,
Scope: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
StreamsIn: nstreamsIn,
StreamsOut: nstreamsOut,
})
}
func (t *trace) RemoveStream(scope string, dir network.Direction, nstreamsIn, nstreamsOut int) {
if t == nil {
return
}
var deltaIn, deltaOut int
if dir == network.DirInbound {
deltaIn = -1
} else {
deltaOut = -1
}
t.push(TraceEvt{
Type: traceRemoveStreamEvt,
Scope: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
StreamsIn: nstreamsIn,
StreamsOut: nstreamsOut,
})
}
func (t *trace) AddStreams(scope string, deltaIn, deltaOut, nstreamsIn, nstreamsOut int) {
if t == nil {
return
}
if deltaIn == 0 && deltaOut == 0 {
return
}
t.push(TraceEvt{
Type: traceAddStreamEvt,
Scope: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
StreamsIn: nstreamsIn,
StreamsOut: nstreamsOut,
})
}
func (t *trace) BlockAddStreams(scope string, deltaIn, deltaOut, nstreamsIn, nstreamsOut int) {
if t == nil {
return
}
if deltaIn == 0 && deltaOut == 0 {
return
}
t.push(TraceEvt{
Type: traceBlockAddStreamEvt,
Scope: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
StreamsIn: nstreamsIn,
StreamsOut: nstreamsOut,
})
}
func (t *trace) RemoveStreams(scope string, deltaIn, deltaOut, nstreamsIn, nstreamsOut int) {
if t == nil {
return
}
if deltaIn == 0 && deltaOut == 0 {
return
}
t.push(TraceEvt{
Type: traceRemoveStreamEvt,
Scope: scope,
DeltaIn: -deltaIn,
DeltaOut: -deltaOut,
StreamsIn: nstreamsIn,
StreamsOut: nstreamsOut,
})
}
func (t *trace) AddConn(scope string, dir network.Direction, usefd bool, nconnsIn, nconnsOut, nfd int) {
if t == nil {
return
}
var deltaIn, deltaOut, deltafd int
if dir == network.DirInbound {
deltaIn = 1
} else {
deltaOut = 1
}
if usefd {
deltafd = 1
}
t.push(TraceEvt{
Type: traceAddConnEvt,
Scope: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
Delta: int64(deltafd),
ConnsIn: nconnsIn,
ConnsOut: nconnsOut,
FD: nfd,
})
}
func (t *trace) BlockAddConn(scope string, dir network.Direction, usefd bool, nconnsIn, nconnsOut, nfd int) {
if t == nil {
return
}
var deltaIn, deltaOut, deltafd int
if dir == network.DirInbound {
deltaIn = 1
} else {
deltaOut = 1
}
if usefd {
deltafd = 1
}
t.push(TraceEvt{
Type: traceBlockAddConnEvt,
Scope: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
Delta: int64(deltafd),
ConnsIn: nconnsIn,
ConnsOut: nconnsOut,
FD: nfd,
})
}
func (t *trace) RemoveConn(scope string, dir network.Direction, usefd bool, nconnsIn, nconnsOut, nfd int) {
if t == nil {
return
}
var deltaIn, deltaOut, deltafd int
if dir == network.DirInbound {
deltaIn = -1
} else {
deltaOut = -1
}
if usefd {
deltafd = -1
}
t.push(TraceEvt{
Type: traceRemoveConnEvt,
Scope: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
Delta: int64(deltafd),
ConnsIn: nconnsIn,
ConnsOut: nconnsOut,
FD: nfd,
})
}
func (t *trace) AddConns(scope string, deltaIn, deltaOut, deltafd, nconnsIn, nconnsOut, nfd int) {
if t == nil {
return
}
if deltaIn == 0 && deltaOut == 0 && deltafd == 0 {
return
}
t.push(TraceEvt{
Type: traceAddConnEvt,
Scope: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
Delta: int64(deltafd),
ConnsIn: nconnsIn,
ConnsOut: nconnsOut,
FD: nfd,
})
}
func (t *trace) BlockAddConns(scope string, deltaIn, deltaOut, deltafd, nconnsIn, nconnsOut, nfd int) {
if t == nil {
return
}
if deltaIn == 0 && deltaOut == 0 && deltafd == 0 {
return
}
t.push(TraceEvt{
Type: traceBlockAddConnEvt,
Scope: scope,
DeltaIn: deltaIn,
DeltaOut: deltaOut,
Delta: int64(deltafd),
ConnsIn: nconnsIn,
ConnsOut: nconnsOut,
FD: nfd,
})
}
func (t *trace) RemoveConns(scope string, deltaIn, deltaOut, deltafd, nconnsIn, nconnsOut, nfd int) {
if t == nil {
return
}
if deltaIn == 0 && deltaOut == 0 && deltafd == 0 {
return
}
t.push(TraceEvt{
Type: traceRemoveConnEvt,
Scope: scope,
DeltaIn: -deltaIn,
DeltaOut: -deltaOut,
Delta: -int64(deltafd),
ConnsIn: nconnsIn,
ConnsOut: nconnsOut,
FD: nfd,
})
}