connection.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. package connect
  2. import (
  3. "errors"
  4. "time"
  5. "git.bvbej.com/bvbej/base-golang/pkg/websocket/peer"
  6. "github.com/gorilla/websocket"
  7. )
  8. const (
  9. writeWait = 20 * time.Second
  10. pongWait = 60 * time.Second
  11. pingPeriod = (pongWait * 9) / 10
  12. maxFrameMessageLen = 16 * 1024 //4 * 4096
  13. maxSendBuffer = 16
  14. )
  15. var (
  16. ErrBrokenPipe = errors.New("send to broken pipe")
  17. ErrBufferPoolExceed = errors.New("send buffer exceed")
  18. )
  19. type wsConnection struct {
  20. peer.ConnectionIdentify
  21. pm *peer.SessionManager
  22. conn *websocket.Conn
  23. send chan []byte
  24. running bool
  25. }
  26. func NewConnection(conn *websocket.Conn, p *peer.SessionManager) *wsConnection {
  27. wsc := &wsConnection{
  28. conn: conn,
  29. pm: p,
  30. send: make(chan []byte, maxSendBuffer),
  31. running: true,
  32. }
  33. go wsc.acceptLoop()
  34. go wsc.sendLoop()
  35. return wsc
  36. }
  37. func (ws *wsConnection) Peer() *peer.SessionManager {
  38. return ws.pm
  39. }
  40. func (ws *wsConnection) Raw() any {
  41. if ws.conn == nil {
  42. return nil
  43. }
  44. return ws.conn
  45. }
  46. func (ws *wsConnection) RemoteAddr() string {
  47. if ws.conn == nil {
  48. return ""
  49. }
  50. return ws.conn.RemoteAddr().String()
  51. }
  52. func (ws *wsConnection) Close() {
  53. _ = ws.conn.Close()
  54. ws.running = false
  55. }
  56. func (ws *wsConnection) Send(msg []byte) (err error) {
  57. defer func() {
  58. if e := recover(); e != nil {
  59. err = ErrBrokenPipe
  60. }
  61. }()
  62. if !ws.running {
  63. return ErrBrokenPipe
  64. }
  65. if len(ws.send) >= maxSendBuffer {
  66. return ErrBufferPoolExceed
  67. }
  68. if len(msg) > maxFrameMessageLen {
  69. return
  70. }
  71. ws.send <- msg
  72. return nil
  73. }
  74. func (ws *wsConnection) acceptLoop() {
  75. defer func() {
  76. ws.pm.Unregister <- ws.ID()
  77. _ = ws.conn.Close()
  78. ws.running = false
  79. }()
  80. _ = ws.conn.SetReadDeadline(time.Now().Add(pongWait))
  81. ws.conn.SetPongHandler(func(string) error {
  82. _ = ws.conn.SetReadDeadline(time.Now().Add(pongWait))
  83. return nil
  84. })
  85. for ws.conn != nil {
  86. _, data, err := ws.conn.ReadMessage()
  87. if err != nil {
  88. break
  89. }
  90. ws.pm.ProcessMessage(ws.ID(), data)
  91. }
  92. }
  93. func (ws *wsConnection) sendLoop() {
  94. ticker := time.NewTicker(pingPeriod)
  95. defer func() {
  96. ticker.Stop()
  97. _ = ws.conn.Close()
  98. ws.running = false
  99. close(ws.send)
  100. }()
  101. for {
  102. select {
  103. case msg := <-ws.send:
  104. _ = ws.conn.SetWriteDeadline(time.Now().Add(writeWait))
  105. if err := ws.conn.WriteMessage(websocket.BinaryMessage, msg); err != nil {
  106. return
  107. }
  108. case <-ticker.C:
  109. _ = ws.conn.SetWriteDeadline(time.Now().Add(writeWait))
  110. if err := ws.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
  111. return
  112. }
  113. }
  114. }
  115. }
  116. func (ws *wsConnection) IsClosed() bool {
  117. return !ws.running
  118. }