tus.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package upload
  2. import (
  3. baseSha256 "crypto/sha256"
  4. "errors"
  5. "fmt"
  6. "git.bvbej.com/bvbej/base-golang/pkg/color"
  7. "git.bvbej.com/bvbej/base-golang/pkg/token"
  8. "github.com/rs/cors"
  9. "github.com/tus/tusd/pkg/filestore"
  10. tus "github.com/tus/tusd/pkg/handler"
  11. "go.uber.org/zap"
  12. "net/http"
  13. "os"
  14. "strings"
  15. "sync"
  16. "time"
  17. )
  18. var _ Server = (*server)(nil)
  19. type Server interface {
  20. GetUploadToken(string, time.Duration) (string, error)
  21. Start(chan string) error
  22. Stop() error
  23. }
  24. type server struct {
  25. headerTokenKey string
  26. uploading sync.Map
  27. config Config
  28. token token.Token
  29. logger *zap.Logger
  30. httpServer *http.Server
  31. completedNotify chan string
  32. }
  33. type Config struct {
  34. ListenAddr string
  35. Path string
  36. Secret string
  37. }
  38. func New(conf Config, logger *zap.Logger) Server {
  39. return &server{
  40. config: conf,
  41. uploading: sync.Map{},
  42. headerTokenKey: "Authorization",
  43. logger: logger,
  44. token: token.New(conf.Secret),
  45. }
  46. }
  47. func (s *server) GetUploadToken(sha256 string, ttl time.Duration) (string, error) {
  48. sign, err := s.token.JwtSign(sha256, "upload", ttl)
  49. if err != nil {
  50. return "", err
  51. }
  52. return sign, nil
  53. }
  54. func (s *server) Start(completedNotify chan string) error {
  55. s.completedNotify = completedNotify
  56. store := filestore.FileStore{
  57. Path: s.config.Path,
  58. }
  59. composer := tus.NewStoreComposer()
  60. store.UseIn(composer)
  61. handler, err := tus.NewHandler(tus.Config{
  62. StoreComposer: composer,
  63. BasePath: fmt.Sprintf("/%s/", s.config.Path),
  64. Logger: zap.NewStdLog(s.logger),
  65. NotifyCompleteUploads: true,
  66. DisableTermination: true,
  67. DisableDownload: true,
  68. RespectForwardedHeaders: strings.Contains(s.config.ListenAddr, "127.0.0.1"),
  69. PreUploadCreateCallback: func(hook tus.HookEvent) error {
  70. authStr := hook.HTTPRequest.Header.Get(s.headerTokenKey)
  71. jwtParse, err := s.token.JwtParse(authStr)
  72. if err == nil {
  73. _, ok := s.uploading.Load(jwtParse.ID)
  74. if !ok {
  75. s.uploading.Store(jwtParse.ID, time.Now())
  76. hook.Upload.ID = jwtParse.ID
  77. return nil
  78. }
  79. }
  80. return errors.New("unauthorized")
  81. },
  82. PreFinishResponseCallback: func(hook tus.HookEvent) error {
  83. authStr := hook.HTTPRequest.Header.Get(s.headerTokenKey)
  84. jwtParse, err := s.token.JwtParse(authStr)
  85. if err != nil {
  86. return errors.New("unauthorized")
  87. }
  88. filepath := fmt.Sprintf("%s/%s", s.config.Path, jwtParse.ID)
  89. file, err := os.ReadFile(filepath)
  90. if err != nil {
  91. return err
  92. }
  93. sha256 := baseSha256.New()
  94. sha256.Write(file)
  95. sha256Byte := sha256.Sum(nil)
  96. sha256String := fmt.Sprintf("%x", sha256Byte)
  97. if sha256String != jwtParse.ID {
  98. _ = os.Remove(filepath)
  99. _ = os.Remove(filepath + ".info")
  100. return errors.New("file check error")
  101. }
  102. return nil
  103. },
  104. })
  105. if err != nil {
  106. return err
  107. }
  108. go func() {
  109. for {
  110. event := <-handler.CompleteUploads
  111. authStr := event.HTTPRequest.Header.Get(s.headerTokenKey)
  112. jwtParse, _ := s.token.JwtParse(authStr)
  113. _, ok := s.uploading.Load(jwtParse.ID)
  114. if ok {
  115. s.uploading.Delete(jwtParse.ID)
  116. go func() {
  117. s.completedNotify <- jwtParse.ID
  118. }()
  119. }
  120. }
  121. }()
  122. //监听服务
  123. addr := s.config.ListenAddr
  124. mux := http.NewServeMux()
  125. mux.Handle(fmt.Sprintf("/%s/", s.config.Path), http.StripPrefix(fmt.Sprintf("/%s/", s.config.Path), handler))
  126. s.httpServer = &http.Server{
  127. Addr: addr,
  128. Handler: cors.AllowAll().Handler(mux),
  129. }
  130. go func() {
  131. if err = s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
  132. s.logger.Sugar().Fatal("upload server startup err", zap.Error(err))
  133. }
  134. }()
  135. fmt.Println(color.Green(fmt.Sprintf("* [register upload listen %s]", addr)))
  136. return nil
  137. }
  138. func (s *server) Stop() error {
  139. return s.httpServer.Close()
  140. }