acceptor.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. package connect
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gin-gonic/gin"
  6. "go.uber.org/zap"
  7. "net/http"
  8. "net/url"
  9. "time"
  10. "git.bvbej.com/bvbej/base-golang/pkg/websocket/peer"
  11. "github.com/gorilla/websocket"
  12. )
  13. var _ WsAcceptor = (*wsAcceptor)(nil)
  14. var upgrader = websocket.Upgrader{
  15. CheckOrigin: func(r *http.Request) bool {
  16. return true
  17. },
  18. }
  19. type WsAcceptor interface {
  20. Start(addr string) error
  21. Stop()
  22. }
  23. type wsAcceptor struct {
  24. server *http.Server
  25. sessMgr *peer.SessionManager
  26. logger *zap.Logger
  27. }
  28. func NewWsAcceptor(sessMgr *peer.SessionManager, loggers *zap.Logger) WsAcceptor {
  29. return &wsAcceptor{
  30. sessMgr: sessMgr,
  31. logger: loggers,
  32. }
  33. }
  34. func (ws *wsAcceptor) Start(addr string) error {
  35. urlObj, err := url.Parse(addr)
  36. if err != nil {
  37. return fmt.Errorf("websocket urlparse failed. url(%s) %v", addr, err)
  38. }
  39. if urlObj.Path == "" {
  40. return fmt.Errorf("websocket start failed. expect path in url to listen addr:%s", addr)
  41. }
  42. http.HandleFunc(urlObj.Path, func(w http.ResponseWriter, r *http.Request) {
  43. c, upgradeErr := upgrader.Upgrade(w, r, nil)
  44. if upgradeErr != nil {
  45. ws.logger.Sugar().Errorf("upgrade http failed: %s", upgradeErr)
  46. return
  47. }
  48. ws.sessMgr.Register <- peer.NewSession(newConnection(c, ws.sessMgr))
  49. })
  50. ws.server = &http.Server{Addr: urlObj.Host}
  51. err = ws.server.ListenAndServe()
  52. if err != nil && err != http.ErrServerClosed {
  53. return fmt.Errorf("websocket ListenAndServe addr:%s failed:%v", addr, err)
  54. }
  55. return nil
  56. }
  57. func (ws *wsAcceptor) Stop() {
  58. ws.sessMgr.CloseAllSession()
  59. if ws.server != nil {
  60. ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
  61. defer cancel()
  62. if err := ws.server.Shutdown(ctx); err != nil {
  63. ws.logger.Sugar().Errorf("server shutdown err:[%s]", err)
  64. }
  65. }
  66. }
  67. func (ws *wsAcceptor) GinHandle(ctx *gin.Context) {
  68. c, upgradeErr := upgrader.Upgrade(ctx.Writer, ctx.Request, nil)
  69. if upgradeErr != nil {
  70. ws.logger.Sugar().Errorf("upgrade http failed: %s", upgradeErr)
  71. return
  72. }
  73. ws.sessMgr.Register <- peer.NewSession(newConnection(c, ws.sessMgr))
  74. }