pool.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package proxy
  2. import (
  3. "log"
  4. "sync"
  5. "time"
  6. )
  7. type ConnPool interface {
  8. initAutoFill(async bool) (err error)
  9. Get() (conn interface{}, err error)
  10. Put(conn interface{})
  11. ReleaseAll()
  12. Len() (length int)
  13. }
  14. type netPool struct {
  15. connects chan interface{}
  16. lock *sync.Mutex
  17. config poolConfig
  18. }
  19. type poolConfig struct {
  20. Factory func() (interface{}, error)
  21. IsActive func(interface{}) bool
  22. Release func(interface{})
  23. InitialCap int
  24. MaxCap int
  25. }
  26. func NewConnPool(poolConfig poolConfig) (pool ConnPool, err error) {
  27. p := netPool{
  28. config: poolConfig,
  29. connects: make(chan interface{}, poolConfig.MaxCap),
  30. lock: &sync.Mutex{},
  31. }
  32. if poolConfig.MaxCap > 0 {
  33. err = p.initAutoFill(false)
  34. if err == nil {
  35. _ = p.initAutoFill(true)
  36. }
  37. }
  38. return &p, nil
  39. }
  40. func (p *netPool) initAutoFill(async bool) (err error) {
  41. var worker = func() (err error) {
  42. for {
  43. if p.Len() <= p.config.InitialCap/2 {
  44. p.lock.Lock()
  45. errN := 0
  46. for i := 0; i < p.config.InitialCap; i++ {
  47. c, factoryErr := p.config.Factory()
  48. if factoryErr != nil {
  49. errN++
  50. if async {
  51. continue
  52. } else {
  53. p.lock.Unlock()
  54. return factoryErr
  55. }
  56. }
  57. select {
  58. case p.connects <- c:
  59. default:
  60. p.config.Release(c)
  61. break
  62. }
  63. if p.Len() >= p.config.InitialCap {
  64. break
  65. }
  66. }
  67. if errN > 0 {
  68. log.Printf("fill conn pool fail , ERRN:%d", errN)
  69. }
  70. p.lock.Unlock()
  71. }
  72. if !async {
  73. return
  74. }
  75. time.Sleep(time.Second * 2)
  76. }
  77. }
  78. if async {
  79. go func() {
  80. _ = worker()
  81. }()
  82. } else {
  83. err = worker()
  84. }
  85. return
  86. }
  87. func (p *netPool) Get() (conn interface{}, err error) {
  88. // defer func() {
  89. // log.Printf("pool len : %d", p.Len())
  90. // }()
  91. p.lock.Lock()
  92. defer p.lock.Unlock()
  93. // for {
  94. select {
  95. case conn = <-p.connects:
  96. if p.config.IsActive(conn) {
  97. return
  98. }
  99. p.config.Release(conn)
  100. default:
  101. conn, err = p.config.Factory()
  102. if err != nil {
  103. return nil, err
  104. }
  105. return conn, nil
  106. }
  107. // }
  108. return
  109. }
  110. func (p *netPool) Put(conn interface{}) {
  111. if conn == nil {
  112. return
  113. }
  114. p.lock.Lock()
  115. defer p.lock.Unlock()
  116. if !p.config.IsActive(conn) {
  117. p.config.Release(conn)
  118. }
  119. select {
  120. case p.connects <- conn:
  121. default:
  122. p.config.Release(conn)
  123. }
  124. }
  125. func (p *netPool) ReleaseAll() {
  126. p.lock.Lock()
  127. defer p.lock.Unlock()
  128. close(p.connects)
  129. for c := range p.connects {
  130. p.config.Release(c)
  131. }
  132. p.connects = make(chan interface{}, p.config.InitialCap)
  133. }
  134. func (p *netPool) Len() (length int) {
  135. return len(p.connects)
  136. }