downloader.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. package downloader
  2. import (
  3. "errors"
  4. "git.bvbej.com/bvbej/base-golang/pkg/downloader/base"
  5. "git.bvbej.com/bvbej/base-golang/pkg/downloader/controller"
  6. "git.bvbej.com/bvbej/base-golang/pkg/downloader/fetcher"
  7. "git.bvbej.com/bvbej/base-golang/pkg/downloader/protocol/http"
  8. "git.bvbej.com/bvbej/base-golang/pkg/downloader/util"
  9. "github.com/google/uuid"
  10. "net/url"
  11. "strings"
  12. "sync"
  13. "time"
  14. )
  15. type Listener func(event *Event)
  16. type TaskInfo struct {
  17. ID string
  18. Res *base.Resource
  19. Opts *base.Options
  20. Status base.Status
  21. Progress *Progress
  22. fetcher fetcher.Fetcher
  23. timer *util.Timer
  24. locker *sync.Mutex
  25. }
  26. type Progress struct {
  27. // 下载耗时(纳秒)
  28. Used int64
  29. // 每秒下载字节数
  30. Speed int64
  31. // 已下载的字节数
  32. Downloaded int64
  33. }
  34. type downloader struct {
  35. *controller.DefaultController
  36. fetchBuilders map[string]func() fetcher.Fetcher
  37. task *TaskInfo
  38. listener Listener
  39. finished bool
  40. finishedCh chan error
  41. }
  42. func newDownloader(f func() (protocols []string, builder func() fetcher.Fetcher)) *downloader {
  43. d := &downloader{
  44. DefaultController: controller.NewController(),
  45. finishedCh: make(chan error),
  46. }
  47. d.fetchBuilders = make(map[string]func() fetcher.Fetcher)
  48. protocols, builder := f()
  49. for _, p := range protocols {
  50. d.fetchBuilders[strings.ToUpper(p)] = builder
  51. }
  52. return d
  53. }
  54. func (d *downloader) buildFetcher(URL string) (fetcher.Fetcher, error) {
  55. parseURL, err := url.Parse(URL)
  56. if err != nil {
  57. return nil, err
  58. }
  59. if fetchBuilder, ok := d.fetchBuilders[strings.ToUpper(parseURL.Scheme)]; ok {
  60. fetched := fetchBuilder()
  61. fetched.Setup(d.DefaultController)
  62. return fetched, nil
  63. }
  64. return nil, errors.New("unsupported protocol")
  65. }
  66. func (d *downloader) Resolve(req *base.Request) (*base.Resource, error) {
  67. fetched, err := d.buildFetcher(req.URL)
  68. if err != nil {
  69. return nil, err
  70. }
  71. return fetched.Resolve(req)
  72. }
  73. func (d *downloader) Create(res *base.Resource, opts *base.Options) (err error) {
  74. fetched, err := d.buildFetcher(res.Req.URL)
  75. if err != nil {
  76. return
  77. }
  78. if !res.Range || opts.Connections < 1 {
  79. opts.Connections = 1
  80. }
  81. err = fetched.Create(res, opts)
  82. if err != nil {
  83. return
  84. }
  85. task := &TaskInfo{
  86. ID: uuid.New().String(),
  87. Res: res,
  88. Opts: opts,
  89. Status: base.DownloadStatusStart,
  90. Progress: &Progress{},
  91. fetcher: fetched,
  92. timer: &util.Timer{},
  93. locker: new(sync.Mutex),
  94. }
  95. d.task = task
  96. task.timer.Start()
  97. d.emit(EventKeyStart)
  98. err = fetched.Start()
  99. if err != nil {
  100. return
  101. }
  102. go func() {
  103. err = fetched.Wait()
  104. if err != nil {
  105. d.emit(EventKeyError, err)
  106. task.Status = base.DownloadStatusError
  107. } else {
  108. task.Progress.Used = task.timer.Used()
  109. if task.Res.TotalSize == 0 {
  110. task.Res.TotalSize = task.fetcher.Progress().TotalDownloaded()
  111. }
  112. used := task.Progress.Used / int64(time.Second)
  113. if used == 0 {
  114. used = 1
  115. }
  116. task.Progress.Speed = task.Res.TotalSize / used
  117. task.Progress.Downloaded = task.Res.TotalSize
  118. d.emit(EventKeyDone)
  119. task.Status = base.DownloadStatusDone
  120. }
  121. d.emit(EventKeyFinally, err)
  122. time.AfterFunc(1100*time.Millisecond, func() {
  123. d.finished = true
  124. d.finishedCh <- err
  125. })
  126. }()
  127. // 每秒统计一次下载速度
  128. go func() {
  129. for !d.finished {
  130. if d.task.Status == base.DownloadStatusPause {
  131. continue
  132. }
  133. current := d.task.fetcher.Progress().TotalDownloaded()
  134. d.task.Progress.Used = d.task.timer.Used()
  135. d.task.Progress.Speed = current - d.task.Progress.Downloaded
  136. d.task.Progress.Downloaded = current
  137. d.emit(EventKeyProgress)
  138. time.Sleep(time.Second)
  139. }
  140. }()
  141. return
  142. }
  143. func (d *downloader) Pause() error {
  144. d.task.locker.Lock()
  145. defer d.task.locker.Unlock()
  146. d.task.timer.Pause()
  147. err := d.task.fetcher.Pause()
  148. if err != nil {
  149. return err
  150. }
  151. d.emit(EventKeyPause)
  152. d.task.Status = base.DownloadStatusPause
  153. return nil
  154. }
  155. func (d *downloader) Continue() error {
  156. d.task.locker.Lock()
  157. defer d.task.locker.Unlock()
  158. d.task.timer.Continue()
  159. err := d.task.fetcher.Continue()
  160. if err != nil {
  161. return err
  162. }
  163. d.emit(EventKeyContinue)
  164. d.task.Status = base.DownloadStatusStart
  165. return nil
  166. }
  167. func (d *downloader) Listener(fn Listener) {
  168. d.listener = fn
  169. }
  170. func (d *downloader) emit(eventKey EventKey, errs ...error) {
  171. if d.listener != nil {
  172. var err error
  173. if len(errs) > 0 {
  174. err = errs[0]
  175. }
  176. d.listener(&Event{
  177. Key: eventKey,
  178. Task: d.task,
  179. Err: err,
  180. })
  181. }
  182. }
  183. var _ Boot = (*boot)(nil)
  184. type Boot interface {
  185. URL(url string) Boot
  186. Extra(extra any) Boot
  187. Listener(listener Listener) Boot
  188. Create(opts *base.Options) <-chan error
  189. }
  190. type boot struct {
  191. url string
  192. extra any
  193. listener Listener
  194. downloader *downloader
  195. }
  196. func (b *boot) resolve() (*base.Resource, error) {
  197. return b.downloader.Resolve(&base.Request{
  198. URL: b.url,
  199. Extra: b.extra,
  200. })
  201. }
  202. func (b *boot) URL(url string) Boot {
  203. b.url = url
  204. return b
  205. }
  206. func (b *boot) Extra(extra any) Boot {
  207. b.extra = extra
  208. return b
  209. }
  210. func (b *boot) Listener(listener Listener) Boot {
  211. b.listener = listener
  212. return b
  213. }
  214. func (b *boot) Create(opts *base.Options) <-chan error {
  215. res, err := b.resolve()
  216. if err != nil {
  217. b.downloader.finishedCh <- err
  218. return b.downloader.finishedCh
  219. }
  220. b.downloader.Listener(b.listener)
  221. err = b.downloader.Create(res, opts)
  222. if err != nil {
  223. b.downloader.finishedCh <- err
  224. return b.downloader.finishedCh
  225. }
  226. return b.downloader.finishedCh
  227. }
  228. func New() Boot {
  229. return &boot{
  230. downloader: newDownloader(http.FetcherBuilder),
  231. }
  232. }