fetcher.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. package http
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "git.bvbej.com/bvbej/base-golang/pkg/downloader/base"
  7. "git.bvbej.com/bvbej/base-golang/pkg/downloader/fetcher"
  8. "golang.org/x/sync/errgroup"
  9. "io"
  10. "mime"
  11. "net"
  12. "net/http"
  13. "net/url"
  14. "path"
  15. "path/filepath"
  16. "strconv"
  17. "strings"
  18. "time"
  19. )
  20. type RequestError struct {
  21. Code int
  22. Msg string
  23. }
  24. func NewRequestError(code int, msg string) *RequestError {
  25. return &RequestError{Code: code, Msg: msg}
  26. }
  27. func (re *RequestError) Error() string {
  28. return fmt.Sprintf("http request fail,code:%d", re.Code)
  29. }
  30. type Fetcher struct {
  31. *fetcher.DefaultFetcher
  32. res *base.Resource
  33. opts *base.Options
  34. status base.Status
  35. clients []*http.Response
  36. chunks []*Chunk
  37. ctx context.Context
  38. cancel context.CancelFunc
  39. pauseCh chan any
  40. }
  41. func NewFetcher() *Fetcher {
  42. return &Fetcher{
  43. DefaultFetcher: new(fetcher.DefaultFetcher),
  44. pauseCh: make(chan any),
  45. }
  46. }
  47. var protocols = []string{"HTTP", "HTTPS"}
  48. func FetcherBuilder() ([]string, func() fetcher.Fetcher) {
  49. return protocols, func() fetcher.Fetcher {
  50. return NewFetcher()
  51. }
  52. }
  53. func (f *Fetcher) Resolve(req *base.Request) (*base.Resource, error) {
  54. httpReq, err := f.buildRequest(nil, req)
  55. if err != nil {
  56. return nil, err
  57. }
  58. client, err := f.buildClient()
  59. if err != nil {
  60. return nil, err
  61. }
  62. // 只访问一个字节,测试资源是否支持Range请求
  63. httpReq.Header.Set(base.HttpHeaderRange, fmt.Sprintf(base.HttpHeaderRangeFormat, 0, 0))
  64. httpResp, err := client.Do(httpReq)
  65. if err != nil {
  66. return nil, err
  67. }
  68. // 拿到响应头就关闭,不用加defer
  69. _ = httpResp.Body.Close()
  70. res := &base.Resource{
  71. Req: req,
  72. Range: false,
  73. Files: []*base.FileInfo{},
  74. }
  75. if base.HttpCodePartialContent == httpResp.StatusCode {
  76. // 返回206响应码表示支持断点下载
  77. res.Range = true
  78. // 解析资源大小: bytes 0-1000/1001 => 1001
  79. contentTotal := path.Base(httpResp.Header.Get(base.HttpHeaderContentRange))
  80. if contentTotal != "" {
  81. parse, err := strconv.ParseInt(contentTotal, 10, 64)
  82. if err != nil {
  83. return nil, err
  84. }
  85. res.TotalSize = parse
  86. }
  87. } else if base.HttpCodeOK == httpResp.StatusCode {
  88. // 返回200响应码,不支持断点下载,通过Content-Length头获取文件大小,获取不到的话可能是chunked编码
  89. contentLength := httpResp.Header.Get(base.HttpHeaderContentLength)
  90. if contentLength != "" {
  91. parse, err := strconv.ParseInt(contentLength, 10, 64)
  92. if err != nil {
  93. return nil, err
  94. }
  95. res.TotalSize = parse
  96. }
  97. } else {
  98. return nil, NewRequestError(httpResp.StatusCode, httpResp.Status)
  99. }
  100. file := &base.FileInfo{
  101. Size: res.TotalSize,
  102. }
  103. contentDisposition := httpResp.Header.Get(base.HttpHeaderContentDisposition)
  104. if contentDisposition != "" {
  105. _, params, _ := mime.ParseMediaType(contentDisposition)
  106. filename := params["filename"]
  107. if filename != "" {
  108. file.Name = filename
  109. }
  110. }
  111. // Get file filename by URL
  112. if file.Name == "" && strings.Count(req.URL, "/") > 2 {
  113. file.Name = filepath.Base(req.URL)
  114. }
  115. // unknown file filename
  116. if file.Name == "" {
  117. file.Name = "unknown"
  118. }
  119. res.Files = append(res.Files, file)
  120. return res, nil
  121. }
  122. func (f *Fetcher) Create(res *base.Resource, opts *base.Options) error {
  123. f.res = res
  124. f.opts = opts
  125. f.status = base.DownloadStatusReady
  126. return nil
  127. }
  128. func (f *Fetcher) Start() (err error) {
  129. // 创建文件
  130. name := f.filename()
  131. _, err = f.Ctl.Touch(name, f.res.TotalSize)
  132. if err != nil {
  133. return err
  134. }
  135. f.status = base.DownloadStatusStart
  136. if f.res.Range {
  137. // 每个连接平均需要下载的分块大小
  138. chunkSize := f.res.TotalSize / int64(f.opts.Connections)
  139. f.chunks = make([]*Chunk, f.opts.Connections)
  140. f.clients = make([]*http.Response, f.opts.Connections)
  141. for i := 0; i < f.opts.Connections; i++ {
  142. var (
  143. begin = chunkSize * int64(i)
  144. end int64
  145. )
  146. if i == f.opts.Connections-1 {
  147. // 最后一个分块需要保证把文件下载完
  148. end = f.res.TotalSize - 1
  149. } else {
  150. end = begin + chunkSize - 1
  151. }
  152. chunk := NewChunk(begin, end)
  153. f.chunks[i] = chunk
  154. }
  155. } else {
  156. // 只支持单连接下载
  157. f.chunks = make([]*Chunk, 1)
  158. f.clients = make([]*http.Response, 1)
  159. f.chunks[0] = NewChunk(0, 0)
  160. }
  161. f.fetch()
  162. return
  163. }
  164. func (f *Fetcher) Pause() (err error) {
  165. if base.DownloadStatusStart != f.status {
  166. return
  167. }
  168. f.status = base.DownloadStatusPause
  169. f.cancel()
  170. <-f.pauseCh
  171. return
  172. }
  173. func (f *Fetcher) Continue() (err error) {
  174. if base.DownloadStatusStart == f.status || base.DownloadStatusDone == f.status {
  175. return
  176. }
  177. f.status = base.DownloadStatusStart
  178. var name = f.filename()
  179. _, err = f.Ctl.Open(name)
  180. if err != nil {
  181. return err
  182. }
  183. f.fetch()
  184. return
  185. }
  186. func (f *Fetcher) Progress() fetcher.Progress {
  187. p := make(fetcher.Progress, 0)
  188. if len(f.chunks) > 0 {
  189. total := int64(0)
  190. for _, chunk := range f.chunks {
  191. total += chunk.Downloaded
  192. }
  193. p = append(p, total)
  194. }
  195. return p
  196. }
  197. func (f *Fetcher) filename() string {
  198. // 创建文件
  199. var filename = f.opts.Name
  200. if filename == "" {
  201. filename = f.res.Files[0].Name
  202. }
  203. return filepath.Join(f.opts.Path, filename)
  204. }
  205. func (f *Fetcher) fetch() {
  206. f.ctx, f.cancel = context.WithCancel(context.Background())
  207. eg, _ := errgroup.WithContext(f.ctx)
  208. for i := 0; i < f.opts.Connections; i++ {
  209. j := i //TODO loop var per loop(1.22已解决,loop var per-iteration)
  210. eg.Go(func() error {
  211. return f.fetchChunk(j)
  212. })
  213. }
  214. go func() {
  215. err := eg.Wait()
  216. // 下载停止,关闭文件句柄
  217. _ = f.Ctl.Close(f.filename())
  218. if f.status == base.DownloadStatusPause {
  219. f.pauseCh <- nil
  220. } else {
  221. if err != nil {
  222. f.status = base.DownloadStatusError
  223. } else {
  224. f.status = base.DownloadStatusDone
  225. }
  226. f.DoneCh <- err
  227. }
  228. }()
  229. }
  230. func (f *Fetcher) fetchChunk(index int) (err error) {
  231. filename := f.filename()
  232. chunk := f.chunks[index]
  233. httpReq, err := f.buildRequest(f.ctx, f.res.Req)
  234. if err != nil {
  235. return err
  236. }
  237. client, err := f.buildClient()
  238. if err != nil {
  239. return err
  240. }
  241. var buf = make([]byte, 8192)
  242. // 重试10次
  243. for i := 0; i < 10; i++ {
  244. // 如果下载完成直接返回
  245. if chunk.Status == base.DownloadStatusDone {
  246. return
  247. }
  248. // 如果已暂停直接跳出
  249. if f.status == base.DownloadStatusPause {
  250. break
  251. }
  252. var (
  253. resp *http.Response
  254. retry bool
  255. )
  256. if f.res.Range {
  257. httpReq.Header.Set(base.HttpHeaderRange,
  258. fmt.Sprintf(base.HttpHeaderRangeFormat, chunk.Begin+chunk.Downloaded, chunk.End))
  259. } else {
  260. chunk.Downloaded = 0
  261. }
  262. err = func() error {
  263. resp, err = client.Do(httpReq)
  264. if err != nil {
  265. return err
  266. }
  267. f.clients[index] = resp
  268. if resp.StatusCode != base.HttpCodeOK && resp.StatusCode != base.HttpCodePartialContent {
  269. err = NewRequestError(resp.StatusCode, resp.Status)
  270. return err
  271. }
  272. return nil
  273. }()
  274. if err != nil {
  275. //请求失败3s后重试
  276. time.Sleep(time.Second * 3)
  277. continue
  278. }
  279. // 请求成功就重置错误次数,连续失败10次才终止
  280. i = 0
  281. retry, err = func() (bool, error) {
  282. defer func() {
  283. _ = resp.Body.Close()
  284. }()
  285. var n int
  286. for {
  287. n, err = resp.Body.Read(buf)
  288. if n > 0 {
  289. _, err = f.Ctl.Write(filename, chunk.Begin+chunk.Downloaded, buf[:n])
  290. if err != nil {
  291. return false, err
  292. }
  293. chunk.Downloaded += int64(n)
  294. }
  295. if err != nil {
  296. if err != io.EOF {
  297. return true, err
  298. }
  299. break
  300. }
  301. }
  302. return false, nil
  303. }()
  304. if !retry {
  305. // 下载成功,跳出重试
  306. break
  307. }
  308. }
  309. if f.status == base.DownloadStatusPause {
  310. chunk.Status = base.DownloadStatusPause
  311. } else if chunk.Downloaded >= chunk.End-chunk.Begin+1 {
  312. chunk.Status = base.DownloadStatusDone
  313. } else {
  314. if err != nil {
  315. chunk.Status = base.DownloadStatusError
  316. } else {
  317. chunk.Status = base.DownloadStatusDone
  318. }
  319. }
  320. return
  321. }
  322. func (f *Fetcher) buildClient() (*http.Client, error) {
  323. dialer, err := f.Ctl.ContextDialer()
  324. if err != nil {
  325. return nil, err
  326. }
  327. transport := &http.Transport{
  328. DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
  329. return dialer.Dial(network, addr)
  330. },
  331. }
  332. if f.Ctl.ContextProxy() != nil {
  333. transport.Proxy = f.Ctl.ContextProxy()
  334. }
  335. return &http.Client{
  336. Jar: f.Ctl.ContextCookie(),
  337. Timeout: f.Ctl.ContextTimeout(),
  338. Transport: transport,
  339. }, nil
  340. }
  341. func (f *Fetcher) buildRequest(ctx context.Context, req *base.Request) (httpReq *http.Request, err error) {
  342. reqUrl, err := url.Parse(req.URL)
  343. if err != nil {
  344. return
  345. }
  346. var (
  347. method string
  348. body io.Reader
  349. )
  350. headers := make(map[string][]string)
  351. if req.Extra == nil {
  352. method = http.MethodGet
  353. } else {
  354. extra := req.Extra.(Extra)
  355. if extra.Method != "" {
  356. method = extra.Method
  357. } else {
  358. method = http.MethodGet
  359. }
  360. if len(extra.Header) > 0 {
  361. for k, v := range extra.Header {
  362. headers[k] = []string{v}
  363. }
  364. }
  365. if extra.Body != "" {
  366. body = io.NopCloser(bytes.NewBufferString(extra.Body))
  367. }
  368. }
  369. if ctx != nil {
  370. httpReq, err = http.NewRequestWithContext(ctx, method, reqUrl.String(), body)
  371. } else {
  372. httpReq, err = http.NewRequest(method, reqUrl.String(), body)
  373. }
  374. if err != nil {
  375. return
  376. }
  377. httpReq.Header = headers
  378. return httpReq, nil
  379. }