service.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. package consul
  2. import (
  3. "fmt"
  4. "strconv"
  5. "unsafe"
  6. "github.com/hashicorp/consul/api"
  7. )
  8. var _ ServiceRegistry = (*serviceRegistry)(nil)
  9. type ServiceRegistry interface {
  10. Register(serviceInstance DefaultServiceInstance) bool
  11. Deregister()
  12. GetInstances(serviceID string, tag string) ([]serviceInstance, error)
  13. GetServices() ([]string, error)
  14. GetClient() *api.Client
  15. }
  16. type serviceRegistry struct {
  17. serviceInstances map[string]map[string]DefaultServiceInstance
  18. client *api.Client
  19. localServiceInstance DefaultServiceInstance
  20. }
  21. func NewServiceRegistry(host string, port int, token string) (ServiceRegistry, error) {
  22. if len(host) < 3 {
  23. return nil, fmt.Errorf("check host")
  24. }
  25. if port <= 0 || port > 65535 {
  26. return nil, fmt.Errorf("check port, port should between 1 and 65535")
  27. }
  28. config := api.DefaultConfig()
  29. config.Address = host + ":" + strconv.Itoa(port)
  30. config.Token = token
  31. client, err := api.NewClient(config)
  32. if err != nil {
  33. return nil, err
  34. }
  35. return &serviceRegistry{client: client}, nil
  36. }
  37. func (c *serviceRegistry) GetClient() *api.Client {
  38. return c.client
  39. }
  40. func (c *serviceRegistry) GetInstances(serviceID string, tag string) ([]serviceInstance, error) {
  41. queryOptions := &api.QueryOptions{
  42. Datacenter: "",
  43. AllowStale: false,
  44. RequireConsistent: false,
  45. UseCache: false,
  46. MaxAge: 0,
  47. StaleIfError: 0,
  48. WaitIndex: 0,
  49. WaitHash: "",
  50. WaitTime: 0,
  51. Token: "",
  52. Near: "",
  53. NodeMeta: nil,
  54. RelayFactor: 0,
  55. Connect: false,
  56. Filter: "",
  57. }
  58. catalogService, _, _ := c.client.Catalog().Service(serviceID, tag, queryOptions)
  59. if len(catalogService) > 0 {
  60. result := make([]serviceInstance, len(catalogService))
  61. for index, sever := range catalogService {
  62. s := serviceInstance{
  63. InstanceID: sever.ServiceID,
  64. ServiceID: sever.ServiceName,
  65. Host: sever.Address,
  66. Port: sever.ServicePort,
  67. Metadata: sever.ServiceMeta,
  68. }
  69. result[index] = s
  70. }
  71. return result, nil
  72. }
  73. return nil, nil
  74. }
  75. func (c *serviceRegistry) GetServices() ([]string, error) {
  76. queryOptions := &api.QueryOptions{
  77. Datacenter: "",
  78. AllowStale: false,
  79. RequireConsistent: false,
  80. UseCache: false,
  81. MaxAge: 0,
  82. StaleIfError: 0,
  83. WaitIndex: 0,
  84. WaitHash: "",
  85. WaitTime: 0,
  86. Token: "",
  87. Near: "",
  88. NodeMeta: nil,
  89. RelayFactor: 0,
  90. Connect: false,
  91. Filter: "",
  92. }
  93. services, _, err := c.client.Catalog().Services(queryOptions)
  94. if err != nil {
  95. return nil, err
  96. }
  97. result := make([]string, unsafe.Sizeof(services))
  98. index := 0
  99. for serviceName, _ := range services {
  100. result[index] = serviceName
  101. index++
  102. }
  103. return result, nil
  104. }
  105. func (c *serviceRegistry) Register(serviceInstance DefaultServiceInstance) bool {
  106. // 创建注册到consul的服务到
  107. registration := new(api.AgentServiceRegistration)
  108. registration.ID = serviceInstance.GetInstanceID()
  109. registration.Name = serviceInstance.GetServiceID()
  110. registration.Port = serviceInstance.GetPort()
  111. var tags []string
  112. if serviceInstance.IsSecure() {
  113. tags = append(tags, "secure=true")
  114. } else {
  115. tags = append(tags, "secure=false")
  116. }
  117. if serviceInstance.GetMetadata() != nil {
  118. for key, value := range serviceInstance.GetMetadata() {
  119. tags = append(tags, key+"="+value)
  120. }
  121. }
  122. registration.Tags = tags
  123. registration.Address = serviceInstance.GetHost()
  124. // 增加consul健康检查回调函数
  125. check := new(api.AgentServiceCheck)
  126. schema := "http"
  127. if serviceInstance.IsSecure() {
  128. schema = "https"
  129. }
  130. check.HTTP = fmt.Sprintf("%s://%s:%d/actuator/health", schema, registration.Address, registration.Port)
  131. check.Timeout = "5s"
  132. check.Interval = "5s"
  133. check.DeregisterCriticalServiceAfter = "20s" // 故障检查失败30s后 consul自动将注册服务删除
  134. registration.Check = check
  135. // 注册服务到consul
  136. err := c.client.Agent().ServiceRegister(registration)
  137. if err != nil {
  138. return false
  139. }
  140. if c.serviceInstances == nil {
  141. c.serviceInstances = map[string]map[string]DefaultServiceInstance{}
  142. }
  143. services := c.serviceInstances[serviceInstance.GetServiceID()]
  144. if services == nil {
  145. services = map[string]DefaultServiceInstance{}
  146. }
  147. services[serviceInstance.GetInstanceID()] = serviceInstance
  148. c.serviceInstances[serviceInstance.GetServiceID()] = services
  149. c.localServiceInstance = serviceInstance
  150. return true
  151. }
  152. func (c *serviceRegistry) Deregister() {
  153. if c.serviceInstances == nil {
  154. return
  155. }
  156. services := c.serviceInstances[c.localServiceInstance.GetServiceID()]
  157. if services == nil {
  158. return
  159. }
  160. delete(services, c.localServiceInstance.GetInstanceID())
  161. if len(services) == 0 {
  162. delete(c.serviceInstances, c.localServiceInstance.GetServiceID())
  163. }
  164. _ = c.client.Agent().ServiceDeregister(c.localServiceInstance.GetInstanceID())
  165. c.localServiceInstance = nil
  166. }