// logger/es_logger.go package http import ( "bytes" "encoding/json" "fmt" "log" "net/http" "strings" "sync" "time" "git.x2erp.com/qdy/go-base/logger/helpers" ) // ====================== 核心结构 ====================== type esWriter struct { serviceName string baseURL string username string password string httpClient *http.Client mu sync.RWMutex isRunning bool console bool //配置文件里是否配置了终端显示,配置了后,每次保存es都在终端显示。方便调试。正式环境不能配置终端显示 stopChan chan struct{} } // ====================== 全局单例 ====================== var ( instance *esWriter once sync.Once initErr error ) // initESWriter 初始化全局ES写入器 func InitESWriter(serviceName, esURL, username, password string, console bool) error { once.Do(func() { instance = &esWriter{ serviceName: serviceName, stopChan: make(chan struct{}), isRunning: false, console: console, } // 初始化ES客户端 initErr = instance.initialize(esURL, username, password) }) return initErr } // getESWriter 获取全局ES写入器 - 确保这个函数存在且导出 func GetESWriter() *esWriter { if instance == nil { panic("ESWriter not initialized. Call initESWriter first") } return instance } // ====================== 核心方法 ====================== // Write 实现io.Writer接口 func (w *esWriter) Write(p []byte) (int, error) { w.mu.RLock() defer w.mu.RUnlock() if !w.isRunning { return len(p), nil } // 异步发送 go w.sendToES(p) return len(p), nil } // ====================== 私有方法 ====================== // initialize 初始化ES连接 - 使用HTTP实现 func (w *esWriter) initialize(esURL, username, password string) error { // 创建HTTP客户端 w.httpClient = &http.Client{ Timeout: 30 * time.Second, Transport: &http.Transport{ MaxIdleConns: 100, MaxIdleConnsPerHost: 100, IdleConnTimeout: 90 * time.Second, }, } w.baseURL = strings.TrimSuffix(esURL, "/") w.username = username w.password = password // 测试连接 if err := w.testConnection(); err != nil { return fmt.Errorf("ES连接测试失败: %v", err) } w.isRunning = true log.Printf("[ES] HTTP ES写入器已启动: %s", esURL) return nil } // testConnection 测试连接 func (w *esWriter) testConnection() error { req, err := http.NewRequest("GET", w.baseURL, nil) if err != nil { return fmt.Errorf("创建请求失败: %v", err) } if w.username != "" && w.password != "" { req.SetBasicAuth(w.username, w.password) } resp, err := w.httpClient.Do(req) if err != nil { return fmt.Errorf("连接失败: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := json.Marshal(resp.Header) return fmt.Errorf("ES响应错误 [%d]: %s", resp.StatusCode, string(body)) } return nil } // sendToES 发送日志到ES func (w *esWriter) sendToES(data []byte) { // 解析JSON数据 var logEntry map[string]interface{} if err := json.Unmarshal(data, &logEntry); err != nil { log.Printf("[ES-ERROR] 解析日志JSON失败: %v", err) return } // 创建索引名(按服务名和日期) //indexName := strings.ToLower("log-"+w.serviceName) + "-" + time.Now().Format("2006-01-02") indexName := helpers.GetIndexName(w.serviceName) url := fmt.Sprintf("%s/%s/_doc", w.baseURL, indexName) if w.console { log.Printf("ES索引url: %s", url) } // 添加时间戳 if _, exists := logEntry["@timestamp"]; !exists { logEntry["@timestamp"] = time.Now().Format(time.RFC3339) } // 转换为JSON jsonData, err := json.Marshal(logEntry) if err != nil { log.Printf("[ES-ERROR] 编码JSON失败: %v", err) return } // 创建请求 req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) if err != nil { log.Printf("[ES-ERROR] 创建请求失败: %v", err) return } req.Header.Set("Content-Type", "application/json") if w.username != "" && w.password != "" { req.SetBasicAuth(w.username, w.password) } // 发送请求 resp, err := w.httpClient.Do(req) if err != nil { log.Printf("[ES-ERROR] 发送请求失败: %v", err) return } if w.console { log.Printf("log save to es success: %s", indexName) } defer resp.Body.Close() if resp.StatusCode >= 400 { log.Printf("[ES-ERROR] 写入失败 [%d]: %s", resp.StatusCode, resp.Status) } } // ====================== 公共方法 ====================== // HealthCheck 健康检查 func (w *esWriter) HealthCheck() bool { w.mu.RLock() defer w.mu.RUnlock() if !w.isRunning { return false } return w.testConnection() == nil } // Stop 停止写入器 func (w *esWriter) stop() { w.mu.Lock() defer w.mu.Unlock() if w.isRunning { w.isRunning = false close(w.stopChan) log.Printf("[ES] ES写入器已停止") } } // ====================== 包级别函数 ====================== // StopESWriter 停止ES写入器 func StopESWriter() { if instance != nil { instance.stop() } }