session_manager.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. package peer
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. )
  6. type SessionManager struct {
  7. sessionList sync.Map // 使用Id关联会话
  8. connIDGen int64 // 记录已经生成的会话ID流水号
  9. count int64 // 记录当前在使用的会话数量
  10. callback ConnectionCallBack
  11. Register chan *Session
  12. Unregister chan int64
  13. }
  14. func (mgr *SessionManager) SetIDBase(base int64) {
  15. atomic.StoreInt64(&mgr.connIDGen, base)
  16. }
  17. func (mgr *SessionManager) Count() int {
  18. return int(atomic.LoadInt64(&mgr.count))
  19. }
  20. func (mgr *SessionManager) Add(sess *Session) {
  21. id := atomic.AddInt64(&mgr.connIDGen, 1)
  22. atomic.AddInt64(&mgr.count, 1)
  23. sess.Conn.(interface {
  24. SetID(int64)
  25. }).SetID(id)
  26. mgr.sessionList.Store(id, sess)
  27. }
  28. func (mgr *SessionManager) Close(id int64) {
  29. if v, ok := mgr.sessionList.Load(id); ok {
  30. if mgr.callback != nil {
  31. go mgr.callback.OnClosed(v.(*Session))
  32. }
  33. }
  34. mgr.sessionList.Delete(id)
  35. atomic.AddInt64(&mgr.count, -1)
  36. }
  37. func (mgr *SessionManager) ProcessMessage(id int64, msg []byte) {
  38. if v, ok := mgr.sessionList.Load(id); ok {
  39. if mgr.callback != nil {
  40. go func() {
  41. err := mgr.callback.OnReceive(v.(*Session), msg)
  42. if err != nil {
  43. v.(*Session).Conn.Close()
  44. }
  45. }()
  46. }
  47. }
  48. }
  49. func (mgr *SessionManager) run() {
  50. for {
  51. select {
  52. case client := <-mgr.Register:
  53. mgr.connIDGen++
  54. mgr.count++
  55. client.Conn.(interface {
  56. SetID(int64)
  57. }).SetID(mgr.connIDGen)
  58. mgr.sessionList.Store(mgr.connIDGen, client)
  59. case clientID := <-mgr.Unregister:
  60. if v, ok := mgr.sessionList.Load(clientID); ok {
  61. if mgr.callback != nil {
  62. go mgr.callback.OnClosed(v.(*Session))
  63. }
  64. }
  65. mgr.sessionList.Delete(clientID)
  66. mgr.count--
  67. }
  68. }
  69. }
  70. func (mgr *SessionManager) GetSession(id int64) *Session {
  71. if v, ok := mgr.sessionList.Load(id); ok {
  72. return v.(*Session)
  73. }
  74. return nil
  75. }
  76. func (mgr *SessionManager) VisitSession(callback func(*Session) bool) {
  77. mgr.sessionList.Range(func(key, value interface{}) bool {
  78. return callback(value.(*Session))
  79. })
  80. }
  81. func (mgr *SessionManager) CloseAllSession() {
  82. mgr.VisitSession(func(sess *Session) bool {
  83. sess.Conn.Close()
  84. return true
  85. })
  86. }
  87. func (mgr *SessionManager) SessionCount() int64 {
  88. return atomic.LoadInt64(&mgr.count)
  89. }
  90. func NewSessionMgr(callback ConnectionCallBack) *SessionManager {
  91. s := &SessionManager{
  92. callback: callback,
  93. Register: make(chan *Session),
  94. Unregister: make(chan int64),
  95. }
  96. go s.run()
  97. return s
  98. }