| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 |
- // logger/es_logger.go
- package esv8
-
- import (
- "bytes"
- "context"
- "fmt"
- "log"
- "sync"
- "time"
-
- "git.x2erp.com/qdy/go-base/logger/helpers"
- "github.com/elastic/go-elasticsearch/v8"
- "github.com/elastic/go-elasticsearch/v8/esapi"
- )
-
- // ====================== 核心结构 ======================
-
- type esWriter struct {
- serviceName string
- esClient *elasticsearch.Client
- mu sync.RWMutex
- isRunning bool
- wg sync.WaitGroup
- stopChan chan struct{}
- }
-
- // ====================== 全局单例 ======================
-
- var (
- instance *esWriter
- once sync.Once
- initErr error
- )
-
- // InitESWriter 初始化全局ES写入器
- func InitESWriter(serviceName, esURL, username, password string) error {
- once.Do(func() {
- instance = &esWriter{
- serviceName: serviceName,
- stopChan: make(chan struct{}),
- isRunning: false,
- }
- // 初始化ES客户端
- initErr = instance.initialize(esURL, username, password)
- })
-
- return initErr
- }
-
- // GetESWriter 获取全局ES写入器
- func GetESWriter() *esWriter {
- if instance == nil {
- panic("ESWriter not initialized. Call InitESWriter first")
- }
- return instance
- }
-
- // ====================== 核心方法 ======================
-
- // Write 实现io.Writer接口 - 异步写入ES
- func (w *esWriter) Write(p []byte) (int, error) {
- w.mu.RLock()
- defer w.mu.RUnlock()
-
- if !w.isRunning {
- return len(p), nil // 已停止,静默丢弃
- }
-
- // 异步发送:启动一个goroutine来处理写入
- go w.asyncSendToES(p)
-
- return len(p), nil
- }
-
- // asyncSendToES 异步发送到ES
- func (w *esWriter) asyncSendToES(data []byte) {
- // 创建索引名
- //indexName := w.serviceName + "-" + time.Now().Format("2006-01-02")
- indexName := helpers.GetIndexName(w.serviceName)
- // 复制数据以避免竞争
- dataCopy := make([]byte, len(data))
- copy(dataCopy, data)
-
- // 执行写入
- err := w.sendToES(indexName, dataCopy)
- if err != nil {
- // 记录错误但继续执行
- log.Printf("[ES-ERROR] 异步写入ES失败: %v", err)
- }
- }
-
- // ====================== 私有方法 ======================
-
- // initialize 初始化ES连接
- func (w *esWriter) initialize(esURL, username, password string) error {
- cfg := elasticsearch.Config{
- Addresses: []string{esURL},
- RetryOnStatus: []int{502, 503, 504, 429},
- MaxRetries: 3,
- }
-
- if username != "" && password != "" {
- cfg.Username = username
- cfg.Password = password
- }
-
- client, err := elasticsearch.NewClient(cfg)
- if err != nil {
- return fmt.Errorf("创建ES客户端失败: %v", err)
- }
-
- // 测试连接
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
-
- resp, err := client.Ping(client.Ping.WithContext(ctx))
- if err != nil {
- return fmt.Errorf("ES连接测试失败: %v", err)
- }
- defer resp.Body.Close()
-
- if resp.IsError() {
- return fmt.Errorf("ES服务响应错误: %s", resp.Status())
- }
-
- w.esClient = client
- w.isRunning = true
-
- log.Printf("[ES] ES异步写入器已启动: %s", esURL)
- return nil
- }
-
- // sendToES 发送日志到ES(同步)
- func (w *esWriter) sendToES(indexName string, data []byte) error {
- // 准备文档
- body := bytes.NewReader(data)
-
- // 创建索引请求
- req := esapi.IndexRequest{
- Index: indexName,
- Body: body,
- Refresh: "false",
- }
-
- // 设置超时
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
-
- // 执行请求
- resp, err := req.Do(ctx, w.esClient)
- if err != nil {
- return fmt.Errorf("发送请求失败: %v", err)
- }
- defer resp.Body.Close()
-
- // 检查响应
- if resp.IsError() {
- return fmt.Errorf("ES响应错误: %s", resp.Status())
- }
-
- return nil
- }
-
- // Reconnect 重新连接ES
- func (w *esWriter) Reconnect(esURL, username, password string) error {
- w.mu.Lock()
- defer w.mu.Unlock()
-
- // 停止当前
- w.isRunning = false
-
- // 重新初始化
- return w.initialize(esURL, username, password)
- }
-
- // HealthCheck 健康检查
- func (w *esWriter) HealthCheck() bool {
- w.mu.RLock()
- defer w.mu.RUnlock()
-
- if !w.isRunning {
- return false
- }
-
- ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
- defer cancel()
-
- resp, err := w.esClient.Ping(w.esClient.Ping.WithContext(ctx))
- if err != nil {
- return false
- }
- defer resp.Body.Close()
-
- return !resp.IsError()
- }
-
- // Stop 停止写入器
- func (w *esWriter) stop() {
- w.mu.Lock()
- defer w.mu.Unlock()
-
- if w.isRunning {
- w.isRunning = false
- close(w.stopChan)
- w.wg.Wait()
- log.Printf("[ES] ES写入器已停止")
- }
- }
-
- // 包级别函数
- func StopESWriter() {
- if instance != nil {
- instance.stop()
- }
- }
|