pool.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package ants
  2. import (
  3. "git.bvbej.com/bvbej/base-golang/pkg/ticker"
  4. "github.com/panjf2000/ants/v2"
  5. "go.uber.org/zap"
  6. "time"
  7. )
  8. var _ GoroutinePool = (*goroutinePool)(nil)
  9. type GoroutinePool interface {
  10. run()
  11. Submit(task func())
  12. Stop()
  13. Size() int
  14. Running() int
  15. Free() int
  16. }
  17. type goroutinePool struct {
  18. pool *ants.Pool
  19. logger *zap.Logger
  20. ticker ticker.Ticker
  21. step int
  22. }
  23. type poolLogger struct {
  24. zap *zap.Logger
  25. }
  26. func (l *poolLogger) Printf(format string, args ...any) {
  27. l.zap.Sugar().Infof(format, args)
  28. }
  29. func NewPool(zap *zap.Logger, step int) (GoroutinePool, error) {
  30. ttl := time.Minute * 5
  31. options := ants.Options{
  32. Nonblocking: true,
  33. ExpiryDuration: ttl,
  34. PanicHandler: func(err any) {
  35. zap.Sugar().Panic(err)
  36. },
  37. Logger: &poolLogger{zap: zap},
  38. }
  39. antsPool, err := ants.NewPool(step, ants.WithOptions(options))
  40. if err != nil {
  41. return nil, err
  42. }
  43. pool := &goroutinePool{
  44. pool: antsPool,
  45. logger: zap,
  46. ticker: ticker.New(ttl),
  47. step: step,
  48. }
  49. pool.run()
  50. return pool, nil
  51. }
  52. func (p *goroutinePool) run() {
  53. p.ticker.Process(func() {
  54. if p.Free() > p.step {
  55. mul := p.Free() / p.step
  56. p.pool.Tune(p.Size() - p.step*mul)
  57. }
  58. p.logger.Sugar().Infof("goroutine pool info: Size[%d] Running[%d] Free[%d]",
  59. p.Size(), p.Running(), p.Free())
  60. })
  61. }
  62. func (p *goroutinePool) Submit(task func()) {
  63. if p.pool.IsClosed() {
  64. return
  65. }
  66. err := p.pool.Submit(task)
  67. if err == ants.ErrPoolOverload {
  68. p.pool.Tune(p.Size() + p.step)
  69. p.Submit(task)
  70. }
  71. }
  72. func (p *goroutinePool) Size() int {
  73. return p.pool.Cap()
  74. }
  75. func (p *goroutinePool) Running() int {
  76. return p.pool.Running()
  77. }
  78. func (p *goroutinePool) Free() int {
  79. return p.pool.Free()
  80. }
  81. func (p *goroutinePool) Stop() {
  82. p.ticker.Stop()
  83. p.pool.Release()
  84. }