1
0
mirror of https://github.com/libp2p/go-libp2p-core.git synced 2025-04-11 15:10:21 +08:00

keep go-flow-metrics untouched.

This commit is contained in:
Raúl Kripalani 2019-04-25 15:57:20 +01:00
parent c30d4b4050
commit 06118baed6
11 changed files with 17 additions and 535 deletions

View File

@ -9,8 +9,6 @@ import (
"github.com/multiformats/go-multiaddr"
)
// ENTITIES.
// Multiaddr aliases the Multiaddr type from github.com/multiformats/go-multiaddr.
//
// Refer to the docs on that type for more info.
@ -31,8 +29,6 @@ type ProtocolID = protocol.ID
// Refer to the docs on that type for more info.
type PeerAddrInfo = peer.AddrInfo
// CONSTRUCTIONS.
// Host aliases host.Host.
//
// Refer to the docs on that type for more info.

1
go.mod
View File

@ -7,6 +7,7 @@ require (
github.com/ipfs/go-cid v0.0.1
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8
github.com/kr/pretty v0.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.0.1
github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16
github.com/mr-tron/base58 v1.1.1
github.com/multiformats/go-multiaddr v0.0.2

2
go.sum
View File

@ -35,6 +35,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/libp2p/go-flow-metrics v0.0.1 h1:0gxuFd2GuK7IIP5pKljLwps6TvcuYgvG7Atqi3INF5s=
github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16 h1:5W7KhL8HVF3XCFOweFD3BNESdnO8ewyYTFT2R+/b8FQ=

View File

@ -1,26 +1,20 @@
package metrics
import (
"github.com/libp2p/go-flow-metrics"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
)
type Stats struct {
TotalIn int64
TotalOut int64
RateIn float64
RateOut float64
}
type BandwidthCounter struct {
totalIn Meter
totalOut Meter
totalIn flow.Meter
totalOut flow.Meter
protocolIn MeterRegistry
protocolOut MeterRegistry
protocolIn flow.MeterRegistry
protocolOut flow.MeterRegistry
peerIn MeterRegistry
peerOut MeterRegistry
peerIn flow.MeterRegistry
peerOut flow.MeterRegistry
}
func NewBandwidthCounter() *BandwidthCounter {

View File

@ -1,175 +0,0 @@
package metrics
import (
"math"
"sync"
"testing"
"time"
)
func TestBasic(t *testing.T) {
var wg sync.WaitGroup
wg.Add(100)
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
ticker := time.NewTicker(40 * time.Millisecond)
defer ticker.Stop()
m := new(Meter)
for i := 0; i < 300; i++ {
m.Mark(1000)
<-ticker.C
}
actual := m.Snapshot()
if !approxEq(actual.Rate, 25000, 500) {
t.Errorf("expected rate 25000 (±500), got %f", actual.Rate)
}
for i := 0; i < 200; i++ {
m.Mark(200)
<-ticker.C
}
// Adjusts
actual = m.Snapshot()
if !approxEq(actual.Rate, 5000, 200) {
t.Errorf("expected rate 5000 (±200), got %f", actual.Rate)
}
// Let it settle.
time.Sleep(2 * time.Second)
// get the right total
actual = m.Snapshot()
if actual.Total != 340000 {
t.Errorf("expected total %d, got %d", 340000, actual.Total)
}
}()
}
wg.Wait()
}
func TestShared(t *testing.T) {
var wg sync.WaitGroup
wg.Add(20 * 21)
for i := 0; i < 20; i++ {
m := new(Meter)
for j := 0; j < 20; j++ {
go func() {
defer wg.Done()
ticker := time.NewTicker(40 * time.Millisecond)
defer ticker.Stop()
for i := 0; i < 300; i++ {
m.Mark(50)
<-ticker.C
}
for i := 0; i < 200; i++ {
m.Mark(10)
<-ticker.C
}
}()
}
go func() {
defer wg.Done()
time.Sleep(40 * 300 * time.Millisecond)
actual := m.Snapshot()
if !approxEq(actual.Rate, 25000, 250) {
t.Errorf("expected rate 25000 (±250), got %f", actual.Rate)
}
time.Sleep(40 * 200 * time.Millisecond)
// Adjusts
actual = m.Snapshot()
if !approxEq(actual.Rate, 5000, 50) {
t.Errorf("expected rate 5000 (±50), got %f", actual.Rate)
}
// Let it settle.
time.Sleep(2 * time.Second)
// get the right total
actual = m.Snapshot()
if actual.Total != 340000 {
t.Errorf("expected total %d, got %d", 340000, actual.Total)
}
}()
}
wg.Wait()
}
func TestUnregister(t *testing.T) {
var wg sync.WaitGroup
wg.Add(100 * 2)
pause := make(chan struct{})
for i := 0; i < 100; i++ {
m := new(Meter)
go func() {
defer wg.Done()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for i := 0; i < 40; i++ {
m.Mark(1)
<-ticker.C
}
<-pause
time.Sleep(2 * time.Second)
for i := 0; i < 40; i++ {
m.Mark(2)
<-ticker.C
}
}()
go func() {
defer wg.Done()
time.Sleep(40 * 100 * time.Millisecond)
actual := m.Snapshot()
if !approxEq(actual.Rate, 10, 1) {
t.Errorf("expected rate 10 (±1), got %f", actual.Rate)
}
<-pause
actual = m.Snapshot()
if actual.Total != 40 {
t.Errorf("expected total 4000, got %d", actual.Total)
}
time.Sleep(2*time.Second + 40*100*time.Millisecond)
actual = m.Snapshot()
if !approxEq(actual.Rate, 20, 4) {
t.Errorf("expected rate 20 (±4), got %f", actual.Rate)
}
time.Sleep(2 * time.Second)
actual = m.Snapshot()
if actual.Total != 120 {
t.Errorf("expected total 120, got %d", actual.Total)
}
}()
}
time.Sleep(60 * time.Second)
globalSweeper.mutex.Lock()
if len(globalSweeper.meters) != 0 {
t.Errorf("expected all sweepers to be unregistered: %d", len(globalSweeper.meters))
}
globalSweeper.mutex.Unlock()
close(pause)
wg.Wait()
globalSweeper.mutex.Lock()
if len(globalSweeper.meters) != 100 {
t.Errorf("expected all sweepers to be registered: %d", len(globalSweeper.meters))
}
globalSweeper.mutex.Unlock()
}
func approxEq(a, b, err float64) bool {
return math.Abs(a-b) < err
}

View File

@ -1,44 +0,0 @@
package metrics
import (
"fmt"
"sync/atomic"
)
// Snapshot is a rate/total snapshot.
type Snapshot struct {
Rate float64
Total uint64
}
func (s Snapshot) String() string {
return fmt.Sprintf("%d (%f/s)", s.Total, s.Rate)
}
// Meter is a meter for monitoring a flow.
type Meter struct {
accumulator uint64
// Take lock.
snapshot Snapshot
}
// Mark updates the total.
func (m *Meter) Mark(count uint64) {
if count > 0 && atomic.AddUint64(&m.accumulator, count) == count {
// I'm the first one to bump this above 0.
// Register it.
globalSweeper.Register(m)
}
}
// Snapshot gets a consistent snapshot of the total and rate.
func (m *Meter) Snapshot() Snapshot {
globalSweeper.mutex.RLock()
defer globalSweeper.mutex.RUnlock()
return m.snapshot
}
func (m *Meter) String() string {
return m.Snapshot().String()
}

View File

@ -1,34 +0,0 @@
package metrics
import (
"fmt"
"math"
"time"
)
func ExampleMeter() {
meter := new(Meter)
t := time.NewTicker(100 * time.Millisecond)
for i := 0; i < 100; i++ {
<-t.C
meter.Mark(30)
}
// Get the current rate. This will be accurate *now* but not after we
// sleep (because we calculate it using EWMA).
rate := meter.Snapshot().Rate
// Sleep 2 seconds to allow the total to catch up. We snapshot every
// second so the total may not yet be accurate.
time.Sleep(2 * time.Second)
// Get the current total.
total := meter.Snapshot().Total
fmt.Printf("%d (%d/s)\n", total, roundTens(rate))
// Output: 3000 (300/s)
}
func roundTens(x float64) int64 {
return int64(math.Floor(x/10+0.5)) * 10
}

View File

@ -1,35 +0,0 @@
package metrics
import (
"sync"
)
// MeterRegistry is a registry for named meters.
type MeterRegistry struct {
meters sync.Map
}
// Get gets (or creates) a meter by name.
func (r *MeterRegistry) Get(name string) *Meter {
if m, ok := r.meters.Load(name); ok {
return m.(*Meter)
}
m, _ := r.meters.LoadOrStore(name, new(Meter))
return m.(*Meter)
}
// Remove removes the named meter from the registry.
//
// Note: The only reason to do this is to save a bit of memory. Unused meters
// don't consume any CPU (after they go idle).
func (r *MeterRegistry) Remove(name string) {
r.meters.Delete(name)
}
// ForEach calls the passed function for each registered meter.
func (r *MeterRegistry) ForEach(iterFunc func(string, *Meter)) {
r.meters.Range(func(k, v interface{}) bool {
iterFunc(k.(string), v.(*Meter))
return true
})
}

View File

@ -1,77 +0,0 @@
package metrics
import (
"testing"
"time"
)
func TestRegistry(t *testing.T) {
r := new(MeterRegistry)
m1 := r.Get("first")
m2 := r.Get("second")
m1.Mark(10)
m2.Mark(30)
time.Sleep(2 * time.Second)
if total := r.Get("first").Snapshot().Total; total != 10 {
t.Errorf("expected first total to be 10, got %d", total)
}
if total := r.Get("second").Snapshot().Total; total != 30 {
t.Errorf("expected second total to be 30, got %d", total)
}
expectedMeters := map[string]*Meter{
"first": m1,
"second": m2,
}
r.ForEach(func(n string, m *Meter) {
if expectedMeters[n] != m {
t.Errorf("wrong meter '%s'", n)
}
delete(expectedMeters, n)
})
if len(expectedMeters) != 0 {
t.Errorf("missing meters: '%v'", expectedMeters)
}
r.Remove("first")
found := false
r.ForEach(func(n string, m *Meter) {
if n != "second" {
t.Errorf("found unexpected meter: %s", n)
return
}
if found {
t.Error("found meter twice")
}
found = true
})
if !found {
t.Errorf("didn't find second meter")
}
m3 := r.Get("first")
if m3 == m1 {
t.Error("should have gotten a new meter")
}
if total := m3.Snapshot().Total; total != 0 {
t.Errorf("expected first total to now be 0, got %d", total)
}
expectedMeters = map[string]*Meter{
"first": m3,
"second": m2,
}
r.ForEach(func(n string, m *Meter) {
if expectedMeters[n] != m {
t.Errorf("wrong meter '%s'", n)
}
delete(expectedMeters, n)
})
if len(expectedMeters) != 0 {
t.Errorf("missing meters: '%v'", expectedMeters)
}
}

View File

@ -5,6 +5,13 @@ import (
"github.com/libp2p/go-libp2p-core/protocol"
)
type Stats struct {
TotalIn int64
TotalOut int64
RateIn float64
RateOut float64
}
type Reporter interface {
LogSentMessage(int64)
LogRecvMessage(int64)

View File

@ -1,153 +0,0 @@
package metrics
import (
"math"
"sync"
"sync/atomic"
"time"
)
// IdleRate the rate at which we declare a meter idle (and stop tracking it
// until it's re-registered).
//
// The default ensures that 1 event every ~30s will keep the meter from going
// idle.
var IdleRate = 1e-13
// Alpha for EWMA of 1s
var alpha = 1 - math.Exp(-1.0)
// The global sweeper.
var globalSweeper sweeper
type sweeper struct {
sweepOnce sync.Once
meters []*Meter
mutex sync.RWMutex
lastUpdateTime time.Time
registerChannel chan *Meter
}
func (sw *sweeper) start() {
sw.registerChannel = make(chan *Meter, 16)
go sw.run()
}
func (sw *sweeper) run() {
for m := range sw.registerChannel {
sw.register(m)
sw.runActive()
}
}
func (sw *sweeper) register(m *Meter) {
// Add back the snapshot total. If we unregistered this
// one, we set it to zero.
atomic.AddUint64(&m.accumulator, m.snapshot.Total)
sw.meters = append(sw.meters, m)
}
func (sw *sweeper) runActive() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
sw.lastUpdateTime = time.Now()
for len(sw.meters) > 0 {
// Scale back allocation.
if len(sw.meters)*2 < cap(sw.meters) {
newMeters := make([]*Meter, len(sw.meters))
copy(newMeters, sw.meters)
sw.meters = newMeters
}
select {
case <-ticker.C:
sw.update()
case m := <-sw.registerChannel:
sw.register(m)
}
}
sw.meters = nil
// Till next time.
}
func (sw *sweeper) update() {
sw.mutex.Lock()
defer sw.mutex.Unlock()
now := time.Now()
tdiff := now.Sub(sw.lastUpdateTime)
if tdiff <= 0 {
return
}
sw.lastUpdateTime = now
timeMultiplier := float64(time.Second) / float64(tdiff)
newLen := len(sw.meters)
for i, m := range sw.meters {
total := atomic.LoadUint64(&m.accumulator)
instant := timeMultiplier * float64(total-m.snapshot.Total)
if m.snapshot.Rate == 0 {
m.snapshot.Rate = instant
} else {
m.snapshot.Rate += alpha * (instant - m.snapshot.Rate)
}
m.snapshot.Total = total
// This is equivalent to one zeros, then one, then 30 zeros.
// We'll consider that to be "idle".
if m.snapshot.Rate > IdleRate {
continue
}
// Ok, so we are idle...
// Mark this as idle by zeroing the accumulator.
swappedTotal := atomic.SwapUint64(&m.accumulator, 0)
// So..., are we really idle?
if swappedTotal > total {
// Not so idle...
// Now we need to make sure this gets re-registered.
// First, add back what we removed. If we can do this
// fast enough, we can put it back before anyone
// notices.
currentTotal := atomic.AddUint64(&m.accumulator, swappedTotal)
// Did we make it?
if currentTotal == swappedTotal {
// Yes! Nobody noticed, move along.
continue
}
// No. Someone noticed and will (or has) put back into
// the registration channel.
//
// Remove the snapshot total, it'll get added back on
// registration.
//
// `^uint64(total - 1)` is the two's complement of
// `total`. It's the "correct" way to subtract
// atomically in go.
atomic.AddUint64(&m.accumulator, ^uint64(m.snapshot.Total-1))
}
// Reset the rate, keep the total.
m.snapshot.Rate = 0
newLen--
sw.meters[i] = sw.meters[newLen]
}
// trim the meter list
for i := newLen; i < len(sw.meters); i++ {
sw.meters[i] = nil
}
sw.meters = sw.meters[:newLen]
}
func (sw *sweeper) Register(m *Meter) {
sw.sweepOnce.Do(sw.start)
sw.registerChannel <- m
}