tus.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. package upload
  2. import (
  3. "context"
  4. "crypto/sha256"
  5. "errors"
  6. "fmt"
  7. "git.bvbej.com/bvbej/base-golang/pkg/color"
  8. "git.bvbej.com/bvbej/base-golang/pkg/ticker"
  9. "git.bvbej.com/bvbej/base-golang/pkg/token"
  10. "github.com/rs/cors"
  11. "github.com/tus/tusd/pkg/filestore"
  12. tus "github.com/tus/tusd/pkg/handler"
  13. "go.uber.org/zap"
  14. "net/http"
  15. "os"
  16. "strings"
  17. "sync"
  18. "time"
  19. )
  20. var _ Server = (*server)(nil)
  21. type Server interface {
  22. GetUploadToken(string, string, time.Duration) string
  23. GetFileInfo(string) (*tus.FileInfo, error)
  24. Start(func(string, string, tus.FileInfo)) error
  25. Stop() error
  26. }
  27. type server struct {
  28. headerTokenKey string
  29. uploading sync.Map
  30. config Config
  31. token token.Token
  32. store filestore.FileStore
  33. logger *zap.Logger
  34. httpServer *http.Server
  35. ctx context.Context
  36. done context.CancelFunc
  37. checker ticker.Ticker
  38. completedEvent func(sha256, param string, info tus.FileInfo)
  39. }
  40. type Config struct {
  41. ListenAddr string
  42. Path string
  43. Dir string
  44. Secret string
  45. DisableDownload bool
  46. Debug bool
  47. }
  48. func New(conf Config, logger *zap.Logger) Server {
  49. ctx, cancelFunc := context.WithCancel(context.Background())
  50. return &server{
  51. config: conf,
  52. uploading: sync.Map{},
  53. headerTokenKey: "Authorization",
  54. logger: logger,
  55. token: token.New(conf.Secret),
  56. ctx: ctx,
  57. done: cancelFunc,
  58. checker: ticker.New(time.Minute),
  59. }
  60. }
  61. func (s *server) GetUploadToken(sha256, param string, ttl time.Duration) string {
  62. sign, _ := s.token.JwtSign(sha256, param, ttl)
  63. return sign
  64. }
  65. func (s *server) GetFileInfo(id string) (*tus.FileInfo, error) {
  66. upload, err := s.store.GetUpload(context.Background(), id)
  67. if err != nil {
  68. return nil, err
  69. }
  70. info, err := upload.GetInfo(context.Background())
  71. if err != nil {
  72. return nil, err
  73. }
  74. return &info, nil
  75. }
  76. func (s *server) Start(completedEvent func(sha256, param string, info tus.FileInfo)) error {
  77. s.completedEvent = completedEvent
  78. composer := tus.NewStoreComposer()
  79. if err := os.MkdirAll(s.config.Dir, os.ModePerm); err != nil {
  80. return err
  81. }
  82. s.store = filestore.New(s.config.Dir)
  83. s.store.UseIn(composer)
  84. handler, err := tus.NewHandler(tus.Config{
  85. StoreComposer: composer,
  86. BasePath: s.config.Path,
  87. Logger: zap.NewStdLog(s.logger),
  88. NotifyCompleteUploads: true,
  89. NotifyTerminatedUploads: true,
  90. DisableTermination: true,
  91. DisableDownload: s.config.DisableDownload,
  92. RespectForwardedHeaders: strings.Contains(s.config.ListenAddr, "127.0.0.1"),
  93. PreUploadCreateCallback: func(hook tus.HookEvent) error {
  94. authStr := hook.HTTPRequest.Header.Get(s.headerTokenKey)
  95. jwtClaims, err := s.token.JwtParse(authStr)
  96. if err == nil {
  97. _, ok := s.uploading.Load(authStr)
  98. if !ok {
  99. s.uploading.Store(authStr, jwtClaims.ExpiresAt.Time)
  100. return nil
  101. }
  102. return errors.New("repeated")
  103. }
  104. return errors.New("unauthorized")
  105. },
  106. PreFinishResponseCallback: func(hook tus.HookEvent) error {
  107. authStr := hook.HTTPRequest.Header.Get(s.headerTokenKey)
  108. jwtParse, err := s.token.JwtParse(authStr)
  109. if err != nil {
  110. return errors.New("token expired")
  111. }
  112. _, ok := s.uploading.Load(authStr)
  113. if ok {
  114. s.uploading.Delete(authStr)
  115. }
  116. upload, err := s.store.GetUpload(context.Background(), hook.Upload.ID)
  117. if err != nil {
  118. return err
  119. }
  120. info, err := upload.GetInfo(context.Background())
  121. path, exist := info.Storage["Path"]
  122. if err != nil || !exist {
  123. return errors.New("file not found")
  124. }
  125. content, err := os.ReadFile(path)
  126. if err != nil {
  127. return err
  128. }
  129. hash := sha256.New()
  130. hash.Write(content)
  131. sha256Byte := hash.Sum(nil)
  132. sha256String := fmt.Sprintf("%x", sha256Byte)
  133. if !s.config.Debug && sha256String != strings.ToLower(jwtParse.ID) {
  134. _ = os.Remove(path)
  135. _ = os.Remove(path + ".info")
  136. return errors.New("file check error")
  137. }
  138. return nil
  139. },
  140. })
  141. if err != nil {
  142. return err
  143. }
  144. go func() {
  145. for {
  146. select {
  147. case event := <-handler.CompleteUploads:
  148. authStr := event.HTTPRequest.Header.Get(s.headerTokenKey)
  149. jwtParse, _ := s.token.JwtParse(authStr)
  150. if s.completedEvent != nil {
  151. go func() {
  152. s.completedEvent(jwtParse.ID, jwtParse.Subject, event.Upload)
  153. }()
  154. }
  155. case <-s.ctx.Done():
  156. return
  157. }
  158. }
  159. }()
  160. go func() {
  161. for {
  162. select {
  163. case event := <-handler.TerminatedUploads:
  164. upload, _ := s.store.GetUpload(context.Background(), event.Upload.ID)
  165. if upload != nil {
  166. info, _ := upload.GetInfo(context.Background())
  167. path, exist := info.Storage["Path"]
  168. if exist {
  169. _ = os.Remove(path)
  170. _ = os.Remove(path + ".info")
  171. }
  172. }
  173. case <-s.ctx.Done():
  174. return
  175. }
  176. }
  177. }()
  178. s.checker.Process(func() {
  179. s.uploading.Range(func(key, value any) bool {
  180. t := value.(time.Time)
  181. if t.Before(time.Now()) {
  182. s.uploading.Delete(key)
  183. }
  184. return true
  185. })
  186. })
  187. //监听服务
  188. addr := s.config.ListenAddr
  189. mux := http.NewServeMux()
  190. mux.Handle(s.config.Path, http.StripPrefix(s.config.Path, handler))
  191. s.httpServer = &http.Server{
  192. Addr: addr,
  193. Handler: cors.AllowAll().Handler(mux),
  194. }
  195. go func() {
  196. if err = s.httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
  197. s.logger.Sugar().Fatal("upload server startup err", zap.Error(err))
  198. }
  199. }()
  200. fmt.Println(color.Green(fmt.Sprintf("* [register tusd listen %s]", addr)))
  201. return nil
  202. }
  203. func (s *server) Stop() error {
  204. s.done()
  205. return s.httpServer.Close()
  206. }