mirror of
https://github.com/libp2p/go-eventbus.git
synced 2025-03-13 10:20:10 +08:00
fix: serialize publishing
Ensure that all subscribers see events in the same order. This also ensures that the subscribers never see the initial "latest" event after some other event. fixes #16
This commit is contained in:
parent
04058af20a
commit
25d54bbbec
12
basic.go
12
basic.go
@ -173,7 +173,7 @@ func (b *basicBus) Subscribe(evtTypes interface{}, opts ...event.SubscriptionOpt
|
||||
out.nodes[i] = n
|
||||
}, func(n *node) {
|
||||
if n.keepLast {
|
||||
l := n.last.Load()
|
||||
l := n.last
|
||||
if l == nil {
|
||||
return
|
||||
}
|
||||
@ -223,7 +223,7 @@ func (b *basicBus) Emitter(evtType interface{}, opts ...event.EmitterOpt) (e eve
|
||||
|
||||
type node struct {
|
||||
// Note: make sure to NEVER lock basicBus.lk when this lock is held
|
||||
lk sync.RWMutex
|
||||
lk sync.Mutex
|
||||
|
||||
typ reflect.Type
|
||||
|
||||
@ -231,7 +231,7 @@ type node struct {
|
||||
nEmitters int32
|
||||
|
||||
keepLast bool
|
||||
last atomic.Value
|
||||
last interface{}
|
||||
|
||||
sinks []chan interface{}
|
||||
}
|
||||
@ -248,13 +248,13 @@ func (n *node) emit(event interface{}) {
|
||||
panic(fmt.Sprintf("Emit called with wrong type. expected: %s, got: %s", n.typ, typ))
|
||||
}
|
||||
|
||||
n.lk.RLock()
|
||||
n.lk.Lock()
|
||||
if n.keepLast {
|
||||
n.last.Store(event)
|
||||
n.last = event
|
||||
}
|
||||
|
||||
for _, ch := range n.sinks {
|
||||
ch <- event
|
||||
}
|
||||
n.lk.RUnlock()
|
||||
n.lk.Unlock()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user