pool.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. package ants
  2. import (
  3. "errors"
  4. "fmt"
  5. "git.bvbej.com/bvbej/base-golang/pkg/ticker"
  6. "github.com/panjf2000/ants/v2"
  7. "go.uber.org/zap"
  8. "runtime/debug"
  9. "time"
  10. )
  11. var _ GoroutinePool = (*goroutinePool)(nil)
  12. type GoroutinePool interface {
  13. run()
  14. Submit(task func())
  15. Stop()
  16. Size() int
  17. Running() int
  18. Free() int
  19. }
  20. type goroutinePool struct {
  21. pool *ants.Pool
  22. logger *zap.Logger
  23. ticker ticker.Ticker
  24. step int
  25. }
  26. type poolLogger struct {
  27. zap *zap.Logger
  28. }
  29. func (l *poolLogger) Printf(format string, args ...any) {
  30. l.zap.Sugar().Infof(format, args)
  31. }
  32. func NewPool(zapLogger *zap.Logger, step int) (GoroutinePool, error) {
  33. ttl := time.Minute * 5
  34. options := ants.Options{
  35. Nonblocking: true,
  36. ExpiryDuration: ttl,
  37. PanicHandler: func(err any) {
  38. zapLogger.Sugar().Error(
  39. "GoroutinePool panic",
  40. zap.String("error", fmt.Sprintf("%+v", err)),
  41. zap.String("stack", string(debug.Stack())),
  42. )
  43. },
  44. Logger: &poolLogger{zap: zapLogger},
  45. }
  46. antsPool, err := ants.NewPool(step, ants.WithOptions(options))
  47. if err != nil {
  48. return nil, err
  49. }
  50. pool := &goroutinePool{
  51. pool: antsPool,
  52. logger: zapLogger,
  53. ticker: ticker.New(ttl),
  54. step: step,
  55. }
  56. pool.run()
  57. return pool, nil
  58. }
  59. func (p *goroutinePool) run() {
  60. p.ticker.Process(func() {
  61. if p.Free() > p.step {
  62. mul := p.Free() / p.step
  63. p.pool.Tune(p.Size() - p.step*mul)
  64. }
  65. })
  66. }
  67. func (p *goroutinePool) Submit(task func()) {
  68. if p.pool.IsClosed() {
  69. return
  70. }
  71. err := p.pool.Submit(task)
  72. if errors.Is(err, ants.ErrPoolOverload) {
  73. p.pool.Tune(p.Size() + p.step)
  74. p.Submit(task)
  75. }
  76. }
  77. func (p *goroutinePool) Size() int {
  78. return p.pool.Cap()
  79. }
  80. func (p *goroutinePool) Running() int {
  81. return p.pool.Running()
  82. }
  83. func (p *goroutinePool) Free() int {
  84. return p.pool.Free()
  85. }
  86. func (p *goroutinePool) Stop() {
  87. p.ticker.Stop()
  88. p.pool.Release()
  89. }