mirror of
https://github.com/libp2p/go-eventbus.git
synced 2025-03-25 12:10:06 +08:00
Fix data races
This commit is contained in:
parent
287e2189af
commit
5b845983c2
4
basic.go
4
basic.go
@ -64,7 +64,7 @@ func (b *bus) tryDropNode(typ reflect.Type) {
|
||||
}
|
||||
|
||||
n.lk.Lock()
|
||||
if n.nEmitters > 0 || len(n.sinks) > 0 {
|
||||
if atomic.LoadInt32(&n.nEmitters) > 0 || len(n.sinks) > 0 {
|
||||
n.lk.Unlock()
|
||||
b.lk.Unlock()
|
||||
return // still in use
|
||||
@ -110,7 +110,7 @@ func (b *bus) Subscribe(typedChan interface{}, opts ...SubOption) (c CancelFunc,
|
||||
break
|
||||
}
|
||||
}
|
||||
tryDrop := len(n.sinks) == 0 && n.nEmitters == 0
|
||||
tryDrop := len(n.sinks) == 0 && atomic.LoadInt32(&n.nEmitters) == 0
|
||||
n.lk.Unlock()
|
||||
if tryDrop {
|
||||
b.tryDropNode(typ.Elem())
|
||||
|
@ -6,11 +6,21 @@ import (
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/jbenet/go-detect-race"
|
||||
)
|
||||
|
||||
type EventA struct{}
|
||||
type EventB int
|
||||
|
||||
func getN() int {
|
||||
n := 50000
|
||||
if detectrace.WithRace() {
|
||||
n = 1000
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func (EventA) String() string {
|
||||
return "Oh, Hello"
|
||||
}
|
||||
@ -28,11 +38,11 @@ func TestEmit(t *testing.T) {
|
||||
<-events
|
||||
}()
|
||||
|
||||
emit, cancel, err := bus.Emitter(new(EventA))
|
||||
emit, cancel2, err := bus.Emitter(new(EventA))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer cancel()
|
||||
defer cancel2()
|
||||
|
||||
emit(EventA{})
|
||||
}
|
||||
@ -56,11 +66,11 @@ func TestSub(t *testing.T) {
|
||||
wait.Done()
|
||||
}()
|
||||
|
||||
emit, cancel, err := bus.Emitter(new(EventB))
|
||||
emit, cancel2, err := bus.Emitter(new(EventB))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer cancel()
|
||||
defer cancel2()
|
||||
|
||||
emit(EventB(7))
|
||||
wait.Wait()
|
||||
@ -105,8 +115,8 @@ func TestEmitOnClosed(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestClosingRaces(t *testing.T) {
|
||||
subs := 50000
|
||||
emits := 50000
|
||||
subs := getN()
|
||||
emits := getN()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
var lk sync.RWMutex
|
||||
@ -156,7 +166,7 @@ func TestSubMany(t *testing.T) {
|
||||
|
||||
var r int32
|
||||
|
||||
n := 50000
|
||||
n := getN()
|
||||
var wait sync.WaitGroup
|
||||
var ready sync.WaitGroup
|
||||
wait.Add(n)
|
||||
@ -212,11 +222,11 @@ func TestSubType(t *testing.T) {
|
||||
wait.Done()
|
||||
}()
|
||||
|
||||
emit, cancel, err := bus.Emitter(new(EventA))
|
||||
emit, cancel2, err := bus.Emitter(new(EventA))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer cancel()
|
||||
defer cancel2()
|
||||
|
||||
emit(EventA{})
|
||||
wait.Wait()
|
||||
@ -295,6 +305,10 @@ func TestStateful(t *testing.T) {
|
||||
}
|
||||
|
||||
func testMany(t testing.TB, subs, emits, msgs int, stateful bool) {
|
||||
if detectrace.WithRace() && subs + emits > 5000 {
|
||||
t.SkipNow()
|
||||
}
|
||||
|
||||
bus := NewBus()
|
||||
|
||||
var r int64
|
||||
|
2
go.mod
2
go.mod
@ -1,3 +1,5 @@
|
||||
module github.com/libp2p/go-eventbus
|
||||
|
||||
go 1.12
|
||||
|
||||
require github.com/jbenet/go-detect-race v0.0.0-20150302022421-3463798d9574
|
||||
|
Loading…
Reference in New Issue
Block a user