123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- package ants
- import (
- "fmt"
- "git.bvbej.com/bvbej/base-golang/pkg/ticker"
- "github.com/panjf2000/ants/v2"
- "go.uber.org/zap"
- "runtime/debug"
- "time"
- )
- var _ GoroutinePool = (*goroutinePool)(nil)
- type GoroutinePool interface {
- run()
- Submit(task func())
- Stop()
- Size() int
- Running() int
- Free() int
- }
- type goroutinePool struct {
- pool *ants.Pool
- logger *zap.Logger
- ticker ticker.Ticker
- step int
- }
- type poolLogger struct {
- zap *zap.Logger
- }
- func (l *poolLogger) Printf(format string, args ...any) {
- l.zap.Sugar().Infof(format, args)
- }
- func NewPool(zapLogger *zap.Logger, step int) (GoroutinePool, error) {
- ttl := time.Minute * 5
- options := ants.Options{
- Nonblocking: true,
- ExpiryDuration: ttl,
- PanicHandler: func(err any) {
- zapLogger.Sugar().Error(
- "GoroutinePool panic",
- zap.String("error", fmt.Sprintf("%+v", err)),
- zap.String("stack", string(debug.Stack())),
- )
- },
- Logger: &poolLogger{zap: zapLogger},
- }
- antsPool, err := ants.NewPool(step, ants.WithOptions(options))
- if err != nil {
- return nil, err
- }
- pool := &goroutinePool{
- pool: antsPool,
- logger: zapLogger,
- ticker: ticker.New(ttl),
- step: step,
- }
- pool.run()
- return pool, nil
- }
- func (p *goroutinePool) run() {
- p.ticker.Process(func() {
- if p.Free() > p.step {
- mul := p.Free() / p.step
- p.pool.Tune(p.Size() - p.step*mul)
- }
- })
- }
- func (p *goroutinePool) Submit(task func()) {
- if p.pool.IsClosed() {
- return
- }
- err := p.pool.Submit(task)
- if err == ants.ErrPoolOverload {
- p.pool.Tune(p.Size() + p.step)
- p.Submit(task)
- }
- }
- func (p *goroutinePool) Size() int {
- return p.pool.Cap()
- }
- func (p *goroutinePool) Running() int {
- return p.pool.Running()
- }
- func (p *goroutinePool) Free() int {
- return p.pool.Free()
- }
- func (p *goroutinePool) Stop() {
- p.ticker.Stop()
- p.pool.Release()
- }
|