This commit is contained in:
Łukasz Magiera 2019-06-16 19:06:49 +02:00
parent 7961d7f4a3
commit 1c855d2c2d
3 changed files with 17 additions and 15 deletions

View File

@ -12,7 +12,7 @@ import (
// BUS
type bus struct {
lk sync.Mutex
lk sync.Mutex
nodes map[string]*node
}
@ -75,7 +75,7 @@ func (b *bus) Subscribe(typedChan interface{}, opts ...SubOption) (c CancelFunc,
if typ.Kind() != reflect.Chan {
return nil, errors.New("expected a channel")
}
if typ.ChanDir() & reflect.SendDir == 0 {
if typ.ChanDir()&reflect.SendDir == 0 {
return nil, errors.New("channel doesn't allow send")
}
@ -144,11 +144,14 @@ type node struct {
nEmitters int32
// sink index counter
sinkC int
sinkC int
// TODO: we could make emit a bit faster by making this into an array, but
// it doesn't seem needed for now
sinks map[int]reflect.Value
sinks map[int]reflect.Value
keepLast bool
last reflect.Value
}
func newNode(typ reflect.Type) *node {

View File

@ -188,7 +188,7 @@ func TestSubMany(t *testing.T) {
emit(EventB(7))
wait.Wait()
if int(r) != 7 * n {
if int(r) != 7*n {
t.Error("got wrong result")
}
}
@ -246,7 +246,7 @@ func testMany(t testing.TB, subs, emits, msgs int) {
defer cancel()
ready.Done()
for i := 0; i < emits * msgs; i++ {
for i := 0; i < emits*msgs; i++ {
atomic.AddInt64(&r, int64(<-events))
}
wait.Done()
@ -273,7 +273,7 @@ func testMany(t testing.TB, subs, emits, msgs int) {
wait.Wait()
if int(r) != 97 * subs * emits * msgs {
if int(r) != 97*subs*emits*msgs {
t.Fatal("got wrong result")
}
}

View File

@ -21,23 +21,22 @@ func ForceSubType(evtType interface{}) SubOption {
}
}
type EmitterSettings struct {}
type EmitterSettings struct{}
type EmitterOption func(*EmitterSettings)
type Bus interface {
// Subscribe creates new subscription. Failing to drain the incoming channel
// will cause publishers to get blocked
// Subscribe creates new subscription. Failing to drain the channel will cause
// publishers to get blocked
Subscribe(typedChan interface{}, opts ...SubOption) (CancelFunc, error)
// Emitter creates new emitter
//
// evtTypes only accepts typed nil pointers, and uses the type information to
// eventType accepts typed nil pointers, and uses the type information to
// select output type
//
// Example:
// sub, cancel, err := eventbus.Subscribe(new(os.Signal))
// defer cancel()
//
// evt := (<-sub).(os.Signal) // guaranteed to be safe
Subscribe(typedChan interface{}, opts ...SubOption) (CancelFunc, error)
Emitter(eventType interface{}, opts ...EmitterOption) (EmitFunc, CancelFunc, error)
}