import { Injectable, OnDestroy } from '@angular/core'; import { BehaviorSubject, Observable, Subject, Subscription } from 'rxjs'; import { AuthService } from './auth.service'; import { GlobalEvent } from '../models/event.model'; /** * 独立事件服务 - 为每个标签页提供独立的SSE连接 * * 特点: * 1. 非单例:通过组件级providers创建独立实例 * 2. 简单:只处理单个会话的连接,无需多会话、映射等复杂逻辑 * 3. 独立:每个实例管理自己的EventSource连接 * 4. 生命周期对齐:随组件创建/销毁 */ @Injectable() export class IndependentEventService implements OnDestroy { private eventSource: EventSource | null = null; private isConnected = false; private reconnectTimeout: any = null; private reconnectAttempts = 0; private maxReconnectAttempts = 5; private reconnectDelay = 3000; private heartbeatTimeout: any = null; private heartbeatTimeoutDuration = 5 * 60 * 1000; // 5分钟心跳超时 // 事件主题,用于广播所有事件 private allEventsSubject = new Subject(); readonly events$ = this.allEventsSubject.asObservable(); // 连接状态主题 private connectionStatusSubject = new BehaviorSubject(false); readonly connectionStatus$ = this.connectionStatusSubject.asObservable(); // 当前连接的会话ID private currentSessionId: string | undefined; constructor( private authService: AuthService ) {} /** * 连接到事件流 * @param sessionId 会话ID(可选,用于会话过滤) */ connect(sessionId?: string): boolean { console.log('🔍 [IndependentEventService] 连接事件流,会话ID:', sessionId); // 断开现有连接 this.disconnect(); // 检查是否已登录 if (!this.authService.isAuthenticated()) { console.warn('未登录状态,不连接事件流'); return false; } // 获取认证token const token = this.authService.getToken(); if (!token) { console.error('无法获取认证token'); return false; } // 构建带认证参数的URL let url = `/api/logs/stream?token=${encodeURIComponent(token)}`; if (sessionId) { url += `&sessionId=${sessionId}`; this.currentSessionId = sessionId; } console.log('🔍 [IndependentEventService] 连接URL:', url); this.eventSource = new EventSource(url); this.eventSource.onopen = () => { console.log('🔍 [IndependentEventService] 事件流连接已建立'); this.isConnected = true; this.connectionStatusSubject.next(true); this.reconnectAttempts = 0; // 启动心跳超时定时器 this.resetHeartbeatTimeout(); // 发送连接成功事件 this.allEventsSubject.next({ payload: { type: 'connection.established', properties: { timestamp: new Date().toISOString() } } }); }; this.eventSource.onmessage = (event) => { const eventData = event.data; // 处理SSE注释格式的心跳 if (eventData === ': heartbeat' || eventData.startsWith(': ')) { // 重置心跳超时定时器 this.resetHeartbeatTimeout(); return; } console.log('🔍 [IndependentEventService] 收到原始事件数据:', eventData.substring(0, 200)); try { // 解析JSON事件 const globalEvent: GlobalEvent = JSON.parse(eventData); this.handleEvent(globalEvent); } catch (error) { console.log('🔍 [IndependentEventService] 非JSON事件:', eventData.substring(0, 100)); } }; this.eventSource.onerror = (error) => { console.error('🔍 [IndependentEventService] 事件流连接错误:', error); this.isConnected = false; this.connectionStatusSubject.next(false); // 清除心跳超时定时器 this.clearHeartbeatTimeout(); // 检查是否仍然登录 if (this.authService.isAuthenticated()) { console.log('🔍 [IndependentEventService] 连接断开,正在重连...'); this.scheduleReconnect(sessionId); } else { console.log('🔍 [IndependentEventService] 连接断开(用户未登录)'); } }; return true; } /** * 断开事件流连接 */ disconnect(): void { if (this.eventSource) { this.eventSource.close(); this.eventSource = null; this.isConnected = false; this.connectionStatusSubject.next(false); this.currentSessionId = undefined; console.log('🔍 [IndependentEventService] 已断开事件流连接'); } if (this.reconnectTimeout) { clearTimeout(this.reconnectTimeout); this.reconnectTimeout = null; } // 清除心跳超时定时器 this.clearHeartbeatTimeout(); this.reconnectAttempts = 0; } /** * 获取当前会话ID */ getCurrentSessionId(): string | undefined { return this.currentSessionId; } /** * 检查连接状态 */ isConnectedStatus(): boolean { return this.isConnected; } /** * 处理事件 */ private handleEvent(event: GlobalEvent): void { // 任何有效事件都重置心跳超时 this.resetHeartbeatTimeout(); // 广播所有事件 this.allEventsSubject.next(event); console.log('🔍 [IndependentEventService] 处理事件,类型:', event.payload.type); console.log('🔍 [IndependentEventService] 事件完整内容:', JSON.stringify(event, null, 2).substring(0, 500)); } /** * 重置心跳超时定时器 */ private resetHeartbeatTimeout(): void { // 清除现有超时 if (this.heartbeatTimeout) { clearTimeout(this.heartbeatTimeout); } // 设置新的超时定时器(5分钟) this.heartbeatTimeout = setTimeout(() => { console.warn('🔍 [IndependentEventService] 心跳超时(5分钟未收到心跳),尝试重连'); this.isConnected = false; this.connectionStatusSubject.next(false); // 清除现有的重连定时器(如果有) if (this.reconnectTimeout) { clearTimeout(this.reconnectTimeout); } // 立即尝试重连 if (this.authService.isAuthenticated()) { this.connect(this.currentSessionId); } }, this.heartbeatTimeoutDuration); } /** * 清除心跳超时定时器 */ private clearHeartbeatTimeout(): void { if (this.heartbeatTimeout) { clearTimeout(this.heartbeatTimeout); this.heartbeatTimeout = null; } } /** * 安排重连 */ private scheduleReconnect(sessionId?: string): void { if (this.reconnectTimeout) { clearTimeout(this.reconnectTimeout); } if (this.reconnectAttempts >= this.maxReconnectAttempts) { console.error('🔍 [IndependentEventService] 达到最大重连次数,停止重连'); return; } this.reconnectAttempts++; const delay = this.reconnectDelay * Math.pow(1.5, this.reconnectAttempts - 1); // 指数退避 console.log(`🔍 [IndependentEventService] ${this.reconnectAttempts}/${this.maxReconnectAttempts} 重连,等待 ${delay}ms`); this.reconnectTimeout = setTimeout(() => { if (this.authService.isAuthenticated()) { this.connect(sessionId); } }, delay); } /** * 清理资源 */ ngOnDestroy(): void { this.disconnect(); } }