From 5b845983c29b38223f92c518cb8db7593e0a43f3 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=C5=81ukasz=20Magiera?= <magik6k@gmail.com>
Date: Wed, 19 Jun 2019 12:13:18 +0200
Subject: [PATCH] Fix data races

---
 basic.go      |  4 ++--
 basic_test.go | 32 +++++++++++++++++++++++---------
 go.mod        |  2 ++
 go.sum        |  2 ++
 4 files changed, 29 insertions(+), 11 deletions(-)
 create mode 100644 go.sum

diff --git a/basic.go b/basic.go
index 7b6a7b4..6dc2b22 100644
--- a/basic.go
+++ b/basic.go
@@ -64,7 +64,7 @@ func (b *bus) tryDropNode(typ reflect.Type) {
 	}
 
 	n.lk.Lock()
-	if n.nEmitters > 0 || len(n.sinks) > 0 {
+	if atomic.LoadInt32(&n.nEmitters) > 0 || len(n.sinks) > 0 {
 		n.lk.Unlock()
 		b.lk.Unlock()
 		return // still in use
@@ -110,7 +110,7 @@ func (b *bus) Subscribe(typedChan interface{}, opts ...SubOption) (c CancelFunc,
 					break
 				}
 			}
-			tryDrop := len(n.sinks) == 0 && n.nEmitters == 0
+			tryDrop := len(n.sinks) == 0 && atomic.LoadInt32(&n.nEmitters) == 0
 			n.lk.Unlock()
 			if tryDrop {
 				b.tryDropNode(typ.Elem())
diff --git a/basic_test.go b/basic_test.go
index 77e9c93..72a6871 100644
--- a/basic_test.go
+++ b/basic_test.go
@@ -6,11 +6,21 @@ import (
 	"sync/atomic"
 	"testing"
 	"time"
+
+	"github.com/jbenet/go-detect-race"
 )
 
 type EventA struct{}
 type EventB int
 
+func getN() int {
+	n := 50000
+	if detectrace.WithRace() {
+		n = 1000
+	}
+	return n
+}
+
 func (EventA) String() string {
 	return "Oh, Hello"
 }
@@ -28,11 +38,11 @@ func TestEmit(t *testing.T) {
 		<-events
 	}()
 
-	emit, cancel, err := bus.Emitter(new(EventA))
+	emit, cancel2, err := bus.Emitter(new(EventA))
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer cancel()
+	defer cancel2()
 
 	emit(EventA{})
 }
@@ -56,11 +66,11 @@ func TestSub(t *testing.T) {
 		wait.Done()
 	}()
 
-	emit, cancel, err := bus.Emitter(new(EventB))
+	emit, cancel2, err := bus.Emitter(new(EventB))
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer cancel()
+	defer cancel2()
 
 	emit(EventB(7))
 	wait.Wait()
@@ -105,8 +115,8 @@ func TestEmitOnClosed(t *testing.T) {
 }
 
 func TestClosingRaces(t *testing.T) {
-	subs := 50000
-	emits := 50000
+	subs := getN()
+	emits := getN()
 
 	var wg sync.WaitGroup
 	var lk sync.RWMutex
@@ -156,7 +166,7 @@ func TestSubMany(t *testing.T) {
 
 	var r int32
 
-	n := 50000
+	n := getN()
 	var wait sync.WaitGroup
 	var ready sync.WaitGroup
 	wait.Add(n)
@@ -212,11 +222,11 @@ func TestSubType(t *testing.T) {
 		wait.Done()
 	}()
 
-	emit, cancel, err := bus.Emitter(new(EventA))
+	emit, cancel2, err := bus.Emitter(new(EventA))
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer cancel()
+	defer cancel2()
 
 	emit(EventA{})
 	wait.Wait()
@@ -295,6 +305,10 @@ func TestStateful(t *testing.T) {
 }
 
 func testMany(t testing.TB, subs, emits, msgs int, stateful bool) {
+	if detectrace.WithRace() && subs + emits > 5000 {
+		t.SkipNow()
+	}
+
 	bus := NewBus()
 
 	var r int64
diff --git a/go.mod b/go.mod
index 8368304..3856f4c 100644
--- a/go.mod
+++ b/go.mod
@@ -1,3 +1,5 @@
 module github.com/libp2p/go-eventbus
 
 go 1.12
+
+require github.com/jbenet/go-detect-race v0.0.0-20150302022421-3463798d9574
diff --git a/go.sum b/go.sum
new file mode 100644
index 0000000..4cc1d1f
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,2 @@
+github.com/jbenet/go-detect-race v0.0.0-20150302022421-3463798d9574 h1:Pxjl8Wn3cCU7nB/MCmPEUMbjMHxXFqODW6rce0jpxB4=
+github.com/jbenet/go-detect-race v0.0.0-20150302022421-3463798d9574/go.mod h1:gynVu6LUw+xMXD3XEvjHQcIbJkWEamnGjJDebRHqTd0=