session_callback.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package service
  2. import (
  3. "fmt"
  4. "reflect"
  5. "strings"
  6. "time"
  7. "git.bvbej.com/bvbej/base-golang/pkg/websocket/peer"
  8. )
  9. type callBackEntity struct{}
  10. func (cb *callBackEntity) OnClosed(session *peer.Session) {
  11. defer func() {
  12. if err := recover(); err != nil {
  13. fmt.Println(fmt.Sprintf("OnClosed: session:%d err:%v", session.Conn.ID(), err))
  14. }
  15. }()
  16. for _, v := range serviceList {
  17. if ok := v.OnSessionClose(session); ok {
  18. return
  19. }
  20. }
  21. }
  22. //调用注册的函数
  23. func callHandlerFunc(foo reflect.Method, args []reflect.Value) (retValue interface{}, retErr error) {
  24. defer func() {
  25. if err := recover(); err != nil {
  26. fmt.Println(fmt.Sprintf("callHandlerFunc: %v", err))
  27. retValue = nil
  28. retErr = fmt.Errorf("callHandlerFunc: call method pkg:%s method:%s err:%v", foo.PkgPath, foo.Name, err)
  29. }
  30. }()
  31. if ret := foo.Func.Call(args); len(ret) > 0 {
  32. var err error = nil
  33. if r1 := ret[1].Interface(); r1 != nil {
  34. err = r1.(error)
  35. }
  36. return ret[0].Interface(), err
  37. }
  38. return nil, fmt.Errorf("callHandlerFunc: call method pkg:%s method:%s", foo.PkgPath, foo.Name)
  39. }
  40. //接收到消息后处理
  41. func (cb *callBackEntity) OnReceive(session *peer.Session, msg []byte) error {
  42. _, msgPack, err := routerCodec.Unmarshal(msg)
  43. if err != nil {
  44. return fmt.Errorf("onreceive: %v", err)
  45. }
  46. router, ok := msgPack.Router.(string)
  47. if !ok {
  48. return fmt.Errorf("onreceive: invalid router:%v", msgPack.Router)
  49. }
  50. routerArr := strings.Split(router, ".")
  51. if len(routerArr) != 2 {
  52. return fmt.Errorf("onreceive: invalid router:%s", msgPack.Router)
  53. }
  54. s, ok := serviceList[routerArr[0]]
  55. if !ok {
  56. return fmt.Errorf("onreceive: function not registed router:%s err:%v", msgPack.Router, err)
  57. }
  58. h, ok := s.Handlers[routerArr[1]]
  59. if !ok {
  60. return fmt.Errorf("onreceive: function not registed router:%s err:%v", msgPack.Router, err)
  61. }
  62. t1 := time.Now()
  63. var args = []reflect.Value{h.Receiver, reflect.ValueOf(session), reflect.ValueOf(msgPack.DataPtr)}
  64. var res interface{}
  65. var rb []byte
  66. res, err = callHandlerFunc(h.Method, args)
  67. if res != nil && !reflect.ValueOf(res).IsNil() {
  68. rb, err = routerCodec.Marshal(router, res, nil)
  69. if err != nil {
  70. return fmt.Errorf("service: %v", err)
  71. }
  72. err = session.Conn.Send(rb)
  73. if err != nil {
  74. serviceLogger.Sugar().Warnf("warn! service send msg failed router:%s err:%v", router, err)
  75. }
  76. } else {
  77. rb, err = routerCodec.Marshal(router, nil, err)
  78. if err != nil {
  79. return fmt.Errorf("service: %v", err)
  80. }
  81. err = session.Conn.Send(rb)
  82. if err != nil {
  83. serviceLogger.Sugar().Warnf("warn! service send msg failed router:%s err:%v", router, err)
  84. }
  85. }
  86. var errs string
  87. if err != nil {
  88. errs = err.Error()
  89. }
  90. dt := time.Since(t1)
  91. go s.component.OnRequestFinished(session, router, routerCodec.ToString(msgPack.DataPtr), errs, dt)
  92. return nil
  93. }
  94. func GetSessionManager() *peer.SessionManager {
  95. return peer.NewSessionMgr(&callBackEntity{})
  96. }