Нет описания
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

conversation.service.ts 14KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. import { Injectable } from '@angular/core';
  2. import { HttpClient } from '@angular/common/http';
  3. import { BehaviorSubject, Observable, Subject, of } from 'rxjs';
  4. import { map, catchError } from 'rxjs/operators';
  5. import { ChatMessage, StreamChunk, PromptStreamRequest, TextPart } from '../models/conversation.model';
  6. import { AuthService } from './auth.service';
  7. @Injectable({
  8. providedIn: 'root'
  9. })
  10. export class ConversationService {
  11. private newMessageSubject = new BehaviorSubject<ChatMessage | null>(null);
  12. newMessage$ = this.newMessageSubject.asObservable();
  13. private streamUpdateSubject = new Subject<StreamChunk>();
  14. streamUpdate$ = this.streamUpdateSubject.asObservable();
  15. constructor(
  16. private http: HttpClient,
  17. private authService: AuthService
  18. ) {}
  19. // 发送消息(流式响应)
  20. sendMessage(sessionId: string, message: string): Observable<void> {
  21. const request: PromptStreamRequest = {
  22. sessionID: sessionId,
  23. parts: [
  24. {
  25. type: 'text',
  26. text: message
  27. }
  28. ]
  29. };
  30. return new Observable<void>(observer => {
  31. // 获取认证token
  32. const token = this.authService.getToken();
  33. console.log('🔍 [ConversationService] 发送消息,sessionId:', sessionId);
  34. console.log('🔍 [ConversationService] 获取的token长度:', token?.length);
  35. console.log('🔍 [ConversationService] 当前认证状态:', this.authService.isAuthenticated());
  36. if (!token) {
  37. console.error('🔍 [ConversationService] 错误: 用户未认证,无法建立流式连接');
  38. observer.error(new Error('用户未认证,无法建立流式连接'));
  39. return;
  40. }
  41. console.log('🔍 [ConversationService] 发送POST请求启动流:', request);
  42. // 创建AbortController用于取消请求
  43. const abortController = new AbortController();
  44. // 发送超时定时器 - 控制发送消息到后端的超时(30秒)
  45. let sendTimeout: any = null;
  46. // 使用fetch API以便流式读取SSE响应
  47. fetch('/api/prompt/stream', {
  48. method: 'POST',
  49. headers: {
  50. 'Content-Type': 'application/json',
  51. 'Authorization': `Bearer ${token}`
  52. },
  53. body: JSON.stringify(request),
  54. signal: abortController.signal // 添加取消信号
  55. }).then(response => {
  56. console.log('🔍 [ConversationService] POST响应状态:', response.status, response.statusText);
  57. if (!response.ok) {
  58. throw new Error(`HTTP ${response.status}: ${response.statusText}`);
  59. }
  60. if (!response.body) {
  61. throw new Error('响应体为空');
  62. }
  63. // 检查Content-Type是否为text/event-stream
  64. const contentType = response.headers.get('content-type');
  65. console.log('🔍 [ConversationService] 响应Content-Type:', contentType);
  66. // 消息发送成功,立即通知组件可以恢复发送按钮状态
  67. observer.next();
  68. // 创建SSE解析器
  69. const reader = response.body.getReader();
  70. const decoder = new TextDecoder();
  71. let buffer = '';
  72. // 发送请求超时(30秒)- 控制发送消息到后端的超时
  73. const SEND_TIMEOUT_MS = 30 * 1000; // 30秒
  74. sendTimeout = setTimeout(() => {
  75. console.error('🔍 [ConversationService] 发送请求超时(30秒未收到初始响应)');
  76. abortController.abort();
  77. this.streamUpdateSubject.next({ type: 'error', data: '发送请求超时,请重试' });
  78. observer.error(new Error('发送请求超时,请重试'));
  79. }, SEND_TIMEOUT_MS);
  80. // 请求成功后清除发送超时
  81. clearTimeout(sendTimeout);
  82. const readStream = () => {
  83. reader.read().then(({ done, value }) => {
  84. if (done) {
  85. console.log('🔍 [ConversationService] 流结束');
  86. // 清除发送超时定时器
  87. clearTimeout(sendTimeout);
  88. this.streamUpdateSubject.next({ type: 'done', data: '' });
  89. observer.complete();
  90. return;
  91. }
  92. // 解码数据
  93. buffer += decoder.decode(value, { stream: true });
  94. // 解析SSE格式:每个事件以"data: "开头,以两个换行符结束
  95. let eventEnd = buffer.indexOf('\n\n');
  96. while (eventEnd !== -1) {
  97. const event = buffer.substring(0, eventEnd);
  98. buffer = buffer.substring(eventEnd + 2); // 移除已处理的事件和两个换行符
  99. // 收到事件,继续处理
  100. // 检查是否为注释行(以冒号开头)
  101. if (event.startsWith(':')) {
  102. console.log('🔍 [ConversationService] 收到SSE注释:', event.substring(0, 50));
  103. // 心跳注释,继续处理下一个事件
  104. eventEnd = buffer.indexOf('\n\n');
  105. continue;
  106. }
  107. // 查找数据行
  108. const dataLineStart = event.indexOf('data: ');
  109. if (dataLineStart !== -1) {
  110. const data = event.substring(dataLineStart + 6); // 移除"data: "
  111. console.log('🔍 [ConversationService] 收到SSE数据:', data.substring(0, 100));
  112. if (data === '[DONE]') {
  113. console.log('🔍 [ConversationService] 收到DONE标记');
  114. // 清除发送超时定时器
  115. clearTimeout(sendTimeout);
  116. this.streamUpdateSubject.next({ type: 'done', data: '' });
  117. observer.complete();
  118. return;
  119. } else {
  120. try {
  121. // 解析JSON格式的SSE数据
  122. const jsonData = JSON.parse(data);
  123. console.log('🔍 [ConversationService] 解析JSON数据:', jsonData);
  124. // 根据payload类型处理不同事件
  125. // 支持两种数据格式:直接 {payload: ...} 或 {directory: ..., payload: ...}
  126. const payload = jsonData.payload || jsonData;
  127. console.log('🔍 [ConversationService] payload类型:', payload.type);
  128. // 处理消息部分更新事件(包含文本内容)
  129. if (payload.type === 'message.part.updated' && payload.properties?.part) {
  130. const part = payload.properties.part;
  131. // 支持 text、reasoning 和 tool 类型
  132. if ((part.type === 'text' || part.type === 'reasoning' || part.type === 'tool') && part.text) {
  133. // 优先使用 delta 字段(增量),如果没有则使用完整文本
  134. const delta = payload.properties.delta || part.text;
  135. // 映射事件类型到前端消息类型
  136. let frontendType: 'thinking' | 'tool' | 'reply' | 'error';
  137. if (part.type === 'reasoning') {
  138. frontendType = 'thinking';
  139. } else if (part.type === 'tool') {
  140. frontendType = 'tool';
  141. } else {
  142. frontendType = 'reply'; // text 类型
  143. }
  144. console.log('🔍 [ConversationService] 收到部分内容 (类型:', part.type, '=>', frontendType, 'delta:', delta, '):', part.text.substring(0, 50));
  145. this.streamUpdateSubject.next({ type: frontendType, data: delta });
  146. }
  147. }
  148. // 处理消息更新事件(包含完整消息信息)
  149. else if (payload.type === 'message.updated' && payload.properties?.info) {
  150. const info = payload.properties.info;
  151. if (info.role === 'assistant') {
  152. console.log('🔍 [ConversationService] 收到助理消息更新,角色:', info.role, '消息ID:', info.id);
  153. // 不发送文本内容,避免与message.part.updated重复
  154. // 文本内容已通过message.part.updated事件流式传输
  155. }
  156. }
  157. // 处理其他事件类型
  158. else if (payload.type === 'session.updated') {
  159. console.log('🔍 [ConversationService] 会话更新事件');
  160. }
  161. else if (payload.type === 'server.connected') {
  162. console.log('🔍 [ConversationService] 服务器连接成功');
  163. }
  164. else if (payload.type === 'session.status') {
  165. console.log('🔍 [ConversationService] 会话状态更新:', payload.properties?.status?.type);
  166. }
  167. else if (payload.type === 'session.idle') {
  168. console.log('🔍 [ConversationService] AI进入空闲状态,保持连接可继续交互');
  169. }
  170. } catch (e) {
  171. console.error('🔍 [ConversationService] 解析SSE JSON数据失败:', e, '原始数据:', data);
  172. // 如果不是JSON,按纯文本处理
  173. this.streamUpdateSubject.next({ type: 'reply', data });
  174. }
  175. }
  176. }
  177. eventEnd = buffer.indexOf('\n\n');
  178. }
  179. // 继续读取
  180. readStream();
  181. }).catch(error => {
  182. // 检查是否是取消错误
  183. if (error.name === 'AbortError') {
  184. console.log('🔍 [ConversationService] 流式请求被取消');
  185. this.streamUpdateSubject.next({ type: 'done', data: '请求已取消' });
  186. observer.complete();
  187. } else {
  188. console.error('🔍 [ConversationService] 读取流错误:', error);
  189. this.streamUpdateSubject.next({ type: 'error', data: '流读取错误' });
  190. observer.error(error);
  191. }
  192. });
  193. };
  194. // 开始读取流
  195. readStream();
  196. }).catch(error => {
  197. // 检查是否是取消错误
  198. if (error.name === 'AbortError') {
  199. console.log('🔍 [ConversationService] 请求被取消');
  200. observer.complete();
  201. } else {
  202. console.error('🔍 [ConversationService] 启动流式请求失败:', error);
  203. this.streamUpdateSubject.next({ type: 'error', data: '连接错误' });
  204. observer.error(error);
  205. }
  206. });
  207. // 清理函数
  208. return () => {
  209. console.log('🔍 [ConversationService] 清理流式连接,取消请求');
  210. // 清除发送超时定时器
  211. clearTimeout(sendTimeout);
  212. // 取消fetch请求
  213. abortController.abort();
  214. };
  215. });
  216. }
  217. // 发送消息(非流式,同步)
  218. sendMessageSync(sessionId: string, message: string): Observable<ChatMessage> {
  219. const request: PromptStreamRequest = {
  220. sessionID: sessionId,
  221. parts: [
  222. {
  223. type: 'text',
  224. text: message
  225. }
  226. ]
  227. };
  228. // 注意:这里使用非流式端点,如果后端支持
  229. return this.http.post<any>('/api/prompt/sync', request).pipe(
  230. map(response => {
  231. console.log('🔍 [ConversationService] 同步响应:', response);
  232. if (response.success) {
  233. // 根据后端实际响应格式解析
  234. let content = '收到空响应';
  235. let messageId = Date.now().toString();
  236. if (response.data) {
  237. // 尝试从info.content获取内容
  238. if (response.data.info?.content) {
  239. content = response.data.info.content;
  240. messageId = response.data.info.id || messageId;
  241. }
  242. // 尝试从parts中提取文本内容
  243. else if (response.data.parts && Array.isArray(response.data.parts)) {
  244. const textParts = response.data.parts.filter((part: any) =>
  245. part.type === 'text' && part.text
  246. );
  247. if (textParts.length > 0) {
  248. content = textParts.map((part: any) => part.text).join('\n');
  249. }
  250. }
  251. // 如果是直接的content字段
  252. else if (response.data.content) {
  253. content = response.data.content;
  254. }
  255. }
  256. const aiMessage: ChatMessage = {
  257. id: messageId,
  258. role: 'assistant',
  259. content: content,
  260. timestamp: new Date(),
  261. sessionID: sessionId
  262. };
  263. this.newMessageSubject.next(aiMessage);
  264. return aiMessage;
  265. } else {
  266. throw new Error(response.message || '发送消息失败');
  267. }
  268. }),
  269. catchError(error => {
  270. console.error('发送消息失败:', error);
  271. throw error;
  272. })
  273. );
  274. }
  275. // 添加新消息(用于测试或手动添加)
  276. addMessage(message: ChatMessage) {
  277. this.newMessageSubject.next(message);
  278. }
  279. // 获取历史消息(如果后端支持)
  280. getHistory(sessionId: string): Observable<ChatMessage[]> {
  281. // 模拟数据
  282. const mockMessages: ChatMessage[] = [
  283. {
  284. id: '1',
  285. role: 'user',
  286. content: '你好,请帮我分析这个代码',
  287. timestamp: new Date(Date.now() - 3600000),
  288. sessionID: sessionId
  289. },
  290. {
  291. id: '2',
  292. role: 'assistant',
  293. content: '你好!我可以帮您分析代码。请把代码发给我。',
  294. timestamp: new Date(Date.now() - 3500000),
  295. sessionID: sessionId
  296. }
  297. ];
  298. return of(mockMessages);
  299. }
  300. // 清空消息流
  301. clearStream() {
  302. this.streamUpdateSubject.next({ type: 'done', data: '' });
  303. }
  304. }