123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- package peer
- import (
- "sync"
- "sync/atomic"
- )
- type SessionManager struct {
- sessionList sync.Map // 使用Id关联会话
- connIDGen int64 // 记录已经生成的会话ID流水号
- count int64 // 记录当前在使用的会话数量
- callback ConnectionCallBack
- Register chan *Session
- Unregister chan int64
- }
- func (mgr *SessionManager) SetIDBase(base int64) {
- atomic.StoreInt64(&mgr.connIDGen, base)
- }
- func (mgr *SessionManager) Count() int {
- return int(atomic.LoadInt64(&mgr.count))
- }
- func (mgr *SessionManager) Add(sess *Session) {
- id := atomic.AddInt64(&mgr.connIDGen, 1)
- atomic.AddInt64(&mgr.count, 1)
- sess.Conn.(interface {
- SetID(int64)
- }).SetID(id)
- mgr.sessionList.Store(id, sess)
- }
- func (mgr *SessionManager) Close(id int64) {
- if v, ok := mgr.sessionList.Load(id); ok {
- if mgr.callback != nil {
- go mgr.callback.OnClosed(v.(*Session))
- }
- }
- mgr.sessionList.Delete(id)
- atomic.AddInt64(&mgr.count, -1)
- }
- func (mgr *SessionManager) ProcessMessage(id int64, msg []byte) {
- if v, ok := mgr.sessionList.Load(id); ok {
- if mgr.callback != nil {
- go func() {
- err := mgr.callback.OnReceive(v.(*Session), msg)
- if err != nil {
- v.(*Session).Conn.Close()
- }
- }()
- }
- }
- }
- func (mgr *SessionManager) run() {
- for {
- select {
- case client := <-mgr.Register:
- mgr.connIDGen++
- mgr.count++
- client.Conn.(interface {
- SetID(int64)
- }).SetID(mgr.connIDGen)
- mgr.sessionList.Store(mgr.connIDGen, client)
- case clientID := <-mgr.Unregister:
- if v, ok := mgr.sessionList.Load(clientID); ok {
- if mgr.callback != nil {
- go mgr.callback.OnClosed(v.(*Session))
- }
- }
- mgr.sessionList.Delete(clientID)
- mgr.count--
- }
- }
- }
- func (mgr *SessionManager) GetSession(id int64) *Session {
- if v, ok := mgr.sessionList.Load(id); ok {
- return v.(*Session)
- }
- return nil
- }
- func (mgr *SessionManager) VisitSession(callback func(*Session) bool) {
- mgr.sessionList.Range(func(key, value interface{}) bool {
- return callback(value.(*Session))
- })
- }
- func (mgr *SessionManager) CloseAllSession() {
- mgr.VisitSession(func(sess *Session) bool {
- sess.Conn.Close()
- return true
- })
- }
- func (mgr *SessionManager) SessionCount() int64 {
- return atomic.LoadInt64(&mgr.count)
- }
- func NewSessionMgr(callback ConnectionCallBack) *SessionManager {
- s := &SessionManager{
- callback: callback,
- Register: make(chan *Session),
- Unregister: make(chan int64),
- }
- go s.run()
- return s
- }
|