123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260 |
- package downloader
- import (
- "errors"
- "git.bvbej.com/bvbej/base-golang/pkg/downloader/base"
- "git.bvbej.com/bvbej/base-golang/pkg/downloader/controller"
- "git.bvbej.com/bvbej/base-golang/pkg/downloader/fetcher"
- "git.bvbej.com/bvbej/base-golang/pkg/downloader/protocol/http"
- "git.bvbej.com/bvbej/base-golang/pkg/downloader/util"
- "github.com/google/uuid"
- "net/url"
- "strings"
- "sync"
- "time"
- )
- type FetcherBuilder func() (protocols []string, builder func() fetcher.Fetcher)
- type Listener func(event *Event)
- type TaskInfo struct {
- ID string
- Res *base.Resource
- Opts *base.Options
- Status base.Status
- Progress *Progress
- fetcher fetcher.Fetcher
- timer *util.Timer
- locker *sync.Mutex
- }
- type Progress struct {
- // 下载耗时(纳秒)
- Used int64
- // 每秒下载字节数
- Speed int64
- // 已下载的字节数
- Downloaded int64
- }
- type Downloader struct {
- *controller.DefaultController
- fetchBuilders map[string]func() fetcher.Fetcher
- task *TaskInfo
- listener Listener
- finished bool
- finishedCh chan error
- }
- func NewDownloader(fbs ...FetcherBuilder) *Downloader {
- d := &Downloader{
- DefaultController: controller.NewController(),
- finishedCh: make(chan error),
- }
- d.fetchBuilders = make(map[string]func() fetcher.Fetcher)
- for _, f := range fbs {
- protocols, builder := f()
- for _, p := range protocols {
- d.fetchBuilders[strings.ToUpper(p)] = builder
- }
- }
- return d
- }
- func (d *Downloader) buildFetcher(URL string) (fetcher.Fetcher, error) {
- parseURL, err := url.Parse(URL)
- if err != nil {
- return nil, err
- }
- if fetchBuilder, ok := d.fetchBuilders[strings.ToUpper(parseURL.Scheme)]; ok {
- fetched := fetchBuilder()
- fetched.Setup(d.DefaultController)
- return fetched, nil
- }
- return nil, errors.New("unsupported protocol")
- }
- func (d *Downloader) Resolve(req *base.Request) (*base.Resource, error) {
- fetched, err := d.buildFetcher(req.URL)
- if err != nil {
- return nil, err
- }
- return fetched.Resolve(req)
- }
- func (d *Downloader) Create(res *base.Resource, opts *base.Options) (err error) {
- fetched, err := d.buildFetcher(res.Req.URL)
- if err != nil {
- return
- }
- if !res.Range || opts.Connections < 1 {
- opts.Connections = 1
- }
- err = fetched.Create(res, opts)
- if err != nil {
- return
- }
- task := &TaskInfo{
- ID: uuid.New().String(),
- Res: res,
- Opts: opts,
- Status: base.DownloadStatusStart,
- Progress: &Progress{},
- fetcher: fetched,
- timer: &util.Timer{},
- locker: new(sync.Mutex),
- }
- d.task = task
- task.timer.Start()
- d.emit(EventKeyStart)
- err = fetched.Start()
- if err != nil {
- return
- }
- go func() {
- err = fetched.Wait()
- if err != nil {
- d.emit(EventKeyError, err)
- task.Status = base.DownloadStatusError
- } else {
- task.Progress.Used = task.timer.Used()
- if task.Res.TotalSize == 0 {
- task.Res.TotalSize = task.fetcher.Progress().TotalDownloaded()
- }
- used := task.Progress.Used / int64(time.Second)
- if used == 0 {
- used = 1
- }
- task.Progress.Speed = task.Res.TotalSize / used
- task.Progress.Downloaded = task.Res.TotalSize
- d.emit(EventKeyDone)
- task.Status = base.DownloadStatusDone
- }
- d.emit(EventKeyFinally, err)
- time.AfterFunc(1100*time.Millisecond, func() {
- d.finished = true
- d.finishedCh <- err
- })
- }()
- // 每秒统计一次下载速度
- go func() {
- for !d.finished {
- if d.task.Status == base.DownloadStatusPause {
- continue
- }
- current := d.task.fetcher.Progress().TotalDownloaded()
- d.task.Progress.Used = d.task.timer.Used()
- d.task.Progress.Speed = current - d.task.Progress.Downloaded
- d.task.Progress.Downloaded = current
- d.emit(EventKeyProgress)
- time.Sleep(time.Second)
- }
- }()
- return
- }
- func (d *Downloader) Pause() error {
- d.task.locker.Lock()
- defer d.task.locker.Unlock()
- d.task.timer.Pause()
- err := d.task.fetcher.Pause()
- if err != nil {
- return err
- }
- d.emit(EventKeyPause)
- d.task.Status = base.DownloadStatusPause
- return nil
- }
- func (d *Downloader) Continue() error {
- d.task.locker.Lock()
- defer d.task.locker.Unlock()
- d.task.timer.Continue()
- err := d.task.fetcher.Continue()
- if err != nil {
- return err
- }
- d.emit(EventKeyContinue)
- d.task.Status = base.DownloadStatusStart
- return nil
- }
- func (d *Downloader) Listener(fn Listener) {
- d.listener = fn
- }
- func (d *Downloader) emit(eventKey EventKey, errs ...error) {
- if d.listener != nil {
- var err error
- if len(errs) > 0 {
- err = errs[0]
- }
- d.listener(&Event{
- Key: eventKey,
- Task: d.task,
- Err: err,
- })
- }
- }
- type boot struct {
- url string
- extra any
- listener Listener
- downloader *Downloader
- }
- func (b *boot) URL(url string) *boot {
- b.url = url
- return b
- }
- func (b *boot) Extra(extra any) *boot {
- b.extra = extra
- return b
- }
- func (b *boot) Resolve() (*base.Resource, error) {
- return b.downloader.Resolve(&base.Request{
- URL: b.url,
- Extra: b.extra,
- })
- }
- func (b *boot) Listener(listener Listener) *boot {
- b.listener = listener
- return b
- }
- func (b *boot) Create(opts *base.Options) <-chan error {
- res, err := b.Resolve()
- if err != nil {
- b.downloader.finishedCh <- err
- return b.downloader.finishedCh
- }
- b.downloader.Listener(b.listener)
- err = b.downloader.Create(res, opts)
- if err != nil {
- b.downloader.finishedCh <- err
- return b.downloader.finishedCh
- }
- return b.downloader.finishedCh
- }
- func Boot() *boot {
- return &boot{
- downloader: NewDownloader(http.FetcherBuilder),
- }
- }
|