// logger/es_logger.go package esv8 import ( "bytes" "context" "fmt" "log" "sync" "time" "git.x2erp.com/qdy/go-base/logger/helpers" "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esapi" ) // ====================== 核心结构 ====================== type esWriter struct { serviceName string esClient *elasticsearch.Client mu sync.RWMutex isRunning bool wg sync.WaitGroup stopChan chan struct{} } // ====================== 全局单例 ====================== var ( instance *esWriter once sync.Once initErr error ) // InitESWriter 初始化全局ES写入器 func InitESWriter(serviceName, esURL, username, password string) error { once.Do(func() { instance = &esWriter{ serviceName: serviceName, stopChan: make(chan struct{}), isRunning: false, } // 初始化ES客户端 initErr = instance.initialize(esURL, username, password) }) return initErr } // GetESWriter 获取全局ES写入器 func GetESWriter() *esWriter { if instance == nil { panic("ESWriter not initialized. Call InitESWriter first") } return instance } // ====================== 核心方法 ====================== // Write 实现io.Writer接口 - 异步写入ES func (w *esWriter) Write(p []byte) (int, error) { w.mu.RLock() defer w.mu.RUnlock() if !w.isRunning { return len(p), nil // 已停止,静默丢弃 } // 异步发送:启动一个goroutine来处理写入 go w.asyncSendToES(p) return len(p), nil } // asyncSendToES 异步发送到ES func (w *esWriter) asyncSendToES(data []byte) { // 创建索引名 //indexName := w.serviceName + "-" + time.Now().Format("2006-01-02") indexName := helpers.GetIndexName(w.serviceName) // 复制数据以避免竞争 dataCopy := make([]byte, len(data)) copy(dataCopy, data) // 执行写入 err := w.sendToES(indexName, dataCopy) if err != nil { // 记录错误但继续执行 log.Printf("[ES-ERROR] 异步写入ES失败: %v", err) } } // ====================== 私有方法 ====================== // initialize 初始化ES连接 func (w *esWriter) initialize(esURL, username, password string) error { cfg := elasticsearch.Config{ Addresses: []string{esURL}, RetryOnStatus: []int{502, 503, 504, 429}, MaxRetries: 3, } if username != "" && password != "" { cfg.Username = username cfg.Password = password } client, err := elasticsearch.NewClient(cfg) if err != nil { return fmt.Errorf("创建ES客户端失败: %v", err) } // 测试连接 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() resp, err := client.Ping(client.Ping.WithContext(ctx)) if err != nil { return fmt.Errorf("ES连接测试失败: %v", err) } defer resp.Body.Close() if resp.IsError() { return fmt.Errorf("ES服务响应错误: %s", resp.Status()) } w.esClient = client w.isRunning = true log.Printf("[ES] ES异步写入器已启动: %s", esURL) return nil } // sendToES 发送日志到ES(同步) func (w *esWriter) sendToES(indexName string, data []byte) error { // 准备文档 body := bytes.NewReader(data) // 创建索引请求 req := esapi.IndexRequest{ Index: indexName, Body: body, Refresh: "false", } // 设置超时 ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // 执行请求 resp, err := req.Do(ctx, w.esClient) if err != nil { return fmt.Errorf("发送请求失败: %v", err) } defer resp.Body.Close() // 检查响应 if resp.IsError() { return fmt.Errorf("ES响应错误: %s", resp.Status()) } return nil } // Reconnect 重新连接ES func (w *esWriter) Reconnect(esURL, username, password string) error { w.mu.Lock() defer w.mu.Unlock() // 停止当前 w.isRunning = false // 重新初始化 return w.initialize(esURL, username, password) } // HealthCheck 健康检查 func (w *esWriter) HealthCheck() bool { w.mu.RLock() defer w.mu.RUnlock() if !w.isRunning { return false } ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() resp, err := w.esClient.Ping(w.esClient.Ping.WithContext(ctx)) if err != nil { return false } defer resp.Body.Close() return !resp.IsError() } // Stop 停止写入器 func (w *esWriter) stop() { w.mu.Lock() defer w.mu.Unlock() if w.isRunning { w.isRunning = false close(w.stopChan) w.wg.Wait() log.Printf("[ES] ES写入器已停止") } } // 包级别函数 func StopESWriter() { if instance != nil { instance.stop() } }