package main import ( "bufio" "bytes" "encoding/json" "fmt" "io" "net/http" "strings" "testing" "time" ) // TestLogsStream 测试日志流API func TestLogsStream(t *testing.T) { // 获取svc-code服务地址 svcCodeURL := "http://localhost:8020" // 检查服务是否运行 if !checkServiceRunningForLogsTest(t, svcCodeURL) { t.Skipf("svc-code服务未运行在 %s,跳过测试", svcCodeURL) } // 1. 用户登录获取token token, err := loginForLogsTest(t, svcCodeURL) if err != nil { t.Fatalf("登录失败: %v", err) } t.Logf("获取到Token: %s...", token[:minLogsInt(20, len(token))]) // 2. 连接日志流 eventCount, err := connectToLogsStream(t, svcCodeURL, token, "") if err != nil { t.Fatalf("连接日志流失败: %v", err) } // 3. 验证日志流 if eventCount == 0 { t.Log("警告: 未收到任何日志事件(可能是OpenCode未产生日志)") } else { t.Logf("成功收到 %d 个日志事件", eventCount) } // 4. 测试带sessionId过滤的日志流 t.Run("WithSessionFilter", func(t *testing.T) { testLogsStreamWithSessionFilter(t, svcCodeURL, token) }) } // connectToLogsStream 连接日志流并统计事件数量 func connectToLogsStream(t *testing.T, svcCodeURL, token, sessionId string) (int, error) { url := svcCodeURL + "/api/logs/stream" if sessionId != "" { url += "?sessionId=" + sessionId } req, err := http.NewRequest("GET", url, nil) if err != nil { return 0, fmt.Errorf("创建请求失败: %w", err) } req.Header.Set("Authorization", "Bearer "+token) req.Header.Set("Accept", "text/event-stream") client := &http.Client{Timeout: 15 * time.Second} resp, err := client.Do(req) if err != nil { return 0, fmt.Errorf("HTTP请求失败: %w", err) } defer resp.Body.Close() t.Logf("日志流响应状态: %d %s", resp.StatusCode, resp.Status) t.Logf("Content-Type: %s", resp.Header.Get("content-type")) if resp.StatusCode != http.StatusOK { bodyBytes, _ := io.ReadAll(resp.Body) return 0, fmt.Errorf("请求失败 (状态码 %d): %s", resp.StatusCode, string(bodyBytes)) } // 解析SSE响应 reader := bufio.NewReader(resp.Body) eventCount := 0 timeout := time.After(10 * time.Second) for { select { case <-timeout: t.Log("日志流读取超时") return eventCount, nil default: line, err := reader.ReadString('\n') if err != nil { if err == io.EOF { t.Log("日志流结束") return eventCount, nil } return eventCount, fmt.Errorf("读取响应失败: %w", err) } line = strings.TrimSpace(line) if line == "" { continue } if strings.HasPrefix(line, "data: ") { data := strings.TrimPrefix(line, "data: ") eventCount++ t.Logf("收到日志[%d]: %s", eventCount, data) } else if strings.HasPrefix(line, ":") { // 心跳消息,忽略 continue } } } } // testLogsStreamWithSessionFilter 测试带sessionId过滤的日志流 func testLogsStreamWithSessionFilter(t *testing.T, svcCodeURL, token string) { // 先创建一个会话,获取sessionId sessionID, err := createSessionForLogsTest(t, svcCodeURL, token, "日志流测试会话") if err != nil { t.Logf("创建会话失败: %v (跳过过滤测试)", err) return } t.Logf("使用sessionId过滤日志: %s", sessionID) eventCount, err := connectToLogsStream(t, svcCodeURL, token, sessionID) if err != nil { t.Errorf("带过滤的日志流连接失败: %v", err) return } t.Logf("过滤日志流收到 %d 个事件", eventCount) } // createSessionForLogsTest 创建会话(日志测试专用) func createSessionForLogsTest(t *testing.T, svcCodeURL, token, title string) (string, error) { url := svcCodeURL + "/api/session/create" sessionData := map[string]string{ "title": title, } jsonData, _ := json.Marshal(sessionData) req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) if err != nil { return "", fmt.Errorf("创建请求失败: %v", err) } req.Header.Set("Authorization", "Bearer "+token) req.Header.Set("Content-Type", "application/json") client := &http.Client{Timeout: 10 * time.Second} resp, err := client.Do(req) if err != nil { return "", fmt.Errorf("HTTP请求失败: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { bodyBytes, _ := io.ReadAll(resp.Body) return "", fmt.Errorf("创建会话失败 (状态码 %d): %s", resp.StatusCode, string(bodyBytes)) } var result struct { Success bool `json:"success"` Data struct { ID string `json:"id"` } `json:"data"` Message string `json:"message"` } bodyBytes, _ := io.ReadAll(resp.Body) if err := json.Unmarshal(bodyBytes, &result); err != nil { return "", fmt.Errorf("解析响应失败: %v", err) } if !result.Success { return "", fmt.Errorf("创建会话失败: %s", result.Message) } return result.Data.ID, nil } // loginForLogsTest 登录获取token(日志测试专用) func loginForLogsTest(t *testing.T, svcCodeURL string) (string, error) { loginURL := svcCodeURL + "/api/auth/login" loginData := map[string]string{ "user_id": "test-user-001", "password": "password123", } jsonData, _ := json.Marshal(loginData) resp, err := http.Post(loginURL, "application/json", bytes.NewBuffer(jsonData)) if err != nil { return "", fmt.Errorf("登录请求失败: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { bodyBytes, _ := io.ReadAll(resp.Body) return "", fmt.Errorf("登录失败 (状态码 %d): %s", resp.StatusCode, string(bodyBytes)) } var result struct { Success bool `json:"success"` Data string `json:"data"` Message string `json:"message"` } bodyBytes, _ := io.ReadAll(resp.Body) if err := json.Unmarshal(bodyBytes, &result); err != nil { return "", fmt.Errorf("解析响应失败: %v", err) } if !result.Success { return "", fmt.Errorf("登录失败: %s", result.Message) } return result.Data, nil } // checkServiceRunningForLogsTest 检查服务是否运行(日志测试专用) func checkServiceRunningForLogsTest(t *testing.T, url string) bool { client := &http.Client{Timeout: 3 * time.Second} resp, err := client.Get(url + "/api/health") if err != nil { // 尝试其他端点 resp, err = client.Get(url) if err != nil { return false } } defer resp.Body.Close() return resp.StatusCode == http.StatusOK || resp.StatusCode == 404 } // minLogsInt 返回两个整数的最小值(日志测试专用) func minLogsInt(a, b int) int { if a < b { return a } return b }