session_callback.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package service
  2. import (
  3. "fmt"
  4. "git.bvbej.com/bvbej/base-golang/pkg/websocket/util"
  5. "reflect"
  6. "strings"
  7. "time"
  8. "git.bvbej.com/bvbej/base-golang/pkg/websocket/peer"
  9. )
  10. type callBackEntity struct{}
  11. func GetSessionManager() *peer.SessionManager {
  12. return peer.NewSessionMgr(&callBackEntity{})
  13. }
  14. func (cb *callBackEntity) OnClosed(session *peer.Session) {
  15. defer func() {
  16. if err := recover(); err != nil {
  17. fmt.Println(fmt.Sprintf("OnClosed: session:%d err:%v", session.Conn.ID(), err))
  18. }
  19. }()
  20. for _, v := range RegisteredServiceList {
  21. if ok := v.OnSessionClose(session); ok {
  22. return
  23. }
  24. }
  25. }
  26. func (cb *callBackEntity) OnReceive(session *peer.Session, msg []byte) error {
  27. _, msgPack, err := RouterCodec.Unmarshal(msg)
  28. if err != nil {
  29. return fmt.Errorf("onreceive: %v", err)
  30. }
  31. router, ok := msgPack.Router.(string)
  32. if !ok {
  33. return fmt.Errorf("onreceive: invalid router:%v", msgPack.Router)
  34. }
  35. routerArr := strings.Split(router, ".")
  36. if len(routerArr) != 2 {
  37. return fmt.Errorf("onreceive: invalid router:%s", msgPack.Router)
  38. }
  39. s, ok := RegisteredServiceList[routerArr[0]]
  40. if !ok {
  41. return fmt.Errorf("onreceive: function not registed router:%s err:%v", msgPack.Router, err)
  42. }
  43. h, ok := s.Handlers[routerArr[1]]
  44. if !ok {
  45. return fmt.Errorf("onreceive: function not registed router:%s err:%v", msgPack.Router, err)
  46. }
  47. t1 := time.Now()
  48. var args = []reflect.Value{h.Receiver, reflect.ValueOf(session), reflect.ValueOf(msgPack.DataPtr)}
  49. var res any
  50. var rb []byte
  51. res, err = util.CallHandlerFunc(h.Method, args)
  52. if res != nil && !reflect.ValueOf(res).IsNil() {
  53. rb, err = RouterCodec.Marshal(router, res, nil)
  54. if err != nil {
  55. return fmt.Errorf("service: %v", err)
  56. }
  57. err = session.Conn.Send(rb)
  58. if err != nil {
  59. serviceLogger.Sugar().Warnf("warn! service send msg failed router:%s err:%v", router, err)
  60. }
  61. } else {
  62. rb, err = RouterCodec.Marshal(router, nil, err)
  63. if err != nil {
  64. return fmt.Errorf("service: %v", err)
  65. }
  66. err = session.Conn.Send(rb)
  67. if err != nil {
  68. serviceLogger.Sugar().Warnf("warn! service send msg failed router:%s err:%v", router, err)
  69. }
  70. }
  71. var errs string
  72. if err != nil {
  73. errs = err.Error()
  74. }
  75. dt := time.Since(t1)
  76. go s.Component.OnRequestFinished(session, router, RouterCodec.ToString(msgPack.DataPtr), errs, dt)
  77. return nil
  78. }