mirror of
https://github.com/libp2p/go-eventbus.git
synced 2024-12-25 23:20:07 +08:00
nit: fix with-node
1. It doesn't return an error and we weren't checking it anyways. 2. Avoid a goroutine unless we need it.
This commit is contained in:
parent
df5be7d7dd
commit
7b280b5c1b
22
basic.go
22
basic.go
@ -51,7 +51,7 @@ func NewBus() event.Bus {
|
||||
}
|
||||
}
|
||||
|
||||
func (b *basicBus) withNode(typ reflect.Type, cb func(*node), async func(*node)) error {
|
||||
func (b *basicBus) withNode(typ reflect.Type, cb func(*node), async func(*node)) {
|
||||
b.lk.Lock()
|
||||
|
||||
n, ok := b.nodes[typ]
|
||||
@ -65,12 +65,14 @@ func (b *basicBus) withNode(typ reflect.Type, cb func(*node), async func(*node))
|
||||
|
||||
cb(n)
|
||||
|
||||
go func() {
|
||||
defer n.lk.Unlock()
|
||||
async(n)
|
||||
}()
|
||||
|
||||
return nil
|
||||
if async == nil {
|
||||
n.lk.Unlock()
|
||||
} else {
|
||||
go func() {
|
||||
defer n.lk.Unlock()
|
||||
async(n)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (b *basicBus) tryDropNode(typ reflect.Type) {
|
||||
@ -173,7 +175,7 @@ func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt
|
||||
for i, etyp := range types {
|
||||
typ := reflect.TypeOf(etyp)
|
||||
|
||||
err = b.withNode(typ.Elem(), func(n *node) {
|
||||
b.withNode(typ.Elem(), func(n *node) {
|
||||
n.sinks = append(n.sinks, out.ch)
|
||||
out.nodes[i] = n
|
||||
}, func(n *node) {
|
||||
@ -215,11 +217,11 @@ func (b *basicBus) Emitter(evtType interface{}, opts ...event.EmitterOpt) (e eve
|
||||
}
|
||||
typ = typ.Elem()
|
||||
|
||||
err = b.withNode(typ, func(n *node) {
|
||||
b.withNode(typ, func(n *node) {
|
||||
atomic.AddInt32(&n.nEmitters, 1)
|
||||
n.keepLast = n.keepLast || settings.makeStateful
|
||||
e = &emitter{n: n, typ: typ, dropper: b.tryDropNode}
|
||||
}, func(_ *node) {})
|
||||
}, nil)
|
||||
return
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user