mirror of
https://github.com/libp2p/go-eventbus.git
synced 2025-01-28 04:20:06 +08:00
ForceSubType
This commit is contained in:
parent
7954da5541
commit
7961d7f4a3
17
basic.go
17
basic.go
@ -62,7 +62,14 @@ func (b *bus) tryDropNode(typ reflect.Type) {
|
|||||||
b.lk.Unlock()
|
b.lk.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *bus) Subscribe(typedChan interface{}, _ ...SubOption) (c CancelFunc, err error) {
|
func (b *bus) Subscribe(typedChan interface{}, opts ...SubOption) (c CancelFunc, err error) {
|
||||||
|
var settings SubSettings
|
||||||
|
for _, opt := range opts {
|
||||||
|
if err := opt(&settings); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
refCh := reflect.ValueOf(typedChan)
|
refCh := reflect.ValueOf(typedChan)
|
||||||
typ := refCh.Type()
|
typ := refCh.Type()
|
||||||
if typ.Kind() != reflect.Chan {
|
if typ.Kind() != reflect.Chan {
|
||||||
@ -72,6 +79,13 @@ func (b *bus) Subscribe(typedChan interface{}, _ ...SubOption) (c CancelFunc, er
|
|||||||
return nil, errors.New("channel doesn't allow send")
|
return nil, errors.New("channel doesn't allow send")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if settings.forcedType != nil {
|
||||||
|
if settings.forcedType.Elem().AssignableTo(typ) {
|
||||||
|
return nil, fmt.Errorf("forced type %s cannot be sent to chan %s", settings.forcedType, typ)
|
||||||
|
}
|
||||||
|
typ = settings.forcedType
|
||||||
|
}
|
||||||
|
|
||||||
err = b.withNode(typ.Elem(), func(n *node) {
|
err = b.withNode(typ.Elem(), func(n *node) {
|
||||||
// when all subs are waiting on this channel, setting this to 1 doesn't
|
// when all subs are waiting on this channel, setting this to 1 doesn't
|
||||||
// really affect benchmarks
|
// really affect benchmarks
|
||||||
@ -159,6 +173,7 @@ func (n *node) emit(event interface{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
n.lk.RLock()
|
n.lk.RLock()
|
||||||
|
// TODO: try using reflect.Select
|
||||||
for _, ch := range n.sinks {
|
for _, ch := range n.sinks {
|
||||||
ch.Send(eval)
|
ch.Send(eval)
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package event
|
package event
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
@ -10,6 +11,10 @@ import (
|
|||||||
type EventA struct{}
|
type EventA struct{}
|
||||||
type EventB int
|
type EventB int
|
||||||
|
|
||||||
|
func (EventA) String() string {
|
||||||
|
return "Oh, Hello"
|
||||||
|
}
|
||||||
|
|
||||||
func TestEmit(t *testing.T) {
|
func TestEmit(t *testing.T) {
|
||||||
bus := NewBus()
|
bus := NewBus()
|
||||||
events := make(chan EventA)
|
events := make(chan EventA)
|
||||||
@ -188,41 +193,38 @@ func TestSubMany(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*func TestSendTo(t *testing.T) {
|
func TestSubType(t *testing.T) {
|
||||||
testSendTo(t, 1000)
|
|
||||||
}
|
|
||||||
|
|
||||||
func testSendTo(t testing.TB, msgs int) {
|
|
||||||
bus := NewBus()
|
bus := NewBus()
|
||||||
|
events := make(chan fmt.Stringer)
|
||||||
|
cancel, err := bus.Subscribe(events, ForceSubType(new(EventA)))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var event fmt.Stringer
|
||||||
|
|
||||||
|
var wait sync.WaitGroup
|
||||||
|
wait.Add(1)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
emit, cancel, err := bus.Emitter(new(EventB))
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
event = <-events
|
||||||
for i := 0; i < msgs; i++ {
|
wait.Done()
|
||||||
emit(EventB(97))
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
ch := make(chan EventB)
|
emit, cancel, err := bus.Emitter(new(EventA))
|
||||||
cancel, err := bus.SendTo(ch)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
r := 0
|
emit(EventA{})
|
||||||
for i := 0; i < msgs; i++ {
|
wait.Wait()
|
||||||
r += int(<-ch)
|
|
||||||
}
|
|
||||||
|
|
||||||
if int(r) != 97 * msgs {
|
if event.String() != "Oh, Hello" {
|
||||||
t.Fatal("got wrong result")
|
t.Error("didn't get the correct message")
|
||||||
}
|
}
|
||||||
}*/
|
}
|
||||||
|
|
||||||
func testMany(t testing.TB, subs, emits, msgs int) {
|
func testMany(t testing.TB, subs, emits, msgs int) {
|
||||||
bus := NewBus()
|
bus := NewBus()
|
||||||
|
22
interface.go
22
interface.go
@ -1,7 +1,25 @@
|
|||||||
package event
|
package event
|
||||||
|
|
||||||
type SubSettings struct {}
|
import (
|
||||||
type SubOption func(*SubSettings)
|
"errors"
|
||||||
|
"reflect"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SubSettings struct {
|
||||||
|
forcedType reflect.Type
|
||||||
|
}
|
||||||
|
type SubOption func(*SubSettings) error
|
||||||
|
|
||||||
|
func ForceSubType(evtType interface{}) SubOption {
|
||||||
|
return func(s *SubSettings) error {
|
||||||
|
typ := reflect.TypeOf(evtType)
|
||||||
|
if typ.Kind() != reflect.Ptr {
|
||||||
|
return errors.New("ForceSubType called with non-pointer type")
|
||||||
|
}
|
||||||
|
s.forcedType = typ
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type EmitterSettings struct {}
|
type EmitterSettings struct {}
|
||||||
type EmitterOption func(*EmitterSettings)
|
type EmitterOption func(*EmitterSettings)
|
||||||
|
Loading…
Reference in New Issue
Block a user