pool.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package ants
  2. import (
  3. "fmt"
  4. "git.bvbej.com/bvbej/base-golang/pkg/ticker"
  5. "github.com/panjf2000/ants/v2"
  6. "go.uber.org/zap"
  7. "runtime/debug"
  8. "time"
  9. )
  10. var _ GoroutinePool = (*goroutinePool)(nil)
  11. type GoroutinePool interface {
  12. run()
  13. Submit(task func())
  14. Stop()
  15. Size() int
  16. Running() int
  17. Free() int
  18. }
  19. type goroutinePool struct {
  20. pool *ants.Pool
  21. logger *zap.Logger
  22. ticker ticker.Ticker
  23. step int
  24. }
  25. type poolLogger struct {
  26. zap *zap.Logger
  27. }
  28. func (l *poolLogger) Printf(format string, args ...any) {
  29. l.zap.Sugar().Infof(format, args)
  30. }
  31. func NewPool(zapLogger *zap.Logger, step int) (GoroutinePool, error) {
  32. ttl := time.Minute * 5
  33. options := ants.Options{
  34. Nonblocking: true,
  35. ExpiryDuration: ttl,
  36. PanicHandler: func(err any) {
  37. zapLogger.Sugar().Error(
  38. "GoroutinePool panic",
  39. zap.String("error", fmt.Sprintf("%+v", err)),
  40. zap.String("stack", string(debug.Stack())),
  41. )
  42. },
  43. Logger: &poolLogger{zap: zapLogger},
  44. }
  45. antsPool, err := ants.NewPool(step, ants.WithOptions(options))
  46. if err != nil {
  47. return nil, err
  48. }
  49. pool := &goroutinePool{
  50. pool: antsPool,
  51. logger: zapLogger,
  52. ticker: ticker.New(ttl),
  53. step: step,
  54. }
  55. pool.run()
  56. return pool, nil
  57. }
  58. func (p *goroutinePool) run() {
  59. p.ticker.Process(func() {
  60. if p.Free() > p.step {
  61. mul := p.Free() / p.step
  62. p.pool.Tune(p.Size() - p.step*mul)
  63. }
  64. })
  65. }
  66. func (p *goroutinePool) Submit(task func()) {
  67. if p.pool.IsClosed() {
  68. return
  69. }
  70. err := p.pool.Submit(task)
  71. if err == ants.ErrPoolOverload {
  72. p.pool.Tune(p.Size() + p.step)
  73. p.Submit(task)
  74. }
  75. }
  76. func (p *goroutinePool) Size() int {
  77. return p.pool.Cap()
  78. }
  79. func (p *goroutinePool) Running() int {
  80. return p.pool.Running()
  81. }
  82. func (p *goroutinePool) Free() int {
  83. return p.pool.Free()
  84. }
  85. func (p *goroutinePool) Stop() {
  86. p.ticker.Stop()
  87. p.pool.Release()
  88. }