subscriber.go 434 B

123456789101112131415161718192021222324252627282930313233
  1. package observable
  2. import (
  3. "sync"
  4. )
  5. type Subscription <-chan any
  6. type Subscriber struct {
  7. buffer chan any
  8. once sync.Once
  9. }
  10. func (s *Subscriber) Emit(item any) {
  11. s.buffer <- item
  12. }
  13. func (s *Subscriber) Out() Subscription {
  14. return s.buffer
  15. }
  16. func (s *Subscriber) Close() {
  17. s.once.Do(func() {
  18. close(s.buffer)
  19. })
  20. }
  21. func newSubscriber() *Subscriber {
  22. sub := &Subscriber{
  23. buffer: make(chan any, 200),
  24. }
  25. return sub
  26. }