Browse Source

Release v0.2.2803

qdy 3 weeks ago
commit
30dbbc70ea
2 changed files with 147 additions and 4 deletions
  1. 71
    4
      internal/opencode/direct_client.go
  2. 76
    0
      internal/routes/prompt_stream_routes.go

+ 71
- 4
internal/opencode/direct_client.go View File

@@ -9,6 +9,7 @@ import (
9 9
 	"io"
10 10
 	"net/http"
11 11
 	"os"
12
+	"path/filepath"
12 13
 	"strings"
13 14
 	"time"
14 15
 )
@@ -213,7 +214,12 @@ func (c *DirectClient) SendPromptStream(ctx context.Context, sessionID string, p
213 214
 	fmt.Printf("🔍 [opencode.DirectClient] 请求 URL: %s\n", url)
214 215
 	fmt.Printf("🔍 [opencode.DirectClient] 请求头: %v\n", req.Header)
215 216
 
216
-	resp, err := c.httpClient.Do(req)
217
+	// 为SSE流创建独立的httpClient,不设置超时限制
218
+	sseClient := &http.Client{
219
+		// 不设置Timeout,允许长连接
220
+		// Timeout: 0 表示无超时限制
221
+	}
222
+	resp, err := sseClient.Do(req)
217 223
 	if err != nil {
218 224
 		return nil, fmt.Errorf("发送请求失败: %w", err)
219 225
 	}
@@ -249,7 +255,12 @@ func (c *DirectClient) SendPromptStream(ctx context.Context, sessionID string, p
249 255
 				if err == io.EOF {
250 256
 					fmt.Printf("🔍 [opencode.DirectClient] SSE流结束,共收到 %d 个事件\n", eventCount)
251 257
 				} else {
252
-					fmt.Printf("🔍 [opencode.DirectClient] 读取错误: %v\n", err)
258
+					// 区分正常取消和错误
259
+					if ctx.Err() != nil {
260
+						fmt.Printf("🔍 [opencode.DirectClient] SSE流正常结束(上下文取消)\n")
261
+					} else {
262
+						fmt.Printf("🔍 [opencode.DirectClient] 读取错误: %v\n", err)
263
+					}
253 264
 				}
254 265
 				return
255 266
 			}
@@ -264,6 +275,9 @@ func (c *DirectClient) SendPromptStream(ctx context.Context, sessionID string, p
264 275
 				eventCount++
265 276
 				fmt.Printf("🔍 [opencode.DirectClient] 收到SSE数据[%d]: %s\n", eventCount, data)
266 277
 
278
+				// 写入日志文件用于分析
279
+				writeStreamLog(sessionID, data)
280
+
267 281
 				select {
268 282
 				case ch <- data:
269 283
 				case <-ctx.Done():
@@ -290,7 +304,13 @@ func (c *DirectClient) subscribeGlobalEvents(ctx context.Context) (<-chan string
290 304
 
291 305
 	fmt.Printf("🔍 [opencode.DirectClient] 订阅全局事件流: %s\n", url)
292 306
 
293
-	resp, err := c.httpClient.Do(req)
307
+	// 为SSE连接创建独立的httpClient,不设置超时限制
308
+	sseClient := &http.Client{
309
+		// 不设置Timeout,允许长连接
310
+		// Timeout: 0 表示无超时限制
311
+	}
312
+
313
+	resp, err := sseClient.Do(req)
294 314
 	if err != nil {
295 315
 		return nil, fmt.Errorf("订阅事件流失败: %w", err)
296 316
 	}
@@ -315,6 +335,9 @@ func (c *DirectClient) subscribeGlobalEvents(ctx context.Context) (<-chan string
315 335
 				eventCount++
316 336
 				fmt.Printf("🔍 [opencode.DirectClient] 收到全局事件[%d]: %s\n", eventCount, data)
317 337
 
338
+				// 写入日志文件用于分析
339
+				writeStreamLog("", data)
340
+
318 341
 				select {
319 342
 				case ch <- data:
320 343
 				case <-ctx.Done():
@@ -325,7 +348,12 @@ func (c *DirectClient) subscribeGlobalEvents(ctx context.Context) (<-chan string
325 348
 		}
326 349
 
327 350
 		if err := scanner.Err(); err != nil {
328
-			fmt.Printf("🔍 [opencode.DirectClient] 扫描事件流错误: %v\n", err)
351
+			// 区分正常取消和错误
352
+			if ctx.Err() != nil {
353
+				fmt.Printf("🔍 [opencode.DirectClient] 全局事件流正常结束(上下文取消)\n")
354
+			} else {
355
+				fmt.Printf("🔍 [opencode.DirectClient] 扫描事件流错误: %v\n", err)
356
+			}
329 357
 		}
330 358
 	}()
331 359
 
@@ -397,3 +425,42 @@ func (c *DirectClient) GetBaseURL() string {
397 425
 func (c *DirectClient) GetPort() int {
398 426
 	return c.port
399 427
 }
428
+
429
+// writeStreamLog 将流式数据写入日志文件用于分析
430
+func writeStreamLog(sessionID string, data string) {
431
+	// 创建日志目录
432
+	logDir := "/Users/kenqdy/Documents/v-bdx-workspace/svc-code/logs"
433
+	if err := os.MkdirAll(logDir, 0755); err != nil {
434
+		fmt.Printf("🔍 [opencode-direct-client] 创建日志目录失败: %v\n", err)
435
+		return
436
+	}
437
+
438
+	// 生成日志文件名,按日期和会话ID组织
439
+	dateStr := time.Now().Format("20060102")
440
+	hourStr := time.Now().Format("15") // 小时
441
+	var filename string
442
+
443
+	if sessionID == "" {
444
+		// 全局事件按小时组织
445
+		filename = fmt.Sprintf("stream-global-%s-%s.log", dateStr, hourStr)
446
+	} else {
447
+		// 会话事件按会话ID和日期组织
448
+		filename = fmt.Sprintf("stream-session-%s-%s.log", sessionID, dateStr)
449
+	}
450
+
451
+	filepath := filepath.Join(logDir, filename)
452
+
453
+	// 追加写入数据
454
+	file, err := os.OpenFile(filepath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
455
+	if err != nil {
456
+		fmt.Printf("🔍 [opencode-direct-client] 打开日志文件失败: %v\n", err)
457
+		return
458
+	}
459
+	defer file.Close()
460
+
461
+	// 写入时间戳和数据
462
+	logLine := fmt.Sprintf("[%s] %s\n", time.Now().Format("15:04:05.000"), data)
463
+	if _, err := file.WriteString(logLine); err != nil {
464
+		fmt.Printf("🔍 [opencode-direct-client] 写入日志失败: %v\n", err)
465
+	}
466
+}

+ 76
- 0
internal/routes/prompt_stream_routes.go View File

@@ -108,6 +108,14 @@ func StreamPromptHandler(client opencode.OpenCodeClient) http.HandlerFunc {
108 108
 					// 尝试解析为JSON,检查是否已有payload字段
109 109
 					var jsonData interface{}
110 110
 					if err := json.Unmarshal([]byte(data), &jsonData); err == nil {
111
+						// 去重处理:移除message.updated事件中的重复content,并过滤不必要的事件
112
+						jsonData = removeDuplicateContent(jsonData)
113
+
114
+						// 如果返回nil,跳过此事件
115
+						if jsonData == nil {
116
+							continue
117
+						}
118
+
111 119
 						// 检查是否是对象且包含payload字段
112 120
 						if obj, ok := jsonData.(map[string]interface{}); ok && obj["payload"] != nil {
113 121
 							// 已有payload字段,直接发送原始数据
@@ -152,3 +160,71 @@ func BindJSON(r *http.Request, v interface{}) error {
152 160
 
153 161
 	return json.Unmarshal(body, v)
154 162
 }
163
+
164
+// removeDuplicateContent 移除message.updated事件中的重复content,避免前端重复显示
165
+func removeDuplicateContent(data interface{}) interface{} {
166
+	// 检查是否为map
167
+	obj, ok := data.(map[string]interface{})
168
+	if !ok {
169
+		return data
170
+	}
171
+
172
+	// 递归处理payload字段
173
+	if payload, ok := obj["payload"].(map[string]interface{}); ok {
174
+		obj["payload"] = removeDuplicateContent(payload)
175
+	}
176
+
177
+	// 如果payload是数组(可能嵌套),处理每个元素
178
+	if payloadArr, ok := obj["payload"].([]interface{}); ok {
179
+		for i, item := range payloadArr {
180
+			if itemMap, ok := item.(map[string]interface{}); ok {
181
+				payloadArr[i] = removeDuplicateContent(itemMap)
182
+			}
183
+		}
184
+	}
185
+
186
+	// 检查type字段
187
+	typeVal, hasType := obj["type"]
188
+	if !hasType {
189
+		return obj
190
+	}
191
+
192
+	typeStr, ok := typeVal.(string)
193
+	if !ok {
194
+		return obj
195
+	}
196
+
197
+	// 事件过滤策略:减少发送给前端的事件数量
198
+	switch typeStr {
199
+	case "message.updated":
200
+		// 检查是否有properties字段
201
+		if properties, ok := obj["properties"].(map[string]interface{}); ok {
202
+			if info, ok := properties["info"].(map[string]interface{}); ok {
203
+				// 移除content字段,避免重复
204
+				delete(info, "content")
205
+
206
+				// 检查是否有completed时间,如果没有则过滤掉(只发送最终状态)
207
+				if timeInfo, ok := info["time"].(map[string]interface{}); ok {
208
+					if timeInfo["completed"] == nil {
209
+						// 没有completed时间,这是中间状态,过滤掉
210
+						return nil
211
+					}
212
+				}
213
+			}
214
+		}
215
+	case "session.status":
216
+		// session.status事件很频繁但前端可能不需要,过滤掉
217
+		return nil
218
+	case "session.diff":
219
+		// session.diff事件通常为空,过滤掉
220
+		return nil
221
+	case "server.heartbeat":
222
+		// 心跳事件,过滤掉
223
+		return nil
224
+	case "session.idle":
225
+		// 空闲事件,过滤掉
226
+		return nil
227
+	}
228
+
229
+	return obj
230
+}

Loading…
Cancel
Save