downloader.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. package downloader
  2. import (
  3. "errors"
  4. "git.bvbej.com/bvbej/base-golang/pkg/downloader/base"
  5. "git.bvbej.com/bvbej/base-golang/pkg/downloader/controller"
  6. "git.bvbej.com/bvbej/base-golang/pkg/downloader/fetcher"
  7. "git.bvbej.com/bvbej/base-golang/pkg/downloader/protocol/http"
  8. "git.bvbej.com/bvbej/base-golang/pkg/downloader/util"
  9. "github.com/google/uuid"
  10. "net/url"
  11. "strings"
  12. "sync"
  13. "time"
  14. )
  15. type FetcherBuilder func() (protocols []string, builder func() fetcher.Fetcher)
  16. type Listener func(event *Event)
  17. type TaskInfo struct {
  18. ID string
  19. Res *base.Resource
  20. Opts *base.Options
  21. Status base.Status
  22. Progress *Progress
  23. fetcher fetcher.Fetcher
  24. timer *util.Timer
  25. locker *sync.Mutex
  26. }
  27. type Progress struct {
  28. // 下载耗时(纳秒)
  29. Used int64
  30. // 每秒下载字节数
  31. Speed int64
  32. // 已下载的字节数
  33. Downloaded int64
  34. }
  35. type Downloader struct {
  36. *controller.DefaultController
  37. fetchBuilders map[string]func() fetcher.Fetcher
  38. task *TaskInfo
  39. listener Listener
  40. finished bool
  41. finishedCh chan error
  42. }
  43. func NewDownloader(fbs ...FetcherBuilder) *Downloader {
  44. d := &Downloader{
  45. DefaultController: controller.NewController(),
  46. finishedCh: make(chan error),
  47. }
  48. d.fetchBuilders = make(map[string]func() fetcher.Fetcher)
  49. for _, f := range fbs {
  50. protocols, builder := f()
  51. for _, p := range protocols {
  52. d.fetchBuilders[strings.ToUpper(p)] = builder
  53. }
  54. }
  55. return d
  56. }
  57. func (d *Downloader) buildFetcher(URL string) (fetcher.Fetcher, error) {
  58. parseURL, err := url.Parse(URL)
  59. if err != nil {
  60. return nil, err
  61. }
  62. if fetchBuilder, ok := d.fetchBuilders[strings.ToUpper(parseURL.Scheme)]; ok {
  63. fetched := fetchBuilder()
  64. fetched.Setup(d.DefaultController)
  65. return fetched, nil
  66. }
  67. return nil, errors.New("unsupported protocol")
  68. }
  69. func (d *Downloader) Resolve(req *base.Request) (*base.Resource, error) {
  70. fetched, err := d.buildFetcher(req.URL)
  71. if err != nil {
  72. return nil, err
  73. }
  74. return fetched.Resolve(req)
  75. }
  76. func (d *Downloader) Create(res *base.Resource, opts *base.Options) (err error) {
  77. fetched, err := d.buildFetcher(res.Req.URL)
  78. if err != nil {
  79. return
  80. }
  81. if !res.Range || opts.Connections < 1 {
  82. opts.Connections = 1
  83. }
  84. err = fetched.Create(res, opts)
  85. if err != nil {
  86. return
  87. }
  88. task := &TaskInfo{
  89. ID: uuid.New().String(),
  90. Res: res,
  91. Opts: opts,
  92. Status: base.DownloadStatusStart,
  93. Progress: &Progress{},
  94. fetcher: fetched,
  95. timer: &util.Timer{},
  96. locker: new(sync.Mutex),
  97. }
  98. d.task = task
  99. task.timer.Start()
  100. d.emit(EventKeyStart)
  101. err = fetched.Start()
  102. if err != nil {
  103. return
  104. }
  105. go func() {
  106. err = fetched.Wait()
  107. if err != nil {
  108. d.emit(EventKeyError, err)
  109. task.Status = base.DownloadStatusError
  110. } else {
  111. task.Progress.Used = task.timer.Used()
  112. if task.Res.TotalSize == 0 {
  113. task.Res.TotalSize = task.fetcher.Progress().TotalDownloaded()
  114. }
  115. used := task.Progress.Used / int64(time.Second)
  116. if used == 0 {
  117. used = 1
  118. }
  119. task.Progress.Speed = task.Res.TotalSize / used
  120. task.Progress.Downloaded = task.Res.TotalSize
  121. d.emit(EventKeyDone)
  122. task.Status = base.DownloadStatusDone
  123. }
  124. d.emit(EventKeyFinally, err)
  125. time.AfterFunc(1100*time.Millisecond, func() {
  126. d.finished = true
  127. d.finishedCh <- err
  128. })
  129. }()
  130. // 每秒统计一次下载速度
  131. go func() {
  132. for !d.finished {
  133. if d.task.Status == base.DownloadStatusPause {
  134. continue
  135. }
  136. current := d.task.fetcher.Progress().TotalDownloaded()
  137. d.task.Progress.Used = d.task.timer.Used()
  138. d.task.Progress.Speed = current - d.task.Progress.Downloaded
  139. d.task.Progress.Downloaded = current
  140. d.emit(EventKeyProgress)
  141. time.Sleep(time.Second)
  142. }
  143. }()
  144. return
  145. }
  146. func (d *Downloader) Pause() error {
  147. d.task.locker.Lock()
  148. defer d.task.locker.Unlock()
  149. d.task.timer.Pause()
  150. err := d.task.fetcher.Pause()
  151. if err != nil {
  152. return err
  153. }
  154. d.emit(EventKeyPause)
  155. d.task.Status = base.DownloadStatusPause
  156. return nil
  157. }
  158. func (d *Downloader) Continue() error {
  159. d.task.locker.Lock()
  160. defer d.task.locker.Unlock()
  161. d.task.timer.Continue()
  162. err := d.task.fetcher.Continue()
  163. if err != nil {
  164. return err
  165. }
  166. d.emit(EventKeyContinue)
  167. d.task.Status = base.DownloadStatusStart
  168. return nil
  169. }
  170. func (d *Downloader) Listener(fn Listener) {
  171. d.listener = fn
  172. }
  173. func (d *Downloader) emit(eventKey EventKey, errs ...error) {
  174. if d.listener != nil {
  175. var err error
  176. if len(errs) > 0 {
  177. err = errs[0]
  178. }
  179. d.listener(&Event{
  180. Key: eventKey,
  181. Task: d.task,
  182. Err: err,
  183. })
  184. }
  185. }
  186. type boot struct {
  187. url string
  188. extra any
  189. listener Listener
  190. downloader *Downloader
  191. }
  192. func (b *boot) URL(url string) *boot {
  193. b.url = url
  194. return b
  195. }
  196. func (b *boot) Extra(extra any) *boot {
  197. b.extra = extra
  198. return b
  199. }
  200. func (b *boot) Resolve() (*base.Resource, error) {
  201. return b.downloader.Resolve(&base.Request{
  202. URL: b.url,
  203. Extra: b.extra,
  204. })
  205. }
  206. func (b *boot) Listener(listener Listener) *boot {
  207. b.listener = listener
  208. return b
  209. }
  210. func (b *boot) Create(opts *base.Options) <-chan error {
  211. res, err := b.Resolve()
  212. if err != nil {
  213. b.downloader.finishedCh <- err
  214. return b.downloader.finishedCh
  215. }
  216. b.downloader.Listener(b.listener)
  217. err = b.downloader.Create(res, opts)
  218. if err != nil {
  219. b.downloader.finishedCh <- err
  220. return b.downloader.finishedCh
  221. }
  222. return b.downloader.finishedCh
  223. }
  224. func Boot() *boot {
  225. return &boot{
  226. downloader: NewDownloader(http.FetcherBuilder),
  227. }
  228. }