fetcher.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. package http
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "git.bvbej.com/bvbej/base-golang/pkg/downloader/base"
  7. "git.bvbej.com/bvbej/base-golang/pkg/downloader/fetcher"
  8. "golang.org/x/sync/errgroup"
  9. "io"
  10. "io/ioutil"
  11. "mime"
  12. "net/http"
  13. "net/http/cookiejar"
  14. "net/url"
  15. "path"
  16. "path/filepath"
  17. "strconv"
  18. "strings"
  19. "time"
  20. )
  21. type RequestError struct {
  22. Code int
  23. Msg string
  24. }
  25. func NewRequestError(code int, msg string) *RequestError {
  26. return &RequestError{Code: code, Msg: msg}
  27. }
  28. func (re *RequestError) Error() string {
  29. return fmt.Sprintf("http request fail,code:%d", re.Code)
  30. }
  31. type Fetcher struct {
  32. *fetcher.DefaultFetcher
  33. res *base.Resource
  34. opts *base.Options
  35. status base.Status
  36. clients []*http.Response
  37. chunks []*Chunk
  38. ctx context.Context
  39. cancel context.CancelFunc
  40. pauseCh chan any
  41. }
  42. func NewFetcher() *Fetcher {
  43. return &Fetcher{
  44. DefaultFetcher: new(fetcher.DefaultFetcher),
  45. pauseCh: make(chan any),
  46. }
  47. }
  48. var protocols = []string{"HTTP", "HTTPS"}
  49. func FetcherBuilder() ([]string, func() fetcher.Fetcher) {
  50. return protocols, func() fetcher.Fetcher {
  51. return NewFetcher()
  52. }
  53. }
  54. func (f *Fetcher) Resolve(req *base.Request) (*base.Resource, error) {
  55. httpReq, err := buildRequest(nil, req)
  56. if err != nil {
  57. return nil, err
  58. }
  59. client := buildClient()
  60. // 只访问一个字节,测试资源是否支持Range请求
  61. httpReq.Header.Set(base.HttpHeaderRange, fmt.Sprintf(base.HttpHeaderRangeFormat, 0, 0))
  62. httpResp, err := client.Do(httpReq)
  63. if err != nil {
  64. return nil, err
  65. }
  66. // 拿到响应头就关闭,不用加defer
  67. httpResp.Body.Close()
  68. res := &base.Resource{
  69. Req: req,
  70. Range: false,
  71. Files: []*base.FileInfo{},
  72. }
  73. if base.HttpCodePartialContent == httpResp.StatusCode {
  74. // 返回206响应码表示支持断点下载
  75. res.Range = true
  76. // 解析资源大小: bytes 0-1000/1001 => 1001
  77. contentTotal := path.Base(httpResp.Header.Get(base.HttpHeaderContentRange))
  78. if contentTotal != "" {
  79. parse, err := strconv.ParseInt(contentTotal, 10, 64)
  80. if err != nil {
  81. return nil, err
  82. }
  83. res.TotalSize = parse
  84. }
  85. } else if base.HttpCodeOK == httpResp.StatusCode {
  86. // 返回200响应码,不支持断点下载,通过Content-Length头获取文件大小,获取不到的话可能是chunked编码
  87. contentLength := httpResp.Header.Get(base.HttpHeaderContentLength)
  88. if contentLength != "" {
  89. parse, err := strconv.ParseInt(contentLength, 10, 64)
  90. if err != nil {
  91. return nil, err
  92. }
  93. res.TotalSize = parse
  94. }
  95. } else {
  96. return nil, NewRequestError(httpResp.StatusCode, httpResp.Status)
  97. }
  98. file := &base.FileInfo{
  99. Size: res.TotalSize,
  100. }
  101. contentDisposition := httpResp.Header.Get(base.HttpHeaderContentDisposition)
  102. if contentDisposition != "" {
  103. _, params, _ := mime.ParseMediaType(contentDisposition)
  104. filename := params["filename"]
  105. if filename != "" {
  106. file.Name = filename
  107. }
  108. }
  109. // Get file filename by URL
  110. if file.Name == "" && strings.Count(req.URL, "/") > 2 {
  111. file.Name = filepath.Base(req.URL)
  112. }
  113. // unknown file filename
  114. if file.Name == "" {
  115. file.Name = "unknown"
  116. }
  117. res.Files = append(res.Files, file)
  118. return res, nil
  119. }
  120. func (f *Fetcher) Create(res *base.Resource, opts *base.Options) error {
  121. f.res = res
  122. f.opts = opts
  123. f.status = base.DownloadStatusReady
  124. return nil
  125. }
  126. func (f *Fetcher) Start() (err error) {
  127. // 创建文件
  128. name := f.filename()
  129. _, err = f.Ctl.Touch(name, f.res.TotalSize)
  130. if err != nil {
  131. return err
  132. }
  133. f.status = base.DownloadStatusStart
  134. if f.res.Range {
  135. // 每个连接平均需要下载的分块大小
  136. chunkSize := f.res.TotalSize / int64(f.opts.Connections)
  137. f.chunks = make([]*Chunk, f.opts.Connections)
  138. f.clients = make([]*http.Response, f.opts.Connections)
  139. for i := 0; i < f.opts.Connections; i++ {
  140. var (
  141. begin = chunkSize * int64(i)
  142. end int64
  143. )
  144. if i == f.opts.Connections-1 {
  145. // 最后一个分块需要保证把文件下载完
  146. end = f.res.TotalSize - 1
  147. } else {
  148. end = begin + chunkSize - 1
  149. }
  150. chunk := NewChunk(begin, end)
  151. f.chunks[i] = chunk
  152. }
  153. } else {
  154. // 只支持单连接下载
  155. f.chunks = make([]*Chunk, 1)
  156. f.clients = make([]*http.Response, 1)
  157. f.chunks[0] = NewChunk(0, 0)
  158. }
  159. f.fetch()
  160. return
  161. }
  162. func (f *Fetcher) Pause() (err error) {
  163. if base.DownloadStatusStart != f.status {
  164. return
  165. }
  166. f.status = base.DownloadStatusPause
  167. f.cancel()
  168. <-f.pauseCh
  169. return
  170. }
  171. func (f *Fetcher) Continue() (err error) {
  172. if base.DownloadStatusStart == f.status || base.DownloadStatusDone == f.status {
  173. return
  174. }
  175. f.status = base.DownloadStatusStart
  176. var name = f.filename()
  177. _, err = f.Ctl.Open(name)
  178. if err != nil {
  179. return err
  180. }
  181. f.fetch()
  182. return
  183. }
  184. func (f *Fetcher) Progress() fetcher.Progress {
  185. p := make(fetcher.Progress, 0)
  186. if len(f.chunks) > 0 {
  187. total := int64(0)
  188. for _, chunk := range f.chunks {
  189. total += chunk.Downloaded
  190. }
  191. p = append(p, total)
  192. }
  193. return p
  194. }
  195. func (f *Fetcher) filename() string {
  196. // 创建文件
  197. var filename = f.opts.Name
  198. if filename == "" {
  199. filename = f.res.Files[0].Name
  200. }
  201. return filepath.Join(f.opts.Path, filename)
  202. }
  203. func (f *Fetcher) fetch() {
  204. f.ctx, f.cancel = context.WithCancel(context.Background())
  205. eg, _ := errgroup.WithContext(f.ctx)
  206. for i := 0; i < f.opts.Connections; i++ {
  207. j := i //不加这一行会造成越界报错
  208. eg.Go(func() error {
  209. return f.fetchChunk(j)
  210. })
  211. }
  212. go func() {
  213. err := eg.Wait()
  214. // 下载停止,关闭文件句柄
  215. _ = f.Ctl.Close(f.filename())
  216. if f.status == base.DownloadStatusPause {
  217. f.pauseCh <- nil
  218. } else {
  219. if err != nil {
  220. f.status = base.DownloadStatusError
  221. } else {
  222. f.status = base.DownloadStatusDone
  223. }
  224. f.DoneCh <- err
  225. }
  226. }()
  227. }
  228. func (f *Fetcher) fetchChunk(index int) (err error) {
  229. filename := f.filename()
  230. chunk := f.chunks[index]
  231. httpReq, err := buildRequest(f.ctx, f.res.Req)
  232. if err != nil {
  233. return err
  234. }
  235. var (
  236. client = buildClient()
  237. buf = make([]byte, 8192)
  238. )
  239. // 重试10次
  240. for i := 0; i < 10; i++ {
  241. // 如果下载完成直接返回
  242. if chunk.Status == base.DownloadStatusDone {
  243. return
  244. }
  245. // 如果已暂停直接跳出
  246. if f.status == base.DownloadStatusPause {
  247. break
  248. }
  249. var (
  250. resp *http.Response
  251. retry bool
  252. )
  253. if f.res.Range {
  254. httpReq.Header.Set(base.HttpHeaderRange,
  255. fmt.Sprintf(base.HttpHeaderRangeFormat, chunk.Begin+chunk.Downloaded, chunk.End))
  256. } else {
  257. chunk.Downloaded = 0
  258. }
  259. err = func() error {
  260. resp, err = client.Do(httpReq)
  261. if err != nil {
  262. return err
  263. }
  264. f.clients[index] = resp
  265. if resp.StatusCode != base.HttpCodeOK && resp.StatusCode != base.HttpCodePartialContent {
  266. err = NewRequestError(resp.StatusCode, resp.Status)
  267. return err
  268. }
  269. return nil
  270. }()
  271. if err != nil {
  272. //请求失败3s后重试
  273. time.Sleep(time.Second * 3)
  274. continue
  275. }
  276. // 请求成功就重置错误次数,连续失败10次才终止
  277. i = 0
  278. retry, err = func() (bool, error) {
  279. defer func() {
  280. _ = resp.Body.Close()
  281. }()
  282. var n int
  283. for {
  284. n, err = resp.Body.Read(buf)
  285. if n > 0 {
  286. _, err = f.Ctl.Write(filename, chunk.Begin+chunk.Downloaded, buf[:n])
  287. if err != nil {
  288. return false, err
  289. }
  290. chunk.Downloaded += int64(n)
  291. }
  292. if err != nil {
  293. if err != io.EOF {
  294. return true, err
  295. }
  296. break
  297. }
  298. }
  299. return false, nil
  300. }()
  301. if !retry {
  302. // 下载成功,跳出重试
  303. break
  304. }
  305. }
  306. if f.status == base.DownloadStatusPause {
  307. chunk.Status = base.DownloadStatusPause
  308. } else if chunk.Downloaded >= chunk.End-chunk.Begin+1 {
  309. chunk.Status = base.DownloadStatusDone
  310. } else {
  311. if err != nil {
  312. chunk.Status = base.DownloadStatusError
  313. } else {
  314. chunk.Status = base.DownloadStatusDone
  315. }
  316. }
  317. return
  318. }
  319. func buildClient() *http.Client {
  320. // Cookie handle
  321. jar, _ := cookiejar.New(nil)
  322. return &http.Client{
  323. Jar: jar,
  324. Timeout: time.Second * 10,
  325. }
  326. }
  327. func buildRequest(ctx context.Context, req *base.Request) (httpReq *http.Request, err error) {
  328. reqUrl, err := url.Parse(req.URL)
  329. if err != nil {
  330. return
  331. }
  332. var (
  333. method string
  334. body io.Reader
  335. )
  336. headers := make(map[string][]string)
  337. if req.Extra == nil {
  338. method = http.MethodGet
  339. } else {
  340. extra := req.Extra.(Extra)
  341. if extra.Method != "" {
  342. method = extra.Method
  343. } else {
  344. method = http.MethodGet
  345. }
  346. if len(extra.Header) > 0 {
  347. for k, v := range extra.Header {
  348. headers[k] = []string{v}
  349. }
  350. }
  351. if extra.Body != "" {
  352. body = ioutil.NopCloser(bytes.NewBufferString(extra.Body))
  353. }
  354. }
  355. if ctx != nil {
  356. httpReq, err = http.NewRequestWithContext(ctx, method, reqUrl.String(), body)
  357. } else {
  358. httpReq, err = http.NewRequest(method, reqUrl.String(), body)
  359. }
  360. if err != nil {
  361. return
  362. }
  363. httpReq.Header = headers
  364. return httpReq, nil
  365. }