mirror of
https://github.com/libp2p/go-libp2p-resource-manager.git
synced 2025-01-27 12:50:07 +08:00
fd5de7a9d8
Flip sign bit of release memory trace event to be inline with the other uses of delta in trace.go
668 lines
12 KiB
Go
668 lines
12 KiB
Go
package rcmgr
|
|
|
|
import (
|
|
"compress/gzip"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/libp2p/go-libp2p-core/network"
|
|
)
|
|
|
|
type trace struct {
|
|
path string
|
|
|
|
ctx context.Context
|
|
cancel func()
|
|
closed chan struct{}
|
|
|
|
mx sync.Mutex
|
|
done bool
|
|
pend []interface{}
|
|
}
|
|
|
|
func WithTrace(path string) Option {
|
|
return func(r *resourceManager) error {
|
|
r.trace = &trace{path: path}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
const (
|
|
traceStartEvt = "start"
|
|
traceCreateScopeEvt = "create_scope"
|
|
traceDestroyScopeEvt = "destroy_scope"
|
|
traceReserveMemoryEvt = "reserve_memory"
|
|
traceBlockReserveMemoryEvt = "block_reserve_memory"
|
|
traceReleaseMemoryEvt = "release_memory"
|
|
traceAddStreamEvt = "add_stream"
|
|
traceBlockAddStreamEvt = "block_add_stream"
|
|
traceRemoveStreamEvt = "remove_stream"
|
|
traceAddConnEvt = "add_conn"
|
|
traceBlockAddConnEvt = "block_add_conn"
|
|
traceRemoveConnEvt = "remove_conn"
|
|
)
|
|
|
|
type scopeClass struct {
|
|
name string
|
|
}
|
|
|
|
func (s scopeClass) MarshalJSON() ([]byte, error) {
|
|
name := s.name
|
|
var span string
|
|
if idx := strings.Index(name, "span:"); idx > -1 {
|
|
name = name[:idx-1]
|
|
span = name[idx+5:]
|
|
}
|
|
// System and Transient scope
|
|
if name == "system" || name == "transient" {
|
|
return json.Marshal(struct {
|
|
Class string
|
|
Span string `json:",omitempty"`
|
|
}{
|
|
Class: name,
|
|
Span: span,
|
|
})
|
|
}
|
|
// Connection scope
|
|
if strings.HasPrefix(name, "conn-") {
|
|
return json.Marshal(struct {
|
|
Class string
|
|
Conn string
|
|
Span string `json:",omitempty"`
|
|
}{
|
|
Class: "conn",
|
|
Conn: name[5:],
|
|
Span: span,
|
|
})
|
|
}
|
|
// Stream scope
|
|
if strings.HasPrefix(name, "stream-") {
|
|
return json.Marshal(struct {
|
|
Class string
|
|
Stream string
|
|
Span string `json:",omitempty"`
|
|
}{
|
|
Class: "stream",
|
|
Stream: name[7:],
|
|
Span: span,
|
|
})
|
|
}
|
|
// Peer scope
|
|
if strings.HasPrefix(name, "peer:") {
|
|
return json.Marshal(struct {
|
|
Class string
|
|
Peer string
|
|
Span string `json:",omitempty"`
|
|
}{
|
|
Class: "peer",
|
|
Peer: name[5:],
|
|
Span: span,
|
|
})
|
|
}
|
|
|
|
if strings.HasPrefix(name, "service:") {
|
|
if idx := strings.Index(name, "peer:"); idx > 0 { // Peer-Service scope
|
|
return json.Marshal(struct {
|
|
Class string
|
|
Service string
|
|
Peer string
|
|
Span string `json:",omitempty"`
|
|
}{
|
|
Class: "service-peer",
|
|
Service: name[8 : idx-1],
|
|
Peer: name[idx+5:],
|
|
Span: span,
|
|
})
|
|
} else { // Service scope
|
|
return json.Marshal(struct {
|
|
Class string
|
|
Service string
|
|
Span string `json:",omitempty"`
|
|
}{
|
|
Class: "service",
|
|
Service: name[8:],
|
|
Span: span,
|
|
})
|
|
}
|
|
}
|
|
|
|
if strings.HasPrefix(name, "protocol:") {
|
|
if idx := strings.Index(name, "peer:"); idx > -1 { // Peer-Protocol scope
|
|
return json.Marshal(struct {
|
|
Class string
|
|
Protocol string
|
|
Peer string
|
|
Span string `json:",omitempty"`
|
|
}{
|
|
Class: "protocol-peer",
|
|
Protocol: name[9 : idx-1],
|
|
Peer: name[idx+5:],
|
|
Span: span,
|
|
})
|
|
} else { // Protocol scope
|
|
return json.Marshal(struct {
|
|
Class string
|
|
Protocol string
|
|
Span string `json:",omitempty"`
|
|
}{
|
|
Class: "protocol",
|
|
Protocol: name[9:],
|
|
Span: span,
|
|
})
|
|
}
|
|
}
|
|
|
|
return nil, fmt.Errorf("unrecognized scope: %s", name)
|
|
}
|
|
|
|
type TraceEvt struct {
|
|
Time string
|
|
Type string
|
|
|
|
Scope *scopeClass `json:",omitempty"`
|
|
Name string `json:",omitempty"`
|
|
|
|
Limit interface{} `json:",omitempty"`
|
|
|
|
Priority uint8 `json:",omitempty"`
|
|
|
|
Delta int64 `json:",omitempty"`
|
|
DeltaIn int `json:",omitempty"`
|
|
DeltaOut int `json:",omitempty"`
|
|
|
|
Memory int64 `json:",omitempty"`
|
|
|
|
StreamsIn int `json:",omitempty"`
|
|
StreamsOut int `json:",omitempty"`
|
|
|
|
ConnsIn int `json:",omitempty"`
|
|
ConnsOut int `json:",omitempty"`
|
|
|
|
FD int `json:",omitempty"`
|
|
}
|
|
|
|
func (t *trace) push(evt TraceEvt) {
|
|
t.mx.Lock()
|
|
defer t.mx.Unlock()
|
|
|
|
if t.done {
|
|
return
|
|
}
|
|
evt.Time = time.Now().Format(time.RFC3339Nano)
|
|
if evt.Name != "" {
|
|
evt.Scope = &scopeClass{name: evt.Name}
|
|
}
|
|
|
|
t.pend = append(t.pend, evt)
|
|
}
|
|
|
|
func (t *trace) background(out io.WriteCloser) {
|
|
defer close(t.closed)
|
|
defer out.Close()
|
|
|
|
gzOut := gzip.NewWriter(out)
|
|
defer gzOut.Close()
|
|
|
|
jsonOut := json.NewEncoder(gzOut)
|
|
|
|
ticker := time.NewTicker(time.Second)
|
|
defer ticker.Stop()
|
|
|
|
var pend []interface{}
|
|
|
|
getEvents := func() {
|
|
t.mx.Lock()
|
|
tmp := t.pend
|
|
t.pend = pend[:0]
|
|
pend = tmp
|
|
t.mx.Unlock()
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
getEvents()
|
|
|
|
if len(pend) == 0 {
|
|
continue
|
|
}
|
|
|
|
if err := t.writeEvents(pend, jsonOut); err != nil {
|
|
log.Warnf("error writing rcmgr trace: %s", err)
|
|
t.mx.Lock()
|
|
t.done = true
|
|
t.mx.Unlock()
|
|
return
|
|
}
|
|
|
|
if err := gzOut.Flush(); err != nil {
|
|
log.Warnf("error flushing rcmgr trace: %s", err)
|
|
t.mx.Lock()
|
|
t.done = true
|
|
t.mx.Unlock()
|
|
return
|
|
}
|
|
|
|
case <-t.ctx.Done():
|
|
getEvents()
|
|
|
|
if len(pend) == 0 {
|
|
return
|
|
}
|
|
|
|
if err := t.writeEvents(pend, jsonOut); err != nil {
|
|
log.Warnf("error writing rcmgr trace: %s", err)
|
|
return
|
|
}
|
|
|
|
if err := gzOut.Flush(); err != nil {
|
|
log.Warnf("error flushing rcmgr trace: %s", err)
|
|
}
|
|
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *trace) writeEvents(pend []interface{}, jout *json.Encoder) error {
|
|
for _, e := range pend {
|
|
if err := jout.Encode(e); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *trace) Start(limits Limiter) error {
|
|
if t == nil {
|
|
return nil
|
|
}
|
|
|
|
t.ctx, t.cancel = context.WithCancel(context.Background())
|
|
t.closed = make(chan struct{})
|
|
|
|
out, err := os.OpenFile(t.path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
go t.background(out)
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceStartEvt,
|
|
Limit: limits,
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *trace) Close() error {
|
|
if t == nil {
|
|
return nil
|
|
}
|
|
|
|
t.mx.Lock()
|
|
|
|
if t.done {
|
|
t.mx.Unlock()
|
|
return nil
|
|
}
|
|
|
|
t.cancel()
|
|
t.done = true
|
|
t.mx.Unlock()
|
|
|
|
<-t.closed
|
|
return nil
|
|
}
|
|
|
|
func (t *trace) CreateScope(scope string, limit Limit) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceCreateScopeEvt,
|
|
Name: scope,
|
|
Limit: limit,
|
|
})
|
|
}
|
|
|
|
func (t *trace) DestroyScope(scope string) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceDestroyScopeEvt,
|
|
Name: scope,
|
|
})
|
|
}
|
|
|
|
func (t *trace) ReserveMemory(scope string, prio uint8, size, mem int64) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
if size == 0 {
|
|
return
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceReserveMemoryEvt,
|
|
Name: scope,
|
|
Priority: prio,
|
|
Delta: size,
|
|
Memory: mem,
|
|
})
|
|
}
|
|
|
|
func (t *trace) BlockReserveMemory(scope string, prio uint8, size, mem int64) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
if size == 0 {
|
|
return
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceBlockReserveMemoryEvt,
|
|
Name: scope,
|
|
Priority: prio,
|
|
Delta: size,
|
|
Memory: mem,
|
|
})
|
|
}
|
|
|
|
func (t *trace) ReleaseMemory(scope string, size, mem int64) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
if size == 0 {
|
|
return
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceReleaseMemoryEvt,
|
|
Name: scope,
|
|
Delta: -size,
|
|
Memory: mem,
|
|
})
|
|
}
|
|
|
|
func (t *trace) AddStream(scope string, dir network.Direction, nstreamsIn, nstreamsOut int) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
var deltaIn, deltaOut int
|
|
if dir == network.DirInbound {
|
|
deltaIn = 1
|
|
} else {
|
|
deltaOut = 1
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceAddStreamEvt,
|
|
Name: scope,
|
|
DeltaIn: deltaIn,
|
|
DeltaOut: deltaOut,
|
|
StreamsIn: nstreamsIn,
|
|
StreamsOut: nstreamsOut,
|
|
})
|
|
}
|
|
|
|
func (t *trace) BlockAddStream(scope string, dir network.Direction, nstreamsIn, nstreamsOut int) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
var deltaIn, deltaOut int
|
|
if dir == network.DirInbound {
|
|
deltaIn = 1
|
|
} else {
|
|
deltaOut = 1
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceBlockAddStreamEvt,
|
|
Name: scope,
|
|
DeltaIn: deltaIn,
|
|
DeltaOut: deltaOut,
|
|
StreamsIn: nstreamsIn,
|
|
StreamsOut: nstreamsOut,
|
|
})
|
|
}
|
|
|
|
func (t *trace) RemoveStream(scope string, dir network.Direction, nstreamsIn, nstreamsOut int) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
var deltaIn, deltaOut int
|
|
if dir == network.DirInbound {
|
|
deltaIn = -1
|
|
} else {
|
|
deltaOut = -1
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceRemoveStreamEvt,
|
|
Name: scope,
|
|
DeltaIn: deltaIn,
|
|
DeltaOut: deltaOut,
|
|
StreamsIn: nstreamsIn,
|
|
StreamsOut: nstreamsOut,
|
|
})
|
|
}
|
|
|
|
func (t *trace) AddStreams(scope string, deltaIn, deltaOut, nstreamsIn, nstreamsOut int) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
if deltaIn == 0 && deltaOut == 0 {
|
|
return
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceAddStreamEvt,
|
|
Name: scope,
|
|
DeltaIn: deltaIn,
|
|
DeltaOut: deltaOut,
|
|
StreamsIn: nstreamsIn,
|
|
StreamsOut: nstreamsOut,
|
|
})
|
|
}
|
|
|
|
func (t *trace) BlockAddStreams(scope string, deltaIn, deltaOut, nstreamsIn, nstreamsOut int) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
if deltaIn == 0 && deltaOut == 0 {
|
|
return
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceBlockAddStreamEvt,
|
|
Name: scope,
|
|
DeltaIn: deltaIn,
|
|
DeltaOut: deltaOut,
|
|
StreamsIn: nstreamsIn,
|
|
StreamsOut: nstreamsOut,
|
|
})
|
|
}
|
|
|
|
func (t *trace) RemoveStreams(scope string, deltaIn, deltaOut, nstreamsIn, nstreamsOut int) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
if deltaIn == 0 && deltaOut == 0 {
|
|
return
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceRemoveStreamEvt,
|
|
Name: scope,
|
|
DeltaIn: -deltaIn,
|
|
DeltaOut: -deltaOut,
|
|
StreamsIn: nstreamsIn,
|
|
StreamsOut: nstreamsOut,
|
|
})
|
|
}
|
|
|
|
func (t *trace) AddConn(scope string, dir network.Direction, usefd bool, nconnsIn, nconnsOut, nfd int) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
var deltaIn, deltaOut, deltafd int
|
|
if dir == network.DirInbound {
|
|
deltaIn = 1
|
|
} else {
|
|
deltaOut = 1
|
|
}
|
|
if usefd {
|
|
deltafd = 1
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceAddConnEvt,
|
|
Name: scope,
|
|
DeltaIn: deltaIn,
|
|
DeltaOut: deltaOut,
|
|
Delta: int64(deltafd),
|
|
ConnsIn: nconnsIn,
|
|
ConnsOut: nconnsOut,
|
|
FD: nfd,
|
|
})
|
|
}
|
|
|
|
func (t *trace) BlockAddConn(scope string, dir network.Direction, usefd bool, nconnsIn, nconnsOut, nfd int) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
var deltaIn, deltaOut, deltafd int
|
|
if dir == network.DirInbound {
|
|
deltaIn = 1
|
|
} else {
|
|
deltaOut = 1
|
|
}
|
|
if usefd {
|
|
deltafd = 1
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceBlockAddConnEvt,
|
|
Name: scope,
|
|
DeltaIn: deltaIn,
|
|
DeltaOut: deltaOut,
|
|
Delta: int64(deltafd),
|
|
ConnsIn: nconnsIn,
|
|
ConnsOut: nconnsOut,
|
|
FD: nfd,
|
|
})
|
|
}
|
|
|
|
func (t *trace) RemoveConn(scope string, dir network.Direction, usefd bool, nconnsIn, nconnsOut, nfd int) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
var deltaIn, deltaOut, deltafd int
|
|
if dir == network.DirInbound {
|
|
deltaIn = -1
|
|
} else {
|
|
deltaOut = -1
|
|
}
|
|
if usefd {
|
|
deltafd = -1
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceRemoveConnEvt,
|
|
Name: scope,
|
|
DeltaIn: deltaIn,
|
|
DeltaOut: deltaOut,
|
|
Delta: int64(deltafd),
|
|
ConnsIn: nconnsIn,
|
|
ConnsOut: nconnsOut,
|
|
FD: nfd,
|
|
})
|
|
}
|
|
|
|
func (t *trace) AddConns(scope string, deltaIn, deltaOut, deltafd, nconnsIn, nconnsOut, nfd int) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
if deltaIn == 0 && deltaOut == 0 && deltafd == 0 {
|
|
return
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceAddConnEvt,
|
|
Name: scope,
|
|
DeltaIn: deltaIn,
|
|
DeltaOut: deltaOut,
|
|
Delta: int64(deltafd),
|
|
ConnsIn: nconnsIn,
|
|
ConnsOut: nconnsOut,
|
|
FD: nfd,
|
|
})
|
|
}
|
|
|
|
func (t *trace) BlockAddConns(scope string, deltaIn, deltaOut, deltafd, nconnsIn, nconnsOut, nfd int) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
if deltaIn == 0 && deltaOut == 0 && deltafd == 0 {
|
|
return
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceBlockAddConnEvt,
|
|
Name: scope,
|
|
DeltaIn: deltaIn,
|
|
DeltaOut: deltaOut,
|
|
Delta: int64(deltafd),
|
|
ConnsIn: nconnsIn,
|
|
ConnsOut: nconnsOut,
|
|
FD: nfd,
|
|
})
|
|
}
|
|
|
|
func (t *trace) RemoveConns(scope string, deltaIn, deltaOut, deltafd, nconnsIn, nconnsOut, nfd int) {
|
|
if t == nil {
|
|
return
|
|
}
|
|
|
|
if deltaIn == 0 && deltaOut == 0 && deltafd == 0 {
|
|
return
|
|
}
|
|
|
|
t.push(TraceEvt{
|
|
Type: traceRemoveConnEvt,
|
|
Name: scope,
|
|
DeltaIn: -deltaIn,
|
|
DeltaOut: -deltaOut,
|
|
Delta: -int64(deltafd),
|
|
ConnsIn: nconnsIn,
|
|
ConnsOut: nconnsOut,
|
|
FD: nfd,
|
|
})
|
|
}
|