12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667 |
- package observable
- // Ref: github.com/Dreamacro/clash/common/observable
- import (
- "errors"
- "sync"
- )
- type Observable struct {
- iterable Iterable
- listener map[Subscription]*Subscriber
- mux sync.Mutex
- done bool
- }
- func (o *Observable) process() {
- for item := range o.iterable {
- o.mux.Lock()
- for _, sub := range o.listener {
- sub.Emit(item)
- }
- o.mux.Unlock()
- }
- o.close()
- }
- func (o *Observable) close() {
- o.mux.Lock()
- defer o.mux.Unlock()
- o.done = true
- for _, sub := range o.listener {
- sub.Close()
- }
- }
- func (o *Observable) Subscribe() (Subscription, error) {
- o.mux.Lock()
- defer o.mux.Unlock()
- if o.done {
- return nil, errors.New("observable is closed")
- }
- subscriber := newSubscriber()
- o.listener[subscriber.Out()] = subscriber
- return subscriber.Out(), nil
- }
- func (o *Observable) UnSubscribe(sub Subscription) {
- o.mux.Lock()
- defer o.mux.Unlock()
- subscriber, exist := o.listener[sub]
- if !exist {
- return
- }
- delete(o.listener, sub)
- subscriber.Close()
- }
- func NewObservable(any Iterable) *Observable {
- observable := &Observable{
- iterable: any,
- listener: map[Subscription]*Subscriber{},
- }
- go observable.process()
- return observable
- }
|