tus.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package upload
  2. import (
  3. "context"
  4. "crypto/sha256"
  5. "errors"
  6. "fmt"
  7. "git.bvbej.com/bvbej/base-golang/pkg/color"
  8. "git.bvbej.com/bvbej/base-golang/pkg/token"
  9. "github.com/rs/cors"
  10. "github.com/tus/tusd/pkg/filestore"
  11. tus "github.com/tus/tusd/pkg/handler"
  12. "go.uber.org/zap"
  13. "net/http"
  14. "os"
  15. "strings"
  16. "sync"
  17. "time"
  18. )
  19. var _ Server = (*server)(nil)
  20. type Server interface {
  21. GetUploadToken(string, string, time.Duration) string
  22. GetFileInfo(string) (*tus.FileInfo, error)
  23. Start(func(string, string, tus.FileInfo)) error
  24. Stop() error
  25. }
  26. type server struct {
  27. headerTokenKey string
  28. uploading sync.Map
  29. config Config
  30. token token.Token
  31. store filestore.FileStore
  32. logger *zap.Logger
  33. httpServer *http.Server
  34. completedEvent func(sha256, param string, info tus.FileInfo)
  35. }
  36. type Config struct {
  37. ListenAddr string
  38. Path string
  39. Dir string
  40. Secret string
  41. DisableDownload bool
  42. Debug bool
  43. }
  44. func New(conf Config, logger *zap.Logger) Server {
  45. return &server{
  46. config: conf,
  47. uploading: sync.Map{},
  48. headerTokenKey: "Authorization",
  49. logger: logger,
  50. token: token.New(conf.Secret),
  51. }
  52. }
  53. func (s *server) GetUploadToken(sha256, param string, ttl time.Duration) string {
  54. sign, _ := s.token.JwtSign(sha256, param, ttl)
  55. return sign
  56. }
  57. func (s *server) GetFileInfo(id string) (*tus.FileInfo, error) {
  58. upload, err := s.store.GetUpload(context.Background(), id)
  59. if err != nil {
  60. return nil, err
  61. }
  62. info, err := upload.GetInfo(context.Background())
  63. if err != nil {
  64. return nil, err
  65. }
  66. return &info, nil
  67. }
  68. func (s *server) Start(completedEvent func(sha256, param string, info tus.FileInfo)) error {
  69. s.completedEvent = completedEvent
  70. composer := tus.NewStoreComposer()
  71. if err := os.MkdirAll(s.config.Dir, os.ModePerm); err != nil {
  72. return err
  73. }
  74. s.store = filestore.New(s.config.Dir)
  75. s.store.UseIn(composer)
  76. handler, err := tus.NewHandler(tus.Config{
  77. StoreComposer: composer,
  78. BasePath: s.config.Path,
  79. Logger: zap.NewStdLog(s.logger),
  80. NotifyCompleteUploads: true,
  81. DisableTermination: true,
  82. DisableDownload: s.config.DisableDownload,
  83. RespectForwardedHeaders: strings.Contains(s.config.ListenAddr, "127.0.0.1"),
  84. PreUploadCreateCallback: func(hook tus.HookEvent) error {
  85. authStr := hook.HTTPRequest.Header.Get(s.headerTokenKey)
  86. jwtParse, err := s.token.JwtParse(authStr)
  87. if err == nil {
  88. _, ok := s.uploading.Load(jwtParse.ID)
  89. if !ok {
  90. s.uploading.Store(jwtParse.ID, time.Now())
  91. return nil
  92. }
  93. }
  94. return errors.New("unauthorized")
  95. },
  96. PreFinishResponseCallback: func(hook tus.HookEvent) error {
  97. authStr := hook.HTTPRequest.Header.Get(s.headerTokenKey)
  98. jwtParse, err := s.token.JwtParse(authStr)
  99. if err != nil {
  100. return errors.New("token expired")
  101. }
  102. _, ok := s.uploading.Load(jwtParse.ID)
  103. if ok {
  104. s.uploading.Delete(jwtParse.ID)
  105. }
  106. upload, err := s.store.GetUpload(context.Background(), hook.Upload.ID)
  107. if err != nil {
  108. return err
  109. }
  110. info, err := upload.GetInfo(context.Background())
  111. path, exist := info.Storage["Path"]
  112. if err != nil || !exist {
  113. return errors.New("file not found")
  114. }
  115. content, err := os.ReadFile(path)
  116. if err != nil {
  117. return err
  118. }
  119. hash := sha256.New()
  120. hash.Write(content)
  121. sha256Byte := hash.Sum(nil)
  122. sha256String := fmt.Sprintf("%x", sha256Byte)
  123. if !s.config.Debug && sha256String != strings.ToLower(jwtParse.ID) {
  124. _ = os.Remove(path)
  125. _ = os.Remove(path + ".info")
  126. return errors.New("file check error")
  127. }
  128. return nil
  129. },
  130. })
  131. if err != nil {
  132. return err
  133. }
  134. go func() {
  135. for {
  136. event := <-handler.CompleteUploads
  137. authStr := event.HTTPRequest.Header.Get(s.headerTokenKey)
  138. jwtParse, _ := s.token.JwtParse(authStr)
  139. if s.completedEvent != nil {
  140. go func() {
  141. s.completedEvent(jwtParse.ID, jwtParse.Subject, event.Upload)
  142. }()
  143. }
  144. }
  145. }()
  146. //监听服务
  147. addr := s.config.ListenAddr
  148. mux := http.NewServeMux()
  149. mux.Handle(s.config.Path, http.StripPrefix(s.config.Path, handler))
  150. s.httpServer = &http.Server{
  151. Addr: addr,
  152. Handler: cors.AllowAll().Handler(mux),
  153. }
  154. go func() {
  155. if err = s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  156. s.logger.Sugar().Fatal("upload server startup err", zap.Error(err))
  157. }
  158. }()
  159. fmt.Println(color.Green(fmt.Sprintf("* [register tusd listen %s]", addr)))
  160. return nil
  161. }
  162. func (s *server) Stop() error {
  163. return s.httpServer.Close()
  164. }