package downloader import ( "errors" "git.bvbej.com/bvbej/base-golang/pkg/downloader/base" "git.bvbej.com/bvbej/base-golang/pkg/downloader/controller" "git.bvbej.com/bvbej/base-golang/pkg/downloader/fetcher" "git.bvbej.com/bvbej/base-golang/pkg/downloader/protocol/http" "git.bvbej.com/bvbej/base-golang/pkg/downloader/util" "github.com/google/uuid" "net/url" "strings" "sync" "time" ) type Listener func(event *Event) type TaskInfo struct { ID string Res *base.Resource Opts *base.Options Status base.Status Progress *Progress fetcher fetcher.Fetcher timer *util.Timer locker *sync.Mutex } type Progress struct { // 下载耗时(纳秒) Used int64 // 每秒下载字节数 Speed int64 // 已下载的字节数 Downloaded int64 } type downloader struct { *controller.DefaultController fetchBuilders map[string]func() fetcher.Fetcher task *TaskInfo listener Listener finished bool finishedCh chan error } func newDownloader(f func() (protocols []string, builder func() fetcher.Fetcher)) *downloader { d := &downloader{ DefaultController: controller.NewController(), finishedCh: make(chan error, 1), } d.fetchBuilders = make(map[string]func() fetcher.Fetcher) protocols, builder := f() for _, p := range protocols { d.fetchBuilders[strings.ToUpper(p)] = builder } return d } func (d *downloader) buildFetcher(URL string) (fetcher.Fetcher, error) { parseURL, err := url.Parse(URL) if err != nil { return nil, err } if fetchBuilder, ok := d.fetchBuilders[strings.ToUpper(parseURL.Scheme)]; ok { fetched := fetchBuilder() fetched.Setup(d.DefaultController) return fetched, nil } return nil, errors.New("unsupported protocol") } func (d *downloader) Resolve(req *base.Request) (*base.Resource, error) { fetched, err := d.buildFetcher(req.URL) if err != nil { return nil, err } return fetched.Resolve(req) } func (d *downloader) Create(res *base.Resource, opts *base.Options) (err error) { fetched, err := d.buildFetcher(res.Req.URL) if err != nil { return } if !res.Range || opts.Connections < 1 { opts.Connections = 1 } err = fetched.Create(res, opts) if err != nil { return } task := &TaskInfo{ ID: uuid.New().String(), Res: res, Opts: opts, Status: base.DownloadStatusStart, Progress: &Progress{}, fetcher: fetched, timer: &util.Timer{}, locker: new(sync.Mutex), } d.task = task task.timer.Start() d.emit(EventKeyStart) err = fetched.Start() if err != nil { return } go func() { err = fetched.Wait() if err != nil { d.emit(EventKeyError, err) task.Status = base.DownloadStatusError } else { task.Progress.Used = task.timer.Used() if task.Res.TotalSize == 0 { task.Res.TotalSize = task.fetcher.Progress().TotalDownloaded() } used := task.Progress.Used / int64(time.Second) if used == 0 { used = 1 } task.Progress.Speed = task.Res.TotalSize / used task.Progress.Downloaded = task.Res.TotalSize d.emit(EventKeyDone) task.Status = base.DownloadStatusDone } d.emit(EventKeyFinally, err) time.AfterFunc(1100*time.Millisecond, func() { d.finished = true d.finishedCh <- err }) }() // 每秒统计一次下载速度 go func() { for !d.finished { if d.task.Status == base.DownloadStatusPause { continue } current := d.task.fetcher.Progress().TotalDownloaded() d.task.Progress.Used = d.task.timer.Used() d.task.Progress.Speed = current - d.task.Progress.Downloaded d.task.Progress.Downloaded = current d.emit(EventKeyProgress) time.Sleep(time.Second) } }() return } func (d *downloader) Pause() error { d.task.locker.Lock() defer d.task.locker.Unlock() d.task.timer.Pause() err := d.task.fetcher.Pause() if err != nil { return err } d.emit(EventKeyPause) d.task.Status = base.DownloadStatusPause return nil } func (d *downloader) Continue() error { d.task.locker.Lock() defer d.task.locker.Unlock() d.task.timer.Continue() err := d.task.fetcher.Continue() if err != nil { return err } d.emit(EventKeyContinue) d.task.Status = base.DownloadStatusStart return nil } func (d *downloader) Listener(fn Listener) { d.listener = fn } func (d *downloader) emit(eventKey EventKey, errs ...error) { if d.listener != nil { var err error if len(errs) > 0 { err = errs[0] } d.listener(&Event{ Key: eventKey, Task: d.task, Err: err, }) } } var _ Boot = (*boot)(nil) type Boot interface { URL(url string) Boot Extra(extra any) Boot Listener(listener Listener) Boot Create(opts *base.Options) <-chan error } type boot struct { url string extra any listener Listener downloader *downloader } func (b *boot) resolve() (*base.Resource, error) { return b.downloader.Resolve(&base.Request{ URL: b.url, Extra: b.extra, }) } func (b *boot) URL(url string) Boot { b.url = url return b } func (b *boot) Extra(extra any) Boot { b.extra = extra return b } func (b *boot) Listener(listener Listener) Boot { b.listener = listener return b } func (b *boot) Create(opts *base.Options) <-chan error { res, err := b.resolve() if err != nil { b.downloader.finishedCh <- err return b.downloader.finishedCh } b.downloader.Listener(b.listener) err = b.downloader.Create(res, opts) if err != nil { b.downloader.finishedCh <- err return b.downloader.finishedCh } return b.downloader.finishedCh } // New 一个文件对应一个实例 func New() Boot { return &boot{ downloader: newDownloader(http.FetcherBuilder), } }