mirror of
https://github.com/libp2p/go-eventbus.git
synced 2025-03-25 12:10:06 +08:00
Address @stebalien's review
This commit is contained in:
parent
1cb839f3b0
commit
d23aaa9b5c
7
basic.go
7
basic.go
@ -65,7 +65,7 @@ func (b *bus) tryDropNode(typ reflect.Type) {
|
||||
}
|
||||
|
||||
func (b *bus) Subscribe(typedChan interface{}, opts ...SubOption) (c CancelFunc, err error) {
|
||||
var settings SubSettings
|
||||
var settings subSettings
|
||||
for _, opt := range opts {
|
||||
if err := opt(&settings); err != nil {
|
||||
return nil, err
|
||||
@ -94,7 +94,7 @@ func (b *bus) Subscribe(typedChan interface{}, opts ...SubOption) (c CancelFunc,
|
||||
n.lk.Lock()
|
||||
for i := 0; i < len(n.sinks); i++ {
|
||||
if n.sinks[i] == refCh {
|
||||
n.sinks[i] = n.sinks[len(n.sinks)-1]
|
||||
n.sinks[i], n.sinks[len(n.sinks)-1] = n.sinks[len(n.sinks)-1], reflect.Value{}
|
||||
n.sinks = n.sinks[:len(n.sinks)-1]
|
||||
break
|
||||
}
|
||||
@ -119,7 +119,7 @@ func (b *bus) Subscribe(typedChan interface{}, opts ...SubOption) (c CancelFunc,
|
||||
}
|
||||
|
||||
func (b *bus) Emitter(evtType interface{}, opts ...EmitterOption) (e EmitFunc, c CancelFunc, err error) {
|
||||
var settings EmitterSettings
|
||||
var settings emitterSettings
|
||||
for _, opt := range opts {
|
||||
opt(&settings)
|
||||
}
|
||||
@ -187,7 +187,6 @@ func (n *node) emit(event interface{}) {
|
||||
n.last.Store(eval)
|
||||
}
|
||||
|
||||
// TODO: try using reflect.Select
|
||||
for _, ch := range n.sinks {
|
||||
ch.Send(eval)
|
||||
}
|
||||
|
@ -305,7 +305,7 @@ func TestStateful(t *testing.T) {
|
||||
}
|
||||
|
||||
func testMany(t testing.TB, subs, emits, msgs int, stateful bool) {
|
||||
if detectrace.WithRace() && subs + emits > 5000 {
|
||||
if detectrace.WithRace() && subs+emits > 5000 {
|
||||
t.SkipNow()
|
||||
}
|
||||
|
||||
@ -337,7 +337,7 @@ func testMany(t testing.TB, subs, emits, msgs int, stateful bool) {
|
||||
|
||||
for i := 0; i < emits; i++ {
|
||||
go func() {
|
||||
emit, cancel, err := bus.Emitter(new(EventB), func(settings *EmitterSettings) {
|
||||
emit, cancel, err := bus.Emitter(new(EventB), func(settings *emitterSettings) {
|
||||
settings.makeStateful = stateful
|
||||
})
|
||||
if err != nil {
|
||||
|
15
interface.go
15
interface.go
@ -5,10 +5,10 @@ import (
|
||||
"reflect"
|
||||
)
|
||||
|
||||
type SubSettings struct {
|
||||
type subSettings struct {
|
||||
forcedType reflect.Type
|
||||
}
|
||||
type SubOption func(*SubSettings) error
|
||||
type SubOption func(*subSettings) error
|
||||
|
||||
// ForceSubType is a Subscribe option which overrides the type to which
|
||||
// the subscription will be done. Note that the evtType must be assignable
|
||||
@ -27,7 +27,7 @@ type SubOption func(*SubSettings) error
|
||||
// cancel, err := eventbus.Subscribe(eventCh, event.ForceSubType(new(Event)))
|
||||
// [...]
|
||||
func ForceSubType(evtType interface{}) SubOption {
|
||||
return func(s *SubSettings) error {
|
||||
return func(s *subSettings) error {
|
||||
typ := reflect.TypeOf(evtType)
|
||||
if typ.Kind() != reflect.Ptr {
|
||||
return errors.New("ForceSubType called with non-pointer type")
|
||||
@ -37,10 +37,10 @@ func ForceSubType(evtType interface{}) SubOption {
|
||||
}
|
||||
}
|
||||
|
||||
type EmitterSettings struct{
|
||||
type emitterSettings struct {
|
||||
makeStateful bool
|
||||
}
|
||||
type EmitterOption func(*EmitterSettings)
|
||||
type EmitterOption func(*emitterSettings)
|
||||
|
||||
// Stateful is an Emitter option which makes makes the eventbus channel
|
||||
// 'remember' last event sent, and when a new subscriber joins the
|
||||
@ -49,14 +49,15 @@ type EmitterOption func(*EmitterSettings)
|
||||
//
|
||||
// This allows to provide state tracking for dynamic systems, and/or
|
||||
// allows new subscribers to verify that there are Emitters on the channel
|
||||
func Stateful(s *EmitterSettings) {
|
||||
func Stateful(s *emitterSettings) {
|
||||
s.makeStateful = true
|
||||
}
|
||||
|
||||
// Bus is an interface to type-based event delivery system
|
||||
type Bus interface {
|
||||
// Subscribe creates new subscription. Failing to drain the channel will cause
|
||||
// publishers to get blocked
|
||||
// publishers to get blocked. CancelFunc is guaranteed to return after last send
|
||||
// to the channel
|
||||
//
|
||||
// Example:
|
||||
// ch := make(chan EventT, 10)
|
||||
|
Loading…
Reference in New Issue
Block a user