| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461 |
- import { Injectable } from '@angular/core';
- import { HttpClient } from '@angular/common/http';
- import { BehaviorSubject, Observable, Subject, of } from 'rxjs';
- import { map, catchError } from 'rxjs/operators';
- import { ChatMessage, StreamChunk, PromptStreamRequest, TextPart } from '../models/conversation.model';
- import { AuthService } from './auth.service';
-
- @Injectable({
- providedIn: 'root'
- })
- export class ConversationService {
- private newMessageSubject = new BehaviorSubject<ChatMessage | null>(null);
- newMessage$ = this.newMessageSubject.asObservable();
-
- private streamUpdateSubject = new Subject<StreamChunk>();
- streamUpdate$ = this.streamUpdateSubject.asObservable();
-
- private messageCache = new Map<string, ChatMessage[]>();
-
- constructor(
- private http: HttpClient,
- private authService: AuthService
- ) {}
-
- // 发送消息(流式响应)
- sendMessage(sessionId: string, message: string): Observable<void> {
- const request: PromptStreamRequest = {
- sessionID: sessionId,
- parts: [
- {
- type: 'text',
- text: message
- }
- ]
- };
-
- return new Observable<void>(observer => {
- // 获取认证token
- const token = this.authService.getToken();
- console.log('🔍 [ConversationService] 发送消息,sessionId:', sessionId);
- console.log('🔍 [ConversationService] 获取的token长度:', token?.length);
- console.log('🔍 [ConversationService] 当前认证状态:', this.authService.isAuthenticated());
-
- if (!token) {
- console.error('🔍 [ConversationService] 错误: 用户未认证,无法建立流式连接');
- observer.error(new Error('用户未认证,无法建立流式连接'));
- return;
- }
-
- console.log('🔍 [ConversationService] 发送POST请求启动流:', request);
-
- // 创建AbortController用于取消请求
- const abortController = new AbortController();
-
- // 发送超时定时器 - 控制发送消息到后端的超时(30秒)
- let sendTimeout: any = null;
-
- // 使用fetch API以便流式读取SSE响应
- fetch('/api/prompt/stream', {
- method: 'POST',
- headers: {
- 'Content-Type': 'application/json',
- 'Authorization': `Bearer ${token}`
- },
- body: JSON.stringify(request),
- signal: abortController.signal // 添加取消信号
- }).then(response => {
- console.log('🔍 [ConversationService] POST响应状态:', response.status, response.statusText);
-
- if (!response.ok) {
- throw new Error(`HTTP ${response.status}: ${response.statusText}`);
- }
-
- if (!response.body) {
- throw new Error('响应体为空');
- }
-
- // 检查Content-Type是否为text/event-stream
- const contentType = response.headers.get('content-type');
- console.log('🔍 [ConversationService] 响应Content-Type:', contentType);
-
- // 消息发送成功,立即通知组件可以恢复发送按钮状态
- observer.next();
-
- // 创建SSE解析器
- const reader = response.body.getReader();
- const decoder = new TextDecoder();
- let buffer = '';
-
- // 发送请求超时(30秒)- 控制发送消息到后端的超时
- const SEND_TIMEOUT_MS = 30 * 1000; // 30秒
- sendTimeout = setTimeout(() => {
- console.error('🔍 [ConversationService] 发送请求超时(30秒未收到初始响应)');
- abortController.abort();
- this.streamUpdateSubject.next({ type: 'error', data: '发送请求超时,请重试' });
- observer.error(new Error('发送请求超时,请重试'));
- }, SEND_TIMEOUT_MS);
-
- // 请求成功后清除发送超时
- clearTimeout(sendTimeout);
-
- const readStream = () => {
- reader.read().then(({ done, value }) => {
- if (done) {
- console.log('🔍 [ConversationService] 流结束');
- // 清除发送超时定时器
- clearTimeout(sendTimeout);
- this.streamUpdateSubject.next({ type: 'done', data: '' });
- observer.complete();
- return;
- }
-
- // 解码数据
- buffer += decoder.decode(value, { stream: true });
-
- // 解析SSE格式:每个事件以"data: "开头,以两个换行符结束
- let eventEnd = buffer.indexOf('\n\n');
- while (eventEnd !== -1) {
- const event = buffer.substring(0, eventEnd);
- buffer = buffer.substring(eventEnd + 2); // 移除已处理的事件和两个换行符
-
- // 收到事件,继续处理
-
- // 检查是否为注释行(以冒号开头)
- if (event.startsWith(':')) {
- console.log('🔍 [ConversationService] 收到SSE注释:', event.substring(0, 50));
- // 心跳注释,继续处理下一个事件
- eventEnd = buffer.indexOf('\n\n');
- continue;
- }
-
- // 查找数据行
- const dataLineStart = event.indexOf('data: ');
- if (dataLineStart !== -1) {
- const data = event.substring(dataLineStart + 6); // 移除"data: "
- console.log('🔍 [ConversationService] 收到SSE数据:', data.substring(0, 100));
-
- if (data === '[DONE]') {
- console.log('🔍 [ConversationService] 收到DONE标记');
- // 清除发送超时定时器
- clearTimeout(sendTimeout);
- this.streamUpdateSubject.next({ type: 'done', data: '' });
- observer.complete();
- return;
- } else {
- try {
- // 解析JSON格式的SSE数据
- const jsonData = JSON.parse(data);
- console.log('🔍 [ConversationService] 解析JSON数据:', jsonData);
-
- // 根据payload类型处理不同事件
- // 支持两种数据格式:直接 {payload: ...} 或 {directory: ..., payload: ...}
- const payload = jsonData.payload || jsonData;
- console.log('🔍 [ConversationService] payload类型:', payload.type);
-
- // 处理消息部分更新事件(包含文本内容)
- if (payload.type === 'message.part.updated' && payload.properties?.part) {
- const part = payload.properties.part;
- // 支持 text、reasoning 和 tool 类型
- if ((part.type === 'text' || part.type === 'reasoning' || part.type === 'tool') && part.text) {
- // 优先使用 delta 字段(增量),如果没有则使用完整文本
- const delta = payload.properties.delta || part.text;
-
- // 映射事件类型到前端消息类型
- let frontendType: 'thinking' | 'tool' | 'reply' | 'error';
- if (part.type === 'reasoning') {
- frontendType = 'thinking';
- } else if (part.type === 'tool') {
- frontendType = 'tool';
- } else {
- frontendType = 'reply'; // text 类型
- }
-
- console.log('🔍 [ConversationService] 收到部分内容 (类型:', part.type, '=>', frontendType, 'delta:', delta, '):', part.text.substring(0, 50));
- this.streamUpdateSubject.next({ type: frontendType, data: delta });
- }
- }
- // 处理消息更新事件(包含完整消息信息)
- else if (payload.type === 'message.updated' && payload.properties?.info) {
- const info = payload.properties.info;
- if (info.role === 'assistant') {
- console.log('🔍 [ConversationService] 收到助理消息更新,角色:', info.role, '消息ID:', info.id);
- // 不发送文本内容,避免与message.part.updated重复
- // 文本内容已通过message.part.updated事件流式传输
- }
- }
- // 处理其他事件类型
- else if (payload.type === 'session.updated') {
- console.log('🔍 [ConversationService] 会话更新事件');
- }
- else if (payload.type === 'server.connected') {
- console.log('🔍 [ConversationService] 服务器连接成功');
- }
- else if (payload.type === 'session.status') {
- console.log('🔍 [ConversationService] 会话状态更新:', payload.properties?.status?.type);
- }
- else if (payload.type === 'session.idle') {
- console.log('🔍 [ConversationService] AI进入空闲状态,保持连接可继续交互');
- }
- } catch (e) {
- console.error('🔍 [ConversationService] 解析SSE JSON数据失败:', e, '原始数据:', data);
- // 如果不是JSON,按纯文本处理
- this.streamUpdateSubject.next({ type: 'reply', data });
- }
- }
- }
-
- eventEnd = buffer.indexOf('\n\n');
- }
-
- // 继续读取
- readStream();
- }).catch(error => {
- // 检查是否是取消错误
- if (error.name === 'AbortError') {
- console.log('🔍 [ConversationService] 流式请求被取消');
- this.streamUpdateSubject.next({ type: 'done', data: '请求已取消' });
- observer.complete();
- } else {
- console.error('🔍 [ConversationService] 读取流错误:', error);
- this.streamUpdateSubject.next({ type: 'error', data: '流读取错误' });
- observer.error(error);
- }
- });
- };
-
- // 开始读取流
- readStream();
- }).catch(error => {
- // 检查是否是取消错误
- if (error.name === 'AbortError') {
- console.log('🔍 [ConversationService] 请求被取消');
- observer.complete();
- } else {
- console.error('🔍 [ConversationService] 启动流式请求失败:', error);
- this.streamUpdateSubject.next({ type: 'error', data: '连接错误' });
- observer.error(error);
- }
- });
-
- // 清理函数
- return () => {
- console.log('🔍 [ConversationService] 清理流式连接,取消请求');
- // 清除发送超时定时器
- clearTimeout(sendTimeout);
- // 取消fetch请求
- abortController.abort();
- };
- });
- }
-
- // 发送消息(非流式,同步)
- sendMessageSync(sessionId: string, message: string): Observable<ChatMessage> {
- const request: PromptStreamRequest = {
- sessionID: sessionId,
- parts: [
- {
- type: 'text',
- text: message
- }
- ]
- };
-
- // 注意:这里使用非流式端点,如果后端支持
- return this.http.post<any>('/api/prompt/sync', request).pipe(
- map(response => {
- console.log('🔍 [ConversationService] 同步响应:', response);
- if (response.success) {
- // 根据后端实际响应格式解析
- let content = '收到空响应';
- let messageId = Date.now().toString();
-
- if (response.data) {
- // 尝试从info.content获取内容
- if (response.data.info?.content) {
- content = response.data.info.content;
- messageId = response.data.info.id || messageId;
- }
- // 尝试从parts中提取文本内容
- else if (response.data.parts && Array.isArray(response.data.parts)) {
- const textParts = response.data.parts.filter((part: any) =>
- part.type === 'text' && part.text
- );
- if (textParts.length > 0) {
- content = textParts.map((part: any) => part.text).join('\n');
- }
- }
- // 如果是直接的content字段
- else if (response.data.content) {
- content = response.data.content;
- }
- }
-
- const aiMessage: ChatMessage = {
- id: messageId,
- role: 'assistant',
- content: content,
- timestamp: new Date(),
- sessionID: sessionId
- };
- this.newMessageSubject.next(aiMessage);
- return aiMessage;
- } else {
- throw new Error(response.message || '发送消息失败');
- }
- }),
- catchError(error => {
- console.error('发送消息失败:', error);
- throw error;
- })
- );
- }
-
- // 添加新消息(用于测试或手动添加)
- addMessage(message: ChatMessage) {
- this.newMessageSubject.next(message);
- }
-
- // 获取历史消息(调用后端API),使用缓存避免重复加载
- getHistory(sessionId: string, limit?: number): Observable<ChatMessage[]> {
- // 检查用户是否已认证
- if (!this.authService.isAuthenticated()) {
- console.warn('用户未认证,无法获取历史消息,返回空数组');
- return of([]);
- }
-
- console.log('🔍 [ConversationService] 获取历史消息,sessionId:', sessionId, 'limit:', limit);
-
- // 临时禁用缓存,强制每次加载(调试独立实例问题)
- console.log('🔍 [ConversationService] 临时禁用缓存,强制重新加载');
- this.messageCache.delete(sessionId); // 清除该会话的缓存
-
- // 直接获取并缓存
- return this.fetchAndCacheMessages(sessionId, limit);
- }
-
- // 获取并缓存消息
- private fetchAndCacheMessages(sessionId: string, limit?: number): Observable<ChatMessage[]> {
- const requestBody: any = { sessionID: sessionId };
- if (limit !== undefined) {
- requestBody.limit = limit;
- }
-
- return this.http.post<any>('/api/session/messages', requestBody).pipe(
- map(response => {
- console.log('🔍 [ConversationService] 获取历史消息响应:', response);
-
- if (response.success && response.data) {
- const messagesData = response.data.messages || [];
- console.log('🔍 [ConversationService] 收到历史消息数量:', messagesData.length);
-
- const messages = this.convertMessages(messagesData, sessionId);
- // 缓存消息
- this.messageCache.set(sessionId, messages);
- console.log('🔍 [ConversationService] 消息已缓存,数量:', messages.length);
- return messages;
- } else {
- console.error('🔍 [ConversationService] 获取历史消息失败:', response.message);
- return [];
- }
- }),
- catchError(error => {
- console.error('🔍 [ConversationService] 获取历史消息请求失败:', error);
- return of([]);
- })
- );
- }
-
- // 后台获取并更新缓存(不返回Observable)
- private fetchAndUpdateCache(sessionId: string, limit?: number): void {
- const requestBody: any = { sessionID: sessionId };
- if (limit !== undefined) {
- requestBody.limit = limit;
- }
-
- this.http.post<any>('/api/session/messages', requestBody).pipe(
- map(response => {
- if (response.success && response.data) {
- const messagesData = response.data.messages || [];
- const messages = this.convertMessages(messagesData, sessionId);
- this.messageCache.set(sessionId, messages);
- console.log('🔍 [ConversationService] 缓存已更新,数量:', messages.length);
- }
- return [];
- }),
- catchError(error => {
- console.error('🔍 [ConversationService] 更新缓存失败:', error);
- return of([]);
- })
- ).subscribe();
- }
-
- // 转换消息格式
- private convertMessages(messagesData: any[], sessionId: string): ChatMessage[] {
- return messagesData.map((msg: any, index: number) => {
- let content = '';
- let role = 'assistant';
- let messageId = '';
- let timestamp = new Date();
-
- // 提取消息ID
- if (msg.info?.id) {
- messageId = msg.info.id;
- } else if (msg.id) {
- messageId = msg.id;
- } else {
- messageId = `msg_${Date.now()}_${index}`;
- }
-
- // 提取角色
- if (msg.info?.role) {
- role = msg.info.role;
- } else if (msg.role) {
- role = msg.role;
- } else if (msg.sender_type === 'user' || msg.sender_type === 'human') {
- role = 'user';
- }
-
- // 提取时间戳
- if (msg.info?.time?.created) {
- timestamp = new Date(msg.info.time.created);
- } else if (msg.created_at) {
- timestamp = new Date(msg.created_at);
- } else if (msg.timestamp) {
- timestamp = new Date(msg.timestamp);
- } else {
- // 如果没有时间戳,使用当前时间减去索引(模拟历史顺序)
- timestamp = new Date(Date.now() - (messagesData.length - index) * 60000);
- }
-
- // 提取消息内容:从parts中查找text类型的内容
- const parts = msg.parts || [];
- if (Array.isArray(parts)) {
- // 从parts中提取文本内容
- const textParts = parts.filter((part: any) =>
- part.type === 'text' && part.text
- );
- if (textParts.length > 0) {
- content = textParts.map((part: any) => part.text).join('\n');
- }
- } else if (typeof msg.content === 'string') {
- content = msg.content;
- } else if (msg.content?.text) {
- content = msg.content.text;
- }
-
- return {
- id: messageId,
- role: role as 'user' | 'assistant',
- content: content || '(无内容)',
- timestamp: timestamp,
- sessionID: sessionId
- };
- });
- }
-
- // 清空消息流
- clearStream() {
- this.streamUpdateSubject.next({ type: 'done', data: '' });
- }
- }
|