You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
subgraph-oz/ipc/handlers.go

129 lines
3.1 KiB

package ipc
import (
"errors"
"fmt"
"github.com/op/go-logging"
"reflect"
)
type handlerMap map[string]reflect.Value
var defaultLog = logging.MustGetLogger("ipc")
type msgDispatcher struct {
log *logging.Logger
msgs chan *Message
hmap handlerMap
}
func createDispatcher(log *logging.Logger, handlers ...interface{}) (*msgDispatcher, error) {
md := &msgDispatcher{
log: log,
msgs: make(chan *Message),
hmap: make(map[string]reflect.Value),
}
for _, h := range handlers {
if err := md.hmap.addHandler(h); err != nil {
return nil, err
}
}
go md.runDispatcher()
return md, nil
}
func (md *msgDispatcher) close() {
close(md.msgs)
}
func (md *msgDispatcher) dispatch(m *Message) {
md.msgs <- m
}
func (md *msgDispatcher) logger() *logging.Logger {
if md.log != nil {
return md.log
}
return defaultLog
}
func (md *msgDispatcher) runDispatcher() {
for m := range md.msgs {
if err := md.hmap.dispatch(m); err != nil {
md.logger().Warning("error dispatching message: %v", err)
}
}
}
func (handlers handlerMap) dispatch(m *Message) error {
h, ok := handlers[m.Type]
if !ok {
return errors.New("no handler found for message type:" + m.Type)
}
return executeHandler(h, m)
}
func executeHandler(h reflect.Value, m *Message) error {
var args [2]reflect.Value
args[0] = reflect.ValueOf(m.Body)
args[1] = reflect.ValueOf(m)
rs := h.Call(args[:])
if len(rs) != 1 {
return errors.New("handler function did not return expected single result value")
}
if rs[0].IsNil() {
return nil
}
return rs[0].Interface().(error)
}
func (handlers handlerMap) addHandler(h interface{}) error {
msgType, err := typeCheckHandler(h)
if err != nil {
return err
}
if _, ok := handlers[msgType]; ok {
return fmt.Errorf("duplicate handler registered for message type '%s'", msgType)
}
handlers[msgType] = reflect.ValueOf(h)
return nil
}
var errType = reflect.TypeOf((*error)(nil)).Elem()
var messageType = reflect.TypeOf((*Message)(nil))
func typeCheckHandler(h interface{}) (string, error) {
t := reflect.TypeOf(h)
if t.Kind() != reflect.Func {
return "", fmt.Errorf("handler %v is not a function", t)
}
if t.NumIn() != 2 {
return "", fmt.Errorf("handler %v has incorrect number of input arguments, got %d", t, t.NumIn())
}
if t.NumOut() != 1 {
return "", fmt.Errorf("handler %v has incorrect number of return values %d", t, t.NumOut())
}
if t.In(0).Kind() != reflect.Ptr {
return "", errors.New("first argument of handler is not a pointer")
}
in0 := t.In(0).Elem()
if in0.Kind() != reflect.Struct {
return "", fmt.Errorf("first argument of handler is not a pointer to struct")
}
if in1 := t.In(1); !in1.AssignableTo(messageType) {
return "", fmt.Errorf("second argument of handler must have type *Message")
}
if out := t.Out(0); !out.AssignableTo(errType) {
return "", fmt.Errorf("return type of handler must be error")
}
if in0.NumField() == 0 {
return "", fmt.Errorf("first argument structure has no fields")
}
if len(in0.Field(0).Tag) == 0 {
return "", fmt.Errorf("first argument structure, first field has no tag")
}
return string(in0.Field(0).Tag), nil
}