mirror of
https://github.com/libp2p/go-eventbus.git
synced 2025-03-25 12:10:06 +08:00
Fix close deadlock (variant B)
This commit is contained in:
parent
c7aefba960
commit
3007cb2e05
21
basic.go
21
basic.go
@ -129,7 +129,28 @@ func (b *basicBus) Subscribe(typedChan interface{}, opts ...event.SubscriptionOp
|
||||
err = b.withNode(typ.Elem(), func(n *node) {
|
||||
n.sinks = append(n.sinks, refCh)
|
||||
c = func() {
|
||||
stopDrain := make(chan struct{})
|
||||
go func() {
|
||||
rdrain := reflect.ValueOf(stopDrain)
|
||||
for {
|
||||
chosen, _, _ := reflect.Select([]reflect.SelectCase{
|
||||
{
|
||||
Dir: reflect.SelectRecv,
|
||||
Chan: refCh,
|
||||
},
|
||||
{
|
||||
Dir: reflect.SelectRecv,
|
||||
Chan: rdrain,
|
||||
},
|
||||
})
|
||||
if chosen == 1 {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
n.lk.Lock()
|
||||
close(stopDrain)
|
||||
|
||||
for i := 0; i < len(n.sinks); i++ {
|
||||
if n.sinks[i] == refCh {
|
||||
n.sinks[i], n.sinks[len(n.sinks)-1] = n.sinks[len(n.sinks)-1], reflect.Value{}
|
||||
|
@ -304,6 +304,28 @@ func TestStateful(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCloseBlocking(t *testing.T) {
|
||||
bus := NewBus()
|
||||
em, err := bus.Emitter(new(EventB))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
events := make(chan EventB)
|
||||
cancel, err := bus.Subscribe(events)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
em.Emit(EventB(159))
|
||||
}()
|
||||
|
||||
time.Sleep(10 * time.Millisecond) // make sure that emit is blocked
|
||||
|
||||
cancel() // cancel sub
|
||||
}
|
||||
|
||||
func testMany(t testing.TB, subs, emits, msgs int, stateful bool) {
|
||||
if detectrace.WithRace() && subs+emits > 5000 {
|
||||
t.SkipNow()
|
||||
|
Loading…
Reference in New Issue
Block a user