go-eventbus/interface.go

97 lines
2.6 KiB
Go
Raw Normal View History

2019-06-13 10:23:03 +08:00
package event
2019-06-17 00:15:35 +08:00
import (
"errors"
"reflect"
)
2019-06-19 20:27:37 +08:00
var closeEmit struct{}
2019-06-19 20:22:10 +08:00
type subSettings struct {
2019-06-17 00:15:35 +08:00
forcedType reflect.Type
}
2019-06-19 21:08:16 +08:00
type SubOption func(interface{}) error
2019-06-17 00:15:35 +08:00
2019-06-19 20:13:48 +08:00
// ForceSubType is a Subscribe option which overrides the type to which
// the subscription will be done. Note that the evtType must be assignable
// to channel type.
//
// This also allows for subscribing to multiple eventbus channels with one
// Go channel to get better ordering guarantees.
//
// Example:
// type Event struct{}
// func (Event) String() string {
// return "event"
// }
//
// eventCh := make(chan fmt.Stringer) // interface { String() string }
// cancel, err := eventbus.Subscribe(eventCh, event.ForceSubType(new(Event)))
// [...]
2019-06-17 00:15:35 +08:00
func ForceSubType(evtType interface{}) SubOption {
2019-06-19 21:08:16 +08:00
return func(settings interface{}) error {
s := settings.(*subSettings)
2019-06-17 00:15:35 +08:00
typ := reflect.TypeOf(evtType)
if typ.Kind() != reflect.Ptr {
return errors.New("ForceSubType called with non-pointer type")
}
s.forcedType = typ
return nil
}
}
2019-06-19 20:22:10 +08:00
type emitterSettings struct {
2019-06-17 03:42:47 +08:00
makeStateful bool
}
2019-06-19 21:08:16 +08:00
type EmitterOption func(interface{}) error
2019-06-19 20:13:48 +08:00
// Stateful is an Emitter option which makes makes the eventbus channel
// 'remember' last event sent, and when a new subscriber joins the
// bus, the remembered event is immediately sent to the subscription
// channel.
//
// This allows to provide state tracking for dynamic systems, and/or
// allows new subscribers to verify that there are Emitters on the channel
2019-06-19 20:22:10 +08:00
func Stateful(s *emitterSettings) {
2019-06-17 03:42:47 +08:00
s.makeStateful = true
}
2019-06-19 20:13:48 +08:00
// Bus is an interface to type-based event delivery system
2019-06-13 10:23:03 +08:00
type Bus interface {
2019-06-17 01:06:49 +08:00
// Subscribe creates new subscription. Failing to drain the channel will cause
2019-06-19 20:22:10 +08:00
// publishers to get blocked. CancelFunc is guaranteed to return after last send
// to the channel
2019-06-19 18:46:59 +08:00
//
// Example:
// ch := make(chan EventT, 10)
// defer close(ch)
// cancel, err := eventbus.Subscribe(ch)
// defer cancel()
2019-06-17 01:06:49 +08:00
Subscribe(typedChan interface{}, opts ...SubOption) (CancelFunc, error)
// Emitter creates new emitter
2019-06-13 14:51:54 +08:00
//
2019-06-17 01:06:49 +08:00
// eventType accepts typed nil pointers, and uses the type information to
2019-06-13 14:51:54 +08:00
// select output type
//
// Example:
2019-06-19 20:32:55 +08:00
// emit, err := eventbus.Emitter(new(EventT))
// defer emit.Close() // MUST call this after being done with the emitter
2019-06-19 18:46:59 +08:00
//
// emit(EventT{})
2019-06-19 20:27:37 +08:00
Emitter(eventType interface{}, opts ...EmitterOption) (EmitFunc, error)
2019-06-13 10:23:03 +08:00
}
2019-06-13 14:51:54 +08:00
// EmitFunc emits events. If any channel subscribed to the topic is blocked,
// calls to EmitFunc will block
//
// Calling this function with wrong event type will cause a panic
type EmitFunc func(event interface{})
2019-06-13 10:23:03 +08:00
2019-06-19 20:27:37 +08:00
func (f EmitFunc) Close() {
f(closeEmit)
}
2019-06-13 14:51:54 +08:00
type CancelFunc func()