123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409 |
- package http
- import (
- "bytes"
- "context"
- "fmt"
- "git.bvbej.com/bvbej/base-golang/pkg/downloader/base"
- "git.bvbej.com/bvbej/base-golang/pkg/downloader/fetcher"
- "golang.org/x/sync/errgroup"
- "io"
- "mime"
- "net"
- "net/http"
- "net/url"
- "path"
- "path/filepath"
- "strconv"
- "strings"
- "time"
- )
- type RequestError struct {
- Code int
- Msg string
- }
- func NewRequestError(code int, msg string) *RequestError {
- return &RequestError{Code: code, Msg: msg}
- }
- func (re *RequestError) Error() string {
- return fmt.Sprintf("http request fail,code:%d", re.Code)
- }
- type Fetcher struct {
- *fetcher.DefaultFetcher
- res *base.Resource
- opts *base.Options
- status base.Status
- clients []*http.Response
- chunks []*Chunk
- ctx context.Context
- cancel context.CancelFunc
- pauseCh chan any
- }
- func NewFetcher() *Fetcher {
- return &Fetcher{
- DefaultFetcher: new(fetcher.DefaultFetcher),
- pauseCh: make(chan any),
- }
- }
- var protocols = []string{"HTTP", "HTTPS"}
- func FetcherBuilder() ([]string, func() fetcher.Fetcher) {
- return protocols, func() fetcher.Fetcher {
- return NewFetcher()
- }
- }
- func (f *Fetcher) Resolve(req *base.Request) (*base.Resource, error) {
- httpReq, err := f.buildRequest(nil, req)
- if err != nil {
- return nil, err
- }
- client, err := f.buildClient()
- if err != nil {
- return nil, err
- }
- // 只访问一个字节,测试资源是否支持Range请求
- httpReq.Header.Set(base.HttpHeaderRange, fmt.Sprintf(base.HttpHeaderRangeFormat, 0, 0))
- httpResp, err := client.Do(httpReq)
- if err != nil {
- return nil, err
- }
- // 拿到响应头就关闭,不用加defer
- _ = httpResp.Body.Close()
- res := &base.Resource{
- Req: req,
- Range: false,
- Files: []*base.FileInfo{},
- }
- if base.HttpCodePartialContent == httpResp.StatusCode {
- // 返回206响应码表示支持断点下载
- res.Range = true
- // 解析资源大小: bytes 0-1000/1001 => 1001
- contentTotal := path.Base(httpResp.Header.Get(base.HttpHeaderContentRange))
- if contentTotal != "" {
- parse, err := strconv.ParseInt(contentTotal, 10, 64)
- if err != nil {
- return nil, err
- }
- res.TotalSize = parse
- }
- } else if base.HttpCodeOK == httpResp.StatusCode {
- // 返回200响应码,不支持断点下载,通过Content-Length头获取文件大小,获取不到的话可能是chunked编码
- contentLength := httpResp.Header.Get(base.HttpHeaderContentLength)
- if contentLength != "" {
- parse, err := strconv.ParseInt(contentLength, 10, 64)
- if err != nil {
- return nil, err
- }
- res.TotalSize = parse
- }
- } else {
- return nil, NewRequestError(httpResp.StatusCode, httpResp.Status)
- }
- file := &base.FileInfo{
- Size: res.TotalSize,
- }
- contentDisposition := httpResp.Header.Get(base.HttpHeaderContentDisposition)
- if contentDisposition != "" {
- _, params, _ := mime.ParseMediaType(contentDisposition)
- filename := params["filename"]
- if filename != "" {
- file.Name = filename
- }
- }
- // Get file filename by URL
- if file.Name == "" && strings.Count(req.URL, "/") > 2 {
- file.Name = filepath.Base(req.URL)
- }
- // unknown file filename
- if file.Name == "" {
- file.Name = "unknown"
- }
- res.Files = append(res.Files, file)
- return res, nil
- }
- func (f *Fetcher) Create(res *base.Resource, opts *base.Options) error {
- f.res = res
- f.opts = opts
- f.status = base.DownloadStatusReady
- return nil
- }
- func (f *Fetcher) Start() (err error) {
- // 创建文件
- name := f.filename()
- _, err = f.Ctl.Touch(name, f.res.TotalSize)
- if err != nil {
- return err
- }
- f.status = base.DownloadStatusStart
- if f.res.Range {
- // 每个连接平均需要下载的分块大小
- chunkSize := f.res.TotalSize / int64(f.opts.Connections)
- f.chunks = make([]*Chunk, f.opts.Connections)
- f.clients = make([]*http.Response, f.opts.Connections)
- for i := 0; i < f.opts.Connections; i++ {
- var (
- begin = chunkSize * int64(i)
- end int64
- )
- if i == f.opts.Connections-1 {
- // 最后一个分块需要保证把文件下载完
- end = f.res.TotalSize - 1
- } else {
- end = begin + chunkSize - 1
- }
- chunk := NewChunk(begin, end)
- f.chunks[i] = chunk
- }
- } else {
- // 只支持单连接下载
- f.chunks = make([]*Chunk, 1)
- f.clients = make([]*http.Response, 1)
- f.chunks[0] = NewChunk(0, 0)
- }
- f.fetch()
- return
- }
- func (f *Fetcher) Pause() (err error) {
- if base.DownloadStatusStart != f.status {
- return
- }
- f.status = base.DownloadStatusPause
- f.cancel()
- <-f.pauseCh
- return
- }
- func (f *Fetcher) Continue() (err error) {
- if base.DownloadStatusStart == f.status || base.DownloadStatusDone == f.status {
- return
- }
- f.status = base.DownloadStatusStart
- var name = f.filename()
- _, err = f.Ctl.Open(name)
- if err != nil {
- return err
- }
- f.fetch()
- return
- }
- func (f *Fetcher) Progress() fetcher.Progress {
- p := make(fetcher.Progress, 0)
- if len(f.chunks) > 0 {
- total := int64(0)
- for _, chunk := range f.chunks {
- total += chunk.Downloaded
- }
- p = append(p, total)
- }
- return p
- }
- func (f *Fetcher) filename() string {
- // 创建文件
- var filename = f.opts.Name
- if filename == "" {
- filename = f.res.Files[0].Name
- }
- return filepath.Join(f.opts.Path, filename)
- }
- func (f *Fetcher) fetch() {
- f.ctx, f.cancel = context.WithCancel(context.Background())
- eg, _ := errgroup.WithContext(f.ctx)
- for i := 0; i < f.opts.Connections; i++ {
- eg.Go(func() error {
- return f.fetchChunk(i)
- })
- }
- go func() {
- err := eg.Wait()
- // 下载停止,关闭文件句柄
- _ = f.Ctl.Close(f.filename())
- if f.status == base.DownloadStatusPause {
- f.pauseCh <- nil
- } else {
- if err != nil {
- f.status = base.DownloadStatusError
- } else {
- f.status = base.DownloadStatusDone
- }
- f.DoneCh <- err
- }
- }()
- }
- func (f *Fetcher) fetchChunk(index int) (err error) {
- filename := f.filename()
- chunk := f.chunks[index]
- httpReq, err := f.buildRequest(f.ctx, f.res.Req)
- if err != nil {
- return err
- }
- client, err := f.buildClient()
- if err != nil {
- return err
- }
- var buf = make([]byte, 8192)
- // 重试10次
- for i := 0; i < 10; i++ {
- // 如果下载完成直接返回
- if chunk.Status == base.DownloadStatusDone {
- return
- }
- // 如果已暂停直接跳出
- if f.status == base.DownloadStatusPause {
- break
- }
- var (
- resp *http.Response
- retry bool
- )
- if f.res.Range {
- httpReq.Header.Set(base.HttpHeaderRange,
- fmt.Sprintf(base.HttpHeaderRangeFormat, chunk.Begin+chunk.Downloaded, chunk.End))
- } else {
- chunk.Downloaded = 0
- }
- err = func() error {
- resp, err = client.Do(httpReq)
- if err != nil {
- return err
- }
- f.clients[index] = resp
- if resp.StatusCode != base.HttpCodeOK && resp.StatusCode != base.HttpCodePartialContent {
- err = NewRequestError(resp.StatusCode, resp.Status)
- return err
- }
- return nil
- }()
- if err != nil {
- //请求失败3s后重试
- time.Sleep(time.Second * 3)
- continue
- }
- // 请求成功就重置错误次数,连续失败10次才终止
- i = 0
- retry, err = func() (bool, error) {
- defer func() {
- _ = resp.Body.Close()
- }()
- var n int
- for {
- n, err = resp.Body.Read(buf)
- if n > 0 {
- _, err = f.Ctl.Write(filename, chunk.Begin+chunk.Downloaded, buf[:n])
- if err != nil {
- return false, err
- }
- chunk.Downloaded += int64(n)
- }
- if err != nil {
- if err != io.EOF {
- return true, err
- }
- break
- }
- }
- return false, nil
- }()
- if !retry {
- // 下载成功,跳出重试
- break
- }
- }
- if f.status == base.DownloadStatusPause {
- chunk.Status = base.DownloadStatusPause
- } else if chunk.Downloaded >= chunk.End-chunk.Begin+1 {
- chunk.Status = base.DownloadStatusDone
- } else {
- if err != nil {
- chunk.Status = base.DownloadStatusError
- } else {
- chunk.Status = base.DownloadStatusDone
- }
- }
- return
- }
- func (f *Fetcher) buildClient() (*http.Client, error) {
- dialer, err := f.Ctl.ContextDialer()
- if err != nil {
- return nil, err
- }
- transport := &http.Transport{
- DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
- return dialer.Dial(network, addr)
- },
- }
- if f.Ctl.ContextProxy() != nil {
- transport.Proxy = f.Ctl.ContextProxy()
- }
- return &http.Client{
- Jar: f.Ctl.ContextCookie(),
- Timeout: f.Ctl.ContextTimeout(),
- Transport: transport,
- }, nil
- }
- func (f *Fetcher) buildRequest(ctx context.Context, req *base.Request) (httpReq *http.Request, err error) {
- reqUrl, err := url.Parse(req.URL)
- if err != nil {
- return
- }
- var (
- method string
- body io.Reader
- )
- headers := make(map[string][]string)
- if req.Extra == nil {
- method = http.MethodGet
- } else {
- extra := req.Extra.(Extra)
- if extra.Method != "" {
- method = extra.Method
- } else {
- method = http.MethodGet
- }
- if len(extra.Header) > 0 {
- for k, v := range extra.Header {
- headers[k] = []string{v}
- }
- }
- if extra.Body != "" {
- body = io.NopCloser(bytes.NewBufferString(extra.Body))
- }
- }
- if ctx != nil {
- httpReq, err = http.NewRequestWithContext(ctx, method, reqUrl.String(), body)
- } else {
- httpReq, err = http.NewRequest(method, reqUrl.String(), body)
- }
- if err != nil {
- return
- }
- httpReq.Header = headers
- return httpReq, nil
- }
|