downloader.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. package downloader
  2. import (
  3. "errors"
  4. "git.bvbej.com/bvbej/base-golang/pkg/downloader/base"
  5. "git.bvbej.com/bvbej/base-golang/pkg/downloader/controller"
  6. "git.bvbej.com/bvbej/base-golang/pkg/downloader/fetcher"
  7. "git.bvbej.com/bvbej/base-golang/pkg/downloader/protocol/http"
  8. "git.bvbej.com/bvbej/base-golang/pkg/downloader/util"
  9. "github.com/google/uuid"
  10. "net/url"
  11. "strings"
  12. "sync"
  13. "time"
  14. )
  15. type Listener func(event *Event)
  16. type TaskInfo struct {
  17. ID string
  18. Res *base.Resource
  19. Opts *base.Options
  20. Status base.Status
  21. Progress *Progress
  22. fetcher fetcher.Fetcher
  23. timer *util.Timer
  24. locker *sync.Mutex
  25. }
  26. type Progress struct {
  27. // 下载耗时(纳秒)
  28. Used int64
  29. // 每秒下载字节数
  30. Speed int64
  31. // 已下载的字节数
  32. Downloaded int64
  33. }
  34. type downloader struct {
  35. *controller.DefaultController
  36. fetchBuilders map[string]func() fetcher.Fetcher
  37. task *TaskInfo
  38. listener Listener
  39. finished bool
  40. finishedCh chan error
  41. }
  42. func newDownloader(f func() (protocols []string, builder func() fetcher.Fetcher), options ...controller.Option) *downloader {
  43. d := &downloader{
  44. DefaultController: controller.NewController(options...),
  45. finishedCh: make(chan error, 1),
  46. }
  47. d.fetchBuilders = make(map[string]func() fetcher.Fetcher)
  48. protocols, builder := f()
  49. for _, p := range protocols {
  50. d.fetchBuilders[strings.ToUpper(p)] = builder
  51. }
  52. return d
  53. }
  54. func (d *downloader) buildFetcher(URL string) (fetcher.Fetcher, error) {
  55. parseURL, err := url.Parse(URL)
  56. if err != nil {
  57. return nil, err
  58. }
  59. if fetchBuilder, ok := d.fetchBuilders[strings.ToUpper(parseURL.Scheme)]; ok {
  60. fetched := fetchBuilder()
  61. fetched.Setup(d.DefaultController)
  62. return fetched, nil
  63. }
  64. return nil, errors.New("unsupported protocol")
  65. }
  66. func (d *downloader) Resolve(req *base.Request) (*base.Resource, error) {
  67. fetched, err := d.buildFetcher(req.URL)
  68. if err != nil {
  69. return nil, err
  70. }
  71. return fetched.Resolve(req)
  72. }
  73. func (d *downloader) Create(res *base.Resource, opts *base.Options) (err error) {
  74. fetched, err := d.buildFetcher(res.Req.URL)
  75. if err != nil {
  76. return
  77. }
  78. if !res.Range || opts.Connections < 1 {
  79. opts.Connections = 1
  80. }
  81. err = fetched.Create(res, opts)
  82. if err != nil {
  83. return
  84. }
  85. task := &TaskInfo{
  86. ID: uuid.New().String(),
  87. Res: res,
  88. Opts: opts,
  89. Status: base.DownloadStatusStart,
  90. Progress: &Progress{},
  91. fetcher: fetched,
  92. timer: &util.Timer{},
  93. locker: new(sync.Mutex),
  94. }
  95. d.task = task
  96. task.timer.Start()
  97. d.emit(EventKeyStart)
  98. err = fetched.Start()
  99. if err != nil {
  100. return
  101. }
  102. go func() {
  103. err = fetched.Wait()
  104. if err != nil {
  105. d.emit(EventKeyError, err)
  106. task.Status = base.DownloadStatusError
  107. } else {
  108. task.Progress.Used = task.timer.Used()
  109. if task.Res.TotalSize == 0 {
  110. task.Res.TotalSize = task.fetcher.Progress().TotalDownloaded()
  111. }
  112. used := task.Progress.Used / int64(time.Second)
  113. if used == 0 {
  114. used = 1
  115. }
  116. task.Progress.Speed = task.Res.TotalSize / used
  117. task.Progress.Downloaded = task.Res.TotalSize
  118. d.emit(EventKeyDone)
  119. task.Status = base.DownloadStatusDone
  120. }
  121. d.finished = true
  122. d.emit(EventKeyFinally, err)
  123. d.finishedCh <- err
  124. }()
  125. // 每秒统计一次下载速度
  126. go func() {
  127. for !d.finished {
  128. if d.task.Status == base.DownloadStatusPause {
  129. continue
  130. }
  131. current := d.task.fetcher.Progress().TotalDownloaded()
  132. d.task.Progress.Used = d.task.timer.Used()
  133. d.task.Progress.Speed = current - d.task.Progress.Downloaded
  134. d.task.Progress.Downloaded = current
  135. d.emit(EventKeyProgress)
  136. time.Sleep(time.Second)
  137. }
  138. }()
  139. return
  140. }
  141. func (d *downloader) Pause() error {
  142. d.task.locker.Lock()
  143. defer d.task.locker.Unlock()
  144. d.task.timer.Pause()
  145. err := d.task.fetcher.Pause()
  146. if err != nil {
  147. return err
  148. }
  149. d.emit(EventKeyPause)
  150. d.task.Status = base.DownloadStatusPause
  151. return nil
  152. }
  153. func (d *downloader) Continue() error {
  154. d.task.locker.Lock()
  155. defer d.task.locker.Unlock()
  156. d.task.timer.Continue()
  157. err := d.task.fetcher.Continue()
  158. if err != nil {
  159. return err
  160. }
  161. d.emit(EventKeyContinue)
  162. d.task.Status = base.DownloadStatusStart
  163. return nil
  164. }
  165. func (d *downloader) Listener(fn Listener) {
  166. d.listener = fn
  167. }
  168. func (d *downloader) emit(eventKey EventKey, errs ...error) {
  169. if d.listener != nil {
  170. var err error
  171. if len(errs) > 0 {
  172. err = errs[0]
  173. }
  174. d.listener(&Event{
  175. Key: eventKey,
  176. Task: d.task,
  177. Err: err,
  178. })
  179. }
  180. }
  181. var _ Boot = (*boot)(nil)
  182. type Boot interface {
  183. URL(url string) Boot
  184. Extra(extra any) Boot
  185. Listener(listener Listener) Boot
  186. Create(opts *base.Options) <-chan error
  187. }
  188. type boot struct {
  189. url string
  190. extra any
  191. listener Listener
  192. downloader *downloader
  193. }
  194. func (b *boot) resolve() (*base.Resource, error) {
  195. return b.downloader.Resolve(&base.Request{
  196. URL: b.url,
  197. Extra: b.extra,
  198. })
  199. }
  200. func (b *boot) URL(url string) Boot {
  201. b.url = url
  202. return b
  203. }
  204. func (b *boot) Extra(extra any) Boot {
  205. b.extra = extra
  206. return b
  207. }
  208. func (b *boot) Listener(listener Listener) Boot {
  209. b.listener = listener
  210. return b
  211. }
  212. func (b *boot) Create(opts *base.Options) <-chan error {
  213. res, err := b.resolve()
  214. if err != nil {
  215. b.downloader.finishedCh <- err
  216. return b.downloader.finishedCh
  217. }
  218. b.downloader.Listener(b.listener)
  219. err = b.downloader.Create(res, opts)
  220. if err != nil {
  221. b.downloader.finishedCh <- err
  222. return b.downloader.finishedCh
  223. }
  224. return b.downloader.finishedCh
  225. }
  226. // New 一个文件对应一个实例
  227. func New(options ...controller.Option) Boot {
  228. return &boot{
  229. downloader: newDownloader(http.FetcherBuilder, options...),
  230. }
  231. }