暂无描述
您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

direct_client.go 13KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. package opencode
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "net/http"
  10. "os"
  11. "path/filepath"
  12. "strings"
  13. "time"
  14. )
  15. // DefaultOpenCodePort opencode 服务默认端口(用于测试)
  16. const DefaultOpenCodePort = 8787
  17. // DirectClient opencode API 客户端(直接 HTTP 调用,不使用 SDK)
  18. type DirectClient struct {
  19. baseURL string
  20. port int
  21. httpClient *http.Client
  22. }
  23. // 确保 DirectClient 实现 OpenCodeClient 接口
  24. var _ OpenCodeClient = (*DirectClient)(nil)
  25. // NewDirectClient 创建新的直接 HTTP opencode 客户端
  26. func NewDirectClient(port int) (*DirectClient, error) {
  27. baseURL := fmt.Sprintf("http://127.0.0.1:%d", port)
  28. // 测试连接
  29. if err := testDirectConnection(baseURL); err != nil {
  30. return nil, fmt.Errorf("无法连接到 opencode 服务: %w", err)
  31. }
  32. return &DirectClient{
  33. baseURL: baseURL,
  34. port: port,
  35. httpClient: &http.Client{
  36. Timeout: 30 * time.Second,
  37. },
  38. }, nil
  39. }
  40. // testDirectConnection 测试连接是否可用
  41. func testDirectConnection(baseURL string) error {
  42. client := &http.Client{Timeout: 5 * time.Second}
  43. resp, err := client.Get(baseURL + "/global/health")
  44. if err != nil {
  45. return err
  46. }
  47. defer resp.Body.Close()
  48. if resp.StatusCode != 200 {
  49. return fmt.Errorf("服务不可用,状态码: %d", resp.StatusCode)
  50. }
  51. return nil
  52. }
  53. // CreateSession 创建新会话(直接 HTTP 调用)
  54. func (c *DirectClient) CreateSession(ctx context.Context, title string) (*Session, error) {
  55. // 构造请求体
  56. reqBody := map[string]interface{}{
  57. "title": title,
  58. }
  59. jsonBody, err := json.Marshal(reqBody)
  60. if err != nil {
  61. return nil, fmt.Errorf("编码请求失败: %w", err)
  62. }
  63. // 发送 HTTP 请求
  64. req, err := http.NewRequestWithContext(ctx, "POST", c.baseURL+"/session", bytes.NewBuffer(jsonBody))
  65. if err != nil {
  66. return nil, fmt.Errorf("创建请求失败: %w", err)
  67. }
  68. req.Header.Set("Content-Type", "application/json")
  69. resp, err := c.httpClient.Do(req)
  70. if err != nil {
  71. return nil, fmt.Errorf("HTTP请求失败: %w", err)
  72. }
  73. defer resp.Body.Close()
  74. if resp.StatusCode != http.StatusOK {
  75. body, _ := io.ReadAll(resp.Body)
  76. return nil, fmt.Errorf("创建会话失败,状态码: %d, 响应体: %s", resp.StatusCode, string(body))
  77. }
  78. // 解析响应
  79. var session Session
  80. body, _ := io.ReadAll(resp.Body)
  81. if err := json.Unmarshal(body, &session); err != nil {
  82. return nil, fmt.Errorf("解析会话响应失败: %w", err)
  83. }
  84. fmt.Fprintf(os.Stderr, "[opencode-direct-client] 创建会话成功: %s\n", session.ID)
  85. return &session, nil
  86. }
  87. // SendPrompt 发送消息(同步,直接 HTTP 调用)
  88. func (c *DirectClient) SendPrompt(ctx context.Context, sessionID string, prompt *PromptRequest) (*PromptResponse, error) {
  89. // 序列化请求体
  90. reqBody, err := json.Marshal(prompt)
  91. if err != nil {
  92. return nil, fmt.Errorf("编码请求失败: %w", err)
  93. }
  94. // 发送 HTTP 请求到 /session/{id}/message 端点(基于 svc-worker 测试)
  95. url := fmt.Sprintf("%s/session/%s/message", c.baseURL, sessionID)
  96. req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(reqBody))
  97. if err != nil {
  98. return nil, fmt.Errorf("创建请求失败: %w", err)
  99. }
  100. req.Header.Set("Content-Type", "application/json")
  101. resp, err := c.httpClient.Do(req)
  102. if err != nil {
  103. return nil, fmt.Errorf("发送消息失败: %w", err)
  104. }
  105. defer resp.Body.Close()
  106. if resp.StatusCode != http.StatusOK {
  107. body, _ := io.ReadAll(resp.Body)
  108. return nil, fmt.Errorf("请求失败,状态码: %d, 响应体: %s", resp.StatusCode, string(body))
  109. }
  110. // 解析响应
  111. body, _ := io.ReadAll(resp.Body)
  112. // 首先尝试解析为完整的 PromptResponse
  113. var response PromptResponse
  114. if err := json.Unmarshal(body, &response); err == nil && response.Info.ID != "" {
  115. fmt.Fprintf(os.Stderr, "[opencode-direct-client] 发送消息成功,消息ID: %s\n", response.Info.ID)
  116. return &response, nil
  117. }
  118. // 如果失败,尝试解析为直接的消息响应
  119. var directResponse struct {
  120. ID string `json:"id"`
  121. Role string `json:"role"`
  122. Content string `json:"content"`
  123. Parts []struct {
  124. Type string `json:"type"`
  125. Text string `json:"text"`
  126. } `json:"parts"`
  127. Model struct {
  128. ID string `json:"id"`
  129. ProviderID string `json:"providerID"`
  130. } `json:"model"`
  131. }
  132. if err := json.Unmarshal(body, &directResponse); err != nil {
  133. return nil, fmt.Errorf("解析响应失败: %w", err)
  134. }
  135. // 构造标准的 PromptResponse
  136. response = PromptResponse{
  137. Info: AssistantMessage{
  138. ID: directResponse.ID,
  139. Role: directResponse.Role,
  140. SessionID: sessionID,
  141. Content: directResponse.Content,
  142. Agent: "opencode",
  143. ModelID: directResponse.Model.ID,
  144. ProviderID: directResponse.Model.ProviderID,
  145. Tokens: TokenInfo{
  146. Input: 0,
  147. Output: 0,
  148. },
  149. Time: map[string]interface{}{
  150. "created": time.Now().Unix(),
  151. },
  152. },
  153. }
  154. // 转换 parts
  155. if len(directResponse.Parts) > 0 {
  156. for _, part := range directResponse.Parts {
  157. response.Parts = append(response.Parts, map[string]string{
  158. "type": part.Type,
  159. "text": part.Text,
  160. })
  161. }
  162. }
  163. fmt.Fprintf(os.Stderr, "[opencode-direct-client] 发送消息成功,消息ID: %s\n", response.Info.ID)
  164. return &response, nil
  165. }
  166. // SendPromptStream 发送消息(流式,直接 HTTP 调用)
  167. func (c *DirectClient) SendPromptStream(ctx context.Context, sessionID string, prompt *PromptRequest) (<-chan string, error) {
  168. reqBody, err := json.Marshal(prompt)
  169. if err != nil {
  170. return nil, fmt.Errorf("编码请求失败: %w", err)
  171. }
  172. fmt.Printf("🔍 [opencode.DirectClient] 发送流式请求到 session: %s\n", sessionID)
  173. fmt.Printf("🔍 [opencode.DirectClient] 请求体: %s\n", string(reqBody))
  174. fmt.Printf("🔍 [opencode.DirectClient] 端口: %d\n", c.port)
  175. // 测试异步端点
  176. url := fmt.Sprintf("%s/session/%s/prompt_async", c.baseURL, sessionID)
  177. req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(reqBody))
  178. if err != nil {
  179. return nil, fmt.Errorf("创建请求失败: %w", err)
  180. }
  181. req.Header.Set("Content-Type", "application/json")
  182. req.Header.Set("Accept", "text/event-stream")
  183. fmt.Printf("🔍 [opencode.DirectClient] 请求 URL: %s\n", url)
  184. fmt.Printf("🔍 [opencode.DirectClient] 请求头: %v\n", req.Header)
  185. // 为SSE流创建独立的httpClient,不设置超时限制
  186. sseClient := &http.Client{
  187. // 不设置Timeout,允许长连接
  188. // Timeout: 0 表示无超时限制
  189. }
  190. resp, err := sseClient.Do(req)
  191. if err != nil {
  192. return nil, fmt.Errorf("发送请求失败: %w", err)
  193. }
  194. fmt.Printf("🔍 [opencode.DirectClient] 响应状态: %d\n", resp.StatusCode)
  195. fmt.Printf("🔍 [opencode.DirectClient] 响应头: %v\n", resp.Header)
  196. if resp.StatusCode != http.StatusOK {
  197. body, _ := io.ReadAll(resp.Body)
  198. resp.Body.Close()
  199. fmt.Printf("🔍 [opencode.DirectClient] 错误响应体: %s\n", string(body))
  200. // 如果返回 204,尝试其他流式端点
  201. if resp.StatusCode == http.StatusNoContent {
  202. fmt.Printf("🔍 [opencode.DirectClient] 异步端点返回 204,尝试 /global/event SSE 流\n")
  203. return c.subscribeGlobalEvents(ctx)
  204. }
  205. return nil, fmt.Errorf("请求失败,状态码: %d", resp.StatusCode)
  206. }
  207. ch := make(chan string, 100)
  208. go func() {
  209. defer resp.Body.Close()
  210. defer close(ch)
  211. reader := bufio.NewReader(resp.Body)
  212. eventCount := 0
  213. for {
  214. line, err := reader.ReadString('\n')
  215. if err != nil {
  216. if err == io.EOF {
  217. fmt.Printf("🔍 [opencode.DirectClient] SSE流结束,共收到 %d 个事件\n", eventCount)
  218. } else {
  219. // 区分正常取消和错误
  220. if ctx.Err() != nil {
  221. fmt.Printf("🔍 [opencode.DirectClient] SSE流正常结束(上下文取消)\n")
  222. } else {
  223. fmt.Printf("🔍 [opencode.DirectClient] 读取错误: %v\n", err)
  224. }
  225. }
  226. return
  227. }
  228. line = strings.TrimSpace(line)
  229. if line == "" {
  230. continue
  231. }
  232. if strings.HasPrefix(line, "data: ") {
  233. data := strings.TrimPrefix(line, "data: ")
  234. eventCount++
  235. fmt.Printf("🔍 [opencode.DirectClient] 收到SSE数据[%d]: %s\n", eventCount, data)
  236. // 写入日志文件用于分析
  237. writeStreamLog(sessionID, data)
  238. select {
  239. case ch <- data:
  240. case <-ctx.Done():
  241. fmt.Printf("🔍 [opencode.DirectClient] 上下文取消\n")
  242. return
  243. }
  244. } else {
  245. fmt.Printf("🔍 [opencode.DirectClient] 忽略非数据行: %s\n", line)
  246. }
  247. }
  248. }()
  249. return ch, nil
  250. }
  251. // subscribeGlobalEvents 订阅全局事件流
  252. func (c *DirectClient) subscribeGlobalEvents(ctx context.Context) (<-chan string, error) {
  253. url := c.baseURL + "/global/event"
  254. req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
  255. if err != nil {
  256. return nil, fmt.Errorf("创建事件订阅请求失败: %w", err)
  257. }
  258. req.Header.Set("Accept", "text/event-stream")
  259. fmt.Printf("🔍 [opencode.DirectClient] 订阅全局事件流: %s\n", url)
  260. // 为SSE连接创建独立的httpClient,不设置超时限制
  261. sseClient := &http.Client{
  262. // 不设置Timeout,允许长连接
  263. // Timeout: 0 表示无超时限制
  264. }
  265. resp, err := sseClient.Do(req)
  266. if err != nil {
  267. return nil, fmt.Errorf("订阅事件流失败: %w", err)
  268. }
  269. if resp.StatusCode != http.StatusOK {
  270. resp.Body.Close()
  271. return nil, fmt.Errorf("事件流订阅失败,状态码: %d", resp.StatusCode)
  272. }
  273. ch := make(chan string, 100)
  274. go func() {
  275. defer resp.Body.Close()
  276. defer close(ch)
  277. scanner := bufio.NewScanner(resp.Body)
  278. eventCount := 0
  279. for scanner.Scan() {
  280. line := scanner.Text()
  281. if strings.HasPrefix(line, "data: ") {
  282. data := strings.TrimPrefix(line, "data: ")
  283. eventCount++
  284. fmt.Printf("🔍 [opencode.DirectClient] 收到全局事件[%d]: %s\n", eventCount, data)
  285. // 写入日志文件用于分析
  286. writeStreamLog("", data)
  287. select {
  288. case ch <- data:
  289. case <-ctx.Done():
  290. fmt.Printf("🔍 [opencode.DirectClient] 全局事件上下文取消\n")
  291. return
  292. }
  293. }
  294. }
  295. if err := scanner.Err(); err != nil {
  296. // 区分正常取消和错误
  297. if ctx.Err() != nil {
  298. fmt.Printf("🔍 [opencode.DirectClient] 全局事件流正常结束(上下文取消)\n")
  299. } else {
  300. fmt.Printf("🔍 [opencode.DirectClient] 扫描事件流错误: %v\n", err)
  301. }
  302. }
  303. }()
  304. return ch, nil
  305. }
  306. // GetSession 获取会话信息
  307. func (c *DirectClient) GetSession(ctx context.Context, sessionID string) (*Session, error) {
  308. url := fmt.Sprintf("%s/session/%s", c.baseURL, sessionID)
  309. req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
  310. if err != nil {
  311. return nil, fmt.Errorf("创建请求失败: %w", err)
  312. }
  313. resp, err := c.httpClient.Do(req)
  314. if err != nil {
  315. return nil, fmt.Errorf("获取会话失败: %w", err)
  316. }
  317. defer resp.Body.Close()
  318. if resp.StatusCode != http.StatusOK {
  319. body, _ := io.ReadAll(resp.Body)
  320. return nil, fmt.Errorf("获取会话失败,状态码: %d, 响应体: %s", resp.StatusCode, string(body))
  321. }
  322. var session Session
  323. if err := json.NewDecoder(resp.Body).Decode(&session); err != nil {
  324. return nil, fmt.Errorf("解析会话响应失败: %w", err)
  325. }
  326. return &session, nil
  327. }
  328. // ListSessions 获取会话列表
  329. func (c *DirectClient) ListSessions(ctx context.Context) ([]Session, error) {
  330. url := c.baseURL + "/session"
  331. req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
  332. if err != nil {
  333. return nil, fmt.Errorf("创建请求失败: %w", err)
  334. }
  335. resp, err := c.httpClient.Do(req)
  336. if err != nil {
  337. return nil, fmt.Errorf("获取会话列表失败: %w", err)
  338. }
  339. defer resp.Body.Close()
  340. if resp.StatusCode != http.StatusOK {
  341. body, _ := io.ReadAll(resp.Body)
  342. return nil, fmt.Errorf("获取会话列表失败,状态码: %d, 响应体: %s", resp.StatusCode, string(body))
  343. }
  344. var sessions []Session
  345. if err := json.NewDecoder(resp.Body).Decode(&sessions); err != nil {
  346. return nil, fmt.Errorf("解析会话列表失败: %w", err)
  347. }
  348. return sessions, nil
  349. }
  350. // GetBaseURL 获取基础URL
  351. func (c *DirectClient) GetBaseURL() string {
  352. return c.baseURL
  353. }
  354. // GetPort 获取端口
  355. func (c *DirectClient) GetPort() int {
  356. return c.port
  357. }
  358. // writeStreamLog 将流式数据写入日志文件用于分析
  359. func writeStreamLog(sessionID string, data string) {
  360. // 创建日志目录
  361. logDir := "/Users/kenqdy/Documents/v-bdx-workspace/svc-code/logs"
  362. if err := os.MkdirAll(logDir, 0755); err != nil {
  363. fmt.Printf("🔍 [opencode-direct-client] 创建日志目录失败: %v\n", err)
  364. return
  365. }
  366. // 生成日志文件名,按日期和会话ID组织
  367. dateStr := time.Now().Format("20060102")
  368. hourStr := time.Now().Format("15") // 小时
  369. var filename string
  370. if sessionID == "" {
  371. // 全局事件按小时组织
  372. filename = fmt.Sprintf("stream-global-%s-%s.log", dateStr, hourStr)
  373. } else {
  374. // 会话事件按会话ID和日期组织
  375. filename = fmt.Sprintf("stream-session-%s-%s.log", sessionID, dateStr)
  376. }
  377. filepath := filepath.Join(logDir, filename)
  378. // 追加写入数据
  379. file, err := os.OpenFile(filepath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
  380. if err != nil {
  381. fmt.Printf("🔍 [opencode-direct-client] 打开日志文件失败: %v\n", err)
  382. return
  383. }
  384. defer file.Close()
  385. // 写入时间戳和数据
  386. logLine := fmt.Sprintf("[%s] %s\n", time.Now().Format("15:04:05.000"), data)
  387. if _, err := file.WriteString(logLine); err != nil {
  388. fmt.Printf("🔍 [opencode-direct-client] 写入日志失败: %v\n", err)
  389. }
  390. }