tus.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package upload
  2. import (
  3. "context"
  4. "crypto/sha256"
  5. "errors"
  6. "fmt"
  7. "git.bvbej.com/bvbej/base-golang/pkg/color"
  8. "git.bvbej.com/bvbej/base-golang/pkg/token"
  9. "github.com/rs/cors"
  10. "github.com/tus/tusd/pkg/filestore"
  11. tus "github.com/tus/tusd/pkg/handler"
  12. "go.uber.org/zap"
  13. "net/http"
  14. "os"
  15. "strings"
  16. "sync"
  17. "time"
  18. )
  19. var _ Server = (*server)(nil)
  20. type Server interface {
  21. GetUploadToken(string, string, time.Duration) string
  22. GetFileInfo(id string) (*tus.FileInfo, error)
  23. Start(func(string, string, tus.FileInfo)) error
  24. Stop() error
  25. }
  26. type server struct {
  27. headerTokenKey string
  28. uploading sync.Map
  29. config Config
  30. token token.Token
  31. store filestore.FileStore
  32. logger *zap.Logger
  33. httpServer *http.Server
  34. completedEvent func(user, param string, info tus.FileInfo)
  35. }
  36. type Config struct {
  37. ListenAddr string
  38. Path string
  39. Dir string
  40. Secret string
  41. DisableDownload bool
  42. Debug bool
  43. }
  44. func New(conf Config, logger *zap.Logger) Server {
  45. return &server{
  46. config: conf,
  47. uploading: sync.Map{},
  48. headerTokenKey: "Authorization",
  49. logger: logger,
  50. token: token.New(conf.Secret),
  51. }
  52. }
  53. func (s *server) GetUploadToken(user, param string, ttl time.Duration) string {
  54. sign, _ := s.token.JwtSign(user, param, ttl)
  55. return sign
  56. }
  57. func (s *server) GetFileInfo(id string) (*tus.FileInfo, error) {
  58. upload, err := s.store.GetUpload(context.Background(), id)
  59. if err != nil {
  60. return nil, err
  61. }
  62. info, err := upload.GetInfo(context.Background())
  63. if err != nil {
  64. return nil, err
  65. }
  66. return &info, nil
  67. }
  68. func (s *server) Start(completedEvent func(user, param string, info tus.FileInfo)) error {
  69. s.completedEvent = completedEvent
  70. composer := tus.NewStoreComposer()
  71. if err := os.MkdirAll(s.config.Dir, os.ModePerm); err != nil {
  72. return err
  73. }
  74. s.store = filestore.New(s.config.Dir)
  75. s.store.UseIn(composer)
  76. handler, err := tus.NewHandler(tus.Config{
  77. StoreComposer: composer,
  78. BasePath: s.config.Path,
  79. Logger: zap.NewStdLog(s.logger),
  80. NotifyCompleteUploads: true,
  81. DisableTermination: true,
  82. DisableDownload: s.config.DisableDownload,
  83. RespectForwardedHeaders: strings.Contains(s.config.ListenAddr, "127.0.0.1"),
  84. PreUploadCreateCallback: func(hook tus.HookEvent) error {
  85. authStr := hook.HTTPRequest.Header.Get(s.headerTokenKey)
  86. jwtParse, err := s.token.JwtParse(authStr)
  87. if err == nil {
  88. _, ok := s.uploading.Load(jwtParse.ID)
  89. if !ok {
  90. s.uploading.Store(jwtParse.ID, time.Now())
  91. return nil
  92. }
  93. }
  94. return errors.New("unauthorized")
  95. },
  96. PreFinishResponseCallback: func(hook tus.HookEvent) error {
  97. authStr := hook.HTTPRequest.Header.Get(s.headerTokenKey)
  98. jwtParse, err := s.token.JwtParse(authStr)
  99. if err != nil {
  100. return errors.New("token expired")
  101. }
  102. _, ok := s.uploading.Load(jwtParse.ID)
  103. if ok {
  104. s.uploading.Delete(jwtParse.ID)
  105. }
  106. upload, err := s.store.GetUpload(context.Background(), hook.Upload.ID)
  107. if err != nil {
  108. return err
  109. }
  110. reader, err := upload.GetReader(context.Background())
  111. if err != nil {
  112. return err
  113. }
  114. var fileBytes []byte
  115. _, err = reader.Read(fileBytes)
  116. if err != nil {
  117. return err
  118. }
  119. hash := sha256.New()
  120. hash.Write(fileBytes)
  121. sha256Byte := hash.Sum(nil)
  122. sha256String := fmt.Sprintf("%x", sha256Byte)
  123. if !s.config.Debug && sha256String != strings.ToLower(jwtParse.ID) {
  124. info, _ := upload.GetInfo(context.Background())
  125. path, exist := info.Storage["Path"]
  126. if exist {
  127. _ = os.Remove(path)
  128. _ = os.Remove(path + ".info")
  129. }
  130. return errors.New("file check error")
  131. }
  132. return nil
  133. },
  134. })
  135. if err != nil {
  136. return err
  137. }
  138. go func() {
  139. for {
  140. event := <-handler.CompleteUploads
  141. authStr := event.HTTPRequest.Header.Get(s.headerTokenKey)
  142. jwtParse, _ := s.token.JwtParse(authStr)
  143. if s.completedEvent != nil {
  144. go func() {
  145. s.completedEvent(jwtParse.ID, jwtParse.Subject, event.Upload)
  146. }()
  147. }
  148. }
  149. }()
  150. //监听服务
  151. addr := s.config.ListenAddr
  152. mux := http.NewServeMux()
  153. mux.Handle(s.config.Path, http.StripPrefix(s.config.Path, handler))
  154. s.httpServer = &http.Server{
  155. Addr: addr,
  156. Handler: cors.AllowAll().Handler(mux),
  157. }
  158. go func() {
  159. if err = s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  160. s.logger.Sugar().Fatal("upload server startup err", zap.Error(err))
  161. }
  162. }()
  163. fmt.Println(color.Green(fmt.Sprintf("* [register tusd listen %s]", addr)))
  164. return nil
  165. }
  166. func (s *server) Stop() error {
  167. return s.httpServer.Close()
  168. }