| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- package main
-
- import (
- "bufio"
- "bytes"
- "encoding/json"
- "fmt"
- "io"
- "net/http"
- "strings"
- "testing"
- "time"
- )
-
- // TestLogsStream 测试日志流API
- func TestLogsStream(t *testing.T) {
- // 获取svc-code服务地址
- svcCodeURL := "http://localhost:8020"
-
- // 检查服务是否运行
- if !checkServiceRunningForLogsTest(t, svcCodeURL) {
- t.Skipf("svc-code服务未运行在 %s,跳过测试", svcCodeURL)
- }
-
- // 1. 用户登录获取token
- token, err := loginForLogsTest(t, svcCodeURL)
- if err != nil {
- t.Fatalf("登录失败: %v", err)
- }
- t.Logf("获取到Token: %s...", token[:minLogsInt(20, len(token))])
-
- // 2. 连接日志流
- eventCount, err := connectToLogsStream(t, svcCodeURL, token, "")
- if err != nil {
- t.Fatalf("连接日志流失败: %v", err)
- }
-
- // 3. 验证日志流
- if eventCount == 0 {
- t.Log("警告: 未收到任何日志事件(可能是OpenCode未产生日志)")
- } else {
- t.Logf("成功收到 %d 个日志事件", eventCount)
- }
-
- // 4. 测试带sessionId过滤的日志流
- t.Run("WithSessionFilter", func(t *testing.T) {
- testLogsStreamWithSessionFilter(t, svcCodeURL, token)
- })
- }
-
- // connectToLogsStream 连接日志流并统计事件数量
- func connectToLogsStream(t *testing.T, svcCodeURL, token, sessionId string) (int, error) {
- url := svcCodeURL + "/api/logs/stream"
- if sessionId != "" {
- url += "?sessionId=" + sessionId
- }
-
- req, err := http.NewRequest("GET", url, nil)
- if err != nil {
- return 0, fmt.Errorf("创建请求失败: %w", err)
- }
- req.Header.Set("Authorization", "Bearer "+token)
- req.Header.Set("Accept", "text/event-stream")
-
- client := &http.Client{Timeout: 15 * time.Second}
- resp, err := client.Do(req)
- if err != nil {
- return 0, fmt.Errorf("HTTP请求失败: %w", err)
- }
- defer resp.Body.Close()
-
- t.Logf("日志流响应状态: %d %s", resp.StatusCode, resp.Status)
- t.Logf("Content-Type: %s", resp.Header.Get("content-type"))
-
- if resp.StatusCode != http.StatusOK {
- bodyBytes, _ := io.ReadAll(resp.Body)
- return 0, fmt.Errorf("请求失败 (状态码 %d): %s", resp.StatusCode, string(bodyBytes))
- }
-
- // 解析SSE响应
- reader := bufio.NewReader(resp.Body)
- eventCount := 0
- timeout := time.After(10 * time.Second)
-
- for {
- select {
- case <-timeout:
- t.Log("日志流读取超时")
- return eventCount, nil
- default:
- line, err := reader.ReadString('\n')
- if err != nil {
- if err == io.EOF {
- t.Log("日志流结束")
- return eventCount, nil
- }
- return eventCount, fmt.Errorf("读取响应失败: %w", err)
- }
-
- line = strings.TrimSpace(line)
- if line == "" {
- continue
- }
-
- if strings.HasPrefix(line, "data: ") {
- data := strings.TrimPrefix(line, "data: ")
- eventCount++
- t.Logf("收到日志[%d]: %s", eventCount, data)
- } else if strings.HasPrefix(line, ":") {
- // 心跳消息,忽略
- continue
- }
- }
- }
- }
-
- // testLogsStreamWithSessionFilter 测试带sessionId过滤的日志流
- func testLogsStreamWithSessionFilter(t *testing.T, svcCodeURL, token string) {
- // 先创建一个会话,获取sessionId
- sessionID, err := createSessionForLogsTest(t, svcCodeURL, token, "日志流测试会话")
- if err != nil {
- t.Logf("创建会话失败: %v (跳过过滤测试)", err)
- return
- }
-
- t.Logf("使用sessionId过滤日志: %s", sessionID)
- eventCount, err := connectToLogsStream(t, svcCodeURL, token, sessionID)
- if err != nil {
- t.Errorf("带过滤的日志流连接失败: %v", err)
- return
- }
-
- t.Logf("过滤日志流收到 %d 个事件", eventCount)
- }
-
- // createSessionForLogsTest 创建会话(日志测试专用)
- func createSessionForLogsTest(t *testing.T, svcCodeURL, token, title string) (string, error) {
- url := svcCodeURL + "/api/session/create"
- sessionData := map[string]string{
- "title": title,
- }
- jsonData, _ := json.Marshal(sessionData)
-
- req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
- if err != nil {
- return "", fmt.Errorf("创建请求失败: %v", err)
- }
- req.Header.Set("Authorization", "Bearer "+token)
- req.Header.Set("Content-Type", "application/json")
-
- client := &http.Client{Timeout: 10 * time.Second}
- resp, err := client.Do(req)
- if err != nil {
- return "", fmt.Errorf("HTTP请求失败: %v", err)
- }
- defer resp.Body.Close()
-
- if resp.StatusCode != http.StatusOK {
- bodyBytes, _ := io.ReadAll(resp.Body)
- return "", fmt.Errorf("创建会话失败 (状态码 %d): %s", resp.StatusCode, string(bodyBytes))
- }
-
- var result struct {
- Success bool `json:"success"`
- Data struct {
- ID string `json:"id"`
- } `json:"data"`
- Message string `json:"message"`
- }
-
- bodyBytes, _ := io.ReadAll(resp.Body)
- if err := json.Unmarshal(bodyBytes, &result); err != nil {
- return "", fmt.Errorf("解析响应失败: %v", err)
- }
-
- if !result.Success {
- return "", fmt.Errorf("创建会话失败: %s", result.Message)
- }
-
- return result.Data.ID, nil
- }
-
- // loginForLogsTest 登录获取token(日志测试专用)
- func loginForLogsTest(t *testing.T, svcCodeURL string) (string, error) {
- loginURL := svcCodeURL + "/api/auth/login"
- loginData := map[string]string{
- "user_id": "test-user-001",
- "password": "password123",
- }
- jsonData, _ := json.Marshal(loginData)
-
- resp, err := http.Post(loginURL, "application/json", bytes.NewBuffer(jsonData))
- if err != nil {
- return "", fmt.Errorf("登录请求失败: %v", err)
- }
- defer resp.Body.Close()
-
- if resp.StatusCode != http.StatusOK {
- bodyBytes, _ := io.ReadAll(resp.Body)
- return "", fmt.Errorf("登录失败 (状态码 %d): %s", resp.StatusCode, string(bodyBytes))
- }
-
- var result struct {
- Success bool `json:"success"`
- Data string `json:"data"`
- Message string `json:"message"`
- }
-
- bodyBytes, _ := io.ReadAll(resp.Body)
- if err := json.Unmarshal(bodyBytes, &result); err != nil {
- return "", fmt.Errorf("解析响应失败: %v", err)
- }
-
- if !result.Success {
- return "", fmt.Errorf("登录失败: %s", result.Message)
- }
-
- return result.Data, nil
- }
-
- // checkServiceRunningForLogsTest 检查服务是否运行(日志测试专用)
- func checkServiceRunningForLogsTest(t *testing.T, url string) bool {
- client := &http.Client{Timeout: 3 * time.Second}
- resp, err := client.Get(url + "/api/health")
- if err != nil {
- // 尝试其他端点
- resp, err = client.Get(url)
- if err != nil {
- return false
- }
- }
- defer resp.Body.Close()
- return resp.StatusCode == http.StatusOK || resp.StatusCode == 404
- }
-
- // minLogsInt 返回两个整数的最小值(日志测试专用)
- func minLogsInt(a, b int) int {
- if a < b {
- return a
- }
- return b
- }
|