go-eventbus/basic_test.go

141 lines
2.0 KiB
Go
Raw Normal View History

2019-06-13 10:23:03 +08:00
package event
import (
2019-06-13 14:51:54 +08:00
"sync"
2019-06-13 10:23:03 +08:00
"testing"
2019-06-13 14:51:54 +08:00
"time"
2019-06-13 10:23:03 +08:00
)
type EventA struct{}
2019-06-13 14:51:54 +08:00
type EventB int
2019-06-13 10:23:03 +08:00
2019-06-13 14:51:54 +08:00
func TestEmit(t *testing.T) {
2019-06-13 10:23:03 +08:00
bus := NewBus()
2019-06-13 14:51:54 +08:00
events, cancel, err := bus.Subscribe(new(EventA))
if err != nil {
t.Fatal(err)
}
2019-06-13 10:23:03 +08:00
2019-06-13 14:51:54 +08:00
go func() {
defer cancel()
<-events
}()
emit, cancel, err := bus.Emitter(new(EventA))
if err != nil {
t.Fatal(err)
}
defer cancel()
emit(EventA{})
}
func TestSub(t *testing.T) {
bus := NewBus()
events, cancel, err := bus.Subscribe(new(EventB))
if err != nil {
t.Fatal(err)
}
var event EventB
var wait sync.WaitGroup
wait.Add(1)
go func() {
defer cancel()
event = (<-events).(EventB)
wait.Done()
}()
emit, cancel, err := bus.Emitter(new(EventB))
if err != nil {
t.Fatal(err)
}
defer cancel()
emit(EventB(7))
wait.Wait()
if event != 7 {
t.Error("got wrong event")
}
}
func TestEmitNoSubNoBlock(t *testing.T) {
bus := NewBus()
emit, cancel, err := bus.Emitter(new(EventA))
if err != nil {
t.Fatal(err)
}
defer cancel()
emit(EventA{})
}
func TestEmitOnClosed(t *testing.T) {
bus := NewBus()
emit, cancel, err := bus.Emitter(new(EventA))
if err != nil {
t.Fatal(err)
}
cancel()
defer func() {
r := recover()
if r == nil {
t.Errorf("expected panic")
}
if r.(string) != "emitter is closed" {
t.Error("unexpected message")
}
}()
emit(EventA{})
}
func TestClosingRaces(t *testing.T) {
subs := 50000
emits := 50000
var wg sync.WaitGroup
var lk sync.RWMutex
lk.Lock()
wg.Add(subs + emits)
bus := NewBus()
for i := 0; i < subs; i++ {
go func() {
lk.RLock()
defer lk.RUnlock()
_, cancel, _ := bus.Subscribe(new(EventA))
time.Sleep(10 * time.Millisecond)
cancel()
wg.Done()
}()
}
for i := 0; i < emits; i++ {
go func() {
lk.RLock()
defer lk.RUnlock()
_, cancel, _ := bus.Emitter(new(EventA))
time.Sleep(10 * time.Millisecond)
cancel()
wg.Done()
}()
}
time.Sleep(10 * time.Millisecond)
lk.Unlock() // start everything
wg.Wait()
2019-06-13 10:23:03 +08:00
}