Procházet zdrojové kódy

测试通过-日志保存到ES

qdy před 2 měsíci
rodič
revize
ea3618b354

+ 18
- 10
config/subconfigs/log_config.go Zobrazit soubor

@@ -8,16 +8,24 @@ import (
8 8
 // LogConfig 日志配置
9 9
 type LogConfig struct {
10 10
 	BaseConfig
11
-	Level      string `yaml:"level"`
12
-	Output     string `yaml:"output"`
13
-	FilePath   string `yaml:"file_path"`
14
-	MaxSize    int    `yaml:"max_size"`
15
-	MaxBackups int    `yaml:"max_backups"`
16
-	MaxAge     int    `yaml:"max_age"`
17
-	Compress   bool   `yaml:"compress"`
18
-	//ESEnabled  bool   `yaml:"es_enabled"`
19
-	ESPath     string `yaml:"es_path"`
20
-	JSONFormat bool   `yaml:"json_format"`
11
+	Level      string `yaml:"level" json:"level"`             // 日志级别
12
+	Output     string `yaml:"output" json:"output"`           // 输出目标:console,file,es
13
+	JSONFormat bool   `yaml:"json_format" json:"json_format"` // 是否JSON格式
14
+
15
+	// 文件输出配置
16
+	FilePath   string `yaml:"file_path" json:"file_path"`
17
+	MaxSize    int    `yaml:"max_size" json:"max_size"`
18
+	MaxBackups int    `yaml:"max_backups" json:"max_backups"`
19
+	MaxAge     int    `yaml:"max_age" json:"max_age"`
20
+	Compress   bool   `yaml:"compress" json:"compress"`
21
+
22
+	// ES输出配置(当Output包含"es"时生效)
23
+	ESPath     string `yaml:"es_path" json:"es_path"`           // ES地址
24
+	ESUsername string `yaml:"es_username" json:"es_username"`   // 用户名(可选)
25
+	ESPassword string `yaml:"es_password" json:"es_password"`   // 密码(可选)
26
+	ESAPIKey   string `yaml:"es_api_key" json:"es_api_key"`     // API Key(可选)
27
+	ESBuffer   int    `yaml:"es_buffer" json:"es_buffer"`       // 缓冲大小,默认10000
28
+	ESMaxRetry int    `yaml:"es_max_retry" json:"es_max_retry"` // 最大重试,默认3
21 29
 }
22 30
 
23 31
 // NewLogConfig 创建日志配置实例

+ 7
- 0
go.mod Zobrazit soubor

@@ -13,7 +13,10 @@ require (
13 13
 	github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect
14 14
 	github.com/cloudflare/circl v1.6.1 // indirect
15 15
 	github.com/cyphar/filepath-securejoin v0.6.1 // indirect
16
+	github.com/elastic/elastic-transport-go/v8 v8.7.0 // indirect
16 17
 	github.com/fatih/color v1.9.0 // indirect
18
+	github.com/go-logr/logr v1.4.2 // indirect
19
+	github.com/go-logr/stdr v1.2.2 // indirect
17 20
 	github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect
18 21
 	github.com/hashicorp/consul/api v1.9.0 // indirect
19 22
 	github.com/hashicorp/errwrap v1.1.0 // indirect
@@ -33,6 +36,9 @@ require (
33 36
 	github.com/pjbgf/sha1cd v0.5.0 // indirect
34 37
 	github.com/skeema/knownhosts v1.3.2 // indirect
35 38
 	github.com/stretchr/objx v0.5.2 // indirect
39
+	go.opentelemetry.io/otel v1.28.0 // indirect
40
+	go.opentelemetry.io/otel/metric v1.28.0 // indirect
41
+	go.opentelemetry.io/otel/trace v1.28.0 // indirect
36 42
 	go.uber.org/multierr v1.10.0 // indirect
37 43
 )
38 44
 
@@ -41,6 +47,7 @@ require (
41 47
 	github.com/ProtonMail/go-crypto v1.3.0 // indirect
42 48
 	github.com/bitly/go-simplejson v0.5.1 // indirect
43 49
 	github.com/cpuguy83/go-md2man/v2 v2.0.7 // indirect
50
+	github.com/elastic/go-elasticsearch/v8 v8.19.0
44 51
 	github.com/emirpasic/gods v1.18.1 // indirect
45 52
 	github.com/fsnotify/fsnotify v1.9.0 // indirect
46 53
 	github.com/go-git/gcfg v1.5.1-0.20230307220236-3a3c6141e376 // indirect

+ 17
- 0
go.sum Zobrazit soubor

@@ -26,6 +26,10 @@ github.com/cyphar/filepath-securejoin v0.6.1/go.mod h1:A8hd4EnAeyujCJRrICiOWqjS1
26 26
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
27 27
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
28 28
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
29
+github.com/elastic/elastic-transport-go/v8 v8.7.0 h1:OgTneVuXP2uip4BA658Xi6Hfw+PeIOod2rY3GVMGoVE=
30
+github.com/elastic/elastic-transport-go/v8 v8.7.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk=
31
+github.com/elastic/go-elasticsearch/v8 v8.19.0 h1:VmfBLNRORY7RZL+9hTxBD97ehl9H8Nxf2QigDh6HuMU=
32
+github.com/elastic/go-elasticsearch/v8 v8.19.0/go.mod h1:F3j9e+BubmKvzvLjNui/1++nJuJxbkhHefbaT0kFKGY=
29 33
 github.com/elazarl/goproxy v1.7.2 h1:Y2o6urb7Eule09PjlhQRGNsqRfPmYI3KKQLFpCAV3+o=
30 34
 github.com/elazarl/goproxy v1.7.2/go.mod h1:82vkLNir0ALaW14Rc399OTTjyNREgmdL2cVoIbS6XaE=
31 35
 github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc=
@@ -46,6 +50,11 @@ github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399 h1:eMj
46 50
 github.com/go-git/go-git-fixtures/v4 v4.3.2-0.20231010084843-55a94097c399/go.mod h1:1OCfN199q1Jm3HZlxleg+Dw/mwps2Wbk9frAWm+4FII=
47 51
 github.com/go-git/go-git/v5 v5.16.4 h1:7ajIEZHZJULcyJebDLo99bGgS0jRrOxzZG4uCk2Yb2Y=
48 52
 github.com/go-git/go-git/v5 v5.16.4/go.mod h1:4Ge4alE/5gPs30F2H1esi2gPd69R0C39lolkucHBOp8=
53
+github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
54
+github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
55
+github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
56
+github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
57
+github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
49 58
 github.com/go-micro/plugins/v4/registry/consul v1.2.1 h1:3wctYMtstwQLCjoJ1HA6mKGGFF1hcdKDv5MzHakB1jE=
50 59
 github.com/go-micro/plugins/v4/registry/consul v1.2.1/go.mod h1:wTat7/K9XQ+i64VbbcMYFcEwipYfSgJM51HcA/sgsM4=
51 60
 github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 h1:f+oWsMOmNPc8JmEHVZIycC7hBoQxHH9pNKQORJNozsQ=
@@ -186,6 +195,14 @@ github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342 h1:FnBeRrxr7OU4VvAz
186 195
 github.com/xrash/smetrics v0.0.0-20250705151800-55b8f293f342/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM=
187 196
 go-micro.dev/v4 v4.11.0 h1:DZ2xcr0pnZJDlp6MJiCLhw4tXRxLw9xrJlPT91kubr0=
188 197
 go-micro.dev/v4 v4.11.0/go.mod h1:eE/tD53n3KbVrzrWxKLxdkGw45Fg1qaNLWjpJMvIUF4=
198
+go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo=
199
+go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4=
200
+go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q=
201
+go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s=
202
+go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g=
203
+go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI=
204
+go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
205
+go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
189 206
 go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
190 207
 go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
191 208
 go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc=

+ 216
- 0
logger/esv8/es_writer.go Zobrazit soubor

@@ -0,0 +1,216 @@
1
+// logger/es_logger.go
2
+package esv8
3
+
4
+import (
5
+	"bytes"
6
+	"context"
7
+	"fmt"
8
+	"log"
9
+	"sync"
10
+	"time"
11
+
12
+	"git.x2erp.com/qdy/go-base/logger/helpers"
13
+	"github.com/elastic/go-elasticsearch/v8"
14
+	"github.com/elastic/go-elasticsearch/v8/esapi"
15
+)
16
+
17
+// ====================== 核心结构 ======================
18
+
19
+type esWriter struct {
20
+	serviceName string
21
+	esClient    *elasticsearch.Client
22
+	mu          sync.RWMutex
23
+	isRunning   bool
24
+	wg          sync.WaitGroup
25
+	stopChan    chan struct{}
26
+}
27
+
28
+// ====================== 全局单例 ======================
29
+
30
+var (
31
+	instance *esWriter
32
+	once     sync.Once
33
+	initErr  error
34
+)
35
+
36
+// InitESWriter 初始化全局ES写入器
37
+func InitESWriter(serviceName, esURL, username, password string) error {
38
+	once.Do(func() {
39
+		instance = &esWriter{
40
+			serviceName: serviceName,
41
+			stopChan:    make(chan struct{}),
42
+			isRunning:   false,
43
+		}
44
+		// 初始化ES客户端
45
+		initErr = instance.initialize(esURL, username, password)
46
+	})
47
+
48
+	return initErr
49
+}
50
+
51
+// GetESWriter 获取全局ES写入器
52
+func GetESWriter() *esWriter {
53
+	if instance == nil {
54
+		panic("ESWriter not initialized. Call InitESWriter first")
55
+	}
56
+	return instance
57
+}
58
+
59
+// ====================== 核心方法 ======================
60
+
61
+// Write 实现io.Writer接口 - 异步写入ES
62
+func (w *esWriter) Write(p []byte) (int, error) {
63
+	w.mu.RLock()
64
+	defer w.mu.RUnlock()
65
+
66
+	if !w.isRunning {
67
+		return len(p), nil // 已停止,静默丢弃
68
+	}
69
+
70
+	// 异步发送:启动一个goroutine来处理写入
71
+	go w.asyncSendToES(p)
72
+
73
+	return len(p), nil
74
+}
75
+
76
+// asyncSendToES 异步发送到ES
77
+func (w *esWriter) asyncSendToES(data []byte) {
78
+	// 创建索引名
79
+	//indexName := w.serviceName + "-" + time.Now().Format("2006-01-02")
80
+	indexName := helpers.GetIndexName(w.serviceName)
81
+	// 复制数据以避免竞争
82
+	dataCopy := make([]byte, len(data))
83
+	copy(dataCopy, data)
84
+
85
+	// 执行写入
86
+	err := w.sendToES(indexName, dataCopy)
87
+	if err != nil {
88
+		// 记录错误但继续执行
89
+		log.Printf("[ES-ERROR] 异步写入ES失败: %v", err)
90
+	}
91
+}
92
+
93
+// ====================== 私有方法 ======================
94
+
95
+// initialize 初始化ES连接
96
+func (w *esWriter) initialize(esURL, username, password string) error {
97
+	cfg := elasticsearch.Config{
98
+		Addresses:     []string{esURL},
99
+		RetryOnStatus: []int{502, 503, 504, 429},
100
+		MaxRetries:    3,
101
+	}
102
+
103
+	if username != "" && password != "" {
104
+		cfg.Username = username
105
+		cfg.Password = password
106
+	}
107
+
108
+	client, err := elasticsearch.NewClient(cfg)
109
+	if err != nil {
110
+		return fmt.Errorf("创建ES客户端失败: %v", err)
111
+	}
112
+
113
+	// 测试连接
114
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
115
+	defer cancel()
116
+
117
+	resp, err := client.Ping(client.Ping.WithContext(ctx))
118
+	if err != nil {
119
+		return fmt.Errorf("ES连接测试失败: %v", err)
120
+	}
121
+	defer resp.Body.Close()
122
+
123
+	if resp.IsError() {
124
+		return fmt.Errorf("ES服务响应错误: %s", resp.Status())
125
+	}
126
+
127
+	w.esClient = client
128
+	w.isRunning = true
129
+
130
+	log.Printf("[ES] ES异步写入器已启动: %s", esURL)
131
+	return nil
132
+}
133
+
134
+// sendToES 发送日志到ES(同步)
135
+func (w *esWriter) sendToES(indexName string, data []byte) error {
136
+	// 准备文档
137
+	body := bytes.NewReader(data)
138
+
139
+	// 创建索引请求
140
+	req := esapi.IndexRequest{
141
+		Index:   indexName,
142
+		Body:    body,
143
+		Refresh: "false",
144
+	}
145
+
146
+	// 设置超时
147
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
148
+	defer cancel()
149
+
150
+	// 执行请求
151
+	resp, err := req.Do(ctx, w.esClient)
152
+	if err != nil {
153
+		return fmt.Errorf("发送请求失败: %v", err)
154
+	}
155
+	defer resp.Body.Close()
156
+
157
+	// 检查响应
158
+	if resp.IsError() {
159
+		return fmt.Errorf("ES响应错误: %s", resp.Status())
160
+	}
161
+
162
+	return nil
163
+}
164
+
165
+// Reconnect 重新连接ES
166
+func (w *esWriter) Reconnect(esURL, username, password string) error {
167
+	w.mu.Lock()
168
+	defer w.mu.Unlock()
169
+
170
+	// 停止当前
171
+	w.isRunning = false
172
+
173
+	// 重新初始化
174
+	return w.initialize(esURL, username, password)
175
+}
176
+
177
+// HealthCheck 健康检查
178
+func (w *esWriter) HealthCheck() bool {
179
+	w.mu.RLock()
180
+	defer w.mu.RUnlock()
181
+
182
+	if !w.isRunning {
183
+		return false
184
+	}
185
+
186
+	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
187
+	defer cancel()
188
+
189
+	resp, err := w.esClient.Ping(w.esClient.Ping.WithContext(ctx))
190
+	if err != nil {
191
+		return false
192
+	}
193
+	defer resp.Body.Close()
194
+
195
+	return !resp.IsError()
196
+}
197
+
198
+// Stop 停止写入器
199
+func (w *esWriter) stop() {
200
+	w.mu.Lock()
201
+	defer w.mu.Unlock()
202
+
203
+	if w.isRunning {
204
+		w.isRunning = false
205
+		close(w.stopChan)
206
+		w.wg.Wait()
207
+		log.Printf("[ES] ES写入器已停止")
208
+	}
209
+}
210
+
211
+// 包级别函数
212
+func StopESWriter() {
213
+	if instance != nil {
214
+		instance.stop()
215
+	}
216
+}

+ 11
- 0
logger/helpers/es_index_name.go Zobrazit soubor

@@ -0,0 +1,11 @@
1
+package helpers
2
+
3
+import (
4
+	"strings"
5
+	"time"
6
+)
7
+
8
+// 创建索引名(按服务名和日期)
9
+func GetIndexName(serviceName string) string {
10
+	return strings.ToLower("log-"+serviceName) + "-" + time.Now().Format("2006-01-02")
11
+}

+ 227
- 0
logger/http/es_http_writer.go Zobrazit soubor

@@ -0,0 +1,227 @@
1
+// logger/es_logger.go
2
+package http
3
+
4
+import (
5
+	"bytes"
6
+	"encoding/json"
7
+	"fmt"
8
+	"log"
9
+	"net/http"
10
+	"strings"
11
+	"sync"
12
+	"time"
13
+
14
+	"git.x2erp.com/qdy/go-base/logger/helpers"
15
+)
16
+
17
+// ====================== 核心结构 ======================
18
+
19
+type esWriter struct {
20
+	serviceName string
21
+	baseURL     string
22
+	username    string
23
+	password    string
24
+	httpClient  *http.Client
25
+	mu          sync.RWMutex
26
+	isRunning   bool
27
+	console     bool //配置文件里是否配置了终端显示,配置了后,每次保存es都在终端显示。方便调试。正式环境不能配置终端显示
28
+	stopChan    chan struct{}
29
+}
30
+
31
+// ====================== 全局单例 ======================
32
+
33
+var (
34
+	instance *esWriter
35
+	once     sync.Once
36
+	initErr  error
37
+)
38
+
39
+// initESWriter 初始化全局ES写入器
40
+func InitESWriter(serviceName, esURL, username, password string, console bool) error {
41
+	once.Do(func() {
42
+		instance = &esWriter{
43
+			serviceName: serviceName,
44
+			stopChan:    make(chan struct{}),
45
+			isRunning:   false,
46
+			console:     console,
47
+		}
48
+		// 初始化ES客户端
49
+		initErr = instance.initialize(esURL, username, password)
50
+	})
51
+
52
+	return initErr
53
+}
54
+
55
+// getESWriter 获取全局ES写入器 - 确保这个函数存在且导出
56
+func GetESWriter() *esWriter {
57
+	if instance == nil {
58
+		panic("ESWriter not initialized. Call initESWriter first")
59
+	}
60
+	return instance
61
+}
62
+
63
+// ====================== 核心方法 ======================
64
+
65
+// Write 实现io.Writer接口
66
+func (w *esWriter) Write(p []byte) (int, error) {
67
+	w.mu.RLock()
68
+	defer w.mu.RUnlock()
69
+
70
+	if !w.isRunning {
71
+		return len(p), nil
72
+	}
73
+
74
+	// 异步发送
75
+	go w.sendToES(p)
76
+
77
+	return len(p), nil
78
+}
79
+
80
+// ====================== 私有方法 ======================
81
+
82
+// initialize 初始化ES连接 - 使用HTTP实现
83
+func (w *esWriter) initialize(esURL, username, password string) error {
84
+	// 创建HTTP客户端
85
+	w.httpClient = &http.Client{
86
+		Timeout: 30 * time.Second,
87
+		Transport: &http.Transport{
88
+			MaxIdleConns:        100,
89
+			MaxIdleConnsPerHost: 100,
90
+			IdleConnTimeout:     90 * time.Second,
91
+		},
92
+	}
93
+
94
+	w.baseURL = strings.TrimSuffix(esURL, "/")
95
+	w.username = username
96
+	w.password = password
97
+
98
+	// 测试连接
99
+	if err := w.testConnection(); err != nil {
100
+		return fmt.Errorf("ES连接测试失败: %v", err)
101
+	}
102
+
103
+	w.isRunning = true
104
+
105
+	log.Printf("[ES] HTTP ES写入器已启动: %s", esURL)
106
+	return nil
107
+}
108
+
109
+// testConnection 测试连接
110
+func (w *esWriter) testConnection() error {
111
+	req, err := http.NewRequest("GET", w.baseURL, nil)
112
+	if err != nil {
113
+		return fmt.Errorf("创建请求失败: %v", err)
114
+	}
115
+
116
+	if w.username != "" && w.password != "" {
117
+		req.SetBasicAuth(w.username, w.password)
118
+	}
119
+
120
+	resp, err := w.httpClient.Do(req)
121
+	if err != nil {
122
+		return fmt.Errorf("连接失败: %v", err)
123
+	}
124
+	defer resp.Body.Close()
125
+
126
+	if resp.StatusCode != http.StatusOK {
127
+		body, _ := json.Marshal(resp.Header)
128
+		return fmt.Errorf("ES响应错误 [%d]: %s", resp.StatusCode, string(body))
129
+	}
130
+
131
+	return nil
132
+}
133
+
134
+// sendToES 发送日志到ES
135
+func (w *esWriter) sendToES(data []byte) {
136
+	// 解析JSON数据
137
+	var logEntry map[string]interface{}
138
+	if err := json.Unmarshal(data, &logEntry); err != nil {
139
+		log.Printf("[ES-ERROR] 解析日志JSON失败: %v", err)
140
+		return
141
+	}
142
+
143
+	// 创建索引名(按服务名和日期)
144
+	//indexName := strings.ToLower("log-"+w.serviceName) + "-" + time.Now().Format("2006-01-02")
145
+	indexName := helpers.GetIndexName(w.serviceName)
146
+	url := fmt.Sprintf("%s/%s/_doc", w.baseURL, indexName)
147
+
148
+	if w.console {
149
+		log.Printf("ES索引url: %s", url)
150
+	}
151
+
152
+	// 添加时间戳
153
+	if _, exists := logEntry["@timestamp"]; !exists {
154
+		logEntry["@timestamp"] = time.Now().Format(time.RFC3339)
155
+	}
156
+
157
+	// 转换为JSON
158
+	jsonData, err := json.Marshal(logEntry)
159
+	if err != nil {
160
+		log.Printf("[ES-ERROR] 编码JSON失败: %v", err)
161
+		return
162
+	}
163
+
164
+	// 创建请求
165
+	req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
166
+	if err != nil {
167
+		log.Printf("[ES-ERROR] 创建请求失败: %v", err)
168
+		return
169
+	}
170
+
171
+	req.Header.Set("Content-Type", "application/json")
172
+	if w.username != "" && w.password != "" {
173
+		req.SetBasicAuth(w.username, w.password)
174
+	}
175
+
176
+	// 发送请求
177
+	resp, err := w.httpClient.Do(req)
178
+	if err != nil {
179
+		log.Printf("[ES-ERROR] 发送请求失败: %v", err)
180
+		return
181
+	}
182
+
183
+	if w.console {
184
+		log.Printf("log save to es success: %s", indexName)
185
+	}
186
+
187
+	defer resp.Body.Close()
188
+
189
+	if resp.StatusCode >= 400 {
190
+		log.Printf("[ES-ERROR] 写入失败 [%d]: %s", resp.StatusCode, resp.Status)
191
+	}
192
+}
193
+
194
+// ====================== 公共方法 ======================
195
+
196
+// HealthCheck 健康检查
197
+func (w *esWriter) HealthCheck() bool {
198
+	w.mu.RLock()
199
+	defer w.mu.RUnlock()
200
+
201
+	if !w.isRunning {
202
+		return false
203
+	}
204
+
205
+	return w.testConnection() == nil
206
+}
207
+
208
+// Stop 停止写入器
209
+func (w *esWriter) stop() {
210
+	w.mu.Lock()
211
+	defer w.mu.Unlock()
212
+
213
+	if w.isRunning {
214
+		w.isRunning = false
215
+		close(w.stopChan)
216
+		log.Printf("[ES] ES写入器已停止")
217
+	}
218
+}
219
+
220
+// ====================== 包级别函数 ======================
221
+
222
+// StopESWriter 停止ES写入器
223
+func StopESWriter() {
224
+	if instance != nil {
225
+		instance.stop()
226
+	}
227
+}

+ 31
- 67
logger/runtime_logger.go Zobrazit soubor

@@ -2,7 +2,6 @@
2 2
 package logger
3 3
 
4 4
 import (
5
-	"encoding/json"
6 5
 	"log"
7 6
 	"os"
8 7
 	"strings"
@@ -10,6 +9,8 @@ import (
10 9
 	"time"
11 10
 
12 11
 	"git.x2erp.com/qdy/go-base/config/subconfigs"
12
+	"git.x2erp.com/qdy/go-base/logger/http"
13
+
13 14
 	"go.uber.org/zap"
14 15
 	"go.uber.org/zap/zapcore"
15 16
 	"gopkg.in/natefinch/lumberjack.v2"
@@ -30,67 +31,6 @@ type LogContext struct {
30 31
 	InstanceName string
31 32
 }
32 33
 
33
-// ESWriter ES写入器
34
-type ESWriter struct {
35
-	serviceName string
36
-	esURL       string
37
-}
38
-
39
-func NewESWriter(serviceName, esURL string) *ESWriter {
40
-	return &ESWriter{
41
-		serviceName: serviceName,
42
-		esURL:       esURL,
43
-	}
44
-}
45
-
46
-func (w *ESWriter) Write(p []byte) (n int, err error) {
47
-	// 解析日志数据
48
-	var logEntry map[string]interface{}
49
-	if err := json.Unmarshal(p, &logEntry); err != nil {
50
-		// 使用标准log输出到启动日志
51
-		log.Printf("[ES-WRITER] 解析日志JSON失败: %v", err)
52
-		return len(p), nil // 返回成功,不阻塞主流程
53
-	}
54
-
55
-	// 动态生成索引名:service-日期
56
-	indexName := strings.ToLower(w.serviceName) + "-" + time.Now().Format("2006-01-02")
57
-
58
-	// 使用标准log输出到启动日志
59
-	log.Printf("[ES-WRITER] 开始写入ES日志")
60
-	log.Printf("索引名称: %s", indexName)
61
-	log.Printf("数据长度: %d 字节", len(p))
62
-	log.Printf("服务名称: %s", w.serviceName)
63
-	log.Printf("写入时间: %s", time.Now().Format("2006-01-02 15:04:05"))
64
-
65
-	// TODO: 这里是实际写入ES的代码
66
-	// 这里只是示例,实际应该连接ES并写入数据
67
-	// esClient.Index().Index(indexName).BodyJson(logEntry).Do(ctx)
68
-
69
-	// 模拟写入ES(在开发环境可以打印到控制台查看)
70
-	w.simulateESWrite(indexName, logEntry)
71
-
72
-	return len(p), nil
73
-}
74
-
75
-// simulateESWrite 模拟ES写入(仅用于开发调试)
76
-func (w *ESWriter) simulateESWrite(indexName string, data map[string]interface{}) {
77
-	// 在开发环境,可以打印到控制台查看
78
-	if os.Getenv("GO_ENV") == "development" {
79
-		jsonData, _ := json.MarshalIndent(data, "", "  ")
80
-		log.Printf("[ES-SIMULATE] === ES写入模拟开始 ===")
81
-		log.Printf("索引名称: %s", indexName)
82
-		log.Printf("写入时间: %s", time.Now().Format("2006-01-02 15:04:05"))
83
-		log.Printf("jsonData: %s", string(jsonData))
84
-		//log.Printf("[ES-SIMULATE] 索引: %s\n数据:\n%s", indexName, string(jsonData))
85
-	}
86
-}
87
-
88
-func (w *ESWriter) Sync() error {
89
-	// ES客户端通常不需要同步
90
-	log.Printf("[ES-WRITER] 同步ES写入器")
91
-	return nil
92
-}
93
-
94 34
 // InitRuntimeLogger 初始化
95 35
 func InitRuntimeLogger(svcName string, logConfig *subconfigs.LogConfig) {
96 36
 	initOnce.Do(func() {
@@ -202,17 +142,36 @@ func createRuntimeLogger(svcName string, cfg *subconfigs.LogConfig) error {
202 142
 			}
203 143
 
204 144
 		case "es":
205
-			// ES输出
145
+			// 1. 检查必要配置
146
+			if cfg.ESPath == "" {
147
+				log.Printf("[RUNTIME-LOGGER] ES地址未配置,跳过ES输出")
148
+				break
149
+			}
150
+			// 判断是否包含console输出
151
+			containsConsole := strings.Contains(strings.ToLower(cfg.Output), "console")
152
+
153
+			// 2. 初始化全局ES写入器
154
+			err := http.InitESWriter(
155
+				svcName,
156
+				cfg.ESPath,
157
+				cfg.ESUsername, // 假设LogConfig已有这些字段
158
+				cfg.ESPassword,
159
+				containsConsole,
160
+			)
161
+
162
+			if err != nil {
163
+				log.Printf("[RUNTIME-LOGGER] ES写入器初始化失败: %v", err)
164
+				break
165
+			}
206 166
 
207
-			log.Printf("[RUNTIME-LOGGER] 初始化ES日志输出, 路径: %s", cfg.ESPath)
208
-			esWriter := NewESWriter(svcName, cfg.ESPath)
167
+			// 3. 创建zap core
209 168
 			esCore := zapcore.NewCore(
210 169
 				jsonEncoder,
211
-				zapcore.AddSync(esWriter),
170
+				zapcore.AddSync(http.GetESWriter()), // 使用全局实例
212 171
 				level,
213 172
 			)
214 173
 			cores = append(cores, esCore)
215
-
174
+			log.Printf("[RUNTIME-LOGGER] ES日志输出已启用: %s", cfg.ESPath)
216 175
 		}
217 176
 	}
218 177
 
@@ -340,3 +299,8 @@ func Sync() error {
340 299
 	}
341 300
 	return nil
342 301
 }
302
+
303
+// StopESWriter 停止ES写入器
304
+func StopESWriter() {
305
+	http.StopESWriter()
306
+}

Loading…
Zrušit
Uložit