fetcher.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. package http
  2. import (
  3. "bytes"
  4. "context"
  5. "fmt"
  6. "git.bvbej.com/bvbej/base-golang/pkg/downloader/base"
  7. "git.bvbej.com/bvbej/base-golang/pkg/downloader/fetcher"
  8. "golang.org/x/sync/errgroup"
  9. "io"
  10. "mime"
  11. "net"
  12. "net/http"
  13. "net/url"
  14. "path"
  15. "path/filepath"
  16. "strconv"
  17. "strings"
  18. "time"
  19. )
  20. type RequestError struct {
  21. Code int
  22. Msg string
  23. }
  24. func NewRequestError(code int, msg string) *RequestError {
  25. return &RequestError{Code: code, Msg: msg}
  26. }
  27. func (re *RequestError) Error() string {
  28. return fmt.Sprintf("http request fail,code:%d", re.Code)
  29. }
  30. type Fetcher struct {
  31. *fetcher.DefaultFetcher
  32. res *base.Resource
  33. opts *base.Options
  34. status base.Status
  35. clients []*http.Response
  36. chunks []*Chunk
  37. ctx context.Context
  38. cancel context.CancelFunc
  39. pauseCh chan any
  40. }
  41. func NewFetcher() *Fetcher {
  42. return &Fetcher{
  43. DefaultFetcher: new(fetcher.DefaultFetcher),
  44. pauseCh: make(chan any),
  45. }
  46. }
  47. var protocols = []string{"HTTP", "HTTPS"}
  48. func FetcherBuilder() ([]string, func() fetcher.Fetcher) {
  49. return protocols, func() fetcher.Fetcher {
  50. return NewFetcher()
  51. }
  52. }
  53. func (f *Fetcher) Resolve(req *base.Request) (*base.Resource, error) {
  54. httpReq, err := f.buildRequest(nil, req)
  55. if err != nil {
  56. return nil, err
  57. }
  58. client, err := f.buildClient()
  59. if err != nil {
  60. return nil, err
  61. }
  62. // 只访问一个字节,测试资源是否支持Range请求
  63. httpReq.Header.Set(base.HttpHeaderRange, fmt.Sprintf(base.HttpHeaderRangeFormat, 0, 0))
  64. httpResp, err := client.Do(httpReq)
  65. if err != nil {
  66. return nil, err
  67. }
  68. // 拿到响应头就关闭,不用加defer
  69. _ = httpResp.Body.Close()
  70. res := &base.Resource{
  71. Req: req,
  72. Range: false,
  73. Files: []*base.FileInfo{},
  74. }
  75. if base.HttpCodePartialContent == httpResp.StatusCode {
  76. // 返回206响应码表示支持断点下载
  77. res.Range = true
  78. // 解析资源大小: bytes 0-1000/1001 => 1001
  79. contentTotal := path.Base(httpResp.Header.Get(base.HttpHeaderContentRange))
  80. if contentTotal != "" {
  81. parse, err := strconv.ParseInt(contentTotal, 10, 64)
  82. if err != nil {
  83. return nil, err
  84. }
  85. res.TotalSize = parse
  86. }
  87. } else if base.HttpCodeOK == httpResp.StatusCode {
  88. // 返回200响应码,不支持断点下载,通过Content-Length头获取文件大小,获取不到的话可能是chunked编码
  89. contentLength := httpResp.Header.Get(base.HttpHeaderContentLength)
  90. if contentLength != "" {
  91. parse, err := strconv.ParseInt(contentLength, 10, 64)
  92. if err != nil {
  93. return nil, err
  94. }
  95. res.TotalSize = parse
  96. }
  97. } else {
  98. return nil, NewRequestError(httpResp.StatusCode, httpResp.Status)
  99. }
  100. file := &base.FileInfo{
  101. Size: res.TotalSize,
  102. }
  103. contentDisposition := httpResp.Header.Get(base.HttpHeaderContentDisposition)
  104. if contentDisposition != "" {
  105. _, params, _ := mime.ParseMediaType(contentDisposition)
  106. filename := params["filename"]
  107. if filename != "" {
  108. file.Name = filename
  109. }
  110. }
  111. // Get file filename by URL
  112. if file.Name == "" && strings.Count(req.URL, "/") > 2 {
  113. file.Name = filepath.Base(req.URL)
  114. }
  115. // unknown file filename
  116. if file.Name == "" {
  117. file.Name = "unknown"
  118. }
  119. res.Files = append(res.Files, file)
  120. return res, nil
  121. }
  122. func (f *Fetcher) Create(res *base.Resource, opts *base.Options) error {
  123. f.res = res
  124. f.opts = opts
  125. f.status = base.DownloadStatusReady
  126. return nil
  127. }
  128. func (f *Fetcher) Start() (err error) {
  129. // 创建文件
  130. name := f.filename()
  131. _, err = f.Ctl.Touch(name, f.res.TotalSize)
  132. if err != nil {
  133. return err
  134. }
  135. f.status = base.DownloadStatusStart
  136. if f.res.Range {
  137. // 每个连接平均需要下载的分块大小
  138. chunkSize := f.res.TotalSize / int64(f.opts.Connections)
  139. f.chunks = make([]*Chunk, f.opts.Connections)
  140. f.clients = make([]*http.Response, f.opts.Connections)
  141. for i := 0; i < f.opts.Connections; i++ {
  142. var (
  143. begin = chunkSize * int64(i)
  144. end int64
  145. )
  146. if i == f.opts.Connections-1 {
  147. // 最后一个分块需要保证把文件下载完
  148. end = f.res.TotalSize - 1
  149. } else {
  150. end = begin + chunkSize - 1
  151. }
  152. chunk := NewChunk(begin, end)
  153. f.chunks[i] = chunk
  154. }
  155. } else {
  156. // 只支持单连接下载
  157. f.chunks = make([]*Chunk, 1)
  158. f.clients = make([]*http.Response, 1)
  159. f.chunks[0] = NewChunk(0, 0)
  160. }
  161. f.fetch()
  162. return
  163. }
  164. func (f *Fetcher) Pause() (err error) {
  165. if base.DownloadStatusStart != f.status {
  166. return
  167. }
  168. f.status = base.DownloadStatusPause
  169. f.cancel()
  170. <-f.pauseCh
  171. return
  172. }
  173. func (f *Fetcher) Continue() (err error) {
  174. if base.DownloadStatusStart == f.status || base.DownloadStatusDone == f.status {
  175. return
  176. }
  177. f.status = base.DownloadStatusStart
  178. var name = f.filename()
  179. _, err = f.Ctl.Open(name)
  180. if err != nil {
  181. return err
  182. }
  183. f.fetch()
  184. return
  185. }
  186. func (f *Fetcher) Progress() fetcher.Progress {
  187. p := make(fetcher.Progress, 0)
  188. if len(f.chunks) > 0 {
  189. total := int64(0)
  190. for _, chunk := range f.chunks {
  191. total += chunk.Downloaded
  192. }
  193. p = append(p, total)
  194. }
  195. return p
  196. }
  197. func (f *Fetcher) filename() string {
  198. // 创建文件
  199. var filename = f.opts.Name
  200. if filename == "" {
  201. filename = f.res.Files[0].Name
  202. }
  203. return filepath.Join(f.opts.Path, filename)
  204. }
  205. func (f *Fetcher) fetch() {
  206. f.ctx, f.cancel = context.WithCancel(context.Background())
  207. eg, _ := errgroup.WithContext(f.ctx)
  208. for i := 0; i < f.opts.Connections; i++ {
  209. eg.Go(func() error {
  210. return f.fetchChunk(i)
  211. })
  212. }
  213. go func() {
  214. err := eg.Wait()
  215. // 下载停止,关闭文件句柄
  216. _ = f.Ctl.Close(f.filename())
  217. if f.status == base.DownloadStatusPause {
  218. f.pauseCh <- nil
  219. } else {
  220. if err != nil {
  221. f.status = base.DownloadStatusError
  222. } else {
  223. f.status = base.DownloadStatusDone
  224. }
  225. f.DoneCh <- err
  226. }
  227. }()
  228. }
  229. func (f *Fetcher) fetchChunk(index int) (err error) {
  230. filename := f.filename()
  231. chunk := f.chunks[index]
  232. httpReq, err := f.buildRequest(f.ctx, f.res.Req)
  233. if err != nil {
  234. return err
  235. }
  236. client, err := f.buildClient()
  237. if err != nil {
  238. return err
  239. }
  240. var buf = make([]byte, 8192)
  241. // 重试10次
  242. for i := 0; i < 10; i++ {
  243. // 如果下载完成直接返回
  244. if chunk.Status == base.DownloadStatusDone {
  245. return
  246. }
  247. // 如果已暂停直接跳出
  248. if f.status == base.DownloadStatusPause {
  249. break
  250. }
  251. var (
  252. resp *http.Response
  253. retry bool
  254. )
  255. if f.res.Range {
  256. httpReq.Header.Set(base.HttpHeaderRange,
  257. fmt.Sprintf(base.HttpHeaderRangeFormat, chunk.Begin+chunk.Downloaded, chunk.End))
  258. } else {
  259. chunk.Downloaded = 0
  260. }
  261. err = func() error {
  262. resp, err = client.Do(httpReq)
  263. if err != nil {
  264. return err
  265. }
  266. f.clients[index] = resp
  267. if resp.StatusCode != base.HttpCodeOK && resp.StatusCode != base.HttpCodePartialContent {
  268. err = NewRequestError(resp.StatusCode, resp.Status)
  269. return err
  270. }
  271. return nil
  272. }()
  273. if err != nil {
  274. //请求失败3s后重试
  275. time.Sleep(time.Second * 3)
  276. continue
  277. }
  278. // 请求成功就重置错误次数,连续失败10次才终止
  279. i = 0
  280. retry, err = func() (bool, error) {
  281. defer func() {
  282. _ = resp.Body.Close()
  283. }()
  284. var n int
  285. for {
  286. n, err = resp.Body.Read(buf)
  287. if n > 0 {
  288. _, err = f.Ctl.Write(filename, chunk.Begin+chunk.Downloaded, buf[:n])
  289. if err != nil {
  290. return false, err
  291. }
  292. chunk.Downloaded += int64(n)
  293. }
  294. if err != nil {
  295. if err != io.EOF {
  296. return true, err
  297. }
  298. break
  299. }
  300. }
  301. return false, nil
  302. }()
  303. if !retry {
  304. // 下载成功,跳出重试
  305. break
  306. }
  307. }
  308. if f.status == base.DownloadStatusPause {
  309. chunk.Status = base.DownloadStatusPause
  310. } else if chunk.Downloaded >= chunk.End-chunk.Begin+1 {
  311. chunk.Status = base.DownloadStatusDone
  312. } else {
  313. if err != nil {
  314. chunk.Status = base.DownloadStatusError
  315. } else {
  316. chunk.Status = base.DownloadStatusDone
  317. }
  318. }
  319. return
  320. }
  321. func (f *Fetcher) buildClient() (*http.Client, error) {
  322. dialer, err := f.Ctl.ContextDialer()
  323. if err != nil {
  324. return nil, err
  325. }
  326. transport := &http.Transport{
  327. DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
  328. return dialer.Dial(network, addr)
  329. },
  330. }
  331. if f.Ctl.ContextProxy() != nil {
  332. transport.Proxy = f.Ctl.ContextProxy()
  333. }
  334. return &http.Client{
  335. Jar: f.Ctl.ContextCookie(),
  336. Timeout: f.Ctl.ContextTimeout(),
  337. Transport: transport,
  338. }, nil
  339. }
  340. func (f *Fetcher) buildRequest(ctx context.Context, req *base.Request) (httpReq *http.Request, err error) {
  341. reqUrl, err := url.Parse(req.URL)
  342. if err != nil {
  343. return
  344. }
  345. var (
  346. method string
  347. body io.Reader
  348. )
  349. headers := make(map[string][]string)
  350. if req.Extra == nil {
  351. method = http.MethodGet
  352. } else {
  353. extra := req.Extra.(Extra)
  354. if extra.Method != "" {
  355. method = extra.Method
  356. } else {
  357. method = http.MethodGet
  358. }
  359. if len(extra.Header) > 0 {
  360. for k, v := range extra.Header {
  361. headers[k] = []string{v}
  362. }
  363. }
  364. if extra.Body != "" {
  365. body = io.NopCloser(bytes.NewBufferString(extra.Body))
  366. }
  367. }
  368. if ctx != nil {
  369. httpReq, err = http.NewRequestWithContext(ctx, method, reqUrl.String(), body)
  370. } else {
  371. httpReq, err = http.NewRequest(method, reqUrl.String(), body)
  372. }
  373. if err != nil {
  374. return
  375. }
  376. httpReq.Header = headers
  377. return httpReq, nil
  378. }