acceptor.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package connect
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/gin-gonic/gin"
  7. "go.uber.org/zap"
  8. "net/http"
  9. "net/url"
  10. "time"
  11. "git.bvbej.com/bvbej/base-golang/pkg/websocket/peer"
  12. "github.com/gorilla/websocket"
  13. )
  14. var _ WsAcceptor = (*wsAcceptor)(nil)
  15. var upgrader = websocket.Upgrader{
  16. CheckOrigin: func(r *http.Request) bool {
  17. return true
  18. },
  19. }
  20. type WsAcceptor interface {
  21. Start(addr string) error
  22. Stop()
  23. GinHandle(ctx *gin.Context)
  24. }
  25. type wsAcceptor struct {
  26. server *http.Server
  27. sessMgr *peer.SessionManager
  28. logger *zap.Logger
  29. }
  30. func NewWsAcceptor(sessMgr *peer.SessionManager, loggers *zap.Logger) WsAcceptor {
  31. return &wsAcceptor{
  32. sessMgr: sessMgr,
  33. logger: loggers,
  34. }
  35. }
  36. func (ws *wsAcceptor) Start(addr string) error {
  37. urlObj, err := url.Parse(addr)
  38. if err != nil {
  39. return fmt.Errorf("websocket urlparse failed. url(%s) %v", addr, err)
  40. }
  41. if urlObj.Path == "" {
  42. return fmt.Errorf("websocket start failed. expect path in url to listen addr:%s", addr)
  43. }
  44. http.HandleFunc(urlObj.Path, func(w http.ResponseWriter, r *http.Request) {
  45. c, upgradeErr := upgrader.Upgrade(w, r, nil)
  46. if upgradeErr != nil {
  47. ws.logger.Sugar().Errorf("upgrade http failed: %s", upgradeErr)
  48. return
  49. }
  50. ws.sessMgr.Register <- peer.NewSession(NewConnection(c, ws.sessMgr))
  51. })
  52. ws.server = &http.Server{Addr: urlObj.Host}
  53. err = ws.server.ListenAndServe()
  54. if err != nil && !errors.Is(err, http.ErrServerClosed) {
  55. return fmt.Errorf("websocket ListenAndServe addr:%s failed:%v", addr, err)
  56. }
  57. return nil
  58. }
  59. func (ws *wsAcceptor) Stop() {
  60. ws.sessMgr.CloseAllSession()
  61. if ws.server != nil {
  62. ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
  63. defer cancel()
  64. if err := ws.server.Shutdown(ctx); err != nil {
  65. ws.logger.Sugar().Errorf("server shutdown err:[%s]", err)
  66. }
  67. }
  68. }
  69. func (ws *wsAcceptor) GinHandle(ctx *gin.Context) {
  70. c, upgradeErr := upgrader.Upgrade(ctx.Writer, ctx.Request, nil)
  71. if upgradeErr != nil {
  72. ws.logger.Sugar().Errorf("upgrade http failed: %s", upgradeErr)
  73. return
  74. }
  75. ws.sessMgr.Register <- peer.NewSession(NewConnection(c, ws.sessMgr))
  76. }