acceptor.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. package connect
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gin-gonic/gin"
  6. "go.uber.org/zap"
  7. "net/http"
  8. "net/url"
  9. "time"
  10. "git.bvbej.com/bvbej/base-golang/pkg/websocket/peer"
  11. "github.com/gorilla/websocket"
  12. )
  13. var _ WsAcceptor = (*wsAcceptor)(nil)
  14. var upgrader = websocket.Upgrader{
  15. CheckOrigin: func(r *http.Request) bool {
  16. return true
  17. },
  18. }
  19. type WsAcceptor interface {
  20. Start(addr string) error
  21. Stop()
  22. GinHandle(ctx *gin.Context)
  23. }
  24. type wsAcceptor struct {
  25. server *http.Server
  26. sessMgr *peer.SessionManager
  27. logger *zap.Logger
  28. }
  29. func NewWsAcceptor(sessMgr *peer.SessionManager, loggers *zap.Logger) WsAcceptor {
  30. return &wsAcceptor{
  31. sessMgr: sessMgr,
  32. logger: loggers,
  33. }
  34. }
  35. func (ws *wsAcceptor) Start(addr string) error {
  36. urlObj, err := url.Parse(addr)
  37. if err != nil {
  38. return fmt.Errorf("websocket urlparse failed. url(%s) %v", addr, err)
  39. }
  40. if urlObj.Path == "" {
  41. return fmt.Errorf("websocket start failed. expect path in url to listen addr:%s", addr)
  42. }
  43. http.HandleFunc(urlObj.Path, func(w http.ResponseWriter, r *http.Request) {
  44. c, upgradeErr := upgrader.Upgrade(w, r, nil)
  45. if upgradeErr != nil {
  46. ws.logger.Sugar().Errorf("upgrade http failed: %s", upgradeErr)
  47. return
  48. }
  49. ws.sessMgr.Register <- peer.NewSession(NewConnection(c, ws.sessMgr))
  50. })
  51. ws.server = &http.Server{Addr: urlObj.Host}
  52. err = ws.server.ListenAndServe()
  53. if err != nil && err != http.ErrServerClosed {
  54. return fmt.Errorf("websocket ListenAndServe addr:%s failed:%v", addr, err)
  55. }
  56. return nil
  57. }
  58. func (ws *wsAcceptor) Stop() {
  59. ws.sessMgr.CloseAllSession()
  60. if ws.server != nil {
  61. ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
  62. defer cancel()
  63. if err := ws.server.Shutdown(ctx); err != nil {
  64. ws.logger.Sugar().Errorf("server shutdown err:[%s]", err)
  65. }
  66. }
  67. }
  68. func (ws *wsAcceptor) GinHandle(ctx *gin.Context) {
  69. c, upgradeErr := upgrader.Upgrade(ctx.Writer, ctx.Request, nil)
  70. if upgradeErr != nil {
  71. ws.logger.Sugar().Errorf("upgrade http failed: %s", upgradeErr)
  72. return
  73. }
  74. ws.sessMgr.Register <- peer.NewSession(NewConnection(c, ws.sessMgr))
  75. }