diff --git a/basic.go b/basic.go index 8cb030d..bb5ebb4 100644 --- a/basic.go +++ b/basic.go @@ -223,8 +223,12 @@ func (n *node) emit(event interface{}) { n.last.Store(eval) } - for _, ch := range n.sinks { + sinks := make([]reflect.Value, len(n.sinks)) + copy(sinks, n.sinks) + + n.lk.RUnlock() + + for _, ch := range sinks { ch.Send(eval) } - n.lk.RUnlock() } diff --git a/basic_test.go b/basic_test.go index 4a0edc9..3ba71c7 100644 --- a/basic_test.go +++ b/basic_test.go @@ -304,6 +304,28 @@ func TestStateful(t *testing.T) { } } +func TestCloseBlocking(t *testing.T) { + bus := NewBus() + em, err := bus.Emitter(new(EventB)) + if err != nil { + t.Fatal(err) + } + + events := make(chan EventB) + cancel, err := bus.Subscribe(events) + if err != nil { + t.Fatal(err) + } + + go func() { + em.Emit(EventB(159)) + }() + + time.Sleep(10 * time.Millisecond) // make sure that emit is blocked + + cancel() // cancel sub +} + func testMany(t testing.TB, subs, emits, msgs int, stateful bool) { if detectrace.WithRace() && subs+emits > 5000 { t.SkipNow()