package routes import ( "encoding/json" "fmt" "net/http" "strings" "time" "git.x2erp.com/qdy/go-base/authbase" "git.x2erp.com/qdy/go-base/webx" "git.x2erp.com/qdy/go-base/webx/router" "git.x2erp.com/qdy/go-svc-code/internal/opencode" ) // RegisterLogStreamRoutes 注册日志流路由 func RegisterLogStreamRoutes(ws *router.RouterService, webService *webx.WebService, process *opencode.Process, opencodePort int) { // 日志流需要直接处理 HTTP 流式响应,不能使用标准的路由包装 // 我们直接注册到 webService 的底层路由器 webService.GetRouter().Handle("/api/logs/stream", LogStreamHandler(process, opencodePort)) } // LogStreamHandler 日志流的 HTTP 处理器(已包含TokenAuth认证) func LogStreamHandler(process *opencode.Process, opencodePort int) http.HandlerFunc { // 创建内部处理器 handler := func(w http.ResponseWriter, r *http.Request) { fmt.Printf("🔍 [LogStreamHandler] 新日志流连接: %s %s\n", r.Method, r.URL.String()) fmt.Printf("🔍 [LogStreamHandler] 查询参数: %v\n", r.URL.Query()) // 设置 SSE 头 w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming unsupported", http.StatusInternalServerError) return } // 获取sessionId过滤参数 sessionId := r.URL.Query().Get("sessionId") if sessionId != "" { // 记录日志过滤设置 fmt.Fprintf(w, "data: 过滤日志,仅显示会话 %s 的日志\n\n", sessionId) flusher.Flush() } // 检查是否有可用的opencode进程 if process == nil { // 外部opencode模式,提供模拟日志流 fmt.Printf("🔍 [LogStreamHandler] 使用模拟日志流(外部opencode模式)\n") // 发送初始消息 fmt.Fprintf(w, "data: 外部 opencode 模式,模拟日志流已连接\n\n") flusher.Flush() fmt.Fprintf(w, "data: opencode 服务运行在端口 %d\n\n", opencodePort) flusher.Flush() fmt.Fprintf(w, "data: 当前时间: %s\n\n", time.Now().Format("2006-01-02 15:04:05")) flusher.Flush() // 监听客户端断开连接 for { select { case <-r.Context().Done(): // 客户端断开连接 return case <-time.After(30 * time.Second): // 发送JSON格式的心跳事件保持连接 heartbeatEvent := map[string]interface{}{ "payload": map[string]interface{}{ "type": "server.heartbeat", "properties": map[string]interface{}{ "timestamp": time.Now().Format(time.RFC3339), }, }, } jsonData, _ := json.Marshal(heartbeatEvent) fmt.Fprintf(w, "data: %s\n\n", string(jsonData)) flusher.Flush() } } } // 获取日志通道 logChan := process.GetLogs() if logChan == nil { http.Error(w, "日志通道不可用", http.StatusInternalServerError) return } // 发送初始消息 fmt.Fprintf(w, "data: 连接到 opencode 日志流\n\n") flusher.Flush() // 监听日志通道 for { select { case log, ok := <-logChan: if !ok { // 通道关闭 fmt.Fprintf(w, "data: 日志流已结束\n\n") flusher.Flush() return } // 根据sessionId过滤日志 if sessionId != "" && !strings.Contains(log, sessionId) { // 日志不包含sessionId,跳过 if strings.Contains(log, "session") { fmt.Printf("🔍 [LogStreamHandler] 跳过不匹配的日志 (sessionId=%s): %s\n", sessionId, log) } continue } // 发送日志 fmt.Printf("🔍 [LogStreamHandler] 发送日志: %s\n", log) fmt.Fprintf(w, "data: %s\n\n", log) flusher.Flush() case <-r.Context().Done(): // 客户端断开连接 return case <-time.After(30 * time.Second): // 发送JSON格式的心跳事件保持连接 heartbeatEvent := map[string]interface{}{ "payload": map[string]interface{}{ "type": "server.heartbeat", "properties": map[string]interface{}{ "timestamp": time.Now().Format(time.RFC3339), }, }, } jsonData, _ := json.Marshal(heartbeatEvent) fmt.Fprintf(w, "data: %s\n\n", string(jsonData)) flusher.Flush() } } } // 包装TokenAuth中间件 return authbase.TokenAuth(http.HandlerFunc(handler)).ServeHTTP }