package peer import ( "sync" "sync/atomic" ) type SessionManager struct { sessionList sync.Map // 使用Id关联会话 connIDGen int64 // 记录已经生成的会话ID流水号 count int64 // 记录当前在使用的会话数量 callback ConnectionCallBack Register chan *Session Unregister chan int64 } func (mgr *SessionManager) SetIDBase(base int64) { atomic.StoreInt64(&mgr.connIDGen, base) } func (mgr *SessionManager) Count() int { return int(atomic.LoadInt64(&mgr.count)) } func (mgr *SessionManager) Add(sess *Session) { id := atomic.AddInt64(&mgr.connIDGen, 1) atomic.AddInt64(&mgr.count, 1) sess.Conn.(interface { SetID(int64) }).SetID(id) mgr.sessionList.Store(id, sess) } func (mgr *SessionManager) Close(id int64) { if v, ok := mgr.sessionList.Load(id); ok { if mgr.callback != nil { go mgr.callback.OnClosed(v.(*Session)) } } mgr.sessionList.Delete(id) atomic.AddInt64(&mgr.count, -1) } func (mgr *SessionManager) ProcessMessage(id int64, msg []byte) { if v, ok := mgr.sessionList.Load(id); ok { if mgr.callback != nil { go func() { err := mgr.callback.OnReceive(v.(*Session), msg) if err != nil { v.(*Session).Conn.Close() } }() } } } func (mgr *SessionManager) run() { for { select { case client := <-mgr.Register: mgr.connIDGen++ mgr.count++ client.Conn.(interface { SetID(int64) }).SetID(mgr.connIDGen) mgr.sessionList.Store(mgr.connIDGen, client) case clientID := <-mgr.Unregister: if v, ok := mgr.sessionList.Load(clientID); ok { if mgr.callback != nil { go mgr.callback.OnClosed(v.(*Session)) } } mgr.sessionList.Delete(clientID) mgr.count-- } } } func (mgr *SessionManager) GetSession(id int64) *Session { if v, ok := mgr.sessionList.Load(id); ok { return v.(*Session) } return nil } func (mgr *SessionManager) VisitSession(callback func(*Session) bool) { mgr.sessionList.Range(func(key, value interface{}) bool { return callback(value.(*Session)) }) } func (mgr *SessionManager) CloseAllSession() { mgr.VisitSession(func(sess *Session) bool { sess.Conn.Close() return true }) } func (mgr *SessionManager) SessionCount() int64 { return atomic.LoadInt64(&mgr.count) } func NewSessionMgr(callback ConnectionCallBack) *SessionManager { s := &SessionManager{ callback: callback, Register: make(chan *Session), Unregister: make(chan int64), } go s.run() return s }