package client import ( "bytes" "encoding/json" "errors" "io" "net/http" "strconv" "sync" "time" "github.com/hashicorp/consul/api" ) var ( balancer *ServiceBalancer once sync.Once ) type ServiceBalancer struct { mu sync.RWMutex services map[string][]string indexes map[string]int consulAddr string } // Init 初始化客户端,传入Consul地址 func Init(consulAddr string) { once.Do(func() { balancer = &ServiceBalancer{ services: make(map[string][]string), indexes: make(map[string]int), consulAddr: consulAddr, } go balancer.watchConsul() }) } func (s *ServiceBalancer) watchConsul() { config := api.DefaultConfig() config.Address = s.consulAddr client, _ := api.NewClient(config) for { s.mu.Lock() services, _, _ := client.Health().Service("", "", true, nil) // 清空旧数据 s.services = make(map[string][]string) for _, entry := range services { service := entry.Service.Service addr := entry.Service.Address + ":" + strconv.Itoa(entry.Service.Port) // 使用map去重 if s.services[service] == nil { s.services[service] = []string{} } // 检查是否已存在 exists := false for _, existingAddr := range s.services[service] { if existingAddr == addr { exists = true break } } if !exists { s.services[service] = append(s.services[service], addr) } } s.mu.Unlock() time.Sleep(10 * time.Second) } } func (s *ServiceBalancer) GetServiceAddr(service string) string { s.mu.RLock() defer s.mu.RUnlock() addrs := s.services[service] if len(addrs) == 0 { return "" } idx := s.indexes[service] addr := addrs[idx] s.indexes[service] = (idx + 1) % len(addrs) return addr } func Get(service, path string, result interface{}) error { if balancer == nil { return errors.New("client not initialized, call client.Init() first") } baseURL := balancer.GetServiceAddr(service) if baseURL == "" { return errors.New("service unavailable") } resp, err := http.Get("http://" + baseURL + path) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return errors.New("HTTP error: " + resp.Status) } body, err := io.ReadAll(resp.Body) if err != nil { return err } return json.Unmarshal(body, result) } func Post(service, path string, body, result interface{}) error { if balancer == nil { return errors.New("client not initialized, call client.Init() first") } baseURL := balancer.GetServiceAddr(service) if baseURL == "" { return errors.New("service unavailable") } jsonBytes, err := json.Marshal(body) if err != nil { return err } resp, err := http.Post( "http://"+baseURL+path, "application/json", bytes.NewBuffer(jsonBytes), ) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return errors.New("HTTP error: " + resp.Status) } respBody, err := io.ReadAll(resp.Body) if err != nil { return err } if result != nil { return json.Unmarshal(respBody, result) } return nil }