From 3007cb2e051af54ca291c10c8664fa1161a3257f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= <magik6k@gmail.com> Date: Thu, 20 Jun 2019 15:13:07 +0200 Subject: [PATCH] Fix close deadlock (variant B) --- basic.go | 21 +++++++++++++++++++++ basic_test.go | 22 ++++++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/basic.go b/basic.go index 8cb030d..bba7f14 100644 --- a/basic.go +++ b/basic.go @@ -129,7 +129,28 @@ func (b *basicBus) Subscribe(typedChan interface{}, opts ...event.SubscriptionOp err = b.withNode(typ.Elem(), func(n *node) { n.sinks = append(n.sinks, refCh) c = func() { + stopDrain := make(chan struct{}) + go func() { + rdrain := reflect.ValueOf(stopDrain) + for { + chosen, _, _ := reflect.Select([]reflect.SelectCase{ + { + Dir: reflect.SelectRecv, + Chan: refCh, + }, + { + Dir: reflect.SelectRecv, + Chan: rdrain, + }, + }) + if chosen == 1 { + return + } + } + }() n.lk.Lock() + close(stopDrain) + for i := 0; i < len(n.sinks); i++ { if n.sinks[i] == refCh { n.sinks[i], n.sinks[len(n.sinks)-1] = n.sinks[len(n.sinks)-1], reflect.Value{} diff --git a/basic_test.go b/basic_test.go index 4a0edc9..3ba71c7 100644 --- a/basic_test.go +++ b/basic_test.go @@ -304,6 +304,28 @@ func TestStateful(t *testing.T) { } } +func TestCloseBlocking(t *testing.T) { + bus := NewBus() + em, err := bus.Emitter(new(EventB)) + if err != nil { + t.Fatal(err) + } + + events := make(chan EventB) + cancel, err := bus.Subscribe(events) + if err != nil { + t.Fatal(err) + } + + go func() { + em.Emit(EventB(159)) + }() + + time.Sleep(10 * time.Millisecond) // make sure that emit is blocked + + cancel() // cancel sub +} + func testMany(t testing.TB, subs, emits, msgs int, stateful bool) { if detectrace.WithRace() && subs+emits > 5000 { t.SkipNow()