123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- package consul
- import (
- "fmt"
- "strconv"
- "unsafe"
- "github.com/hashicorp/consul/api"
- )
- var _ ServiceRegistry = (*serviceRegistry)(nil)
- type ServiceRegistry interface {
- Register(serviceInstance DefaultServiceInstance) bool
- Deregister()
- GetInstances(serviceID string, tag string) ([]serviceInstance, error)
- GetServices() ([]string, error)
- GetClient() *api.Client
- }
- type serviceRegistry struct {
- serviceInstances map[string]map[string]DefaultServiceInstance
- client *api.Client
- localServiceInstance DefaultServiceInstance
- }
- func NewServiceRegistry(host string, port int, token string) (ServiceRegistry, error) {
- if len(host) < 3 {
- return nil, fmt.Errorf("check host")
- }
- if port <= 0 || port > 65535 {
- return nil, fmt.Errorf("check port, port should between 1 and 65535")
- }
- config := api.DefaultConfig()
- config.Address = host + ":" + strconv.Itoa(port)
- config.Token = token
- client, err := api.NewClient(config)
- if err != nil {
- return nil, err
- }
- return &serviceRegistry{client: client}, nil
- }
- func (c *serviceRegistry) GetClient() *api.Client {
- return c.client
- }
- func (c *serviceRegistry) GetInstances(serviceID string, tag string) ([]serviceInstance, error) {
- queryOptions := &api.QueryOptions{
- Datacenter: "",
- AllowStale: false,
- RequireConsistent: false,
- UseCache: false,
- MaxAge: 0,
- StaleIfError: 0,
- WaitIndex: 0,
- WaitHash: "",
- WaitTime: 0,
- Token: "",
- Near: "",
- NodeMeta: nil,
- RelayFactor: 0,
- Connect: false,
- Filter: "",
- }
- catalogService, _, _ := c.client.Catalog().Service(serviceID, tag, queryOptions)
- if len(catalogService) > 0 {
- result := make([]serviceInstance, len(catalogService))
- for index, sever := range catalogService {
- s := serviceInstance{
- InstanceID: sever.ServiceID,
- ServiceID: sever.ServiceName,
- Host: sever.Address,
- Port: sever.ServicePort,
- Metadata: sever.ServiceMeta,
- }
- result[index] = s
- }
- return result, nil
- }
- return nil, nil
- }
- func (c *serviceRegistry) GetServices() ([]string, error) {
- queryOptions := &api.QueryOptions{
- Datacenter: "",
- AllowStale: false,
- RequireConsistent: false,
- UseCache: false,
- MaxAge: 0,
- StaleIfError: 0,
- WaitIndex: 0,
- WaitHash: "",
- WaitTime: 0,
- Token: "",
- Near: "",
- NodeMeta: nil,
- RelayFactor: 0,
- Connect: false,
- Filter: "",
- }
- services, _, err := c.client.Catalog().Services(queryOptions)
- if err != nil {
- return nil, err
- }
- result := make([]string, unsafe.Sizeof(services))
- index := 0
- for serviceName, _ := range services {
- result[index] = serviceName
- index++
- }
- return result, nil
- }
- func (c *serviceRegistry) Register(serviceInstance DefaultServiceInstance) bool {
- // 创建注册到consul的服务到
- registration := new(api.AgentServiceRegistration)
- registration.ID = serviceInstance.GetInstanceID()
- registration.Name = serviceInstance.GetServiceID()
- registration.Port = serviceInstance.GetPort()
- var tags []string
- if serviceInstance.IsSecure() {
- tags = append(tags, "secure=true")
- } else {
- tags = append(tags, "secure=false")
- }
- if serviceInstance.GetMetadata() != nil {
- for key, value := range serviceInstance.GetMetadata() {
- tags = append(tags, key+"="+value)
- }
- }
- registration.Tags = tags
- registration.Address = serviceInstance.GetHost()
- // 增加consul健康检查回调函数
- check := new(api.AgentServiceCheck)
- schema := "http"
- if serviceInstance.IsSecure() {
- schema = "https"
- }
- check.HTTP = fmt.Sprintf("%s://%s:%d/actuator/health", schema, registration.Address, registration.Port)
- check.Timeout = "5s"
- check.Interval = "5s"
- check.DeregisterCriticalServiceAfter = "20s" // 故障检查失败30s后 consul自动将注册服务删除
- registration.Check = check
- // 注册服务到consul
- err := c.client.Agent().ServiceRegister(registration)
- if err != nil {
- return false
- }
- if c.serviceInstances == nil {
- c.serviceInstances = map[string]map[string]DefaultServiceInstance{}
- }
- services := c.serviceInstances[serviceInstance.GetServiceID()]
- if services == nil {
- services = map[string]DefaultServiceInstance{}
- }
- services[serviceInstance.GetInstanceID()] = serviceInstance
- c.serviceInstances[serviceInstance.GetServiceID()] = services
- c.localServiceInstance = serviceInstance
- return true
- }
- func (c *serviceRegistry) Deregister() {
- if c.serviceInstances == nil {
- return
- }
- services := c.serviceInstances[c.localServiceInstance.GetServiceID()]
- if services == nil {
- return
- }
- delete(services, c.localServiceInstance.GetInstanceID())
- if len(services) == 0 {
- delete(c.serviceInstances, c.localServiceInstance.GetServiceID())
- }
- _ = c.client.Agent().ServiceDeregister(c.localServiceInstance.GetInstanceID())
- c.localServiceInstance = nil
- }
|