123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 |
- package upload
- import (
- "context"
- "crypto/sha256"
- "errors"
- "fmt"
- "git.bvbej.com/bvbej/base-golang/pkg/color"
- "git.bvbej.com/bvbej/base-golang/pkg/ticker"
- "git.bvbej.com/bvbej/base-golang/pkg/token"
- "github.com/rs/cors"
- "github.com/tus/tusd/pkg/filestore"
- tus "github.com/tus/tusd/pkg/handler"
- "go.uber.org/zap"
- "net/http"
- "os"
- "strings"
- "sync"
- "time"
- )
- var _ Server = (*server)(nil)
- type Server interface {
- GetUploadToken(string, string, time.Duration) string
- GetFileInfo(string) (*tus.FileInfo, error)
- Start(func(string, string, tus.FileInfo)) error
- Stop() error
- }
- type server struct {
- headerTokenKey string
- uploading sync.Map
- config Config
- token token.Token
- store filestore.FileStore
- logger *zap.Logger
- httpServer *http.Server
- ctx context.Context
- done context.CancelFunc
- checker ticker.Ticker
- completedEvent func(sha256, param string, info tus.FileInfo)
- }
- type Config struct {
- ListenAddr string
- Path string
- Dir string
- Secret string
- DisableDownload bool
- Debug bool
- }
- func New(conf Config, logger *zap.Logger) Server {
- ctx, cancelFunc := context.WithCancel(context.Background())
- return &server{
- config: conf,
- uploading: sync.Map{},
- headerTokenKey: "Authorization",
- logger: logger,
- token: token.New(conf.Secret),
- ctx: ctx,
- done: cancelFunc,
- checker: ticker.New(time.Minute),
- }
- }
- func (s *server) GetUploadToken(sha256, param string, ttl time.Duration) string {
- sign, _ := s.token.JwtSign(sha256, param, ttl)
- return sign
- }
- func (s *server) GetFileInfo(id string) (*tus.FileInfo, error) {
- upload, err := s.store.GetUpload(context.Background(), id)
- if err != nil {
- return nil, err
- }
- info, err := upload.GetInfo(context.Background())
- if err != nil {
- return nil, err
- }
- return &info, nil
- }
- func (s *server) Start(completedEvent func(sha256, param string, info tus.FileInfo)) error {
- s.completedEvent = completedEvent
- composer := tus.NewStoreComposer()
- if err := os.MkdirAll(s.config.Dir, os.ModePerm); err != nil {
- return err
- }
- s.store = filestore.New(s.config.Dir)
- s.store.UseIn(composer)
- handler, err := tus.NewHandler(tus.Config{
- StoreComposer: composer,
- BasePath: s.config.Path,
- Logger: zap.NewStdLog(s.logger),
- NotifyCompleteUploads: true,
- NotifyTerminatedUploads: true,
- DisableTermination: true,
- DisableDownload: s.config.DisableDownload,
- RespectForwardedHeaders: strings.Contains(s.config.ListenAddr, "127.0.0.1"),
- PreUploadCreateCallback: func(hook tus.HookEvent) error {
- authStr := hook.HTTPRequest.Header.Get(s.headerTokenKey)
- jwtClaims, err := s.token.JwtParse(authStr)
- if err == nil {
- _, ok := s.uploading.Load(authStr)
- if !ok {
- s.uploading.Store(authStr, jwtClaims.ExpiresAt.Time)
- return nil
- }
- return errors.New("repeated")
- }
- return errors.New("unauthorized")
- },
- PreFinishResponseCallback: func(hook tus.HookEvent) error {
- authStr := hook.HTTPRequest.Header.Get(s.headerTokenKey)
- jwtParse, err := s.token.JwtParse(authStr)
- if err != nil {
- return errors.New("token expired")
- }
- _, ok := s.uploading.Load(authStr)
- if ok {
- s.uploading.Delete(authStr)
- }
- upload, err := s.store.GetUpload(context.Background(), hook.Upload.ID)
- if err != nil {
- return err
- }
- info, err := upload.GetInfo(context.Background())
- path, exist := info.Storage["Path"]
- if err != nil || !exist {
- return errors.New("file not found")
- }
- content, err := os.ReadFile(path)
- if err != nil {
- return err
- }
- hash := sha256.New()
- hash.Write(content)
- sha256Byte := hash.Sum(nil)
- sha256String := fmt.Sprintf("%x", sha256Byte)
- if !s.config.Debug && sha256String != strings.ToLower(jwtParse.ID) {
- _ = os.Remove(path)
- _ = os.Remove(path + ".info")
- return errors.New("file check error")
- }
- return nil
- },
- })
- if err != nil {
- return err
- }
- go func() {
- for {
- select {
- case event := <-handler.CompleteUploads:
- authStr := event.HTTPRequest.Header.Get(s.headerTokenKey)
- jwtParse, _ := s.token.JwtParse(authStr)
- if s.completedEvent != nil {
- go func() {
- s.completedEvent(jwtParse.ID, jwtParse.Subject, event.Upload)
- }()
- }
- case <-s.ctx.Done():
- return
- }
- }
- }()
- go func() {
- for {
- select {
- case event := <-handler.TerminatedUploads:
- upload, _ := s.store.GetUpload(context.Background(), event.Upload.ID)
- if upload != nil {
- info, _ := upload.GetInfo(context.Background())
- path, exist := info.Storage["Path"]
- if exist {
- _ = os.Remove(path)
- _ = os.Remove(path + ".info")
- }
- }
- case <-s.ctx.Done():
- return
- }
- }
- }()
- s.checker.Process(func() {
- s.uploading.Range(func(key, value any) bool {
- t := value.(time.Time)
- if t.Before(time.Now()) {
- s.uploading.Delete(key)
- }
- return true
- })
- })
- //监听服务
- addr := s.config.ListenAddr
- mux := http.NewServeMux()
- mux.Handle(s.config.Path, http.StripPrefix(s.config.Path, handler))
- s.httpServer = &http.Server{
- Addr: addr,
- Handler: cors.AllowAll().Handler(mux),
- }
- go func() {
- if err = s.httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
- s.logger.Sugar().Fatal("upload server startup err", zap.Error(err))
- }
- }()
- fmt.Println(color.Green(fmt.Sprintf("* [register tusd listen %s]", addr)))
- return nil
- }
- func (s *server) Stop() error {
- s.done()
- return s.httpServer.Close()
- }
|