tcp.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package proxy
  2. import (
  3. "fmt"
  4. "log"
  5. "net"
  6. "runtime/debug"
  7. "strconv"
  8. "sync"
  9. )
  10. var ServicesMap = new(sync.Map)
  11. var _ TCP = (*tcp)(nil)
  12. type TCPArgs struct {
  13. Local string //监听地址
  14. Parent string //被代理地址
  15. Timeout int //拨号超时(毫秒)
  16. PoolSize int //代理池数
  17. CheckParentInterval int //检查被代理连接间隔(秒)
  18. OutCallback func() bool //回调 //是否允许代理
  19. }
  20. type TCP interface {
  21. Clean()
  22. start() (err error)
  23. callback(inConn net.Conn)
  24. outToTCP(inConn *net.Conn) (err error)
  25. initOutConnPool()
  26. }
  27. type tcp struct {
  28. outPool OutPool
  29. cfg TCPArgs
  30. }
  31. type ServiceItem struct {
  32. S TCP
  33. Name string
  34. }
  35. func Regist(name string, args TCPArgs) *sync.Map {
  36. item := &ServiceItem{
  37. S: &tcp{cfg: args},
  38. Name: name,
  39. }
  40. ServicesMap.LoadOrStore(name, item)
  41. return ServicesMap
  42. }
  43. func Run(name string) (err error) {
  44. item, ok := ServicesMap.Load(name)
  45. service := item.(*ServiceItem)
  46. if ok {
  47. go func() {
  48. defer func() {
  49. recoverErr := recover()
  50. if recoverErr != nil {
  51. log.Fatalf("%s servcie crashed, ERR: %s\ntrace:%s", name, recoverErr, string(debug.Stack()))
  52. }
  53. }()
  54. startErr := service.S.start()
  55. if startErr != nil {
  56. log.Fatalf("%s servcie fail, ERR: %s", name, startErr)
  57. }
  58. }()
  59. }
  60. if !ok {
  61. err = fmt.Errorf("service %s not found", name)
  62. }
  63. return
  64. }
  65. func (s *tcp) start() (err error) {
  66. s.initOutConnPool()
  67. host, port, _ := net.SplitHostPort(s.cfg.Local)
  68. p, _ := strconv.Atoi(port)
  69. sc := NewServerChannel(host, p)
  70. err = sc.ListenTCP(s.callback)
  71. if err != nil {
  72. return
  73. }
  74. return
  75. }
  76. func (s *tcp) Clean() {
  77. if s.outPool.Pool != nil {
  78. s.outPool.Pool.ReleaseAll()
  79. }
  80. }
  81. func (s *tcp) callback(inConn net.Conn) {
  82. defer func() {
  83. if err := recover(); err != nil {
  84. log.Printf("conn handler crashed with err : %s \nstack: %s", err, string(debug.Stack()))
  85. }
  86. }()
  87. if s.cfg.OutCallback != nil {
  88. if !s.cfg.OutCallback() {
  89. CloseConn(&inConn)
  90. return
  91. }
  92. }
  93. err := s.outToTCP(&inConn)
  94. if err != nil {
  95. CloseConn(&inConn)
  96. }
  97. }
  98. func (s *tcp) outToTCP(inConn *net.Conn) (err error) {
  99. var outConn net.Conn
  100. var _outConn interface{}
  101. _outConn, err = s.outPool.Pool.Get()
  102. if err == nil {
  103. outConn = _outConn.(net.Conn)
  104. }
  105. if err != nil {
  106. CloseConn(inConn)
  107. return
  108. }
  109. inAddr := (*inConn).RemoteAddr().String()
  110. inLocalAddr := (*inConn).LocalAddr().String()
  111. outAddr := outConn.RemoteAddr().String()
  112. outLocalAddr := outConn.LocalAddr().String()
  113. IoBind(*inConn, outConn, func(isSrcErr bool, err error) {
  114. log.Printf("conn %s - %s - %s -%s released", inAddr, inLocalAddr, outLocalAddr, outAddr)
  115. CloseConn(inConn)
  116. CloseConn(&outConn)
  117. }, func(n int, d bool) {}, 0)
  118. log.Printf("conn %s - %s - %s -%s connected", inAddr, inLocalAddr, outLocalAddr, outAddr)
  119. return
  120. }
  121. func (s *tcp) initOutConnPool() {
  122. s.outPool = NewOutPool(s.cfg.CheckParentInterval, s.cfg.Parent, s.cfg.Timeout, s.cfg.PoolSize, s.cfg.PoolSize*2)
  123. }