Nav apraksta
Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

events_stream.go 5.3KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. package api
  2. import (
  3. "bufio"
  4. "bytes"
  5. "encoding/json"
  6. "fmt"
  7. "log"
  8. "net/http"
  9. "strings"
  10. "time"
  11. "git.x2erp.com/qdy/go-svc-code/internal/opencode/container"
  12. )
  13. // EventsStreamHandler 创建OpenCode事件流代理处理器,包含会话过滤
  14. func EventsStreamHandler(manager *container.InstanceManager) http.HandlerFunc {
  15. return func(w http.ResponseWriter, r *http.Request) {
  16. // 从URL路径中提取项目ID
  17. // 路径格式:/api/opencode/projects/{id}/events
  18. pathParts := strings.Split(strings.Trim(r.URL.Path, "/"), "/")
  19. if len(pathParts) < 5 {
  20. http.Error(w, "无效的路径格式", http.StatusBadRequest)
  21. return
  22. }
  23. projectID := pathParts[3]
  24. // 从查询参数获取会话ID
  25. sessionID := r.URL.Query().Get("session_id")
  26. if sessionID == "" {
  27. sessionID = "default"
  28. }
  29. log.Printf("OpenCode事件流请求: 项目ID=%s, 会话ID=%s", projectID, sessionID)
  30. // 获取实例
  31. instance := manager.GetInstance(projectID)
  32. if instance == nil {
  33. http.Error(w, fmt.Sprintf("项目 %s 的OpenCode实例未启动", projectID), http.StatusNotFound)
  34. return
  35. }
  36. if instance.Status != container.StatusRunning {
  37. http.Error(w, fmt.Sprintf("项目 %s 的OpenCode实例不在运行状态", projectID), http.StatusServiceUnavailable)
  38. return
  39. }
  40. // 构建OpenCode事件流URL
  41. eventsURL := fmt.Sprintf("http://localhost:%d/api/sessions/%s/events", instance.Port, sessionID)
  42. // 创建HTTP请求到OpenCode实例
  43. req, err := http.NewRequest("GET", eventsURL, nil)
  44. if err != nil {
  45. http.Error(w, fmt.Sprintf("创建HTTP请求失败: %v", err), http.StatusInternalServerError)
  46. return
  47. }
  48. // 复制原始请求的头
  49. for name, values := range r.Header {
  50. for _, value := range values {
  51. req.Header.Add(name, value)
  52. }
  53. }
  54. // 添加OpenCode认证头(如果需要)
  55. if instance.Token != "" {
  56. req.Header.Set("Authorization", "Bearer "+instance.Token)
  57. }
  58. // 发送请求
  59. client := &http.Client{}
  60. resp, err := client.Do(req)
  61. if err != nil {
  62. http.Error(w, fmt.Sprintf("连接到OpenCode事件流失败: %v", err), http.StatusBadGateway)
  63. return
  64. }
  65. defer resp.Body.Close()
  66. // 检查响应状态
  67. if resp.StatusCode != http.StatusOK {
  68. http.Error(w, fmt.Sprintf("OpenCode事件流返回错误状态码: %d", resp.StatusCode), http.StatusBadGateway)
  69. return
  70. }
  71. // 设置响应头
  72. w.Header().Set("Content-Type", "text/event-stream")
  73. w.Header().Set("Cache-Control", "no-cache")
  74. w.Header().Set("Connection", "keep-alive")
  75. w.Header().Set("Access-Control-Allow-Origin", "*")
  76. flusher, ok := w.(http.Flusher)
  77. if !ok {
  78. http.Error(w, "流式响应不支持", http.StatusInternalServerError)
  79. return
  80. }
  81. // 建立连接事件
  82. connectEvent := map[string]interface{}{
  83. "type": "connected",
  84. "project_id": projectID,
  85. "session_id": sessionID,
  86. "timestamp": timestamp(),
  87. }
  88. sendSSEEvent(w, flusher, connectEvent)
  89. // 使用bufio.Scanner读取SSE事件流
  90. scanner := bufio.NewScanner(resp.Body)
  91. scanner.Buffer(make([]byte, 64*1024), 1024*1024) // 增加缓冲区大小
  92. var eventBuffer bytes.Buffer
  93. var inEvent bool
  94. for scanner.Scan() {
  95. line := scanner.Text()
  96. if line == "" {
  97. // 空行表示事件结束
  98. if inEvent && eventBuffer.Len() > 0 {
  99. data := eventBuffer.String()
  100. eventBuffer.Reset()
  101. inEvent = false
  102. // 处理SSE事件
  103. processedEvent := processSSEEvent(projectID, sessionID, data)
  104. if processedEvent != "" {
  105. // 发送处理后的事件
  106. fmt.Fprintf(w, "%s\n", processedEvent)
  107. flusher.Flush()
  108. }
  109. }
  110. continue
  111. }
  112. if !inEvent {
  113. inEvent = true
  114. }
  115. // 累积事件行
  116. eventBuffer.WriteString(line)
  117. eventBuffer.WriteString("\n")
  118. }
  119. if err := scanner.Err(); err != nil {
  120. log.Printf("读取OpenCode事件流失败: 项目=%s, 会话=%s, 错误=%v", projectID, sessionID, err)
  121. }
  122. }
  123. }
  124. // processSSEEvent 处理SSE事件,添加项目ID和会话ID信息
  125. func processSSEEvent(projectID, sessionID, data string) string {
  126. // 解析SSE事件格式:data: {...}
  127. lines := strings.Split(data, "\n")
  128. for _, line := range lines {
  129. if strings.HasPrefix(line, "data: ") {
  130. eventData := strings.TrimPrefix(line, "data: ")
  131. // 尝试解析JSON
  132. var eventObj map[string]interface{}
  133. if err := json.Unmarshal([]byte(eventData), &eventObj); err != nil {
  134. // 如果不是JSON,直接返回原始数据
  135. return fmt.Sprintf("data: %s\n", eventData)
  136. }
  137. // 添加项目ID和会话ID信息
  138. eventObj["project_id"] = projectID
  139. eventObj["session_id"] = sessionID
  140. eventObj["timestamp"] = timestamp()
  141. // 重新序列化
  142. processedData, err := json.Marshal(eventObj)
  143. if err != nil {
  144. log.Printf("序列化处理后的事件失败: %v", err)
  145. return fmt.Sprintf("data: %s\n", eventData)
  146. }
  147. return fmt.Sprintf("data: %s\n", string(processedData))
  148. }
  149. }
  150. return data
  151. }
  152. // sendSSEEvent 发送SSE事件
  153. func sendSSEEvent(w http.ResponseWriter, flusher http.Flusher, event map[string]interface{}) {
  154. eventJSON, err := json.Marshal(event)
  155. if err != nil {
  156. log.Printf("序列化SSE事件失败: %v", err)
  157. return
  158. }
  159. fmt.Fprintf(w, "data: %s\n\n", string(eventJSON))
  160. flusher.Flush()
  161. }
  162. // timestamp 获取当前时间戳
  163. func timestamp() string {
  164. return fmt.Sprintf("%d", time.Now().UnixNano()/int64(time.Millisecond))
  165. }