2022-06-21 07:56:01 +08:00
package obs
import (
"context"
"strings"
rcmgr "github.com/libp2p/go-libp2p-resource-manager"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)
var (
2022-08-12 07:18:57 +08:00
metricNamespace = "rcmgr/"
conns = stats . Int64 ( metricNamespace + "connections" , "Number of Connections" , stats . UnitDimensionless )
2022-06-21 07:56:01 +08:00
2022-08-12 07:18:57 +08:00
peerConns = stats . Int64 ( metricNamespace + "peer/connections" , "Number of connections this peer has" , stats . UnitDimensionless )
peerConnsNegative = stats . Int64 ( metricNamespace + "peer/connections_negative" , "Number of connections this peer had. This is used to get the current connection number per peer histogram by subtracting this from the peer/connections histogram" , stats . UnitDimensionless )
2022-06-21 07:56:01 +08:00
2022-08-12 07:18:57 +08:00
streams = stats . Int64 ( metricNamespace + "streams" , "Number of Streams" , stats . UnitDimensionless )
2022-06-21 07:56:01 +08:00
2022-08-12 07:18:57 +08:00
peerStreams = stats . Int64 ( metricNamespace + "peer/streams" , "Number of streams this peer has" , stats . UnitDimensionless )
peerStreamsNegative = stats . Int64 ( metricNamespace + "peer/streams_negative" , "Number of streams this peer had. This is used to get the current streams number per peer histogram by subtracting this from the peer/streams histogram" , stats . UnitDimensionless )
2022-06-21 07:56:01 +08:00
2022-08-12 07:18:57 +08:00
memory = stats . Int64 ( metricNamespace + "memory" , "Amount of memory reserved as reported to the Resource Manager" , stats . UnitDimensionless )
peerMemory = stats . Int64 ( metricNamespace + "peer/memory" , "Amount of memory currently reseved for peer" , stats . UnitDimensionless )
peerMemoryNegative = stats . Int64 ( metricNamespace + "peer/memory_negative" , "Amount of memory previously reseved for peer. This is used to get the current memory per peer histogram by subtracting this from the peer/memory histogram" , stats . UnitDimensionless )
2022-06-21 07:56:01 +08:00
2022-08-12 07:18:57 +08:00
connMemory = stats . Int64 ( metricNamespace + "conn/memory" , "Amount of memory currently reseved for the connection" , stats . UnitDimensionless )
connMemoryNegative = stats . Int64 ( metricNamespace + "conn/memory_negative" , "Amount of memory previously reseved for the connection. This is used to get the current memory per connection histogram by subtracting this from the conn/memory histogram" , stats . UnitDimensionless )
2022-06-21 07:56:01 +08:00
2022-08-12 07:18:57 +08:00
fds = stats . Int64 ( metricNamespace + "fds" , "Number of fds as reported to the Resource Manager" , stats . UnitDimensionless )
2022-06-21 07:56:01 +08:00
2022-08-12 07:18:57 +08:00
blockedResources = stats . Int64 ( metricNamespace + "blocked_resources" , "Number of resource requests blocked" , stats . UnitDimensionless )
2022-06-21 07:56:01 +08:00
)
var (
2022-06-28 06:58:42 +08:00
directionTag , _ = tag . NewKey ( "dir" )
scopeTag , _ = tag . NewKey ( "scope" )
serviceTag , _ = tag . NewKey ( "service" )
protocolTag , _ = tag . NewKey ( "protocol" )
resourceTag , _ = tag . NewKey ( "resource" )
2022-06-21 07:56:01 +08:00
)
var (
2022-06-28 06:58:42 +08:00
ConnView = & view . View { Measure : conns , Aggregation : view . Sum ( ) , TagKeys : [ ] tag . Key { directionTag , scopeTag } }
2022-06-21 07:56:01 +08:00
2022-06-24 06:30:13 +08:00
oneTenThenExpDistribution = [ ] float64 {
1.1 , 2.1 , 3.1 , 4.1 , 5.1 , 6.1 , 7.1 , 8.1 , 9.1 , 10.1 , 16.1 , 32.1 , 64.1 , 128.1 , 256.1 ,
2022-06-21 07:56:01 +08:00
}
PeerConnsView = & view . View {
Measure : peerConns ,
2022-06-24 06:30:13 +08:00
Aggregation : view . Distribution ( oneTenThenExpDistribution ... ) ,
2022-06-28 06:58:42 +08:00
TagKeys : [ ] tag . Key { directionTag } ,
2022-06-21 07:56:01 +08:00
}
PeerConnsNegativeView = & view . View {
Measure : peerConnsNegative ,
2022-06-24 06:30:13 +08:00
Aggregation : view . Distribution ( oneTenThenExpDistribution ... ) ,
2022-06-28 06:58:42 +08:00
TagKeys : [ ] tag . Key { directionTag } ,
2022-06-21 07:56:01 +08:00
}
2022-06-28 06:58:42 +08:00
StreamView = & view . View { Measure : streams , Aggregation : view . Sum ( ) , TagKeys : [ ] tag . Key { directionTag , scopeTag , serviceTag , protocolTag } }
PeerStreamsView = & view . View { Measure : peerStreams , Aggregation : view . Distribution ( oneTenThenExpDistribution ... ) , TagKeys : [ ] tag . Key { directionTag } }
PeerStreamNegativeView = & view . View { Measure : peerStreamsNegative , Aggregation : view . Distribution ( oneTenThenExpDistribution ... ) , TagKeys : [ ] tag . Key { directionTag } }
2022-06-21 07:56:01 +08:00
2022-06-28 06:58:42 +08:00
MemoryView = & view . View { Measure : memory , Aggregation : view . Sum ( ) , TagKeys : [ ] tag . Key { scopeTag , serviceTag , protocolTag } }
2022-06-21 07:56:01 +08:00
memDistribution = [ ] float64 {
2022-06-30 12:06:43 +08:00
1 << 10 , // 1KB
4 << 10 , // 4KB
32 << 10 , // 32KB
1 << 20 , // 1MB
32 << 20 , // 32MB
256 << 20 , // 256MB
512 << 20 , // 512MB
1 << 30 , // 1GB
2 << 30 , // 2GB
4 << 30 , // 4GB
2022-06-21 07:56:01 +08:00
}
PeerMemoryView = & view . View {
Measure : peerMemory ,
Aggregation : view . Distribution ( memDistribution ... ) ,
}
PeerMemoryNegativeView = & view . View {
Measure : peerMemoryNegative ,
Aggregation : view . Distribution ( memDistribution ... ) ,
}
// Not setup yet. Memory isn't attached to a given connection.
ConnMemoryView = & view . View {
Measure : connMemory ,
Aggregation : view . Distribution ( memDistribution ... ) ,
}
ConnMemoryNegativeView = & view . View {
Measure : connMemoryNegative ,
Aggregation : view . Distribution ( memDistribution ... ) ,
}
2022-06-28 06:58:42 +08:00
FDsView = & view . View { Measure : fds , Aggregation : view . Sum ( ) , TagKeys : [ ] tag . Key { scopeTag } }
2022-06-21 07:56:01 +08:00
BlockedResourcesView = & view . View {
Measure : blockedResources ,
Aggregation : view . Sum ( ) ,
2022-06-28 06:58:42 +08:00
TagKeys : [ ] tag . Key { scopeTag , resourceTag } ,
2022-06-21 07:56:01 +08:00
}
)
var DefaultViews [ ] * view . View = [ ] * view . View {
ConnView ,
PeerConnsView ,
PeerConnsNegativeView ,
FDsView ,
StreamView ,
PeerStreamsView ,
PeerStreamNegativeView ,
MemoryView ,
PeerMemoryView ,
PeerMemoryNegativeView ,
BlockedResourcesView ,
}
// StatsTraceReporter reports stats on the resource manager using its traces.
type StatsTraceReporter struct { }
func NewStatsTraceReporter ( ) ( StatsTraceReporter , error ) {
2022-06-24 06:30:13 +08:00
// TODO tell prometheus the system limits
2022-06-21 07:56:01 +08:00
return StatsTraceReporter { } , nil
}
func ( r StatsTraceReporter ) ConsumeEvent ( evt rcmgr . TraceEvt ) {
ctx := context . Background ( )
switch evt . Type {
case rcmgr . TraceAddStreamEvt , rcmgr . TraceRemoveStreamEvt :
2022-06-21 09:31:09 +08:00
if p := rcmgr . ParsePeerScopeName ( evt . Name ) ; p . Validate ( ) == nil {
2022-06-21 07:56:01 +08:00
// Aggregated peer stats. Counts how many peers have N number of streams open.
// Uses two buckets aggregations. One to count how many streams the
// peer has now. The other to count the negative value, or how many
// streams did the peer use to have. When looking at the data you
// take the difference from the two.
oldStreamsOut := int64 ( evt . StreamsOut - evt . DeltaOut )
peerStreamsOut := int64 ( evt . StreamsOut )
if oldStreamsOut != peerStreamsOut {
if oldStreamsOut != 0 {
2022-06-28 06:58:42 +08:00
stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( directionTag , "outbound" ) } , peerStreamsNegative . M ( oldStreamsOut ) )
2022-06-21 07:56:01 +08:00
}
if peerStreamsOut != 0 {
2022-06-28 06:58:42 +08:00
stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( directionTag , "outbound" ) } , peerStreams . M ( peerStreamsOut ) )
2022-06-21 07:56:01 +08:00
}
}
oldStreamsIn := int64 ( evt . StreamsIn - evt . DeltaIn )
peerStreamsIn := int64 ( evt . StreamsIn )
if oldStreamsIn != peerStreamsIn {
if oldStreamsIn != 0 {
2022-06-28 06:58:42 +08:00
stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( directionTag , "inbound" ) } , peerStreamsNegative . M ( oldStreamsIn ) )
2022-06-21 07:56:01 +08:00
}
if peerStreamsIn != 0 {
2022-06-28 06:58:42 +08:00
stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( directionTag , "inbound" ) } , peerStreams . M ( peerStreamsIn ) )
2022-06-21 07:56:01 +08:00
}
}
} else {
var tags [ ] tag . Mutator
2022-06-21 09:31:09 +08:00
if rcmgr . IsSystemScope ( evt . Name ) || rcmgr . IsTransientScope ( evt . Name ) {
2022-06-28 06:58:42 +08:00
tags = append ( tags , tag . Upsert ( scopeTag , evt . Name ) )
2022-06-21 09:31:09 +08:00
} else if svc := rcmgr . ParseServiceScopeName ( evt . Name ) ; svc != "" {
2022-06-28 06:58:42 +08:00
tags = append ( tags , tag . Upsert ( scopeTag , "service" ) , tag . Upsert ( serviceTag , svc ) )
2022-06-21 09:31:09 +08:00
} else if proto := rcmgr . ParseProtocolScopeName ( evt . Name ) ; proto != "" {
2022-06-28 06:58:42 +08:00
tags = append ( tags , tag . Upsert ( scopeTag , "protocol" ) , tag . Upsert ( protocolTag , proto ) )
2022-06-21 07:56:01 +08:00
} else {
// Not measuring connscope, servicepeer and protocolpeer. Lots of data, and
// you can use aggregated peer stats + service stats to infer
// this.
break
}
if evt . DeltaOut != 0 {
stats . RecordWithTags (
ctx ,
2022-06-28 06:58:42 +08:00
append ( [ ] tag . Mutator { tag . Upsert ( directionTag , "outbound" ) } , tags ... ) ,
2022-06-21 07:56:01 +08:00
streams . M ( int64 ( evt . DeltaOut ) ) ,
)
}
if evt . DeltaIn != 0 {
stats . RecordWithTags (
ctx ,
2022-06-28 06:58:42 +08:00
append ( [ ] tag . Mutator { tag . Upsert ( directionTag , "inbound" ) } , tags ... ) ,
2022-06-21 07:56:01 +08:00
streams . M ( int64 ( evt . DeltaIn ) ) ,
)
}
}
case rcmgr . TraceAddConnEvt , rcmgr . TraceRemoveConnEvt :
2022-06-21 09:31:09 +08:00
if p := rcmgr . ParsePeerScopeName ( evt . Name ) ; p . Validate ( ) == nil {
2022-06-21 07:56:01 +08:00
// Aggregated peer stats. Counts how many peers have N number of connections.
// Uses two buckets aggregations. One to count how many streams the
// peer has now. The other to count the negative value, or how many
// conns did the peer use to have. When looking at the data you
// take the difference from the two.
oldConnsOut := int64 ( evt . ConnsOut - evt . DeltaOut )
connsOut := int64 ( evt . ConnsOut )
if oldConnsOut != connsOut {
if oldConnsOut != 0 {
2022-06-28 06:58:42 +08:00
stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( directionTag , "outbound" ) } , peerConnsNegative . M ( oldConnsOut ) )
2022-06-21 07:56:01 +08:00
}
if connsOut != 0 {
2022-06-28 06:58:42 +08:00
stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( directionTag , "outbound" ) } , peerConns . M ( connsOut ) )
2022-06-21 07:56:01 +08:00
}
}
oldConnsIn := int64 ( evt . ConnsIn - evt . DeltaIn )
connsIn := int64 ( evt . ConnsIn )
if oldConnsIn != connsIn {
if oldConnsIn != 0 {
2022-06-28 06:58:42 +08:00
stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( directionTag , "inbound" ) } , peerConnsNegative . M ( oldConnsIn ) )
2022-06-21 07:56:01 +08:00
}
if connsIn != 0 {
2022-06-28 06:58:42 +08:00
stats . RecordWithTags ( ctx , [ ] tag . Mutator { tag . Upsert ( directionTag , "inbound" ) } , peerConns . M ( connsIn ) )
2022-06-21 07:56:01 +08:00
}
}
} else {
var tags [ ] tag . Mutator
2022-06-21 09:31:09 +08:00
if rcmgr . IsSystemScope ( evt . Name ) || rcmgr . IsTransientScope ( evt . Name ) {
2022-06-28 06:58:42 +08:00
tags = append ( tags , tag . Upsert ( scopeTag , evt . Name ) )
2022-06-21 09:31:09 +08:00
} else if rcmgr . IsConnScope ( evt . Name ) {
2022-06-21 07:56:01 +08:00
// Not measuring this. I don't think it's useful.
break
} else {
2022-06-30 12:03:22 +08:00
// This could be a span
break
2022-06-21 07:56:01 +08:00
}
if evt . DeltaOut != 0 {
stats . RecordWithTags (
ctx ,
2022-06-28 06:58:42 +08:00
append ( [ ] tag . Mutator { tag . Upsert ( directionTag , "outbound" ) } , tags ... ) ,
2022-06-21 07:56:01 +08:00
conns . M ( int64 ( evt . DeltaOut ) ) ,
)
}
if evt . DeltaIn != 0 {
stats . RecordWithTags (
ctx ,
2022-06-28 06:58:42 +08:00
append ( [ ] tag . Mutator { tag . Upsert ( directionTag , "inbound" ) } , tags ... ) ,
2022-06-21 07:56:01 +08:00
conns . M ( int64 ( evt . DeltaIn ) ) ,
)
}
// Represents the delta in fds
if evt . Delta != 0 {
stats . RecordWithTags (
ctx ,
tags ,
fds . M ( int64 ( evt . Delta ) ) ,
)
}
}
case rcmgr . TraceReserveMemoryEvt , rcmgr . TraceReleaseMemoryEvt :
2022-06-21 09:31:09 +08:00
if p := rcmgr . ParsePeerScopeName ( evt . Name ) ; p . Validate ( ) == nil {
2022-06-21 07:56:01 +08:00
oldMem := evt . Memory - evt . Delta
if oldMem != evt . Memory {
if oldMem != 0 {
stats . Record ( ctx , peerMemoryNegative . M ( oldMem ) )
}
if evt . Memory != 0 {
stats . Record ( ctx , peerMemory . M ( evt . Memory ) )
}
}
2022-06-21 09:31:09 +08:00
} else if rcmgr . IsConnScope ( evt . Name ) {
2022-06-21 07:56:01 +08:00
oldMem := evt . Memory - evt . Delta
if oldMem != evt . Memory {
if oldMem != 0 {
stats . Record ( ctx , connMemoryNegative . M ( oldMem ) )
}
if evt . Memory != 0 {
stats . Record ( ctx , connMemory . M ( evt . Memory ) )
}
}
} else {
var tags [ ] tag . Mutator
2022-06-21 09:31:09 +08:00
if rcmgr . IsSystemScope ( evt . Name ) || rcmgr . IsTransientScope ( evt . Name ) {
2022-06-28 06:58:42 +08:00
tags = append ( tags , tag . Upsert ( scopeTag , evt . Name ) )
2022-06-21 09:31:09 +08:00
} else if svc := rcmgr . ParseServiceScopeName ( evt . Name ) ; svc != "" {
2022-06-28 06:58:42 +08:00
tags = append ( tags , tag . Upsert ( scopeTag , "service" ) , tag . Upsert ( serviceTag , svc ) )
2022-06-21 09:31:09 +08:00
} else if proto := rcmgr . ParseProtocolScopeName ( evt . Name ) ; proto != "" {
2022-06-28 06:58:42 +08:00
tags = append ( tags , tag . Upsert ( scopeTag , "protocol" ) , tag . Upsert ( protocolTag , proto ) )
2022-06-21 07:56:01 +08:00
} else {
// Not measuring connscope, servicepeer and protocolpeer. Lots of data, and
// you can use aggregated peer stats + service stats to infer
// this.
break
}
if evt . Delta != 0 {
stats . RecordWithTags ( ctx , tags , memory . M ( int64 ( evt . Delta ) ) )
}
}
case rcmgr . TraceBlockAddConnEvt , rcmgr . TraceBlockAddStreamEvt , rcmgr . TraceBlockReserveMemoryEvt :
var resource string
if evt . Type == rcmgr . TraceBlockAddConnEvt {
resource = "connection"
} else if evt . Type == rcmgr . TraceBlockAddStreamEvt {
resource = "stream"
} else {
resource = "memory"
}
2022-06-28 06:58:42 +08:00
// Only the top scopeName. We don't want to get the peerid here.
scopeName := strings . SplitN ( evt . Name , ":" , 2 ) [ 0 ]
2022-06-21 09:09:58 +08:00
// Drop the connection or stream id
2022-06-28 06:58:42 +08:00
scopeName = strings . SplitN ( scopeName , "-" , 2 ) [ 0 ]
2022-06-21 07:56:01 +08:00
2022-07-27 19:28:44 +08:00
// If something else gets added here, make sure to update the size hint
// below when we make `tagsWithDir`.
2022-06-28 06:58:42 +08:00
tags := [ ] tag . Mutator { tag . Upsert ( scopeTag , scopeName ) , tag . Upsert ( resourceTag , resource ) }
2022-06-21 07:56:01 +08:00
if evt . DeltaIn != 0 {
2022-08-12 01:52:49 +08:00
tagsWithDir := make ( [ ] tag . Mutator , 0 , 3 )
2022-07-27 19:28:44 +08:00
tagsWithDir = append ( tagsWithDir , tag . Insert ( directionTag , "inbound" ) )
tagsWithDir = append ( tagsWithDir , tags ... )
stats . RecordWithTags ( ctx , tagsWithDir [ 0 : ] , blockedResources . M ( int64 ( 1 ) ) )
2022-06-21 07:56:01 +08:00
}
if evt . DeltaOut != 0 {
2022-08-12 01:52:49 +08:00
tagsWithDir := make ( [ ] tag . Mutator , 0 , 3 )
2022-07-27 19:28:44 +08:00
tagsWithDir = append ( tagsWithDir , tag . Insert ( directionTag , "outbound" ) )
tagsWithDir = append ( tagsWithDir , tags ... )
2022-07-09 04:43:46 +08:00
stats . RecordWithTags ( ctx , tagsWithDir , blockedResources . M ( int64 ( 1 ) ) )
2022-06-21 07:56:01 +08:00
}
if evt . Delta != 0 {
2022-06-24 06:30:13 +08:00
stats . RecordWithTags ( ctx , tags , blockedResources . M ( 1 ) )
2022-06-21 07:56:01 +08:00
}
}
}