| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- package client
-
- import (
- "bytes"
- "encoding/json"
- "errors"
- "io"
- "net/http"
- "strconv"
- "sync"
- "time"
-
- "github.com/hashicorp/consul/api"
- )
-
- var (
- balancer *ServiceBalancer
- once sync.Once
- )
-
- type ServiceBalancer struct {
- mu sync.RWMutex
- services map[string][]string
- indexes map[string]int
- consulAddr string
- }
-
- // Init 初始化客户端,传入Consul地址
- func Init(consulAddr string) {
- once.Do(func() {
- balancer = &ServiceBalancer{
- services: make(map[string][]string),
- indexes: make(map[string]int),
- consulAddr: consulAddr,
- }
- go balancer.watchConsul()
- })
- }
-
- func (s *ServiceBalancer) watchConsul() {
- config := api.DefaultConfig()
- config.Address = s.consulAddr
- client, _ := api.NewClient(config)
-
- for {
- s.mu.Lock()
- services, _, _ := client.Health().Service("", "", true, nil)
-
- // 清空旧数据
- s.services = make(map[string][]string)
-
- for _, entry := range services {
- service := entry.Service.Service
- addr := entry.Service.Address + ":" + strconv.Itoa(entry.Service.Port)
-
- // 使用map去重
- if s.services[service] == nil {
- s.services[service] = []string{}
- }
-
- // 检查是否已存在
- exists := false
- for _, existingAddr := range s.services[service] {
- if existingAddr == addr {
- exists = true
- break
- }
- }
- if !exists {
- s.services[service] = append(s.services[service], addr)
- }
- }
- s.mu.Unlock()
-
- time.Sleep(10 * time.Second)
- }
- }
-
- func (s *ServiceBalancer) GetServiceAddr(service string) string {
- s.mu.RLock()
- defer s.mu.RUnlock()
-
- addrs := s.services[service]
- if len(addrs) == 0 {
- return ""
- }
-
- idx := s.indexes[service]
- addr := addrs[idx]
- s.indexes[service] = (idx + 1) % len(addrs)
-
- return addr
- }
-
- func Get(service, path string, result interface{}) error {
- if balancer == nil {
- return errors.New("client not initialized, call client.Init() first")
- }
-
- baseURL := balancer.GetServiceAddr(service)
- if baseURL == "" {
- return errors.New("service unavailable")
- }
-
- resp, err := http.Get("http://" + baseURL + path)
- if err != nil {
- return err
- }
- defer resp.Body.Close()
-
- if resp.StatusCode != http.StatusOK {
- return errors.New("HTTP error: " + resp.Status)
- }
-
- body, err := io.ReadAll(resp.Body)
- if err != nil {
- return err
- }
-
- return json.Unmarshal(body, result)
- }
-
- func Post(service, path string, body, result interface{}) error {
- if balancer == nil {
- return errors.New("client not initialized, call client.Init() first")
- }
-
- baseURL := balancer.GetServiceAddr(service)
- if baseURL == "" {
- return errors.New("service unavailable")
- }
-
- jsonBytes, err := json.Marshal(body)
- if err != nil {
- return err
- }
-
- resp, err := http.Post(
- "http://"+baseURL+path,
- "application/json",
- bytes.NewBuffer(jsonBytes),
- )
- if err != nil {
- return err
- }
- defer resp.Body.Close()
-
- if resp.StatusCode != http.StatusOK {
- return errors.New("HTTP error: " + resp.Status)
- }
-
- respBody, err := io.ReadAll(resp.Body)
- if err != nil {
- return err
- }
-
- if result != nil {
- return json.Unmarshal(respBody, result)
- }
- return nil
- }
|