session_callback.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package service
  2. import (
  3. "fmt"
  4. "reflect"
  5. "strings"
  6. "time"
  7. "git.bvbej.com/bvbej/base-golang/pkg/websocket/peer"
  8. )
  9. type callBackEntity struct{}
  10. func GetSessionManager() *peer.SessionManager {
  11. return peer.NewSessionMgr(&callBackEntity{})
  12. }
  13. func (cb *callBackEntity) OnClosed(session *peer.Session) {
  14. defer func() {
  15. if err := recover(); err != nil {
  16. fmt.Println(fmt.Sprintf("OnClosed: session:%d err:%v", session.Conn.ID(), err))
  17. }
  18. }()
  19. for _, v := range RegisteredServiceList {
  20. if ok := v.OnSessionClose(session); ok {
  21. return
  22. }
  23. }
  24. }
  25. func (cb *callBackEntity) OnReceive(session *peer.Session, msg []byte) error {
  26. _, msgPack, err := RouterCodec.Unmarshal(msg)
  27. if err != nil {
  28. return fmt.Errorf("onreceive: %v", err)
  29. }
  30. router, ok := msgPack.Router.(string)
  31. if !ok {
  32. return fmt.Errorf("onreceive: invalid router:%v", msgPack.Router)
  33. }
  34. routerArr := strings.Split(router, ".")
  35. if len(routerArr) != 2 {
  36. return fmt.Errorf("onreceive: invalid router:%s", msgPack.Router)
  37. }
  38. s, ok := RegisteredServiceList[routerArr[0]]
  39. if !ok {
  40. return fmt.Errorf("onreceive: function not registed router:%s err:%v", msgPack.Router, err)
  41. }
  42. h, ok := s.Handlers[routerArr[1]]
  43. if !ok {
  44. return fmt.Errorf("onreceive: function not registed router:%s err:%v", msgPack.Router, err)
  45. }
  46. t1 := time.Now()
  47. var args = []reflect.Value{h.Receiver, reflect.ValueOf(session), reflect.ValueOf(msgPack.DataPtr)}
  48. var res any
  49. var rb []byte
  50. res, err = CallHandlerFunc(h.Method, args)
  51. if res != nil && !reflect.ValueOf(res).IsNil() {
  52. rb, err = RouterCodec.Marshal(router, res, nil)
  53. if err != nil {
  54. return fmt.Errorf("service: %v", err)
  55. }
  56. err = session.Conn.Send(rb)
  57. if err != nil {
  58. serviceLogger.Sugar().Warnf("warn! service send msg failed router:%s err:%v", router, err)
  59. }
  60. } else {
  61. rb, err = RouterCodec.Marshal(router, nil, err)
  62. if err != nil {
  63. return fmt.Errorf("service: %v", err)
  64. }
  65. err = session.Conn.Send(rb)
  66. if err != nil {
  67. serviceLogger.Sugar().Warnf("warn! service send msg failed router:%s err:%v", router, err)
  68. }
  69. }
  70. var errs string
  71. if err != nil {
  72. errs = err.Error()
  73. }
  74. dt := time.Since(t1)
  75. go s.Component.OnRequestFinished(session, router, RouterCodec.ToString(msgPack.DataPtr), errs, dt)
  76. return nil
  77. }
  78. func CallHandlerFunc(foo reflect.Method, args []reflect.Value) (retValue any, retErr error) {
  79. defer func() {
  80. if err := recover(); err != nil {
  81. fmt.Println(fmt.Sprintf("CallHandlerFunc: %v", err))
  82. retValue = nil
  83. retErr = fmt.Errorf("CallHandlerFunc: call method pkg:%s method:%s err:%v", foo.PkgPath, foo.Name, err)
  84. }
  85. }()
  86. if ret := foo.Func.Call(args); len(ret) > 0 {
  87. var err error = nil
  88. if r1 := ret[1].Interface(); r1 != nil {
  89. err = r1.(error)
  90. }
  91. return ret[0].Interface(), err
  92. }
  93. return nil, fmt.Errorf("CallHandlerFunc: call method pkg:%s method:%s", foo.PkgPath, foo.Name)
  94. }