From d23aaa9b5cf3532e68b42f7855f67ebd2748ca69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= <magik6k@gmail.com> Date: Wed, 19 Jun 2019 14:22:10 +0200 Subject: [PATCH] Address @stebalien's review --- basic.go | 7 +++---- basic_test.go | 4 ++-- interface.go | 15 ++++++++------- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/basic.go b/basic.go index 8a5c374..5b06da6 100644 --- a/basic.go +++ b/basic.go @@ -65,7 +65,7 @@ func (b *bus) tryDropNode(typ reflect.Type) { } func (b *bus) Subscribe(typedChan interface{}, opts ...SubOption) (c CancelFunc, err error) { - var settings SubSettings + var settings subSettings for _, opt := range opts { if err := opt(&settings); err != nil { return nil, err @@ -94,7 +94,7 @@ func (b *bus) Subscribe(typedChan interface{}, opts ...SubOption) (c CancelFunc, n.lk.Lock() for i := 0; i < len(n.sinks); i++ { if n.sinks[i] == refCh { - n.sinks[i] = n.sinks[len(n.sinks)-1] + n.sinks[i], n.sinks[len(n.sinks)-1] = n.sinks[len(n.sinks)-1], reflect.Value{} n.sinks = n.sinks[:len(n.sinks)-1] break } @@ -119,7 +119,7 @@ func (b *bus) Subscribe(typedChan interface{}, opts ...SubOption) (c CancelFunc, } func (b *bus) Emitter(evtType interface{}, opts ...EmitterOption) (e EmitFunc, c CancelFunc, err error) { - var settings EmitterSettings + var settings emitterSettings for _, opt := range opts { opt(&settings) } @@ -187,7 +187,6 @@ func (n *node) emit(event interface{}) { n.last.Store(eval) } - // TODO: try using reflect.Select for _, ch := range n.sinks { ch.Send(eval) } diff --git a/basic_test.go b/basic_test.go index 72a6871..14ca408 100644 --- a/basic_test.go +++ b/basic_test.go @@ -305,7 +305,7 @@ func TestStateful(t *testing.T) { } func testMany(t testing.TB, subs, emits, msgs int, stateful bool) { - if detectrace.WithRace() && subs + emits > 5000 { + if detectrace.WithRace() && subs+emits > 5000 { t.SkipNow() } @@ -337,7 +337,7 @@ func testMany(t testing.TB, subs, emits, msgs int, stateful bool) { for i := 0; i < emits; i++ { go func() { - emit, cancel, err := bus.Emitter(new(EventB), func(settings *EmitterSettings) { + emit, cancel, err := bus.Emitter(new(EventB), func(settings *emitterSettings) { settings.makeStateful = stateful }) if err != nil { diff --git a/interface.go b/interface.go index 5953836..96fb060 100644 --- a/interface.go +++ b/interface.go @@ -5,10 +5,10 @@ import ( "reflect" ) -type SubSettings struct { +type subSettings struct { forcedType reflect.Type } -type SubOption func(*SubSettings) error +type SubOption func(*subSettings) error // ForceSubType is a Subscribe option which overrides the type to which // the subscription will be done. Note that the evtType must be assignable @@ -27,7 +27,7 @@ type SubOption func(*SubSettings) error // cancel, err := eventbus.Subscribe(eventCh, event.ForceSubType(new(Event))) // [...] func ForceSubType(evtType interface{}) SubOption { - return func(s *SubSettings) error { + return func(s *subSettings) error { typ := reflect.TypeOf(evtType) if typ.Kind() != reflect.Ptr { return errors.New("ForceSubType called with non-pointer type") @@ -37,10 +37,10 @@ func ForceSubType(evtType interface{}) SubOption { } } -type EmitterSettings struct{ +type emitterSettings struct { makeStateful bool } -type EmitterOption func(*EmitterSettings) +type EmitterOption func(*emitterSettings) // Stateful is an Emitter option which makes makes the eventbus channel // 'remember' last event sent, and when a new subscriber joins the @@ -49,14 +49,15 @@ type EmitterOption func(*EmitterSettings) // // This allows to provide state tracking for dynamic systems, and/or // allows new subscribers to verify that there are Emitters on the channel -func Stateful(s *EmitterSettings) { +func Stateful(s *emitterSettings) { s.makeStateful = true } // Bus is an interface to type-based event delivery system type Bus interface { // Subscribe creates new subscription. Failing to drain the channel will cause - // publishers to get blocked + // publishers to get blocked. CancelFunc is guaranteed to return after last send + // to the channel // // Example: // ch := make(chan EventT, 10)