123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- package sse
- import (
- "github.com/gin-gonic/gin"
- "io"
- "net/http"
- "sync"
- "sync/atomic"
- )
- var _ Server = (*event)(nil)
- type Server interface {
- HandlerFunc(auth func(c *gin.Context) (string, error)) gin.HandlerFunc
- Push(user, name, msg string) bool
- Broadcast(name, msg string)
- }
- type clientChan struct {
- User string
- Chan chan msgChan
- }
- type msgChan struct {
- Name string
- Message string
- }
- type event struct {
- SessionList sync.Map
- Count atomic.Int32
- Register chan clientChan
- Unregister chan string
- }
- func NewServer() Server {
- e := &event{
- SessionList: sync.Map{},
- Count: atomic.Int32{},
- Register: make(chan clientChan),
- Unregister: make(chan string),
- }
- go e.listen()
- return e
- }
- func (stream *event) listen() {
- for {
- select {
- case client := <-stream.Register:
- stream.SessionList.Store(client.User, client.Chan)
- stream.Count.Add(1)
- case user := <-stream.Unregister:
- value, ok := stream.SessionList.Load(user)
- if ok {
- event := value.(chan msgChan)
- close(event)
- stream.SessionList.Delete(user)
- stream.Count.Add(-1)
- }
- }
- }
- }
- func (stream *event) HandlerFunc(auth func(c *gin.Context) (string, error)) gin.HandlerFunc {
- return func(c *gin.Context) {
- user, err := auth(c)
- if err != nil {
- c.AbortWithStatus(http.StatusBadRequest)
- return
- }
- e := make(chan msgChan)
- client := clientChan{
- User: user,
- Chan: e,
- }
- stream.Register <- client
- defer func() {
- stream.Unregister <- user
- }()
- c.Writer.Header().Set("Content-Type", "text/event-stream")
- c.Writer.Header().Set("Cache-Control", "no-cache")
- c.Writer.Header().Set("Connection", "keep-alive")
- c.Writer.Header().Set("Transfer-Encoding", "chunked")
- c.Stream(func(w io.Writer) bool {
- if msg, ok := <-e; ok {
- c.SSEvent(msg.Name, msg.Message)
- return true
- }
- return false
- })
- c.Next()
- }
- }
- func (stream *event) Push(user, name, msg string) bool {
- value, ok := stream.SessionList.Load(user)
- if ok {
- val := value.(chan msgChan)
- val <- msgChan{Name: name, Message: msg}
- }
- return false
- }
- func (stream *event) Broadcast(name, msg string) {
- stream.SessionList.Range(func(user, value any) bool {
- val := value.(chan msgChan)
- val <- msgChan{Name: name, Message: msg}
- return true
- })
- }
|