package ants import ( "git.bvbej.com/bvbej/base-golang/pkg/ticker" "github.com/panjf2000/ants/v2" "go.uber.org/zap" "time" ) var _ GoroutinePool = (*goroutinePool)(nil) type GoroutinePool interface { run() Submit(task func()) Stop() Size() int Running() int Free() int } type goroutinePool struct { pool *ants.Pool logger *zap.Logger ticker ticker.Ticker step int } type poolLogger struct { zap *zap.Logger } func (l *poolLogger) Printf(format string, args ...any) { l.zap.Sugar().Infof(format, args) } func NewPool(zap *zap.Logger, step int) (GoroutinePool, error) { ttl := time.Minute * 5 options := ants.Options{ Nonblocking: true, ExpiryDuration: ttl, PanicHandler: func(err any) { zap.Sugar().Panic(err) }, Logger: &poolLogger{zap: zap}, } antsPool, err := ants.NewPool(step, ants.WithOptions(options)) if err != nil { return nil, err } pool := &goroutinePool{ pool: antsPool, logger: zap, ticker: ticker.New(ttl), step: step, } pool.run() return pool, nil } func (p *goroutinePool) run() { p.ticker.Process(func() { if p.Free() > p.step { mul := p.Free() / p.step p.pool.Tune(p.Size() - p.step*mul) } p.logger.Sugar().Infof("goroutine pool info: Size[%d] Running[%d] Free[%d]", p.Size(), p.Running(), p.Free()) }) } func (p *goroutinePool) Submit(task func()) { if p.pool.IsClosed() { return } err := p.pool.Submit(task) if err == ants.ErrPoolOverload { p.pool.Tune(p.Size() + p.step) p.Submit(task) } } func (p *goroutinePool) Size() int { return p.pool.Cap() } func (p *goroutinePool) Running() int { return p.pool.Running() } func (p *goroutinePool) Free() int { return p.pool.Free() } func (p *goroutinePool) Stop() { p.ticker.Stop() p.pool.Release() }