observable.go 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package observable
  2. // Ref: github.com/Dreamacro/clash/common/observable
  3. import (
  4. "errors"
  5. "sync"
  6. )
  7. type Observable struct {
  8. iterable Iterable
  9. listener map[Subscription]*Subscriber
  10. mux sync.Mutex
  11. done bool
  12. }
  13. func (o *Observable) process() {
  14. for item := range o.iterable {
  15. o.mux.Lock()
  16. for _, sub := range o.listener {
  17. sub.Emit(item)
  18. }
  19. o.mux.Unlock()
  20. }
  21. o.close()
  22. }
  23. func (o *Observable) close() {
  24. o.mux.Lock()
  25. defer o.mux.Unlock()
  26. o.done = true
  27. for _, sub := range o.listener {
  28. sub.Close()
  29. }
  30. }
  31. func (o *Observable) Subscribe() (Subscription, error) {
  32. o.mux.Lock()
  33. defer o.mux.Unlock()
  34. if o.done {
  35. return nil, errors.New("observable is closed")
  36. }
  37. subscriber := newSubscriber()
  38. o.listener[subscriber.Out()] = subscriber
  39. return subscriber.Out(), nil
  40. }
  41. func (o *Observable) UnSubscribe(sub Subscription) {
  42. o.mux.Lock()
  43. defer o.mux.Unlock()
  44. subscriber, exist := o.listener[sub]
  45. if !exist {
  46. return
  47. }
  48. delete(o.listener, sub)
  49. subscriber.Close()
  50. }
  51. func NewObservable(any Iterable) *Observable {
  52. observable := &Observable{
  53. iterable: any,
  54. listener: map[Subscription]*Subscriber{},
  55. }
  56. go observable.process()
  57. return observable
  58. }