tus.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. package upload
  2. import (
  3. "context"
  4. "crypto/sha256"
  5. "errors"
  6. "fmt"
  7. "git.bvbej.com/bvbej/base-golang/pkg/color"
  8. "git.bvbej.com/bvbej/base-golang/pkg/token"
  9. "github.com/rs/cors"
  10. "github.com/tus/tusd/pkg/filestore"
  11. tus "github.com/tus/tusd/pkg/handler"
  12. "go.uber.org/zap"
  13. "io"
  14. "net/http"
  15. "os"
  16. "strings"
  17. "sync"
  18. "time"
  19. )
  20. var _ Server = (*server)(nil)
  21. type Server interface {
  22. GetUploadToken(string, string, time.Duration) string
  23. GetFileInfo(id string) (*tus.FileInfo, error)
  24. Start(func(string, string, tus.FileInfo)) error
  25. Stop() error
  26. }
  27. type server struct {
  28. headerTokenKey string
  29. uploading sync.Map
  30. config Config
  31. token token.Token
  32. store filestore.FileStore
  33. logger *zap.Logger
  34. httpServer *http.Server
  35. completedEvent func(user, param string, info tus.FileInfo)
  36. }
  37. type Config struct {
  38. ListenAddr string
  39. Path string
  40. Dir string
  41. Secret string
  42. DisableDownload bool
  43. Debug bool
  44. }
  45. func New(conf Config, logger *zap.Logger) Server {
  46. return &server{
  47. config: conf,
  48. uploading: sync.Map{},
  49. headerTokenKey: "Authorization",
  50. logger: logger,
  51. token: token.New(conf.Secret),
  52. }
  53. }
  54. func (s *server) GetUploadToken(user, param string, ttl time.Duration) string {
  55. sign, _ := s.token.JwtSign(user, param, ttl)
  56. return sign
  57. }
  58. func (s *server) GetFileInfo(id string) (*tus.FileInfo, error) {
  59. upload, err := s.store.GetUpload(context.Background(), id)
  60. if err != nil {
  61. return nil, err
  62. }
  63. info, err := upload.GetInfo(context.Background())
  64. if err != nil {
  65. return nil, err
  66. }
  67. return &info, nil
  68. }
  69. func (s *server) Start(completedEvent func(user, param string, info tus.FileInfo)) error {
  70. s.completedEvent = completedEvent
  71. composer := tus.NewStoreComposer()
  72. if err := os.MkdirAll(s.config.Dir, os.ModePerm); err != nil {
  73. return err
  74. }
  75. s.store = filestore.New(s.config.Dir)
  76. s.store.UseIn(composer)
  77. handler, err := tus.NewHandler(tus.Config{
  78. StoreComposer: composer,
  79. BasePath: s.config.Path,
  80. Logger: zap.NewStdLog(s.logger),
  81. NotifyCompleteUploads: true,
  82. DisableTermination: true,
  83. DisableDownload: s.config.DisableDownload,
  84. RespectForwardedHeaders: strings.Contains(s.config.ListenAddr, "127.0.0.1"),
  85. PreUploadCreateCallback: func(hook tus.HookEvent) error {
  86. authStr := hook.HTTPRequest.Header.Get(s.headerTokenKey)
  87. jwtParse, err := s.token.JwtParse(authStr)
  88. if err == nil {
  89. _, ok := s.uploading.Load(jwtParse.ID)
  90. if !ok {
  91. s.uploading.Store(jwtParse.ID, time.Now())
  92. return nil
  93. }
  94. }
  95. return errors.New("unauthorized")
  96. },
  97. PreFinishResponseCallback: func(hook tus.HookEvent) error {
  98. authStr := hook.HTTPRequest.Header.Get(s.headerTokenKey)
  99. jwtParse, err := s.token.JwtParse(authStr)
  100. if err != nil {
  101. return errors.New("token expired")
  102. }
  103. _, ok := s.uploading.Load(jwtParse.ID)
  104. if ok {
  105. s.uploading.Delete(jwtParse.ID)
  106. }
  107. upload, err := s.store.GetUpload(context.Background(), hook.Upload.ID)
  108. if err != nil {
  109. return err
  110. }
  111. reader, err := upload.GetReader(context.Background())
  112. if err != nil {
  113. return err
  114. }
  115. all, err := io.ReadAll(reader)
  116. if err != nil {
  117. return err
  118. }
  119. _ = reader.(io.Closer).Close()
  120. hash := sha256.New()
  121. hash.Write(all)
  122. sha256Byte := hash.Sum(nil)
  123. sha256String := fmt.Sprintf("%x", sha256Byte)
  124. if !s.config.Debug && sha256String != strings.ToLower(jwtParse.ID) {
  125. info, _ := upload.GetInfo(context.Background())
  126. path, exist := info.Storage["Path"]
  127. if exist {
  128. _ = os.Remove(path)
  129. _ = os.Remove(path + ".info")
  130. }
  131. return errors.New("file check error")
  132. }
  133. return nil
  134. },
  135. })
  136. if err != nil {
  137. return err
  138. }
  139. go func() {
  140. for {
  141. event := <-handler.CompleteUploads
  142. authStr := event.HTTPRequest.Header.Get(s.headerTokenKey)
  143. jwtParse, _ := s.token.JwtParse(authStr)
  144. if s.completedEvent != nil {
  145. go func() {
  146. s.completedEvent(jwtParse.ID, jwtParse.Subject, event.Upload)
  147. }()
  148. }
  149. }
  150. }()
  151. //监听服务
  152. addr := s.config.ListenAddr
  153. mux := http.NewServeMux()
  154. mux.Handle(s.config.Path, http.StripPrefix(s.config.Path, handler))
  155. s.httpServer = &http.Server{
  156. Addr: addr,
  157. Handler: cors.AllowAll().Handler(mux),
  158. }
  159. go func() {
  160. if err = s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  161. s.logger.Sugar().Fatal("upload server startup err", zap.Error(err))
  162. }
  163. }()
  164. fmt.Println(color.Green(fmt.Sprintf("* [register tusd listen %s]", addr)))
  165. return nil
  166. }
  167. func (s *server) Stop() error {
  168. return s.httpServer.Close()
  169. }