import { Injectable, OnDestroy, inject, Optional } from '@angular/core'; import { BehaviorSubject, Observable, Subject, Subscription, filter, map } from 'rxjs'; import { AuthService } from './auth.service'; import { GlobalEvent, EventPayload, EventType, SessionUpdatedEvent, SessionDiffEvent, ServerHeartbeatEvent, MessageUpdatedEvent, MessagePartUpdatedEvent, SessionStatusEvent, ServerConnectedEvent } from '../models/event.model'; @Injectable({ providedIn: 'root' }) export class EventService implements OnDestroy { private eventSource: EventSource | null = null; private isConnected = false; private subscriptions: Subscription = new Subscription(); private reconnectTimeout: any = null; private heartbeatTimeout: any = null; private reconnectAttempts = 0; private maxReconnectAttempts = 5; private reconnectDelay = 3000; // 3秒 private heartbeatTimeoutDuration = 5 * 60 * 1000; // 5分钟心跳超时(根据用户要求) // 事件主题,用于广播所有事件 private allEventsSubject = new Subject(); allEvents$ = this.allEventsSubject.asObservable(); // 特定事件类型主题 private sessionUpdatedSubject = new Subject(); sessionUpdated$ = this.sessionUpdatedSubject.asObservable(); private sessionDiffSubject = new Subject(); sessionDiff$ = this.sessionDiffSubject.asObservable(); private messageUpdatedSubject = new Subject(); messageUpdated$ = this.messageUpdatedSubject.asObservable(); private messagePartUpdatedSubject = new Subject(); messagePartUpdated$ = this.messagePartUpdatedSubject.asObservable(); // 按会话ID过滤的事件主题 private sessionEventsSubject = new Subject<{sessionId: string, event: GlobalEvent}>(); sessionEvents$ = this.sessionEventsSubject.asObservable(); // 连接状态主题 private connectionStatusSubject = new BehaviorSubject(false); connectionStatus$ = this.connectionStatusSubject.asObservable(); // 会话ID到实例ID的注册映射(一对多,支持同一会话多个实例) // 已移除:每个浏览器标签页独立SSE连接,不再需要映射 // 实例事件主题 // 已移除:每个浏览器标签页独立SSE连接,不再需要实例事件路由 // 多会话事件源映射(用于文档列表等场景) private multiSessionEventSources = new Map(); // 多会话事件主题 private multiSessionEventsSubject = new Subject<{sessionId: string, event: GlobalEvent}>(); multiSessionEvents$ = this.multiSessionEventsSubject.asObservable(); constructor( private authService: AuthService ) { this.initializeAuthSubscription(); } // 初始化认证状态订阅 private initializeAuthSubscription() { this.subscriptions.add( this.authService.authState$.subscribe(authState => { console.log('EventService: 认证状态变化', authState.isAuthenticated); if (authState.isAuthenticated) { // 用户已登录,连接事件流 this.connectToEventStream(); } else { // 用户登出,断开事件流 this.disconnect(); } }) ); } // 连接到事件流 connectToEventStream(sessionId?: string) { if (this.eventSource) { this.eventSource.close(); this.eventSource = null; } // 检查是否已登录 if (!this.authService.isAuthenticated()) { console.warn('未登录状态,不连接事件流'); return; } // 获取认证token const token = this.authService.getToken(); if (!token) { console.error('无法获取认证token'); return; } // 构建带认证参数的URL - 使用日志流端点,因为它发送全局事件 let url = `/api/logs/stream?token=${encodeURIComponent(token)}`; if (sessionId) { url += `&sessionId=${sessionId}`; } console.log('EventService: 连接事件流URL:', url); this.eventSource = new EventSource(url); this.eventSource.onopen = () => { console.log('EventService: 事件流连接已建立'); this.isConnected = true; this.connectionStatusSubject.next(true); this.reconnectAttempts = 0; // 重置重连计数 // 启动心跳超时定时器(5分钟) this.resetHeartbeatTimeout(); // 发送连接成功事件 this.allEventsSubject.next({ payload: { type: 'connection.established', properties: { timestamp: new Date().toISOString() } } }); }; this.eventSource.onmessage = (event) => { const eventData = event.data; console.log('EventService: 收到原始事件数据:', eventData.substring(0, 200)); // 处理SSE注释格式的心跳(保持向后兼容) if (eventData === ': heartbeat' || eventData.startsWith(': ')) { // 重置心跳超时定时器(表示连接活跃) this.resetHeartbeatTimeout(); // 发送内部心跳事件,供其他服务订阅 this.allEventsSubject.next({ payload: { type: 'server.heartbeat.comment', properties: { timestamp: new Date().toISOString(), rawData: eventData } } }); return; } try { // 解析JSON事件 const globalEvent: GlobalEvent = JSON.parse(eventData); this.handleGlobalEvent(globalEvent); } catch (error) { // 如果不是JSON,可能是纯文本日志 console.log('EventService: 非JSON事件,可能是纯文本日志:', eventData.substring(0, 100)); // 如果是纯文本日志,可以转发给日志服务或忽略 // 这里只处理JSON事件 } }; this.eventSource.onerror = (error) => { console.error('EventService: 事件流连接错误:', error); this.isConnected = false; this.connectionStatusSubject.next(false); // 清除心跳超时定时器 this.clearHeartbeatTimeout(); // 检查是否仍然登录 if (this.authService.isAuthenticated()) { console.log('EventService: 连接断开,正在重连...'); this.scheduleReconnect(sessionId); } else { console.log('EventService: 连接断开(用户未登录)'); } }; } /** * 连接到多会话事件流(用于文档列表等场景) * @param sessionIds 会话ID数组 */ connectToMultipleSessions(sessionIds: string[]) { if (!sessionIds || sessionIds.length === 0) { return; } // 检查是否已登录 if (!this.authService.isAuthenticated()) { console.warn('未登录状态,不连接多会话事件流'); return; } // 获取认证token const token = this.authService.getToken(); if (!token) { console.error('无法获取认证token'); return; } // 为每个会话ID创建独立的EventSource连接 sessionIds.forEach(sessionId => { // 如果已经存在连接,先关闭 if (this.multiSessionEventSources.has(sessionId)) { const existingSource = this.multiSessionEventSources.get(sessionId); if (existingSource) { existingSource.close(); } this.multiSessionEventSources.delete(sessionId); } // 构建带认证参数的URL const url = `/api/logs/stream?token=${encodeURIComponent(token)}&sessionId=${sessionId}`; console.log(`EventService: 连接多会话事件流,会话ID: ${sessionId}, URL: ${url}`); const eventSource = new EventSource(url); this.multiSessionEventSources.set(sessionId, eventSource); eventSource.onopen = () => { console.log(`EventService: 多会话事件流连接已建立,会话ID: ${sessionId}`); }; eventSource.onmessage = (event) => { const eventData = event.data; // 跳过心跳消息 if (eventData === ': heartbeat' || eventData.startsWith(': ')) { return; } try { // 解析JSON事件 const globalEvent: GlobalEvent = JSON.parse(eventData); console.log(`EventService: 收到多会话事件,会话ID: ${sessionId}, 类型: ${globalEvent.payload.type}`); // 分发到多会话事件主题 this.multiSessionEventsSubject.next({ sessionId, event: globalEvent }); // 同时分发到全局事件流(保持兼容性) this.distributeEventBySession(globalEvent); } catch (error) { console.error(`EventService: 解析多会话事件失败,会话ID: ${sessionId}, 数据:`, eventData.substring(0, 100)); } }; eventSource.onerror = (error) => { console.error(`EventService: 多会话事件流连接错误,会话ID: ${sessionId}:`, error); // 多会话连接错误不触发全局重连,只关闭该连接 eventSource.close(); this.multiSessionEventSources.delete(sessionId); // 可选:重新连接单个会话(延迟重试) setTimeout(() => { if (this.authService.isAuthenticated()) { this.connectToMultipleSessions([sessionId]); } }, 5000); }; }); } /** * 断开指定会话的多会话事件流 * @param sessionIds 会话ID数组(如果为空则断开所有) */ disconnectMultipleSessions(sessionIds?: string[]) { if (!sessionIds) { // 断开所有多会话连接 this.multiSessionEventSources.forEach((eventSource, sessionId) => { eventSource.close(); console.log(`EventService: 断开多会话事件流,会话ID: ${sessionId}`); }); this.multiSessionEventSources.clear(); } else { // 断开指定会话的连接 sessionIds.forEach(sessionId => { const eventSource = this.multiSessionEventSources.get(sessionId); if (eventSource) { eventSource.close(); this.multiSessionEventSources.delete(sessionId); console.log(`EventService: 断开多会话事件流,会话ID: ${sessionId}`); } }); } } /** * 获取指定会话的事件流(按会话ID过滤) * @param sessionId 会话ID * @returns 过滤后的事件Observable */ getSessionEvents(sessionId: string): Observable { return this.sessionEvents$.pipe( filter(({ sessionId: eventSessionId }) => eventSessionId === sessionId), map(({ event }) => event) ); } /** * 获取多个会话的事件流(按会话ID数组过滤) * @param sessionIds 会话ID数组 * @returns 过滤后的事件Observable */ getMultipleSessionEvents(sessionIds: string[]): Observable<{sessionId: string, event: GlobalEvent}> { return this.multiSessionEvents$.pipe( filter(({ sessionId }) => sessionIds.includes(sessionId)) ); } // 处理全局事件 private handleGlobalEvent(event: GlobalEvent) { console.log('EventService: 处理全局事件,类型:', event.payload.type); // 任何有效事件都重置心跳超时(表明连接活跃) this.resetHeartbeatTimeout(); // 广播所有事件 this.allEventsSubject.next(event); // 根据会话分发事件 this.distributeEventBySession(event); // 根据事件类型分发到特定主题 const payload = event.payload; switch (payload.type) { case 'session.updated': { const sessionEvent = payload as SessionUpdatedEvent; this.sessionUpdatedSubject.next(sessionEvent); console.log('EventService: 分发 session.updated 事件', sessionEvent.properties.info?.title); break; } case 'session.diff': this.sessionDiffSubject.next(payload as SessionDiffEvent); console.log('EventService: 分发 session.diff 事件'); break; case 'server.heartbeat': this.allEventsSubject.next(event); // 已经广播过 this.resetHeartbeatTimeout(); break; case 'message.updated': this.messageUpdatedSubject.next(payload as MessageUpdatedEvent); console.log('EventService: 分发 message.updated 事件'); break; case 'message.part.updated': this.messagePartUpdatedSubject.next(payload as MessagePartUpdatedEvent); console.log('EventService: 分发 message.part.updated 事件'); break; case 'session.status': this.allEventsSubject.next(event); console.log('EventService: 分发 session.status 事件'); break; case 'server.connected': this.allEventsSubject.next(event); console.log('EventService: 分发 server.connected 事件'); break; default: console.log('EventService: 未知事件类型:', payload.type); // 未知事件类型仍然通过 allEvents$ 广播 } } // 重置心跳超时定时器 private resetHeartbeatTimeout() { // 清除现有超时 if (this.heartbeatTimeout) { clearTimeout(this.heartbeatTimeout); } // 设置新的超时定时器(5分钟) this.heartbeatTimeout = setTimeout(() => { console.warn('EventService: 心跳超时(5分钟未收到心跳),尝试重连'); this.isConnected = false; this.connectionStatusSubject.next(false); // 清除现有的重连定时器(如果有) if (this.reconnectTimeout) { clearTimeout(this.reconnectTimeout); } // 立即尝试重连 if (this.authService.isAuthenticated()) { const activeSession = this.getCurrentSessionId(); // 需要获取当前会话ID this.connectToEventStream(activeSession); } }, this.heartbeatTimeoutDuration); console.log('EventService: 心跳超时定时器已重置(5分钟)'); } // 清除心跳超时定时器 private clearHeartbeatTimeout() { if (this.heartbeatTimeout) { clearTimeout(this.heartbeatTimeout); this.heartbeatTimeout = null; } } // 获取当前会话ID(用于重连时保持会话过滤) private getCurrentSessionId(): string | undefined { // 这里可以从URL或状态管理获取当前会话ID // 暂时返回undefined,表示不进行会话过滤 return undefined; } // 安排重连 private scheduleReconnect(sessionId?: string) { if (this.reconnectTimeout) { clearTimeout(this.reconnectTimeout); } if (this.reconnectAttempts >= this.maxReconnectAttempts) { console.error('EventService: 达到最大重连次数,停止重连'); return; } this.reconnectAttempts++; const delay = this.reconnectDelay * Math.pow(1.5, this.reconnectAttempts - 1); // 指数退避 console.log(`EventService: ${this.reconnectAttempts}/${this.maxReconnectAttempts} 重连,等待 ${delay}ms`); this.reconnectTimeout = setTimeout(() => { if (this.authService.isAuthenticated()) { this.connectToEventStream(sessionId); } }, delay); } // 断开事件流连接 disconnect() { if (this.eventSource) { this.eventSource.close(); this.eventSource = null; this.isConnected = false; this.connectionStatusSubject.next(false); console.log('EventService: 已断开事件流连接'); } if (this.reconnectTimeout) { clearTimeout(this.reconnectTimeout); this.reconnectTimeout = null; } // 清除心跳超时定时器 this.clearHeartbeatTimeout(); this.reconnectAttempts = 0; } // 订阅特定事件类型 subscribeToEvent(eventType: EventType): Observable { return new Observable(observer => { const subscription = this.allEvents$.subscribe(event => { if (event.payload.type === eventType) { observer.next(event.payload as T); } }); return () => subscription.unsubscribe(); }); } // 手动发送事件(用于测试) emitTestEvent(event: GlobalEvent) { this.handleGlobalEvent(event); } // 检查连接状态 isStreamConnected(): boolean { return this.isConnected; } // 切换会话过滤 switchSession(sessionId?: string) { if (this.authService.isAuthenticated()) { this.connectToEventStream(sessionId); } else { console.warn('未登录状态,无法切换会话过滤'); } } // 订阅特定会话的事件 subscribeToSessionEvents(sessionId: string): Observable { return this.sessionEvents$.pipe( filter(({ sessionId: eventSessionId }) => eventSessionId === sessionId), map(({ event }) => event) ); } // 获取事件中的会话ID private getSessionIdFromEvent(event: GlobalEvent): string | undefined { const payload = event.payload; // 根据后端事件分发器的extractSessionIDFromEvent逻辑实现 // 支持多种sessionID字段名和嵌套路径 // 1. 检查payload.properties.sessionID(session.status事件等) if (payload.properties && payload.properties['sessionID']) { return payload.properties['sessionID']; } // 2. 根据事件类型处理 switch (payload.type) { case 'session.updated': // session.updated事件中,会话ID在properties.info.id中 const sessionEvent = payload as SessionUpdatedEvent; return sessionEvent.properties?.info?.id; case 'session.diff': // session.diff事件中,会话ID在properties.sessionID中 const diffEvent = payload as SessionDiffEvent; return diffEvent.properties?.sessionID; case 'message.updated': // message.updated事件中,会话ID可能在多个位置 const messageEvent = payload as MessageUpdatedEvent; // 尝试从properties.info.sessionID获取 if (messageEvent.properties?.info && typeof messageEvent.properties.info === 'object') { const info = messageEvent.properties.info as any; if (info.sessionID) return info.sessionID; if (info.sessionId) return info.sessionId; if (info.session_id) return info.session_id; } // 尝试从properties.sessionID获取 if (messageEvent.properties && (messageEvent.properties as any).sessionID) { return (messageEvent.properties as any).sessionID; } break; case 'message.part.updated': // message.part.updated事件中,会话ID可能在properties.part.sessionID中 const partEvent = payload as MessagePartUpdatedEvent; if (partEvent.properties?.part && typeof partEvent.properties.part === 'object') { const part = partEvent.properties.part as any; if (part.sessionID) return part.sessionID; if (part.sessionId) return part.sessionId; if (part.session_id) return part.session_id; } break; case 'session.status': // session.status事件中,会话ID可能在properties.status.sessionID中 const statusEvent = payload as any; if (statusEvent.properties?.status?.sessionID) { return statusEvent.properties.status.sessionID; } break; } // 3. 递归查找sessionID字段(仿照后端实现) const findSessionIDRecursive = (obj: any): string | undefined => { if (!obj || typeof obj !== 'object') return undefined; // 检查常见的sessionID字段名 const sessionIDKeys = ['sessionID', 'sessionId', 'session_id', 'session']; for (const key of sessionIDKeys) { if (obj[key] && typeof obj[key] === 'string') { return obj[key]; } } // 递归查找 if (Array.isArray(obj)) { for (const item of obj) { const result = findSessionIDRecursive(item); if (result) return result; } } else { for (const key in obj) { if (Object.prototype.hasOwnProperty.call(obj, key)) { const result = findSessionIDRecursive(obj[key]); if (result) return result; } } } return undefined; }; // 在整个事件对象中递归查找 return findSessionIDRecursive(event); } // 分发事件到会话/实例 private distributeEventBySession(event: GlobalEvent): void { const sessionId = this.getSessionIdFromEvent(event); if (sessionId) { console.log(`分发事件到会话: ${sessionId}, 类型: ${event.payload.type}`); // 发送到会话事件流(供该会话的组件订阅) this.sessionEventsSubject.next({ sessionId, event }); // 注:每个浏览器标签页独立SSE连接,不再需要实例映射 // 后端已按sessionId过滤事件,每个标签页只收到自己会话的事件 } } ngOnDestroy() { this.subscriptions.unsubscribe(); this.disconnect(); } }