package sse import ( "github.com/gin-gonic/gin" "io" "net/http" "sync" "sync/atomic" ) type clientChan struct { User string Chan chan msgChan } type msgChan struct { Name string Message string } type Event struct { SessionList sync.Map Count atomic.Int32 Register chan clientChan Unregister chan string } func NewServer() (event *Event) { event = &Event{ SessionList: sync.Map{}, Count: atomic.Int32{}, Register: make(chan clientChan), Unregister: make(chan string), } go event.listen() return } func (stream *Event) listen() { for { select { case client := <-stream.Register: stream.SessionList.Store(client.User, client.Chan) stream.Count.Add(1) case user := <-stream.Unregister: value, ok := stream.SessionList.Load(user) if ok { event := value.(chan msgChan) close(event) stream.SessionList.Delete(user) stream.Count.Add(-1) } } } } func (stream *Event) HandlerFunc(auth func(c *gin.Context) (string, error)) gin.HandlerFunc { return func(c *gin.Context) { user, err := auth(c) if err != nil { c.AbortWithStatus(http.StatusBadRequest) return } event := make(chan msgChan) client := clientChan{ User: user, Chan: event, } stream.Register <- client defer func() { stream.Unregister <- user }() c.Writer.Header().Set("Content-Type", "text/event-stream") c.Writer.Header().Set("Cache-Control", "no-cache") c.Writer.Header().Set("Connection", "keep-alive") c.Writer.Header().Set("Transfer-Encoding", "chunked") c.Stream(func(w io.Writer) bool { if msg, ok := <-event; ok { c.SSEvent(msg.Name, msg.Message) return true } return false }) c.Next() } } func (stream *Event) Push(user, name, msg string) bool { value, ok := stream.SessionList.Load(user) if ok { val := value.(chan msgChan) val <- msgChan{Name: name, Message: msg} } return false } func (stream *Event) Broadcast(name, msg string) { stream.SessionList.Range(func(user, value any) bool { val := value.(chan msgChan) val <- msgChan{Name: name, Message: msg} return true }) }