Subscribe with user provided typed channels

This commit is contained in:
Łukasz Magiera 2019-06-16 17:20:33 +02:00
parent 001f18ea77
commit 1e0dd64ef6
3 changed files with 48 additions and 70 deletions

View File

@ -22,12 +22,7 @@ func NewBus() Bus {
} }
} }
func (b *bus) withNode(evtType interface{}, cb func(*node)) error { func (b *bus) withNode(typ reflect.Type, cb func(*node)) error {
typ := reflect.TypeOf(evtType)
if typ.Kind() != reflect.Ptr {
return errors.New("subscribe called with non-pointer type")
}
typ = typ.Elem()
path := typePath(typ) path := typePath(typ)
b.lk.Lock() b.lk.Lock()
@ -45,8 +40,8 @@ func (b *bus) withNode(evtType interface{}, cb func(*node)) error {
return nil return nil
} }
func (b *bus) tryDropNode(evtType interface{}) { func (b *bus) tryDropNode(typ reflect.Type) {
path := typePath(reflect.TypeOf(evtType).Elem()) path := typePath(typ)
b.lk.Lock() b.lk.Lock()
n, ok := b.nodes[path] n, ok := b.nodes[path]
@ -67,20 +62,27 @@ func (b *bus) tryDropNode(evtType interface{}) {
b.lk.Unlock() b.lk.Unlock()
} }
func (b *bus) Subscribe(evtType interface{}, _ ...SubOption) (s <-chan interface{}, c CancelFunc, err error) { func (b *bus) Subscribe(typedChan interface{}, _ ...SubOption) (c CancelFunc, err error) {
err = b.withNode(evtType, func(n *node) { refCh := reflect.ValueOf(typedChan)
typ := refCh.Type()
if typ.Kind() != reflect.Chan {
return nil, errors.New("expected a channel")
}
if typ.ChanDir() & reflect.SendDir == 0 {
return nil, errors.New("channel doesn't allow send")
}
err = b.withNode(typ.Elem(), func(n *node) {
// when all subs are waiting on this channel, setting this to 1 doesn't // when all subs are waiting on this channel, setting this to 1 doesn't
// really affect benchmarks // really affect benchmarks
out, i := n.sub(0) i := n.sub(refCh)
s = out
c = func() { c = func() {
n.lk.Lock() n.lk.Lock()
delete(n.sinks, i) delete(n.sinks, i)
close(out)
tryDrop := len(n.sinks) == 0 && n.nEmitters == 0 tryDrop := len(n.sinks) == 0 && n.nEmitters == 0
n.lk.Unlock() n.lk.Unlock()
if tryDrop { if tryDrop {
b.tryDropNode(evtType) b.tryDropNode(typ.Elem())
} }
} }
}) })
@ -88,7 +90,13 @@ func (b *bus) Subscribe(evtType interface{}, _ ...SubOption) (s <-chan interface
} }
func (b *bus) Emitter(evtType interface{}, _ ...EmitterOption) (e EmitFunc, c CancelFunc, err error) { func (b *bus) Emitter(evtType interface{}, _ ...EmitterOption) (e EmitFunc, c CancelFunc, err error) {
err = b.withNode(evtType, func(n *node) { typ := reflect.TypeOf(evtType)
if typ.Kind() != reflect.Ptr {
return nil, nil, errors.New("emitter called with non-pointer type")
}
typ = typ.Elem()
err = b.withNode(typ, func(n *node) {
atomic.AddInt32(&n.nEmitters, 1) atomic.AddInt32(&n.nEmitters, 1)
closed := false closed := false
@ -102,37 +110,13 @@ func (b *bus) Emitter(evtType interface{}, _ ...EmitterOption) (e EmitFunc, c Ca
c = func() { c = func() {
closed = true closed = true
if atomic.AddInt32(&n.nEmitters, -1) == 0 { if atomic.AddInt32(&n.nEmitters, -1) == 0 {
b.tryDropNode(evtType) b.tryDropNode(typ)
} }
} }
}) })
return return
} }
func (b *bus) SendTo(typedChan interface{}) (CancelFunc, error) {
typ := reflect.TypeOf(typedChan)
if typ.Kind() != reflect.Chan {
return nil, errors.New("expected a channel")
}
if typ.ChanDir() & reflect.SendDir == 0 {
return nil, errors.New("channel doesn't allow send")
}
etype := reflect.New(typ.Elem())
sub, cf, err := b.Subscribe(etype.Interface())
if err != nil {
return nil, err
}
go func() {
tcv := reflect.ValueOf(typedChan)
for event := range sub {
tcv.Send(reflect.ValueOf(event))
}
}()
return cf, nil
}
/////////////////////// ///////////////////////
// NODE // NODE
@ -150,34 +134,33 @@ type node struct {
// TODO: we could make emit a bit faster by making this into an array, but // TODO: we could make emit a bit faster by making this into an array, but
// it doesn't seem needed for now // it doesn't seem needed for now
sinks map[int]chan interface{} sinks map[int]reflect.Value
} }
func newNode(typ reflect.Type) *node { func newNode(typ reflect.Type) *node {
return &node{ return &node{
typ: typ, typ: typ,
sinks: map[int]chan interface{}{}, sinks: map[int]reflect.Value{},
} }
} }
func (n *node) sub(buf int) (chan interface{}, int) { func (n *node) sub(outChan reflect.Value) int {
out := make(chan interface{}, buf)
i := n.sinkC i := n.sinkC
n.sinkC++ n.sinkC++
n.sinks[i] = out n.sinks[i] = outChan
return out, i return i
} }
func (n *node) emit(event interface{}) { func (n *node) emit(event interface{}) {
etype := reflect.TypeOf(event) eval := reflect.ValueOf(event)
if etype != n.typ { if eval.Type() != n.typ {
panic(fmt.Sprintf("Emit called with wrong type. expected: %s, got: %s", n.typ, etype)) panic(fmt.Sprintf("Emit called with wrong type. expected: %s, got: %s", n.typ, eval.Type()))
} }
n.lk.RLock() n.lk.RLock()
for _, ch := range n.sinks { for _, ch := range n.sinks {
ch <- event ch.Send(eval)
} }
n.lk.RUnlock() n.lk.RUnlock()
} }

View File

@ -12,7 +12,8 @@ type EventB int
func TestEmit(t *testing.T) { func TestEmit(t *testing.T) {
bus := NewBus() bus := NewBus()
events, cancel, err := bus.Subscribe(new(EventA)) events := make(chan EventA)
cancel, err := bus.Subscribe(events)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -33,7 +34,8 @@ func TestEmit(t *testing.T) {
func TestSub(t *testing.T) { func TestSub(t *testing.T) {
bus := NewBus() bus := NewBus()
events, cancel, err := bus.Subscribe(new(EventB)) events := make(chan EventB)
cancel, err := bus.Subscribe(events)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -45,7 +47,7 @@ func TestSub(t *testing.T) {
go func() { go func() {
defer cancel() defer cancel()
event = (<-events).(EventB) event = <-events
wait.Done() wait.Done()
}() }()
@ -114,7 +116,7 @@ func TestClosingRaces(t *testing.T) {
lk.RLock() lk.RLock()
defer lk.RUnlock() defer lk.RUnlock()
_, cancel, _ := b.Subscribe(new(EventA)) cancel, _ := b.Subscribe(make(chan EventA))
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
cancel() cancel()
@ -157,14 +159,15 @@ func TestSubMany(t *testing.T) {
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
go func() { go func() {
events, cancel, err := bus.Subscribe(new(EventB)) events := make(chan EventB)
cancel, err := bus.Subscribe(events)
if err != nil { if err != nil {
panic(err) panic(err)
} }
defer cancel() defer cancel()
ready.Done() ready.Done()
atomic.AddInt32(&r, int32((<-events).(EventB))) atomic.AddInt32(&r, int32(<-events))
wait.Done() wait.Done()
}() }()
} }
@ -185,7 +188,7 @@ func TestSubMany(t *testing.T) {
} }
} }
func TestSendTo(t *testing.T) { /*func TestSendTo(t *testing.T) {
testSendTo(t, 1000) testSendTo(t, 1000)
} }
@ -219,7 +222,7 @@ func testSendTo(t testing.TB, msgs int) {
if int(r) != 97 * msgs { if int(r) != 97 * msgs {
t.Fatal("got wrong result") t.Fatal("got wrong result")
} }
} }*/
func testMany(t testing.TB, subs, emits, msgs int) { func testMany(t testing.TB, subs, emits, msgs int) {
bus := NewBus() bus := NewBus()
@ -233,7 +236,8 @@ func testMany(t testing.TB, subs, emits, msgs int) {
for i := 0; i < subs; i++ { for i := 0; i < subs; i++ {
go func() { go func() {
events, cancel, err := bus.Subscribe(new(EventB)) events := make(chan EventB)
cancel, err := bus.Subscribe(events)
if err != nil { if err != nil {
panic(err) panic(err)
} }
@ -241,7 +245,7 @@ func testMany(t testing.TB, subs, emits, msgs int) {
ready.Done() ready.Done()
for i := 0; i < emits * msgs; i++ { for i := 0; i < emits * msgs; i++ {
atomic.AddInt64(&r, int64((<-events).(EventB))) atomic.AddInt64(&r, int64(<-events))
} }
wait.Done() wait.Done()
}() }()
@ -330,9 +334,3 @@ func BenchmarkMs6e0m0(b *testing.B) {
b.ReportAllocs() b.ReportAllocs()
testMany(b, 1000000, 1, 1) testMany(b, 1000000, 1, 1)
} }
func BenchmarkSendTo(b *testing.B) {
b.N = 1000000
b.ReportAllocs()
testSendTo(b, b.N)
}

View File

@ -18,9 +18,7 @@ type Bus interface {
// defer cancel() // defer cancel()
// //
// evt := (<-sub).(os.Signal) // guaranteed to be safe // evt := (<-sub).(os.Signal) // guaranteed to be safe
Subscribe(eventType interface{}, opts ...SubOption) (<-chan interface{}, CancelFunc, error) Subscribe(typedChan interface{}, opts ...SubOption) (CancelFunc, error)
SendTo(typedChan interface{}) (CancelFunc, error)
Emitter(eventType interface{}, opts ...EmitterOption) (EmitFunc, CancelFunc, error) Emitter(eventType interface{}, opts ...EmitterOption) (EmitFunc, CancelFunc, error)
} }
@ -32,4 +30,3 @@ type Bus interface {
type EmitFunc func(event interface{}) type EmitFunc func(event interface{})
type CancelFunc func() type CancelFunc func()