| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- // logger/es_logger.go
- package http
-
- import (
- "bytes"
- "encoding/json"
- "fmt"
- "log"
- "net/http"
- "strings"
- "sync"
- "time"
-
- "git.x2erp.com/qdy/go-base/logger/helpers"
- )
-
- // ====================== 核心结构 ======================
-
- type esWriter struct {
- serviceName string
- baseURL string
- username string
- password string
- httpClient *http.Client
- mu sync.RWMutex
- isRunning bool
- console bool //配置文件里是否配置了终端显示,配置了后,每次保存es都在终端显示。方便调试。正式环境不能配置终端显示
- stopChan chan struct{}
- }
-
- // ====================== 全局单例 ======================
-
- var (
- instance *esWriter
- once sync.Once
- initErr error
- )
-
- // initESWriter 初始化全局ES写入器
- func InitESWriter(serviceName, esURL, username, password string, console bool) error {
- once.Do(func() {
- instance = &esWriter{
- serviceName: serviceName,
- stopChan: make(chan struct{}),
- isRunning: false,
- console: console,
- }
- // 初始化ES客户端
- initErr = instance.initialize(esURL, username, password)
- })
-
- return initErr
- }
-
- // getESWriter 获取全局ES写入器 - 确保这个函数存在且导出
- func GetESWriter() *esWriter {
- if instance == nil {
- panic("ESWriter not initialized. Call initESWriter first")
- }
- return instance
- }
-
- // ====================== 核心方法 ======================
-
- // Write 实现io.Writer接口
- func (w *esWriter) Write(p []byte) (int, error) {
- w.mu.RLock()
- defer w.mu.RUnlock()
-
- if !w.isRunning {
- return len(p), nil
- }
-
- // 异步发送
- go w.sendToES(p)
-
- return len(p), nil
- }
-
- // ====================== 私有方法 ======================
-
- // initialize 初始化ES连接 - 使用HTTP实现
- func (w *esWriter) initialize(esURL, username, password string) error {
- // 创建HTTP客户端
- w.httpClient = &http.Client{
- Timeout: 30 * time.Second,
- Transport: &http.Transport{
- MaxIdleConns: 100,
- MaxIdleConnsPerHost: 100,
- IdleConnTimeout: 90 * time.Second,
- },
- }
-
- w.baseURL = strings.TrimSuffix(esURL, "/")
- w.username = username
- w.password = password
-
- // 测试连接
- if err := w.testConnection(); err != nil {
- return fmt.Errorf("ES连接测试失败: %v", err)
- }
-
- w.isRunning = true
-
- log.Printf("[ES] HTTP ES写入器已启动: %s", esURL)
- return nil
- }
-
- // testConnection 测试连接
- func (w *esWriter) testConnection() error {
- req, err := http.NewRequest("GET", w.baseURL, nil)
- if err != nil {
- return fmt.Errorf("创建请求失败: %v", err)
- }
-
- if w.username != "" && w.password != "" {
- req.SetBasicAuth(w.username, w.password)
- }
-
- resp, err := w.httpClient.Do(req)
- if err != nil {
- return fmt.Errorf("连接失败: %v", err)
- }
- defer resp.Body.Close()
-
- if resp.StatusCode != http.StatusOK {
- body, _ := json.Marshal(resp.Header)
- return fmt.Errorf("ES响应错误 [%d]: %s", resp.StatusCode, string(body))
- }
-
- return nil
- }
-
- // sendToES 发送日志到ES
- func (w *esWriter) sendToES(data []byte) {
- // 解析JSON数据
- var logEntry map[string]interface{}
- if err := json.Unmarshal(data, &logEntry); err != nil {
- log.Printf("[ES-ERROR] 解析日志JSON失败: %v", err)
- return
- }
-
- // 创建索引名(按服务名和日期)
- //indexName := strings.ToLower("log-"+w.serviceName) + "-" + time.Now().Format("2006-01-02")
- indexName := helpers.GetIndexName(w.serviceName)
- url := fmt.Sprintf("%s/%s/_doc", w.baseURL, indexName)
-
- if w.console {
- log.Printf("ES索引url: %s", url)
- }
-
- // 添加时间戳
- if _, exists := logEntry["@timestamp"]; !exists {
- logEntry["@timestamp"] = time.Now().Format(time.RFC3339)
- }
-
- // 转换为JSON
- jsonData, err := json.Marshal(logEntry)
- if err != nil {
- log.Printf("[ES-ERROR] 编码JSON失败: %v", err)
- return
- }
-
- // 创建请求
- req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
- if err != nil {
- log.Printf("[ES-ERROR] 创建请求失败: %v", err)
- return
- }
-
- req.Header.Set("Content-Type", "application/json")
- if w.username != "" && w.password != "" {
- req.SetBasicAuth(w.username, w.password)
- }
-
- // 发送请求
- resp, err := w.httpClient.Do(req)
- if err != nil {
- log.Printf("[ES-ERROR] 发送请求失败: %v", err)
- return
- }
-
- if w.console {
- log.Printf("log save to es success: %s", indexName)
- }
-
- defer resp.Body.Close()
-
- if resp.StatusCode >= 400 {
- log.Printf("[ES-ERROR] 写入失败 [%d]: %s", resp.StatusCode, resp.Status)
- }
- }
-
- // ====================== 公共方法 ======================
-
- // HealthCheck 健康检查
- func (w *esWriter) HealthCheck() bool {
- w.mu.RLock()
- defer w.mu.RUnlock()
-
- if !w.isRunning {
- return false
- }
-
- return w.testConnection() == nil
- }
-
- // Stop 停止写入器
- func (w *esWriter) stop() {
- w.mu.Lock()
- defer w.mu.Unlock()
-
- if w.isRunning {
- w.isRunning = false
- close(w.stopChan)
- log.Printf("[ES] ES写入器已停止")
- }
- }
-
- // ====================== 包级别函数 ======================
-
- // StopESWriter 停止ES写入器
- func StopESWriter() {
- if instance != nil {
- instance.stop()
- }
- }
|