Без опису
Ви не можете вибрати більше 25 тем Теми мають розпочинатися з літери або цифри, можуть містити дефіси (-) і не повинні перевищувати 35 символів.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. // logger/es_logger.go
  2. package esv8
  3. import (
  4. "bytes"
  5. "context"
  6. "fmt"
  7. "log"
  8. "sync"
  9. "time"
  10. "git.x2erp.com/qdy/go-base/logger/helpers"
  11. "github.com/elastic/go-elasticsearch/v8"
  12. "github.com/elastic/go-elasticsearch/v8/esapi"
  13. )
  14. // ====================== 核心结构 ======================
  15. type esWriter struct {
  16. serviceName string
  17. esClient *elasticsearch.Client
  18. mu sync.RWMutex
  19. isRunning bool
  20. wg sync.WaitGroup
  21. stopChan chan struct{}
  22. }
  23. // ====================== 全局单例 ======================
  24. var (
  25. instance *esWriter
  26. once sync.Once
  27. initErr error
  28. )
  29. // InitESWriter 初始化全局ES写入器
  30. func InitESWriter(serviceName, esURL, username, password string) error {
  31. once.Do(func() {
  32. instance = &esWriter{
  33. serviceName: serviceName,
  34. stopChan: make(chan struct{}),
  35. isRunning: false,
  36. }
  37. // 初始化ES客户端
  38. initErr = instance.initialize(esURL, username, password)
  39. })
  40. return initErr
  41. }
  42. // GetESWriter 获取全局ES写入器
  43. func GetESWriter() *esWriter {
  44. if instance == nil {
  45. panic("ESWriter not initialized. Call InitESWriter first")
  46. }
  47. return instance
  48. }
  49. // ====================== 核心方法 ======================
  50. // Write 实现io.Writer接口 - 异步写入ES
  51. func (w *esWriter) Write(p []byte) (int, error) {
  52. w.mu.RLock()
  53. defer w.mu.RUnlock()
  54. if !w.isRunning {
  55. return len(p), nil // 已停止,静默丢弃
  56. }
  57. // 异步发送:启动一个goroutine来处理写入
  58. go w.asyncSendToES(p)
  59. return len(p), nil
  60. }
  61. // asyncSendToES 异步发送到ES
  62. func (w *esWriter) asyncSendToES(data []byte) {
  63. // 创建索引名
  64. //indexName := w.serviceName + "-" + time.Now().Format("2006-01-02")
  65. indexName := helpers.GetIndexName(w.serviceName)
  66. // 复制数据以避免竞争
  67. dataCopy := make([]byte, len(data))
  68. copy(dataCopy, data)
  69. // 执行写入
  70. err := w.sendToES(indexName, dataCopy)
  71. if err != nil {
  72. // 记录错误但继续执行
  73. log.Printf("[ES-ERROR] 异步写入ES失败: %v", err)
  74. }
  75. }
  76. // ====================== 私有方法 ======================
  77. // initialize 初始化ES连接
  78. func (w *esWriter) initialize(esURL, username, password string) error {
  79. cfg := elasticsearch.Config{
  80. Addresses: []string{esURL},
  81. RetryOnStatus: []int{502, 503, 504, 429},
  82. MaxRetries: 3,
  83. }
  84. if username != "" && password != "" {
  85. cfg.Username = username
  86. cfg.Password = password
  87. }
  88. client, err := elasticsearch.NewClient(cfg)
  89. if err != nil {
  90. return fmt.Errorf("创建ES客户端失败: %v", err)
  91. }
  92. // 测试连接
  93. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  94. defer cancel()
  95. resp, err := client.Ping(client.Ping.WithContext(ctx))
  96. if err != nil {
  97. return fmt.Errorf("ES连接测试失败: %v", err)
  98. }
  99. defer resp.Body.Close()
  100. if resp.IsError() {
  101. return fmt.Errorf("ES服务响应错误: %s", resp.Status())
  102. }
  103. w.esClient = client
  104. w.isRunning = true
  105. log.Printf("[ES] ES异步写入器已启动: %s", esURL)
  106. return nil
  107. }
  108. // sendToES 发送日志到ES(同步)
  109. func (w *esWriter) sendToES(indexName string, data []byte) error {
  110. // 准备文档
  111. body := bytes.NewReader(data)
  112. // 创建索引请求
  113. req := esapi.IndexRequest{
  114. Index: indexName,
  115. Body: body,
  116. Refresh: "false",
  117. }
  118. // 设置超时
  119. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  120. defer cancel()
  121. // 执行请求
  122. resp, err := req.Do(ctx, w.esClient)
  123. if err != nil {
  124. return fmt.Errorf("发送请求失败: %v", err)
  125. }
  126. defer resp.Body.Close()
  127. // 检查响应
  128. if resp.IsError() {
  129. return fmt.Errorf("ES响应错误: %s", resp.Status())
  130. }
  131. return nil
  132. }
  133. // Reconnect 重新连接ES
  134. func (w *esWriter) Reconnect(esURL, username, password string) error {
  135. w.mu.Lock()
  136. defer w.mu.Unlock()
  137. // 停止当前
  138. w.isRunning = false
  139. // 重新初始化
  140. return w.initialize(esURL, username, password)
  141. }
  142. // HealthCheck 健康检查
  143. func (w *esWriter) HealthCheck() bool {
  144. w.mu.RLock()
  145. defer w.mu.RUnlock()
  146. if !w.isRunning {
  147. return false
  148. }
  149. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  150. defer cancel()
  151. resp, err := w.esClient.Ping(w.esClient.Ping.WithContext(ctx))
  152. if err != nil {
  153. return false
  154. }
  155. defer resp.Body.Close()
  156. return !resp.IsError()
  157. }
  158. // Stop 停止写入器
  159. func (w *esWriter) stop() {
  160. w.mu.Lock()
  161. defer w.mu.Unlock()
  162. if w.isRunning {
  163. w.isRunning = false
  164. close(w.stopChan)
  165. w.wg.Wait()
  166. log.Printf("[ES] ES写入器已停止")
  167. }
  168. }
  169. // 包级别函数
  170. func StopESWriter() {
  171. if instance != nil {
  172. instance.stop()
  173. }
  174. }