package consul import ( "fmt" "strconv" "unsafe" "github.com/hashicorp/consul/api" ) var _ ServiceRegistry = (*serviceRegistry)(nil) type ServiceRegistry interface { Register(serviceInstance DefaultServiceInstance) bool Deregister() GetInstances(serviceID string, tag string) ([]serviceInstance, error) GetServices() ([]string, error) GetClient() *api.Client } type serviceRegistry struct { serviceInstances map[string]map[string]DefaultServiceInstance client *api.Client localServiceInstance DefaultServiceInstance } func NewServiceRegistry(host string, port int, token string) (ServiceRegistry, error) { if len(host) < 3 { return nil, fmt.Errorf("check host") } if port <= 0 || port > 65535 { return nil, fmt.Errorf("check port, port should between 1 and 65535") } config := api.DefaultConfig() config.Address = host + ":" + strconv.Itoa(port) config.Token = token client, err := api.NewClient(config) if err != nil { return nil, err } return &serviceRegistry{client: client}, nil } func (c *serviceRegistry) GetClient() *api.Client { return c.client } func (c *serviceRegistry) GetInstances(serviceID string, tag string) ([]serviceInstance, error) { queryOptions := &api.QueryOptions{ Datacenter: "", AllowStale: false, RequireConsistent: false, UseCache: false, MaxAge: 0, StaleIfError: 0, WaitIndex: 0, WaitHash: "", WaitTime: 0, Token: "", Near: "", NodeMeta: nil, RelayFactor: 0, Connect: false, Filter: "", } catalogService, _, _ := c.client.Catalog().Service(serviceID, tag, queryOptions) if len(catalogService) > 0 { result := make([]serviceInstance, len(catalogService)) for index, sever := range catalogService { s := serviceInstance{ InstanceID: sever.ServiceID, ServiceID: sever.ServiceName, Host: sever.Address, Port: sever.ServicePort, Metadata: sever.ServiceMeta, } result[index] = s } return result, nil } return nil, nil } func (c *serviceRegistry) GetServices() ([]string, error) { queryOptions := &api.QueryOptions{ Datacenter: "", AllowStale: false, RequireConsistent: false, UseCache: false, MaxAge: 0, StaleIfError: 0, WaitIndex: 0, WaitHash: "", WaitTime: 0, Token: "", Near: "", NodeMeta: nil, RelayFactor: 0, Connect: false, Filter: "", } services, _, err := c.client.Catalog().Services(queryOptions) if err != nil { return nil, err } result := make([]string, unsafe.Sizeof(services)) index := 0 for serviceName, _ := range services { result[index] = serviceName index++ } return result, nil } func (c *serviceRegistry) Register(serviceInstance DefaultServiceInstance) bool { // 创建注册到consul的服务到 registration := new(api.AgentServiceRegistration) registration.ID = serviceInstance.GetInstanceID() registration.Name = serviceInstance.GetServiceID() registration.Port = serviceInstance.GetPort() var tags []string if serviceInstance.IsSecure() { tags = append(tags, "secure=true") } else { tags = append(tags, "secure=false") } if serviceInstance.GetMetadata() != nil { for key, value := range serviceInstance.GetMetadata() { tags = append(tags, key+"="+value) } } registration.Tags = tags registration.Address = serviceInstance.GetHost() // 增加consul健康检查回调函数 check := new(api.AgentServiceCheck) schema := "http" if serviceInstance.IsSecure() { schema = "https" } check.HTTP = fmt.Sprintf("%s://%s:%d/actuator/health", schema, registration.Address, registration.Port) check.Timeout = "5s" check.Interval = "5s" check.DeregisterCriticalServiceAfter = "20s" // 故障检查失败30s后 consul自动将注册服务删除 registration.Check = check // 注册服务到consul err := c.client.Agent().ServiceRegister(registration) if err != nil { return false } if c.serviceInstances == nil { c.serviceInstances = map[string]map[string]DefaultServiceInstance{} } services := c.serviceInstances[serviceInstance.GetServiceID()] if services == nil { services = map[string]DefaultServiceInstance{} } services[serviceInstance.GetInstanceID()] = serviceInstance c.serviceInstances[serviceInstance.GetServiceID()] = services c.localServiceInstance = serviceInstance return true } func (c *serviceRegistry) Deregister() { if c.serviceInstances == nil { return } services := c.serviceInstances[c.localServiceInstance.GetServiceID()] if services == nil { return } delete(services, c.localServiceInstance.GetInstanceID()) if len(services) == 0 { delete(c.serviceInstances, c.localServiceInstance.GetServiceID()) } _ = c.client.Agent().ServiceDeregister(c.localServiceInstance.GetInstanceID()) c.localServiceInstance = nil }