connection.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package connect
  2. import (
  3. "errors"
  4. "time"
  5. "git.bvbej.com/bvbej/base-golang/pkg/websocket/peer"
  6. "github.com/gorilla/websocket"
  7. )
  8. const (
  9. writeWait = 20 * time.Second
  10. pongWait = 60 * time.Second
  11. pingPeriod = (pongWait * 9) / 10
  12. maxFrameMessageLen = 16 * 1024 //4 * 4096
  13. maxSendBuffer = 16
  14. )
  15. var (
  16. ErrBrokenPipe = errors.New("send to broken pipe")
  17. ErrBufferPoolExceed = errors.New("send buffer exceed")
  18. )
  19. type wsConnection struct {
  20. peer.ConnectionIdentify
  21. p *peer.SessionManager
  22. conn *websocket.Conn
  23. send chan []byte
  24. running bool
  25. }
  26. func (ws *wsConnection) Peer() *peer.SessionManager {
  27. return ws.p
  28. }
  29. func (ws *wsConnection) Raw() interface{} {
  30. if ws.conn == nil {
  31. return nil
  32. }
  33. return ws.conn
  34. }
  35. func (ws *wsConnection) RemoteAddr() string {
  36. if ws.conn == nil {
  37. return ""
  38. }
  39. return ws.conn.RemoteAddr().String()
  40. }
  41. func (ws *wsConnection) Close() {
  42. _ = ws.conn.Close()
  43. ws.running = false
  44. }
  45. func (ws *wsConnection) Send(msg []byte) (err error) {
  46. defer func() {
  47. if e := recover(); e != nil {
  48. err = ErrBrokenPipe
  49. }
  50. }()
  51. if !ws.running {
  52. return ErrBrokenPipe
  53. }
  54. if len(ws.send) >= maxSendBuffer {
  55. return ErrBufferPoolExceed
  56. }
  57. if len(msg) > maxFrameMessageLen {
  58. return
  59. }
  60. ws.send <- msg
  61. return nil
  62. }
  63. func (ws *wsConnection) recvLoop() {
  64. defer func() {
  65. ws.p.Unregister <- ws.ID()
  66. _ = ws.conn.Close()
  67. ws.running = false
  68. }()
  69. _ = ws.conn.SetReadDeadline(time.Now().Add(pongWait))
  70. ws.conn.SetPongHandler(func(string) error {
  71. _ = ws.conn.SetReadDeadline(time.Now().Add(pongWait))
  72. return nil
  73. })
  74. for ws.conn != nil {
  75. _, data, err := ws.conn.ReadMessage()
  76. if err != nil {
  77. break
  78. }
  79. ws.p.ProcessMessage(ws.ID(), data)
  80. }
  81. }
  82. func (ws *wsConnection) sendLoop() {
  83. ticker := time.NewTicker(pingPeriod)
  84. defer func() {
  85. ticker.Stop()
  86. _ = ws.conn.Close()
  87. ws.running = false
  88. close(ws.send)
  89. }()
  90. for {
  91. select {
  92. case msg := <-ws.send:
  93. _ = ws.conn.SetWriteDeadline(time.Now().Add(writeWait))
  94. if err := ws.conn.WriteMessage(websocket.BinaryMessage, msg); err != nil {
  95. return
  96. }
  97. case <-ticker.C:
  98. _ = ws.conn.SetWriteDeadline(time.Now().Add(writeWait))
  99. if err := ws.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
  100. return
  101. }
  102. }
  103. }
  104. }
  105. func (ws *wsConnection) IsClosed() bool {
  106. return !ws.running
  107. }
  108. func (ws *wsConnection) init() {
  109. go ws.recvLoop()
  110. go ws.sendLoop()
  111. ws.running = true
  112. }
  113. func newConnection(conn *websocket.Conn,
  114. p *peer.SessionManager) *wsConnection {
  115. wsc := &wsConnection{
  116. conn: conn,
  117. p: p,
  118. send: make(chan []byte, maxSendBuffer),
  119. }
  120. wsc.init()
  121. return wsc
  122. }