tus.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. package upload
  2. import (
  3. "context"
  4. "crypto/sha256"
  5. "errors"
  6. "fmt"
  7. "git.bvbej.com/bvbej/base-golang/pkg/color"
  8. "git.bvbej.com/bvbej/base-golang/pkg/token"
  9. "github.com/rs/cors"
  10. "github.com/tus/tusd/pkg/filestore"
  11. tus "github.com/tus/tusd/pkg/handler"
  12. "go.uber.org/zap"
  13. "net/http"
  14. "os"
  15. "strings"
  16. "sync"
  17. "time"
  18. )
  19. var _ Server = (*server)(nil)
  20. type Server interface {
  21. GetUploadToken(string, string, time.Duration) string
  22. GetFileInfo(string) (*tus.FileInfo, error)
  23. Start(func(string, string, tus.FileInfo)) error
  24. Stop() error
  25. }
  26. type server struct {
  27. headerTokenKey string
  28. uploading sync.Map
  29. config Config
  30. token token.Token
  31. store filestore.FileStore
  32. logger *zap.Logger
  33. httpServer *http.Server
  34. completedEvent func(sha256, param string, info tus.FileInfo)
  35. }
  36. type Config struct {
  37. ListenAddr string
  38. Path string
  39. Dir string
  40. Secret string
  41. DisableDownload bool
  42. Debug bool
  43. }
  44. func New(conf Config, logger *zap.Logger) Server {
  45. return &server{
  46. config: conf,
  47. uploading: sync.Map{},
  48. headerTokenKey: "Authorization",
  49. logger: logger,
  50. token: token.New(conf.Secret),
  51. }
  52. }
  53. func (s *server) GetUploadToken(sha256, param string, ttl time.Duration) string {
  54. sign, _ := s.token.JwtSign(sha256, param, ttl)
  55. return sign
  56. }
  57. func (s *server) GetFileInfo(id string) (*tus.FileInfo, error) {
  58. upload, err := s.store.GetUpload(context.Background(), id)
  59. if err != nil {
  60. return nil, err
  61. }
  62. info, err := upload.GetInfo(context.Background())
  63. if err != nil {
  64. return nil, err
  65. }
  66. return &info, nil
  67. }
  68. func (s *server) Start(completedEvent func(sha256, param string, info tus.FileInfo)) error {
  69. s.completedEvent = completedEvent
  70. composer := tus.NewStoreComposer()
  71. if err := os.MkdirAll(s.config.Dir, os.ModePerm); err != nil {
  72. return err
  73. }
  74. s.store = filestore.New(s.config.Dir)
  75. s.store.UseIn(composer)
  76. handler, err := tus.NewHandler(tus.Config{
  77. StoreComposer: composer,
  78. BasePath: s.config.Path,
  79. Logger: zap.NewStdLog(s.logger),
  80. DisableCors: true,
  81. NotifyCompleteUploads: true,
  82. NotifyTerminatedUploads: true,
  83. DisableTermination: true,
  84. DisableDownload: s.config.DisableDownload,
  85. RespectForwardedHeaders: strings.Contains(s.config.ListenAddr, "127.0.0.1"),
  86. PreUploadCreateCallback: func(hook tus.HookEvent) error {
  87. authStr := hook.HTTPRequest.Header.Get(s.headerTokenKey)
  88. jwtParse, err := s.token.JwtParse(authStr)
  89. if err == nil {
  90. _, ok := s.uploading.Load(jwtParse.ID)
  91. if !ok {
  92. s.uploading.Store(jwtParse.ID, time.Now())
  93. return nil
  94. }
  95. }
  96. return errors.New("unauthorized")
  97. },
  98. PreFinishResponseCallback: func(hook tus.HookEvent) error {
  99. fmt.Println(hook)
  100. authStr := hook.HTTPRequest.Header.Get(s.headerTokenKey)
  101. jwtParse, err := s.token.JwtParse(authStr)
  102. if err != nil {
  103. return errors.New("token expired")
  104. }
  105. _, ok := s.uploading.Load(jwtParse.ID)
  106. if ok {
  107. s.uploading.Delete(jwtParse.ID)
  108. }
  109. upload, err := s.store.GetUpload(context.Background(), hook.Upload.ID)
  110. if err != nil {
  111. return err
  112. }
  113. info, err := upload.GetInfo(context.Background())
  114. path, exist := info.Storage["Path"]
  115. if err != nil || !exist {
  116. return errors.New("file not found")
  117. }
  118. content, err := os.ReadFile(path)
  119. if err != nil {
  120. return err
  121. }
  122. hash := sha256.New()
  123. hash.Write(content)
  124. sha256Byte := hash.Sum(nil)
  125. sha256String := fmt.Sprintf("%x", sha256Byte)
  126. if !s.config.Debug && sha256String != strings.ToLower(jwtParse.ID) {
  127. _ = os.Remove(path)
  128. _ = os.Remove(path + ".info")
  129. return errors.New("file check error")
  130. }
  131. return nil
  132. },
  133. })
  134. if err != nil {
  135. return err
  136. }
  137. go func() {
  138. for {
  139. event := <-handler.CompleteUploads
  140. authStr := event.HTTPRequest.Header.Get(s.headerTokenKey)
  141. jwtParse, _ := s.token.JwtParse(authStr)
  142. if s.completedEvent != nil {
  143. go func() {
  144. s.completedEvent(jwtParse.ID, jwtParse.Subject, event.Upload)
  145. }()
  146. }
  147. }
  148. }()
  149. //监听服务
  150. addr := s.config.ListenAddr
  151. mux := http.NewServeMux()
  152. mux.Handle(s.config.Path, http.StripPrefix(s.config.Path, handler))
  153. s.httpServer = &http.Server{
  154. Addr: addr,
  155. Handler: cors.AllowAll().Handler(mux),
  156. }
  157. go func() {
  158. if err = s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  159. s.logger.Sugar().Fatal("upload server startup err", zap.Error(err))
  160. }
  161. }()
  162. fmt.Println(color.Green(fmt.Sprintf("* [register tusd listen %s]", addr)))
  163. return nil
  164. }
  165. func (s *server) Stop() error {
  166. return s.httpServer.Close()
  167. }