package upload import ( baseSha256 "crypto/sha256" "errors" "fmt" "git.bvbej.com/bvbej/base-golang/pkg/color" "git.bvbej.com/bvbej/base-golang/pkg/token" "github.com/rs/cors" "github.com/tus/tusd/pkg/filestore" tus "github.com/tus/tusd/pkg/handler" "go.uber.org/zap" "net/http" "os" "strings" "sync" "time" ) var _ Server = (*server)(nil) type Server interface { GetUploadToken(string, time.Duration) (string, error) Start(chan string) error Stop() error } type server struct { headerTokenKey string uploading sync.Map config Config token token.Token logger *zap.Logger httpServer *http.Server completedNotify chan string } type Config struct { ListenAddr string Path string Secret string } func New(conf Config, logger *zap.Logger) Server { return &server{ config: conf, uploading: sync.Map{}, headerTokenKey: "Authorization", logger: logger, token: token.New(conf.Secret), } } func (s *server) GetUploadToken(sha256 string, ttl time.Duration) (string, error) { sign, err := s.token.JwtSign(sha256, "upload", ttl) if err != nil { return "", err } return sign, nil } func (s *server) Start(completedNotify chan string) error { s.completedNotify = completedNotify store := filestore.FileStore{ Path: s.config.Path, } composer := tus.NewStoreComposer() store.UseIn(composer) handler, err := tus.NewHandler(tus.Config{ StoreComposer: composer, BasePath: fmt.Sprintf("/%s/", s.config.Path), Logger: zap.NewStdLog(s.logger), NotifyCompleteUploads: true, DisableTermination: true, DisableDownload: true, RespectForwardedHeaders: strings.Contains(s.config.ListenAddr, "127.0.0.1"), PreUploadCreateCallback: func(hook tus.HookEvent) error { authStr := hook.HTTPRequest.Header.Get(s.headerTokenKey) jwtParse, err := s.token.JwtParse(authStr) if err == nil { _, ok := s.uploading.Load(jwtParse.ID) if !ok { s.uploading.Store(jwtParse.ID, time.Now()) hook.Upload.ID = jwtParse.ID return nil } } return errors.New("unauthorized") }, PreFinishResponseCallback: func(hook tus.HookEvent) error { authStr := hook.HTTPRequest.Header.Get(s.headerTokenKey) jwtParse, err := s.token.JwtParse(authStr) if err != nil { return errors.New("unauthorized") } filepath := fmt.Sprintf("%s/%s", s.config.Path, jwtParse.ID) file, err := os.ReadFile(filepath) if err != nil { return err } sha256 := baseSha256.New() sha256.Write(file) sha256Byte := sha256.Sum(nil) sha256String := fmt.Sprintf("%x", sha256Byte) if sha256String != jwtParse.ID { _ = os.Remove(filepath) _ = os.Remove(filepath + ".info") return errors.New("file check error") } return nil }, }) if err != nil { return err } go func() { for { event := <-handler.CompleteUploads authStr := event.HTTPRequest.Header.Get(s.headerTokenKey) jwtParse, _ := s.token.JwtParse(authStr) _, ok := s.uploading.Load(jwtParse.ID) if ok { s.uploading.Delete(jwtParse.ID) go func() { s.completedNotify <- jwtParse.ID }() } } }() //监听服务 addr := s.config.ListenAddr mux := http.NewServeMux() mux.Handle(fmt.Sprintf("/%s/", s.config.Path), http.StripPrefix(fmt.Sprintf("/%s/", s.config.Path), handler)) s.httpServer = &http.Server{ Addr: addr, Handler: cors.AllowAll().Handler(mux), } go func() { if err = s.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { s.logger.Sugar().Fatal("upload server startup err", zap.Error(err)) } }() fmt.Println(color.Green(fmt.Sprintf("* [register upload listen %s]", addr))) return nil } func (s *server) Stop() error { return s.httpServer.Close() }