acceptor.go 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package connect
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "git.bvbej.com/bvbej/base-golang/pkg/mux"
  7. "github.com/gin-gonic/gin"
  8. "go.uber.org/zap"
  9. "net/http"
  10. "net/url"
  11. "time"
  12. "git.bvbej.com/bvbej/base-golang/pkg/websocket/peer"
  13. "github.com/gorilla/websocket"
  14. )
  15. var _ WsAcceptor = (*wsAcceptor)(nil)
  16. var upgrader = websocket.Upgrader{
  17. CheckOrigin: func(r *http.Request) bool {
  18. return true
  19. },
  20. }
  21. type WsAcceptor interface {
  22. Start(addr string) error
  23. Stop()
  24. GinHandle(ctx *gin.Context)
  25. HandlerFunc() mux.HandlerFunc
  26. }
  27. type wsAcceptor struct {
  28. server *http.Server
  29. sessMgr *peer.SessionManager
  30. logger *zap.Logger
  31. }
  32. func NewWsAcceptor(sessMgr *peer.SessionManager, loggers *zap.Logger) WsAcceptor {
  33. return &wsAcceptor{
  34. sessMgr: sessMgr,
  35. logger: loggers,
  36. }
  37. }
  38. func (ws *wsAcceptor) Start(addr string) error {
  39. urlObj, err := url.Parse(addr)
  40. if err != nil {
  41. return fmt.Errorf("websocket urlparse failed. url(%s) %v", addr, err)
  42. }
  43. if urlObj.Path == "" {
  44. return fmt.Errorf("websocket start failed. expect path in url to listen addr:%s", addr)
  45. }
  46. http.HandleFunc(urlObj.Path, func(w http.ResponseWriter, r *http.Request) {
  47. c, upgradeErr := upgrader.Upgrade(w, r, nil)
  48. if upgradeErr != nil {
  49. ws.logger.Sugar().Errorf("upgrade http failed: %s", upgradeErr)
  50. return
  51. }
  52. ws.sessMgr.Register <- peer.NewSession(NewConnection(c, ws.sessMgr))
  53. })
  54. ws.server = &http.Server{Addr: urlObj.Host}
  55. err = ws.server.ListenAndServe()
  56. if err != nil && !errors.Is(err, http.ErrServerClosed) {
  57. return fmt.Errorf("websocket ListenAndServe addr:%s failed:%v", addr, err)
  58. }
  59. return nil
  60. }
  61. func (ws *wsAcceptor) Stop() {
  62. ws.sessMgr.CloseAllSession()
  63. if ws.server != nil {
  64. ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
  65. defer cancel()
  66. if err := ws.server.Shutdown(ctx); err != nil {
  67. ws.logger.Sugar().Errorf("server shutdown err:[%s]", err)
  68. }
  69. }
  70. }
  71. func (ws *wsAcceptor) GinHandle(ctx *gin.Context) {
  72. c, upgradeErr := upgrader.Upgrade(ctx.Writer, ctx.Request, nil)
  73. if upgradeErr != nil {
  74. ws.logger.Sugar().Errorf("upgrade http failed: %s", upgradeErr)
  75. return
  76. }
  77. ws.sessMgr.Register <- peer.NewSession(NewConnection(c, ws.sessMgr))
  78. }
  79. func (ws *wsAcceptor) HandlerFunc() mux.HandlerFunc {
  80. return func(c mux.Context) {
  81. ws.GinHandle(c.Context())
  82. }
  83. }