pool.go 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package ants
  2. import (
  3. "go.uber.org/zap"
  4. )
  5. var Pool GoroutinePool
  6. var _ GoroutinePool = (*goroutinePool)(nil)
  7. type GoroutinePool interface {
  8. Submit(task func())
  9. Stop()
  10. Size() int
  11. }
  12. type goroutinePool struct {
  13. pool *ants.Pool
  14. zap *zap.Logger
  15. size int
  16. }
  17. type poolLogger struct {
  18. zap *zap.Logger
  19. }
  20. func (l *poolLogger) Printf(format string, args ...interface{}) {
  21. l.zap.Sugar().Infof(format, args)
  22. }
  23. func Init(zap *zap.Logger, size int) error {
  24. l := &poolLogger{zap: zap}
  25. options := ants.Options{
  26. Nonblocking: true,
  27. PanicHandler: func(i interface{}) {
  28. zap.Sugar().Panic(i)
  29. },
  30. Logger: l,
  31. }
  32. p, err := ants.NewPool(size, ants.WithOptions(options))
  33. if err != nil {
  34. return err
  35. }
  36. Pool = &goroutinePool{
  37. pool: p,
  38. zap: zap,
  39. size: size,
  40. }
  41. return nil
  42. }
  43. func (p *goroutinePool) Submit(task func()) {
  44. if p.pool.IsClosed() {
  45. panic("协程池异常关闭")
  46. }
  47. err := p.pool.Submit(task)
  48. if err == ants.ErrPoolOverload {
  49. p.size = p.size + p.size/10
  50. p.pool.Tune(p.size)
  51. p.Submit(task)
  52. }
  53. }
  54. func (p *goroutinePool) Size() int {
  55. return p.size
  56. }
  57. func (p *goroutinePool) Stop() {
  58. p.pool.Release()
  59. }