import { Injectable } from '@angular/core'; import { HttpClient } from '@angular/common/http'; import { BehaviorSubject, Observable, Subject, of } from 'rxjs'; import { map, catchError } from 'rxjs/operators'; import { ChatMessage, StreamChunk, PromptStreamRequest, TextPart } from '../models/conversation.model'; import { AuthService } from './auth.service'; @Injectable({ providedIn: 'root' }) export class ConversationService { private newMessageSubject = new BehaviorSubject(null); newMessage$ = this.newMessageSubject.asObservable(); private streamUpdateSubject = new Subject(); streamUpdate$ = this.streamUpdateSubject.asObservable(); private messageCache = new Map(); constructor( private http: HttpClient, private authService: AuthService ) {} // 发送消息(流式响应) sendMessage(sessionId: string, message: string): Observable { const request: PromptStreamRequest = { sessionID: sessionId, parts: [ { type: 'text', text: message } ] }; return new Observable(observer => { // 获取认证token const token = this.authService.getToken(); console.log('🔍 [ConversationService] 发送消息,sessionId:', sessionId); console.log('🔍 [ConversationService] 获取的token长度:', token?.length); console.log('🔍 [ConversationService] 当前认证状态:', this.authService.isAuthenticated()); if (!token) { console.error('🔍 [ConversationService] 错误: 用户未认证,无法建立流式连接'); observer.error(new Error('用户未认证,无法建立流式连接')); return; } console.log('🔍 [ConversationService] 发送POST请求启动流:', request); // 创建AbortController用于取消请求 const abortController = new AbortController(); // 发送超时定时器 - 控制发送消息到后端的超时(30秒) let sendTimeout: any = null; // 使用fetch API以便流式读取SSE响应 fetch('/api/prompt/stream', { method: 'POST', headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${token}` }, body: JSON.stringify(request), signal: abortController.signal // 添加取消信号 }).then(response => { console.log('🔍 [ConversationService] POST响应状态:', response.status, response.statusText); if (!response.ok) { throw new Error(`HTTP ${response.status}: ${response.statusText}`); } if (!response.body) { throw new Error('响应体为空'); } // 检查Content-Type是否为text/event-stream const contentType = response.headers.get('content-type'); console.log('🔍 [ConversationService] 响应Content-Type:', contentType); // 消息发送成功,立即通知组件可以恢复发送按钮状态 observer.next(); // 创建SSE解析器 const reader = response.body.getReader(); const decoder = new TextDecoder(); let buffer = ''; // 发送请求超时(30秒)- 控制发送消息到后端的超时 const SEND_TIMEOUT_MS = 30 * 1000; // 30秒 sendTimeout = setTimeout(() => { console.error('🔍 [ConversationService] 发送请求超时(30秒未收到初始响应)'); abortController.abort(); this.streamUpdateSubject.next({ type: 'error', data: '发送请求超时,请重试' }); observer.error(new Error('发送请求超时,请重试')); }, SEND_TIMEOUT_MS); // 请求成功后清除发送超时 clearTimeout(sendTimeout); const readStream = () => { reader.read().then(({ done, value }) => { if (done) { console.log('🔍 [ConversationService] 流结束'); // 清除发送超时定时器 clearTimeout(sendTimeout); this.streamUpdateSubject.next({ type: 'done', data: '' }); observer.complete(); return; } // 解码数据 buffer += decoder.decode(value, { stream: true }); // 解析SSE格式:每个事件以"data: "开头,以两个换行符结束 let eventEnd = buffer.indexOf('\n\n'); while (eventEnd !== -1) { const event = buffer.substring(0, eventEnd); buffer = buffer.substring(eventEnd + 2); // 移除已处理的事件和两个换行符 // 收到事件,继续处理 // 检查是否为注释行(以冒号开头) if (event.startsWith(':')) { console.log('🔍 [ConversationService] 收到SSE注释:', event.substring(0, 50)); // 心跳注释,继续处理下一个事件 eventEnd = buffer.indexOf('\n\n'); continue; } // 查找数据行 const dataLineStart = event.indexOf('data: '); if (dataLineStart !== -1) { const data = event.substring(dataLineStart + 6); // 移除"data: " console.log('🔍 [ConversationService] 收到SSE数据:', data.substring(0, 100)); if (data === '[DONE]') { console.log('🔍 [ConversationService] 收到DONE标记'); // 清除发送超时定时器 clearTimeout(sendTimeout); this.streamUpdateSubject.next({ type: 'done', data: '' }); observer.complete(); return; } else { try { // 解析JSON格式的SSE数据 const jsonData = JSON.parse(data); console.log('🔍 [ConversationService] 解析JSON数据:', jsonData); // 根据payload类型处理不同事件 // 支持两种数据格式:直接 {payload: ...} 或 {directory: ..., payload: ...} const payload = jsonData.payload || jsonData; console.log('🔍 [ConversationService] payload类型:', payload.type); // 处理消息部分更新事件(包含文本内容) if (payload.type === 'message.part.updated' && payload.properties?.part) { const part = payload.properties.part; // 支持 text、reasoning 和 tool 类型 if ((part.type === 'text' || part.type === 'reasoning' || part.type === 'tool') && part.text) { // 优先使用 delta 字段(增量),如果没有则使用完整文本 const delta = payload.properties.delta || part.text; // 映射事件类型到前端消息类型 let frontendType: 'thinking' | 'tool' | 'reply' | 'error'; if (part.type === 'reasoning') { frontendType = 'thinking'; } else if (part.type === 'tool') { frontendType = 'tool'; } else { frontendType = 'reply'; // text 类型 } console.log('🔍 [ConversationService] 收到部分内容 (类型:', part.type, '=>', frontendType, 'delta:', delta, '):', part.text.substring(0, 50)); this.streamUpdateSubject.next({ type: frontendType, data: delta }); } } // 处理消息更新事件(包含完整消息信息) else if (payload.type === 'message.updated' && payload.properties?.info) { const info = payload.properties.info; if (info.role === 'assistant') { console.log('🔍 [ConversationService] 收到助理消息更新,角色:', info.role, '消息ID:', info.id); // 不发送文本内容,避免与message.part.updated重复 // 文本内容已通过message.part.updated事件流式传输 } } // 处理其他事件类型 else if (payload.type === 'session.updated') { console.log('🔍 [ConversationService] 会话更新事件'); } else if (payload.type === 'server.connected') { console.log('🔍 [ConversationService] 服务器连接成功'); } else if (payload.type === 'session.status') { console.log('🔍 [ConversationService] 会话状态更新:', payload.properties?.status?.type); } else if (payload.type === 'session.idle') { console.log('🔍 [ConversationService] AI进入空闲状态,保持连接可继续交互'); } } catch (e) { console.error('🔍 [ConversationService] 解析SSE JSON数据失败:', e, '原始数据:', data); // 如果不是JSON,按纯文本处理 this.streamUpdateSubject.next({ type: 'reply', data }); } } } eventEnd = buffer.indexOf('\n\n'); } // 继续读取 readStream(); }).catch(error => { // 检查是否是取消错误 if (error.name === 'AbortError') { console.log('🔍 [ConversationService] 流式请求被取消'); this.streamUpdateSubject.next({ type: 'done', data: '请求已取消' }); observer.complete(); } else { console.error('🔍 [ConversationService] 读取流错误:', error); this.streamUpdateSubject.next({ type: 'error', data: '流读取错误' }); observer.error(error); } }); }; // 开始读取流 readStream(); }).catch(error => { // 检查是否是取消错误 if (error.name === 'AbortError') { console.log('🔍 [ConversationService] 请求被取消'); observer.complete(); } else { console.error('🔍 [ConversationService] 启动流式请求失败:', error); this.streamUpdateSubject.next({ type: 'error', data: '连接错误' }); observer.error(error); } }); // 清理函数 return () => { console.log('🔍 [ConversationService] 清理流式连接,取消请求'); // 清除发送超时定时器 clearTimeout(sendTimeout); // 取消fetch请求 abortController.abort(); }; }); } // 发送消息(非流式,同步) sendMessageSync(sessionId: string, message: string): Observable { const request: PromptStreamRequest = { sessionID: sessionId, parts: [ { type: 'text', text: message } ] }; // 注意:这里使用非流式端点,如果后端支持 return this.http.post('/api/prompt/sync', request).pipe( map(response => { console.log('🔍 [ConversationService] 同步响应:', response); if (response.success) { // 根据后端实际响应格式解析 let content = '收到空响应'; let messageId = Date.now().toString(); if (response.data) { // 尝试从info.content获取内容 if (response.data.info?.content) { content = response.data.info.content; messageId = response.data.info.id || messageId; } // 尝试从parts中提取文本内容 else if (response.data.parts && Array.isArray(response.data.parts)) { const textParts = response.data.parts.filter((part: any) => part.type === 'text' && part.text ); if (textParts.length > 0) { content = textParts.map((part: any) => part.text).join('\n'); } } // 如果是直接的content字段 else if (response.data.content) { content = response.data.content; } } const aiMessage: ChatMessage = { id: messageId, role: 'assistant', content: content, timestamp: new Date(), sessionID: sessionId }; this.newMessageSubject.next(aiMessage); return aiMessage; } else { throw new Error(response.message || '发送消息失败'); } }), catchError(error => { console.error('发送消息失败:', error); throw error; }) ); } // 添加新消息(用于测试或手动添加) addMessage(message: ChatMessage) { this.newMessageSubject.next(message); } // 获取历史消息(调用后端API),使用缓存避免重复加载 getHistory(sessionId: string, limit?: number): Observable { // 检查用户是否已认证 if (!this.authService.isAuthenticated()) { console.warn('用户未认证,无法获取历史消息,返回空数组'); return of([]); } console.log('🔍 [ConversationService] 获取历史消息,sessionId:', sessionId, 'limit:', limit); // 临时禁用缓存,强制每次加载(调试独立实例问题) console.log('🔍 [ConversationService] 临时禁用缓存,强制重新加载'); this.messageCache.delete(sessionId); // 清除该会话的缓存 // 直接获取并缓存 return this.fetchAndCacheMessages(sessionId, limit); } // 获取并缓存消息 private fetchAndCacheMessages(sessionId: string, limit?: number): Observable { const requestBody: any = { sessionID: sessionId }; if (limit !== undefined) { requestBody.limit = limit; } return this.http.post('/api/session/messages', requestBody).pipe( map(response => { console.log('🔍 [ConversationService] 获取历史消息响应:', response); if (response.success && response.data) { const messagesData = response.data.messages || []; console.log('🔍 [ConversationService] 收到历史消息数量:', messagesData.length); const messages = this.convertMessages(messagesData, sessionId); // 缓存消息 this.messageCache.set(sessionId, messages); console.log('🔍 [ConversationService] 消息已缓存,数量:', messages.length); return messages; } else { console.error('🔍 [ConversationService] 获取历史消息失败:', response.message); return []; } }), catchError(error => { console.error('🔍 [ConversationService] 获取历史消息请求失败:', error); return of([]); }) ); } // 后台获取并更新缓存(不返回Observable) private fetchAndUpdateCache(sessionId: string, limit?: number): void { const requestBody: any = { sessionID: sessionId }; if (limit !== undefined) { requestBody.limit = limit; } this.http.post('/api/session/messages', requestBody).pipe( map(response => { if (response.success && response.data) { const messagesData = response.data.messages || []; const messages = this.convertMessages(messagesData, sessionId); this.messageCache.set(sessionId, messages); console.log('🔍 [ConversationService] 缓存已更新,数量:', messages.length); } return []; }), catchError(error => { console.error('🔍 [ConversationService] 更新缓存失败:', error); return of([]); }) ).subscribe(); } // 转换消息格式 private convertMessages(messagesData: any[], sessionId: string): ChatMessage[] { return messagesData.map((msg: any, index: number) => { let content = ''; let role = 'assistant'; let messageId = ''; let timestamp = new Date(); // 提取消息ID if (msg.info?.id) { messageId = msg.info.id; } else if (msg.id) { messageId = msg.id; } else { messageId = `msg_${Date.now()}_${index}`; } // 提取角色 if (msg.info?.role) { role = msg.info.role; } else if (msg.role) { role = msg.role; } else if (msg.sender_type === 'user' || msg.sender_type === 'human') { role = 'user'; } // 提取时间戳 if (msg.info?.time?.created) { timestamp = new Date(msg.info.time.created); } else if (msg.created_at) { timestamp = new Date(msg.created_at); } else if (msg.timestamp) { timestamp = new Date(msg.timestamp); } else { // 如果没有时间戳,使用当前时间减去索引(模拟历史顺序) timestamp = new Date(Date.now() - (messagesData.length - index) * 60000); } // 提取消息内容:从parts中查找text类型的内容 const parts = msg.parts || []; if (Array.isArray(parts)) { // 从parts中提取文本内容 const textParts = parts.filter((part: any) => part.type === 'text' && part.text ); if (textParts.length > 0) { content = textParts.map((part: any) => part.text).join('\n'); } } else if (typeof msg.content === 'string') { content = msg.content; } else if (msg.content?.text) { content = msg.content.text; } return { id: messageId, role: role as 'user' | 'assistant', content: content || '(无内容)', timestamp: timestamp, sessionID: sessionId }; }); } // 清空消息流 clearStream() { this.streamUpdateSubject.next({ type: 'done', data: '' }); } }