pool.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package ants
  2. import (
  3. "git.bvbej.com/bvbej/base-golang/pkg/ticker"
  4. "github.com/panjf2000/ants/v2"
  5. "go.uber.org/zap"
  6. "time"
  7. )
  8. var _ GoroutinePool = (*goroutinePool)(nil)
  9. type GoroutinePool interface {
  10. run()
  11. Submit(task func())
  12. Stop()
  13. Size() int
  14. Running() int
  15. Waiting() int
  16. Free() int
  17. }
  18. type goroutinePool struct {
  19. pool *ants.Pool
  20. logger *zap.Logger
  21. ticker ticker.Ticker
  22. step int
  23. }
  24. type poolLogger struct {
  25. zap *zap.Logger
  26. }
  27. func (l *poolLogger) Printf(format string, args ...any) {
  28. l.zap.Sugar().Infof(format, args)
  29. }
  30. func NewPool(zap *zap.Logger, step int) (GoroutinePool, error) {
  31. ttl := time.Minute
  32. options := ants.Options{
  33. Nonblocking: true,
  34. ExpiryDuration: ttl,
  35. PanicHandler: func(err any) {
  36. zap.Sugar().Panic(err)
  37. },
  38. Logger: &poolLogger{zap: zap},
  39. }
  40. antsPool, err := ants.NewPool(step, ants.WithOptions(options))
  41. if err != nil {
  42. return nil, err
  43. }
  44. pool := &goroutinePool{
  45. pool: antsPool,
  46. logger: zap,
  47. ticker: ticker.New(ttl),
  48. step: step,
  49. }
  50. pool.run()
  51. return pool, nil
  52. }
  53. func (p *goroutinePool) run() {
  54. p.ticker.Process(func() {
  55. if p.Free() > p.step {
  56. p.pool.Tune(p.step)
  57. }
  58. p.logger.Sugar().Warnf("goroutine pool info: Size[%d] Waiting[%d] Running[%d] Free[%d]",
  59. p.Size(), p.Waiting(), p.Running(), p.Free())
  60. })
  61. }
  62. func (p *goroutinePool) Submit(task func()) {
  63. if p.pool.IsClosed() {
  64. return
  65. }
  66. err := p.pool.Submit(task)
  67. if err == ants.ErrPoolOverload {
  68. p.pool.Tune(p.Size() + p.step)
  69. p.Submit(task)
  70. }
  71. }
  72. func (p *goroutinePool) Size() int {
  73. return p.pool.Cap()
  74. }
  75. func (p *goroutinePool) Running() int {
  76. return p.pool.Running()
  77. }
  78. func (p *goroutinePool) Waiting() int {
  79. return p.pool.Waiting()
  80. }
  81. func (p *goroutinePool) Free() int {
  82. return p.pool.Free()
  83. }
  84. func (p *goroutinePool) Stop() {
  85. p.ticker.Stop()
  86. p.pool.Release()
  87. }