package routes import ( "context" "encoding/json" "fmt" "io" "net/http" "time" "git.x2erp.com/qdy/go-base/authbase" "git.x2erp.com/qdy/go-base/webx" "git.x2erp.com/qdy/go-base/webx/router" "git.x2erp.com/qdy/go-svc-code/internal/opencode" ) // PromptStreamRequest 流式对话请求 type PromptStreamRequest struct { SessionID string `json:"sessionID" binding:"required"` Parts []opencode.TextPart `json:"parts" binding:"required"` Agent string `json:"agent,omitempty"` Model *opencode.ModelInfo `json:"model,omitempty"` } // RegisterPromptStreamRoutes 注册流式对话路由 func RegisterPromptStreamRoutes(ws *router.RouterService, webService *webx.WebService, client opencode.OpenCodeClient) { // 流式对话需要直接处理 HTTP 流式响应,不能使用标准的路由包装 // 我们直接注册到 webService 的底层路由器 webService.GetRouter().Handle("/api/prompt/stream", StreamPromptHandler(client)) } // StreamPromptHandler 流式对话的 HTTP 处理器(已包含TokenAuth认证) func StreamPromptHandler(client opencode.OpenCodeClient) http.HandlerFunc { // 创建内部处理器 handler := func(w http.ResponseWriter, r *http.Request) { fmt.Printf("🔍 [StreamPromptHandler] 收到流式对话请求: %s %s\n", r.Method, r.URL.String()) // 解析请求 var req PromptStreamRequest if err := BindJSON(r, &req); err != nil { fmt.Printf("🔍 [StreamPromptHandler] 解析请求失败: %v\n", err) http.Error(w, fmt.Sprintf("解析请求失败: %v", err), http.StatusBadRequest) return } fmt.Printf("🔍 [StreamPromptHandler] 请求数据: sessionID=%s, agent=%v, parts=%d\n", req.SessionID, req.Agent, len(req.Parts)) if len(req.Parts) > 0 && req.Parts[0].Text != "" { fmt.Printf("🔍 [StreamPromptHandler] 用户消息: %s\n", req.Parts[0].Text) } // 创建 prompt 请求 prompt := &opencode.PromptRequest{ Parts: req.Parts, Agent: req.Agent, Model: req.Model, } // 设置 SSE 头 w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") // 创建带超时的上下文 ctx, cancel := context.WithTimeout(r.Context(), 5*time.Minute) defer cancel() fmt.Printf("🔍 [StreamPromptHandler] 调用 SendPromptStream, sessionID=%s\n", req.SessionID) // 获取流式响应通道 ch, err := client.SendPromptStream(ctx, req.SessionID, prompt) if err != nil { fmt.Printf("🔍 [StreamPromptHandler] 发送流式请求失败: %v\n", err) http.Error(w, fmt.Sprintf("发送流式请求失败: %v", err), http.StatusInternalServerError) return } fmt.Printf("🔍 [StreamPromptHandler] 成功获取流式响应通道\n") // 发送流式响应 flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming unsupported", http.StatusInternalServerError) return } fmt.Printf("🔍 [StreamPromptHandler] 开始发送流式响应\n") eventCount := 0 for { select { case data, ok := <-ch: if !ok { // 通道关闭,发送结束标记 fmt.Printf("🔍 [StreamPromptHandler] 流式通道关闭,发送DONE标记,共发送 %d 个事件\n", eventCount) fmt.Fprintf(w, "data: [DONE]\n\n") flusher.Flush() return } // 发送 SSE 数据 eventCount++ fmt.Printf("🔍 [StreamPromptHandler] 发送SSE数据[%d]: %s\n", eventCount, data) // 发送 SSE 数据,opencode 数据已包含 payload 字段,不需要额外包装 var wrappedData string if data == "[DONE]" { wrappedData = "[DONE]" } else { // 尝试解析为JSON,检查是否已有payload字段 var jsonData interface{} if err := json.Unmarshal([]byte(data), &jsonData); err == nil { // 检查是否是对象且包含payload字段 if obj, ok := jsonData.(map[string]interface{}); ok && obj["payload"] != nil { // 已有payload字段,直接发送原始数据 wrappedData = data } else { // 没有payload字段,包装在payload对象中 wrapped := map[string]interface{}{ "payload": jsonData, } wrappedBytes, _ := json.Marshal(wrapped) wrappedData = string(wrappedBytes) } } else { // 不是JSON,按原样发送 wrappedData = data } } fmt.Fprintf(w, "data: %s\n\n", wrappedData) flusher.Flush() case <-ctx.Done(): fmt.Printf("🔍 [StreamPromptHandler] 上下文超时\n") return case <-r.Context().Done(): fmt.Printf("🔍 [StreamPromptHandler] 客户端断开连接\n") return } } } // 包装TokenAuth中间件 return authbase.TokenAuth(http.HandlerFunc(handler)).ServeHTTP } // BindJSON 简单的 JSON 绑定函数 func BindJSON(r *http.Request, v interface{}) error { body, err := io.ReadAll(r.Body) if err != nil { return err } defer r.Body.Close() return json.Unmarshal(body, v) }