Nenhuma descrição
Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.

independent-event.service.ts 7.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. import { Injectable, OnDestroy } from '@angular/core';
  2. import { BehaviorSubject, Observable, Subject, Subscription } from 'rxjs';
  3. import { AuthService } from './auth.service';
  4. import { GlobalEvent } from '../models/event.model';
  5. /**
  6. * 独立事件服务 - 为每个标签页提供独立的SSE连接
  7. *
  8. * 特点:
  9. * 1. 非单例:通过组件级providers创建独立实例
  10. * 2. 简单:只处理单个会话的连接,无需多会话、映射等复杂逻辑
  11. * 3. 独立:每个实例管理自己的EventSource连接
  12. * 4. 生命周期对齐:随组件创建/销毁
  13. */
  14. @Injectable()
  15. export class IndependentEventService implements OnDestroy {
  16. private eventSource: EventSource | null = null;
  17. private isConnected = false;
  18. private reconnectTimeout: any = null;
  19. private reconnectAttempts = 0;
  20. private maxReconnectAttempts = 5;
  21. private reconnectDelay = 3000;
  22. private heartbeatTimeout: any = null;
  23. private heartbeatTimeoutDuration = 5 * 60 * 1000; // 5分钟心跳超时
  24. // 事件主题,用于广播所有事件
  25. private allEventsSubject = new Subject<GlobalEvent>();
  26. readonly events$ = this.allEventsSubject.asObservable();
  27. // 连接状态主题
  28. private connectionStatusSubject = new BehaviorSubject<boolean>(false);
  29. readonly connectionStatus$ = this.connectionStatusSubject.asObservable();
  30. // 当前连接的会话ID
  31. private currentSessionId: string | undefined;
  32. constructor(
  33. private authService: AuthService
  34. ) {}
  35. /**
  36. * 连接到事件流
  37. * @param sessionId 会话ID(可选,用于会话过滤)
  38. */
  39. connect(sessionId?: string): boolean {
  40. console.log('🔍 [IndependentEventService] 连接事件流,会话ID:', sessionId);
  41. // 断开现有连接
  42. this.disconnect();
  43. // 检查是否已登录
  44. if (!this.authService.isAuthenticated()) {
  45. console.warn('未登录状态,不连接事件流');
  46. return false;
  47. }
  48. // 获取认证token
  49. const token = this.authService.getToken();
  50. if (!token) {
  51. console.error('无法获取认证token');
  52. return false;
  53. }
  54. // 构建带认证参数的URL
  55. let url = `/api/logs/stream?token=${encodeURIComponent(token)}`;
  56. if (sessionId) {
  57. url += `&sessionId=${sessionId}`;
  58. this.currentSessionId = sessionId;
  59. }
  60. console.log('🔍 [IndependentEventService] 连接URL:', url);
  61. this.eventSource = new EventSource(url);
  62. this.eventSource.onopen = () => {
  63. console.log('🔍 [IndependentEventService] 事件流连接已建立');
  64. this.isConnected = true;
  65. this.connectionStatusSubject.next(true);
  66. this.reconnectAttempts = 0;
  67. // 启动心跳超时定时器
  68. this.resetHeartbeatTimeout();
  69. // 发送连接成功事件
  70. this.allEventsSubject.next({
  71. payload: {
  72. type: 'connection.established',
  73. properties: { timestamp: new Date().toISOString() }
  74. }
  75. });
  76. };
  77. this.eventSource.onmessage = (event) => {
  78. const eventData = event.data;
  79. // 处理SSE注释格式的心跳
  80. if (eventData === ': heartbeat' || eventData.startsWith(': ')) {
  81. // 重置心跳超时定时器
  82. this.resetHeartbeatTimeout();
  83. return;
  84. }
  85. console.log('🔍 [IndependentEventService] 收到原始事件数据:', eventData.substring(0, 200));
  86. try {
  87. // 解析JSON事件
  88. const globalEvent: GlobalEvent = JSON.parse(eventData);
  89. this.handleEvent(globalEvent);
  90. } catch (error) {
  91. console.log('🔍 [IndependentEventService] 非JSON事件:', eventData.substring(0, 100));
  92. }
  93. };
  94. this.eventSource.onerror = (error) => {
  95. console.error('🔍 [IndependentEventService] 事件流连接错误:', error);
  96. this.isConnected = false;
  97. this.connectionStatusSubject.next(false);
  98. // 清除心跳超时定时器
  99. this.clearHeartbeatTimeout();
  100. // 检查是否仍然登录
  101. if (this.authService.isAuthenticated()) {
  102. console.log('🔍 [IndependentEventService] 连接断开,正在重连...');
  103. this.scheduleReconnect(sessionId);
  104. } else {
  105. console.log('🔍 [IndependentEventService] 连接断开(用户未登录)');
  106. }
  107. };
  108. return true;
  109. }
  110. /**
  111. * 断开事件流连接
  112. */
  113. disconnect(): void {
  114. if (this.eventSource) {
  115. this.eventSource.close();
  116. this.eventSource = null;
  117. this.isConnected = false;
  118. this.connectionStatusSubject.next(false);
  119. this.currentSessionId = undefined;
  120. console.log('🔍 [IndependentEventService] 已断开事件流连接');
  121. }
  122. if (this.reconnectTimeout) {
  123. clearTimeout(this.reconnectTimeout);
  124. this.reconnectTimeout = null;
  125. }
  126. // 清除心跳超时定时器
  127. this.clearHeartbeatTimeout();
  128. this.reconnectAttempts = 0;
  129. }
  130. /**
  131. * 获取当前会话ID
  132. */
  133. getCurrentSessionId(): string | undefined {
  134. return this.currentSessionId;
  135. }
  136. /**
  137. * 检查连接状态
  138. */
  139. isConnectedStatus(): boolean {
  140. return this.isConnected;
  141. }
  142. /**
  143. * 处理事件
  144. */
  145. private handleEvent(event: GlobalEvent): void {
  146. // 任何有效事件都重置心跳超时
  147. this.resetHeartbeatTimeout();
  148. // 广播所有事件
  149. this.allEventsSubject.next(event);
  150. console.log('🔍 [IndependentEventService] 处理事件,类型:', event.payload.type);
  151. console.log('🔍 [IndependentEventService] 事件完整内容:', JSON.stringify(event, null, 2).substring(0, 500));
  152. }
  153. /**
  154. * 重置心跳超时定时器
  155. */
  156. private resetHeartbeatTimeout(): void {
  157. // 清除现有超时
  158. if (this.heartbeatTimeout) {
  159. clearTimeout(this.heartbeatTimeout);
  160. }
  161. // 设置新的超时定时器(5分钟)
  162. this.heartbeatTimeout = setTimeout(() => {
  163. console.warn('🔍 [IndependentEventService] 心跳超时(5分钟未收到心跳),尝试重连');
  164. this.isConnected = false;
  165. this.connectionStatusSubject.next(false);
  166. // 清除现有的重连定时器(如果有)
  167. if (this.reconnectTimeout) {
  168. clearTimeout(this.reconnectTimeout);
  169. }
  170. // 立即尝试重连
  171. if (this.authService.isAuthenticated()) {
  172. this.connect(this.currentSessionId);
  173. }
  174. }, this.heartbeatTimeoutDuration);
  175. }
  176. /**
  177. * 清除心跳超时定时器
  178. */
  179. private clearHeartbeatTimeout(): void {
  180. if (this.heartbeatTimeout) {
  181. clearTimeout(this.heartbeatTimeout);
  182. this.heartbeatTimeout = null;
  183. }
  184. }
  185. /**
  186. * 安排重连
  187. */
  188. private scheduleReconnect(sessionId?: string): void {
  189. if (this.reconnectTimeout) {
  190. clearTimeout(this.reconnectTimeout);
  191. }
  192. if (this.reconnectAttempts >= this.maxReconnectAttempts) {
  193. console.error('🔍 [IndependentEventService] 达到最大重连次数,停止重连');
  194. return;
  195. }
  196. this.reconnectAttempts++;
  197. const delay = this.reconnectDelay * Math.pow(1.5, this.reconnectAttempts - 1); // 指数退避
  198. console.log(`🔍 [IndependentEventService] ${this.reconnectAttempts}/${this.maxReconnectAttempts} 重连,等待 ${delay}ms`);
  199. this.reconnectTimeout = setTimeout(() => {
  200. if (this.authService.isAuthenticated()) {
  201. this.connect(sessionId);
  202. }
  203. }, delay);
  204. }
  205. /**
  206. * 清理资源
  207. */
  208. ngOnDestroy(): void {
  209. this.disconnect();
  210. }
  211. }