暫無描述
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

service_client.go 3.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. package client
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "io"
  7. "net/http"
  8. "strconv"
  9. "sync"
  10. "time"
  11. "github.com/hashicorp/consul/api"
  12. )
  13. var (
  14. balancer *ServiceBalancer
  15. once sync.Once
  16. )
  17. type ServiceBalancer struct {
  18. mu sync.RWMutex
  19. services map[string][]string
  20. indexes map[string]int
  21. consulAddr string
  22. }
  23. // Init 初始化客户端,传入Consul地址
  24. func Init(consulAddr string) {
  25. once.Do(func() {
  26. balancer = &ServiceBalancer{
  27. services: make(map[string][]string),
  28. indexes: make(map[string]int),
  29. consulAddr: consulAddr,
  30. }
  31. go balancer.watchConsul()
  32. })
  33. }
  34. func (s *ServiceBalancer) watchConsul() {
  35. config := api.DefaultConfig()
  36. config.Address = s.consulAddr
  37. client, _ := api.NewClient(config)
  38. for {
  39. s.mu.Lock()
  40. services, _, _ := client.Health().Service("", "", true, nil)
  41. // 清空旧数据
  42. s.services = make(map[string][]string)
  43. for _, entry := range services {
  44. service := entry.Service.Service
  45. addr := entry.Service.Address + ":" + strconv.Itoa(entry.Service.Port)
  46. // 使用map去重
  47. if s.services[service] == nil {
  48. s.services[service] = []string{}
  49. }
  50. // 检查是否已存在
  51. exists := false
  52. for _, existingAddr := range s.services[service] {
  53. if existingAddr == addr {
  54. exists = true
  55. break
  56. }
  57. }
  58. if !exists {
  59. s.services[service] = append(s.services[service], addr)
  60. }
  61. }
  62. s.mu.Unlock()
  63. time.Sleep(10 * time.Second)
  64. }
  65. }
  66. func (s *ServiceBalancer) GetServiceAddr(service string) string {
  67. s.mu.RLock()
  68. defer s.mu.RUnlock()
  69. addrs := s.services[service]
  70. if len(addrs) == 0 {
  71. return ""
  72. }
  73. idx := s.indexes[service]
  74. addr := addrs[idx]
  75. s.indexes[service] = (idx + 1) % len(addrs)
  76. return addr
  77. }
  78. func Get(service, path string, result interface{}) error {
  79. if balancer == nil {
  80. return errors.New("client not initialized, call client.Init() first")
  81. }
  82. baseURL := balancer.GetServiceAddr(service)
  83. if baseURL == "" {
  84. return errors.New("service unavailable")
  85. }
  86. resp, err := http.Get("http://" + baseURL + path)
  87. if err != nil {
  88. return err
  89. }
  90. defer resp.Body.Close()
  91. if resp.StatusCode != http.StatusOK {
  92. return errors.New("HTTP error: " + resp.Status)
  93. }
  94. body, err := io.ReadAll(resp.Body)
  95. if err != nil {
  96. return err
  97. }
  98. return json.Unmarshal(body, result)
  99. }
  100. func Post(service, path string, body, result interface{}) error {
  101. if balancer == nil {
  102. return errors.New("client not initialized, call client.Init() first")
  103. }
  104. baseURL := balancer.GetServiceAddr(service)
  105. if baseURL == "" {
  106. return errors.New("service unavailable")
  107. }
  108. jsonBytes, err := json.Marshal(body)
  109. if err != nil {
  110. return err
  111. }
  112. resp, err := http.Post(
  113. "http://"+baseURL+path,
  114. "application/json",
  115. bytes.NewBuffer(jsonBytes),
  116. )
  117. if err != nil {
  118. return err
  119. }
  120. defer resp.Body.Close()
  121. if resp.StatusCode != http.StatusOK {
  122. return errors.New("HTTP error: " + resp.Status)
  123. }
  124. respBody, err := io.ReadAll(resp.Body)
  125. if err != nil {
  126. return err
  127. }
  128. if result != nil {
  129. return json.Unmarshal(respBody, result)
  130. }
  131. return nil
  132. }