Нема описа
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. // logger/es_logger.go
  2. package http
  3. import (
  4. "bytes"
  5. "encoding/json"
  6. "fmt"
  7. "log"
  8. "net/http"
  9. "strings"
  10. "sync"
  11. "time"
  12. "git.x2erp.com/qdy/go-base/logger/helpers"
  13. )
  14. // ====================== 核心结构 ======================
  15. type esWriter struct {
  16. serviceName string
  17. baseURL string
  18. username string
  19. password string
  20. httpClient *http.Client
  21. mu sync.RWMutex
  22. isRunning bool
  23. console bool //配置文件里是否配置了终端显示,配置了后,每次保存es都在终端显示。方便调试。正式环境不能配置终端显示
  24. stopChan chan struct{}
  25. }
  26. // ====================== 全局单例 ======================
  27. var (
  28. instance *esWriter
  29. once sync.Once
  30. initErr error
  31. )
  32. // initESWriter 初始化全局ES写入器
  33. func InitESWriter(serviceName, esURL, username, password string, console bool) error {
  34. once.Do(func() {
  35. instance = &esWriter{
  36. serviceName: serviceName,
  37. stopChan: make(chan struct{}),
  38. isRunning: false,
  39. console: console,
  40. }
  41. // 初始化ES客户端
  42. initErr = instance.initialize(esURL, username, password)
  43. })
  44. return initErr
  45. }
  46. // getESWriter 获取全局ES写入器 - 确保这个函数存在且导出
  47. func GetESWriter() *esWriter {
  48. if instance == nil {
  49. panic("ESWriter not initialized. Call initESWriter first")
  50. }
  51. return instance
  52. }
  53. // ====================== 核心方法 ======================
  54. // Write 实现io.Writer接口
  55. func (w *esWriter) Write(p []byte) (int, error) {
  56. w.mu.RLock()
  57. defer w.mu.RUnlock()
  58. if !w.isRunning {
  59. return len(p), nil
  60. }
  61. // 异步发送
  62. go w.sendToES(p)
  63. return len(p), nil
  64. }
  65. // ====================== 私有方法 ======================
  66. // initialize 初始化ES连接 - 使用HTTP实现
  67. func (w *esWriter) initialize(esURL, username, password string) error {
  68. // 创建HTTP客户端
  69. w.httpClient = &http.Client{
  70. Timeout: 30 * time.Second,
  71. Transport: &http.Transport{
  72. MaxIdleConns: 100,
  73. MaxIdleConnsPerHost: 100,
  74. IdleConnTimeout: 90 * time.Second,
  75. },
  76. }
  77. w.baseURL = strings.TrimSuffix(esURL, "/")
  78. w.username = username
  79. w.password = password
  80. // 测试连接
  81. if err := w.testConnection(); err != nil {
  82. return fmt.Errorf("ES连接测试失败: %v", err)
  83. }
  84. w.isRunning = true
  85. log.Printf("[ES] HTTP ES写入器已启动: %s", esURL)
  86. return nil
  87. }
  88. // testConnection 测试连接
  89. func (w *esWriter) testConnection() error {
  90. req, err := http.NewRequest("GET", w.baseURL, nil)
  91. if err != nil {
  92. return fmt.Errorf("创建请求失败: %v", err)
  93. }
  94. if w.username != "" && w.password != "" {
  95. req.SetBasicAuth(w.username, w.password)
  96. }
  97. resp, err := w.httpClient.Do(req)
  98. if err != nil {
  99. return fmt.Errorf("连接失败: %v", err)
  100. }
  101. defer resp.Body.Close()
  102. if resp.StatusCode != http.StatusOK {
  103. body, _ := json.Marshal(resp.Header)
  104. return fmt.Errorf("ES响应错误 [%d]: %s", resp.StatusCode, string(body))
  105. }
  106. return nil
  107. }
  108. // sendToES 发送日志到ES
  109. func (w *esWriter) sendToES(data []byte) {
  110. // 解析JSON数据
  111. var logEntry map[string]interface{}
  112. if err := json.Unmarshal(data, &logEntry); err != nil {
  113. log.Printf("[ES-ERROR] 解析日志JSON失败: %v", err)
  114. return
  115. }
  116. // 创建索引名(按服务名和日期)
  117. //indexName := strings.ToLower("log-"+w.serviceName) + "-" + time.Now().Format("2006-01-02")
  118. indexName := helpers.GetIndexName(w.serviceName)
  119. url := fmt.Sprintf("%s/%s/_doc", w.baseURL, indexName)
  120. if w.console {
  121. log.Printf("ES索引url: %s", url)
  122. }
  123. // 添加时间戳
  124. if _, exists := logEntry["@timestamp"]; !exists {
  125. logEntry["@timestamp"] = time.Now().Format(time.RFC3339)
  126. }
  127. // 转换为JSON
  128. jsonData, err := json.Marshal(logEntry)
  129. if err != nil {
  130. log.Printf("[ES-ERROR] 编码JSON失败: %v", err)
  131. return
  132. }
  133. // 创建请求
  134. req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
  135. if err != nil {
  136. log.Printf("[ES-ERROR] 创建请求失败: %v", err)
  137. return
  138. }
  139. req.Header.Set("Content-Type", "application/json")
  140. if w.username != "" && w.password != "" {
  141. req.SetBasicAuth(w.username, w.password)
  142. }
  143. // 发送请求
  144. resp, err := w.httpClient.Do(req)
  145. if err != nil {
  146. log.Printf("[ES-ERROR] 发送请求失败: %v", err)
  147. return
  148. }
  149. if w.console {
  150. log.Printf("log save to es success: %s", indexName)
  151. }
  152. defer resp.Body.Close()
  153. if resp.StatusCode >= 400 {
  154. log.Printf("[ES-ERROR] 写入失败 [%d]: %s", resp.StatusCode, resp.Status)
  155. }
  156. }
  157. // ====================== 公共方法 ======================
  158. // HealthCheck 健康检查
  159. func (w *esWriter) HealthCheck() bool {
  160. w.mu.RLock()
  161. defer w.mu.RUnlock()
  162. if !w.isRunning {
  163. return false
  164. }
  165. return w.testConnection() == nil
  166. }
  167. // Stop 停止写入器
  168. func (w *esWriter) stop() {
  169. w.mu.Lock()
  170. defer w.mu.Unlock()
  171. if w.isRunning {
  172. w.isRunning = false
  173. close(w.stopChan)
  174. log.Printf("[ES] ES写入器已停止")
  175. }
  176. }
  177. // ====================== 包级别函数 ======================
  178. // StopESWriter 停止ES写入器
  179. func StopESWriter() {
  180. if instance != nil {
  181. instance.stop()
  182. }
  183. }