pool.go 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package ants
  2. import (
  3. "github.com/panjf2000/ants/v2"
  4. "go.uber.org/zap"
  5. )
  6. var Pool GoroutinePool
  7. var _ GoroutinePool = (*goroutinePool)(nil)
  8. type GoroutinePool interface {
  9. Submit(task func())
  10. Stop()
  11. Size() int
  12. }
  13. type goroutinePool struct {
  14. pool *ants.Pool
  15. zap *zap.Logger
  16. size int
  17. }
  18. type poolLogger struct {
  19. zap *zap.Logger
  20. }
  21. func (l *poolLogger) Printf(format string, args ...interface{}) {
  22. l.zap.Sugar().Infof(format, args)
  23. }
  24. func Init(zap *zap.Logger, size int) error {
  25. l := &poolLogger{zap: zap}
  26. options := ants.Options{
  27. Nonblocking: true,
  28. PanicHandler: func(i interface{}) {
  29. zap.Sugar().Panic(i)
  30. },
  31. Logger: l,
  32. }
  33. p, err := ants.NewPool(size, ants.WithOptions(options))
  34. if err != nil {
  35. return err
  36. }
  37. Pool = &goroutinePool{
  38. pool: p,
  39. zap: zap,
  40. size: size,
  41. }
  42. return nil
  43. }
  44. func (p *goroutinePool) Submit(task func()) {
  45. if p.pool.IsClosed() {
  46. panic("协程池异常关闭")
  47. }
  48. err := p.pool.Submit(task)
  49. if err == ants.ErrPoolOverload {
  50. p.size = p.size + p.size/10
  51. p.pool.Tune(p.size)
  52. p.Submit(task)
  53. }
  54. }
  55. func (p *goroutinePool) Size() int {
  56. return p.size
  57. }
  58. func (p *goroutinePool) Stop() {
  59. p.pool.Release()
  60. }