pool.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package ants
  2. import (
  3. "fmt"
  4. "git.bvbej.com/bvbej/base-golang/pkg/ticker"
  5. "github.com/panjf2000/ants/v2"
  6. "go.uber.org/zap"
  7. "runtime/debug"
  8. "time"
  9. )
  10. var _ GoroutinePool = (*goroutinePool)(nil)
  11. type GoroutinePool interface {
  12. run()
  13. Submit(task func())
  14. Stop()
  15. Size() int
  16. Running() int
  17. Free() int
  18. }
  19. type goroutinePool struct {
  20. pool *ants.Pool
  21. logger *zap.Logger
  22. ticker ticker.Ticker
  23. step int
  24. }
  25. type poolLogger struct {
  26. zap *zap.Logger
  27. }
  28. func (l *poolLogger) Printf(format string, args ...any) {
  29. l.zap.Sugar().Infof(format, args)
  30. }
  31. func NewPool(zapLogger *zap.Logger, step int) (GoroutinePool, error) {
  32. ttl := time.Minute * 5
  33. options := ants.Options{
  34. Nonblocking: true,
  35. ExpiryDuration: ttl,
  36. PanicHandler: func(err any) {
  37. zapLogger.Sugar().Error(
  38. "GoroutinePool panic",
  39. zap.String("error", fmt.Sprintf("%+v", err)),
  40. zap.String("stack", string(debug.Stack())),
  41. )
  42. },
  43. Logger: &poolLogger{zap: zapLogger},
  44. }
  45. antsPool, err := ants.NewPool(step, ants.WithOptions(options))
  46. if err != nil {
  47. return nil, err
  48. }
  49. pool := &goroutinePool{
  50. pool: antsPool,
  51. logger: zapLogger,
  52. ticker: ticker.New(ttl),
  53. step: step,
  54. }
  55. pool.run()
  56. return pool, nil
  57. }
  58. func (p *goroutinePool) run() {
  59. p.ticker.Process(func() {
  60. if p.Free() > p.step {
  61. mul := p.Free() / p.step
  62. p.pool.Tune(p.Size() - p.step*mul)
  63. }
  64. p.logger.Sugar().Infof("goroutine pool info: Size[%d] Running[%d] Free[%d]",
  65. p.Size(), p.Running(), p.Free())
  66. })
  67. }
  68. func (p *goroutinePool) Submit(task func()) {
  69. if p.pool.IsClosed() {
  70. return
  71. }
  72. err := p.pool.Submit(task)
  73. if err == ants.ErrPoolOverload {
  74. p.pool.Tune(p.Size() + p.step)
  75. p.Submit(task)
  76. }
  77. }
  78. func (p *goroutinePool) Size() int {
  79. return p.pool.Cap()
  80. }
  81. func (p *goroutinePool) Running() int {
  82. return p.pool.Running()
  83. }
  84. func (p *goroutinePool) Free() int {
  85. return p.pool.Free()
  86. }
  87. func (p *goroutinePool) Stop() {
  88. p.ticker.Stop()
  89. p.pool.Release()
  90. }