From 001f18ea777af06c69f47c5e99c86ae165425a91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Fri, 14 Jun 2019 18:57:21 +0200 Subject: [PATCH] POC SendTo --- basic.go | 26 ++++++++++++++++++++++++++ basic_test.go | 50 +++++++++++++++++++++++++++++++++++++++++++++++++- interface.go | 1 + 3 files changed, 76 insertions(+), 1 deletion(-) diff --git a/basic.go b/basic.go index 7ddf9e1..f755679 100644 --- a/basic.go +++ b/basic.go @@ -69,6 +69,8 @@ func (b *bus) tryDropNode(evtType interface{}) { func (b *bus) Subscribe(evtType interface{}, _ ...SubOption) (s <-chan interface{}, c CancelFunc, err error) { err = b.withNode(evtType, func(n *node) { + // when all subs are waiting on this channel, setting this to 1 doesn't + // really affect benchmarks out, i := n.sub(0) s = out c = func() { @@ -107,6 +109,30 @@ func (b *bus) Emitter(evtType interface{}, _ ...EmitterOption) (e EmitFunc, c Ca return } +func (b *bus) SendTo(typedChan interface{}) (CancelFunc, error) { + typ := reflect.TypeOf(typedChan) + if typ.Kind() != reflect.Chan { + return nil, errors.New("expected a channel") + } + if typ.ChanDir() & reflect.SendDir == 0 { + return nil, errors.New("channel doesn't allow send") + } + etype := reflect.New(typ.Elem()) + sub, cf, err := b.Subscribe(etype.Interface()) + if err != nil { + return nil, err + } + + go func() { + tcv := reflect.ValueOf(typedChan) + for event := range sub { + tcv.Send(reflect.ValueOf(event)) + } + }() + + return cf, nil +} + /////////////////////// // NODE diff --git a/basic_test.go b/basic_test.go index 7a95c12..c8b3de5 100644 --- a/basic_test.go +++ b/basic_test.go @@ -185,6 +185,42 @@ func TestSubMany(t *testing.T) { } } +func TestSendTo(t *testing.T) { + testSendTo(t, 1000) +} + +func testSendTo(t testing.TB, msgs int) { + bus := NewBus() + + go func() { + emit, cancel, err := bus.Emitter(new(EventB)) + if err != nil { + panic(err) + } + defer cancel() + + for i := 0; i < msgs; i++ { + emit(EventB(97)) + } + }() + + ch := make(chan EventB) + cancel, err := bus.SendTo(ch) + if err != nil { + return + } + defer cancel() + + r := 0 + for i := 0; i < msgs; i++ { + r += int(<-ch) + } + + if int(r) != 97 * msgs { + t.Fatal("got wrong result") + } +} + func testMany(t testing.TB, subs, emits, msgs int) { bus := NewBus() @@ -272,11 +308,17 @@ func BenchmarkMs1e2m4(b *testing.B) { } func BenchmarkMs1e0m6(b *testing.B) { - b.N = 1000000 + b.N = 10000000 b.ReportAllocs() testMany(b, 10, 1, 1000000) } +func BenchmarkMs0e0m6(b *testing.B) { + b.N = 1000000 + b.ReportAllocs() + testMany(b, 1, 1, 1000000) +} + func BenchmarkMs0e6m0(b *testing.B) { b.N = 1000000 b.ReportAllocs() @@ -288,3 +330,9 @@ func BenchmarkMs6e0m0(b *testing.B) { b.ReportAllocs() testMany(b, 1000000, 1, 1) } + +func BenchmarkSendTo(b *testing.B) { + b.N = 1000000 + b.ReportAllocs() + testSendTo(b, b.N) +} diff --git a/interface.go b/interface.go index 80e9585..3b50529 100644 --- a/interface.go +++ b/interface.go @@ -20,6 +20,7 @@ type Bus interface { // evt := (<-sub).(os.Signal) // guaranteed to be safe Subscribe(eventType interface{}, opts ...SubOption) (<-chan interface{}, CancelFunc, error) + SendTo(typedChan interface{}) (CancelFunc, error) Emitter(eventType interface{}, opts ...EmitterOption) (EmitFunc, CancelFunc, error) }