Açıklama Yok
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

event.service.ts 21KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614
  1. import { Injectable, OnDestroy, inject, Optional } from '@angular/core';
  2. import { BehaviorSubject, Observable, Subject, Subscription, filter, map } from 'rxjs';
  3. import { AuthService } from './auth.service';
  4. import { GlobalEvent, EventPayload, EventType, SessionUpdatedEvent, SessionDiffEvent, ServerHeartbeatEvent, MessageUpdatedEvent, MessagePartUpdatedEvent, SessionStatusEvent, ServerConnectedEvent } from '../models/event.model';
  5. @Injectable({
  6. providedIn: 'root'
  7. })
  8. export class EventService implements OnDestroy {
  9. private eventSource: EventSource | null = null;
  10. private isConnected = false;
  11. private subscriptions: Subscription = new Subscription();
  12. private reconnectTimeout: any = null;
  13. private heartbeatTimeout: any = null;
  14. private reconnectAttempts = 0;
  15. private maxReconnectAttempts = 5;
  16. private reconnectDelay = 3000; // 3秒
  17. private heartbeatTimeoutDuration = 5 * 60 * 1000; // 5分钟心跳超时(根据用户要求)
  18. // 事件主题,用于广播所有事件
  19. private allEventsSubject = new Subject<GlobalEvent>();
  20. allEvents$ = this.allEventsSubject.asObservable();
  21. // 特定事件类型主题
  22. private sessionUpdatedSubject = new Subject<SessionUpdatedEvent>();
  23. sessionUpdated$ = this.sessionUpdatedSubject.asObservable();
  24. private sessionDiffSubject = new Subject<SessionDiffEvent>();
  25. sessionDiff$ = this.sessionDiffSubject.asObservable();
  26. private messageUpdatedSubject = new Subject<MessageUpdatedEvent>();
  27. messageUpdated$ = this.messageUpdatedSubject.asObservable();
  28. private messagePartUpdatedSubject = new Subject<MessagePartUpdatedEvent>();
  29. messagePartUpdated$ = this.messagePartUpdatedSubject.asObservable();
  30. // 按会话ID过滤的事件主题
  31. private sessionEventsSubject = new Subject<{sessionId: string, event: GlobalEvent}>();
  32. sessionEvents$ = this.sessionEventsSubject.asObservable();
  33. // 连接状态主题
  34. private connectionStatusSubject = new BehaviorSubject<boolean>(false);
  35. connectionStatus$ = this.connectionStatusSubject.asObservable();
  36. // 会话ID到实例ID的注册映射(一对多,支持同一会话多个实例)
  37. // 已移除:每个浏览器标签页独立SSE连接,不再需要映射
  38. // 实例事件主题
  39. // 已移除:每个浏览器标签页独立SSE连接,不再需要实例事件路由
  40. // 多会话事件源映射(用于文档列表等场景)
  41. private multiSessionEventSources = new Map<string, EventSource>();
  42. // 多会话事件主题
  43. private multiSessionEventsSubject = new Subject<{sessionId: string, event: GlobalEvent}>();
  44. multiSessionEvents$ = this.multiSessionEventsSubject.asObservable();
  45. constructor(
  46. private authService: AuthService
  47. ) {
  48. this.initializeAuthSubscription();
  49. }
  50. // 初始化认证状态订阅
  51. private initializeAuthSubscription() {
  52. this.subscriptions.add(
  53. this.authService.authState$.subscribe(authState => {
  54. console.log('EventService: 认证状态变化', authState.isAuthenticated);
  55. if (authState.isAuthenticated) {
  56. // 用户已登录,连接事件流
  57. this.connectToEventStream();
  58. } else {
  59. // 用户登出,断开事件流
  60. this.disconnect();
  61. }
  62. })
  63. );
  64. }
  65. // 连接到事件流
  66. connectToEventStream(sessionId?: string) {
  67. if (this.eventSource) {
  68. this.eventSource.close();
  69. this.eventSource = null;
  70. }
  71. // 检查是否已登录
  72. if (!this.authService.isAuthenticated()) {
  73. console.warn('未登录状态,不连接事件流');
  74. return;
  75. }
  76. // 获取认证token
  77. const token = this.authService.getToken();
  78. if (!token) {
  79. console.error('无法获取认证token');
  80. return;
  81. }
  82. // 构建带认证参数的URL - 使用日志流端点,因为它发送全局事件
  83. let url = `/api/logs/stream?token=${encodeURIComponent(token)}`;
  84. if (sessionId) {
  85. url += `&sessionId=${sessionId}`;
  86. }
  87. console.log('EventService: 连接事件流URL:', url);
  88. this.eventSource = new EventSource(url);
  89. this.eventSource.onopen = () => {
  90. console.log('EventService: 事件流连接已建立');
  91. this.isConnected = true;
  92. this.connectionStatusSubject.next(true);
  93. this.reconnectAttempts = 0; // 重置重连计数
  94. // 启动心跳超时定时器(5分钟)
  95. this.resetHeartbeatTimeout();
  96. // 发送连接成功事件
  97. this.allEventsSubject.next({
  98. payload: {
  99. type: 'connection.established',
  100. properties: { timestamp: new Date().toISOString() }
  101. }
  102. });
  103. };
  104. this.eventSource.onmessage = (event) => {
  105. const eventData = event.data;
  106. console.log('EventService: 收到原始事件数据:', eventData.substring(0, 200));
  107. // 处理SSE注释格式的心跳(保持向后兼容)
  108. if (eventData === ': heartbeat' || eventData.startsWith(': ')) {
  109. // 重置心跳超时定时器(表示连接活跃)
  110. this.resetHeartbeatTimeout();
  111. // 发送内部心跳事件,供其他服务订阅
  112. this.allEventsSubject.next({
  113. payload: {
  114. type: 'server.heartbeat.comment',
  115. properties: {
  116. timestamp: new Date().toISOString(),
  117. rawData: eventData
  118. }
  119. }
  120. });
  121. return;
  122. }
  123. try {
  124. // 解析JSON事件
  125. const globalEvent: GlobalEvent = JSON.parse(eventData);
  126. this.handleGlobalEvent(globalEvent);
  127. } catch (error) {
  128. // 如果不是JSON,可能是纯文本日志
  129. console.log('EventService: 非JSON事件,可能是纯文本日志:', eventData.substring(0, 100));
  130. // 如果是纯文本日志,可以转发给日志服务或忽略
  131. // 这里只处理JSON事件
  132. }
  133. };
  134. this.eventSource.onerror = (error) => {
  135. console.error('EventService: 事件流连接错误:', error);
  136. this.isConnected = false;
  137. this.connectionStatusSubject.next(false);
  138. // 清除心跳超时定时器
  139. this.clearHeartbeatTimeout();
  140. // 检查是否仍然登录
  141. if (this.authService.isAuthenticated()) {
  142. console.log('EventService: 连接断开,正在重连...');
  143. this.scheduleReconnect(sessionId);
  144. } else {
  145. console.log('EventService: 连接断开(用户未登录)');
  146. }
  147. };
  148. }
  149. /**
  150. * 连接到多会话事件流(用于文档列表等场景)
  151. * @param sessionIds 会话ID数组
  152. */
  153. connectToMultipleSessions(sessionIds: string[]) {
  154. if (!sessionIds || sessionIds.length === 0) {
  155. return;
  156. }
  157. // 检查是否已登录
  158. if (!this.authService.isAuthenticated()) {
  159. console.warn('未登录状态,不连接多会话事件流');
  160. return;
  161. }
  162. // 获取认证token
  163. const token = this.authService.getToken();
  164. if (!token) {
  165. console.error('无法获取认证token');
  166. return;
  167. }
  168. // 为每个会话ID创建独立的EventSource连接
  169. sessionIds.forEach(sessionId => {
  170. // 如果已经存在连接,先关闭
  171. if (this.multiSessionEventSources.has(sessionId)) {
  172. const existingSource = this.multiSessionEventSources.get(sessionId);
  173. if (existingSource) {
  174. existingSource.close();
  175. }
  176. this.multiSessionEventSources.delete(sessionId);
  177. }
  178. // 构建带认证参数的URL
  179. const url = `/api/logs/stream?token=${encodeURIComponent(token)}&sessionId=${sessionId}`;
  180. console.log(`EventService: 连接多会话事件流,会话ID: ${sessionId}, URL: ${url}`);
  181. const eventSource = new EventSource(url);
  182. this.multiSessionEventSources.set(sessionId, eventSource);
  183. eventSource.onopen = () => {
  184. console.log(`EventService: 多会话事件流连接已建立,会话ID: ${sessionId}`);
  185. };
  186. eventSource.onmessage = (event) => {
  187. const eventData = event.data;
  188. // 跳过心跳消息
  189. if (eventData === ': heartbeat' || eventData.startsWith(': ')) {
  190. return;
  191. }
  192. try {
  193. // 解析JSON事件
  194. const globalEvent: GlobalEvent = JSON.parse(eventData);
  195. console.log(`EventService: 收到多会话事件,会话ID: ${sessionId}, 类型: ${globalEvent.payload.type}`);
  196. // 分发到多会话事件主题
  197. this.multiSessionEventsSubject.next({ sessionId, event: globalEvent });
  198. // 同时分发到全局事件流(保持兼容性)
  199. this.distributeEventBySession(globalEvent);
  200. } catch (error) {
  201. console.error(`EventService: 解析多会话事件失败,会话ID: ${sessionId}, 数据:`, eventData.substring(0, 100));
  202. }
  203. };
  204. eventSource.onerror = (error) => {
  205. console.error(`EventService: 多会话事件流连接错误,会话ID: ${sessionId}:`, error);
  206. // 多会话连接错误不触发全局重连,只关闭该连接
  207. eventSource.close();
  208. this.multiSessionEventSources.delete(sessionId);
  209. // 可选:重新连接单个会话(延迟重试)
  210. setTimeout(() => {
  211. if (this.authService.isAuthenticated()) {
  212. this.connectToMultipleSessions([sessionId]);
  213. }
  214. }, 5000);
  215. };
  216. });
  217. }
  218. /**
  219. * 断开指定会话的多会话事件流
  220. * @param sessionIds 会话ID数组(如果为空则断开所有)
  221. */
  222. disconnectMultipleSessions(sessionIds?: string[]) {
  223. if (!sessionIds) {
  224. // 断开所有多会话连接
  225. this.multiSessionEventSources.forEach((eventSource, sessionId) => {
  226. eventSource.close();
  227. console.log(`EventService: 断开多会话事件流,会话ID: ${sessionId}`);
  228. });
  229. this.multiSessionEventSources.clear();
  230. } else {
  231. // 断开指定会话的连接
  232. sessionIds.forEach(sessionId => {
  233. const eventSource = this.multiSessionEventSources.get(sessionId);
  234. if (eventSource) {
  235. eventSource.close();
  236. this.multiSessionEventSources.delete(sessionId);
  237. console.log(`EventService: 断开多会话事件流,会话ID: ${sessionId}`);
  238. }
  239. });
  240. }
  241. }
  242. /**
  243. * 获取指定会话的事件流(按会话ID过滤)
  244. * @param sessionId 会话ID
  245. * @returns 过滤后的事件Observable
  246. */
  247. getSessionEvents(sessionId: string): Observable<GlobalEvent> {
  248. return this.sessionEvents$.pipe(
  249. filter(({ sessionId: eventSessionId }) => eventSessionId === sessionId),
  250. map(({ event }) => event)
  251. );
  252. }
  253. /**
  254. * 获取多个会话的事件流(按会话ID数组过滤)
  255. * @param sessionIds 会话ID数组
  256. * @returns 过滤后的事件Observable
  257. */
  258. getMultipleSessionEvents(sessionIds: string[]): Observable<{sessionId: string, event: GlobalEvent}> {
  259. return this.multiSessionEvents$.pipe(
  260. filter(({ sessionId }) => sessionIds.includes(sessionId))
  261. );
  262. }
  263. // 处理全局事件
  264. private handleGlobalEvent(event: GlobalEvent) {
  265. console.log('EventService: 处理全局事件,类型:', event.payload.type);
  266. // 任何有效事件都重置心跳超时(表明连接活跃)
  267. this.resetHeartbeatTimeout();
  268. // 广播所有事件
  269. this.allEventsSubject.next(event);
  270. // 根据会话分发事件
  271. this.distributeEventBySession(event);
  272. // 根据事件类型分发到特定主题
  273. const payload = event.payload;
  274. switch (payload.type) {
  275. case 'session.updated': {
  276. const sessionEvent = payload as SessionUpdatedEvent;
  277. this.sessionUpdatedSubject.next(sessionEvent);
  278. console.log('EventService: 分发 session.updated 事件', sessionEvent.properties.info?.title);
  279. break;
  280. }
  281. case 'session.diff':
  282. this.sessionDiffSubject.next(payload as SessionDiffEvent);
  283. console.log('EventService: 分发 session.diff 事件');
  284. break;
  285. case 'server.heartbeat':
  286. this.allEventsSubject.next(event); // 已经广播过
  287. this.resetHeartbeatTimeout();
  288. break;
  289. case 'message.updated':
  290. this.messageUpdatedSubject.next(payload as MessageUpdatedEvent);
  291. console.log('EventService: 分发 message.updated 事件');
  292. break;
  293. case 'message.part.updated':
  294. this.messagePartUpdatedSubject.next(payload as MessagePartUpdatedEvent);
  295. console.log('EventService: 分发 message.part.updated 事件');
  296. break;
  297. case 'session.status':
  298. this.allEventsSubject.next(event);
  299. console.log('EventService: 分发 session.status 事件');
  300. break;
  301. case 'server.connected':
  302. this.allEventsSubject.next(event);
  303. console.log('EventService: 分发 server.connected 事件');
  304. break;
  305. default:
  306. console.log('EventService: 未知事件类型:', payload.type);
  307. // 未知事件类型仍然通过 allEvents$ 广播
  308. }
  309. }
  310. // 重置心跳超时定时器
  311. private resetHeartbeatTimeout() {
  312. // 清除现有超时
  313. if (this.heartbeatTimeout) {
  314. clearTimeout(this.heartbeatTimeout);
  315. }
  316. // 设置新的超时定时器(5分钟)
  317. this.heartbeatTimeout = setTimeout(() => {
  318. console.warn('EventService: 心跳超时(5分钟未收到心跳),尝试重连');
  319. this.isConnected = false;
  320. this.connectionStatusSubject.next(false);
  321. // 清除现有的重连定时器(如果有)
  322. if (this.reconnectTimeout) {
  323. clearTimeout(this.reconnectTimeout);
  324. }
  325. // 立即尝试重连
  326. if (this.authService.isAuthenticated()) {
  327. const activeSession = this.getCurrentSessionId(); // 需要获取当前会话ID
  328. this.connectToEventStream(activeSession);
  329. }
  330. }, this.heartbeatTimeoutDuration);
  331. console.log('EventService: 心跳超时定时器已重置(5分钟)');
  332. }
  333. // 清除心跳超时定时器
  334. private clearHeartbeatTimeout() {
  335. if (this.heartbeatTimeout) {
  336. clearTimeout(this.heartbeatTimeout);
  337. this.heartbeatTimeout = null;
  338. }
  339. }
  340. // 获取当前会话ID(用于重连时保持会话过滤)
  341. private getCurrentSessionId(): string | undefined {
  342. // 这里可以从URL或状态管理获取当前会话ID
  343. // 暂时返回undefined,表示不进行会话过滤
  344. return undefined;
  345. }
  346. // 安排重连
  347. private scheduleReconnect(sessionId?: string) {
  348. if (this.reconnectTimeout) {
  349. clearTimeout(this.reconnectTimeout);
  350. }
  351. if (this.reconnectAttempts >= this.maxReconnectAttempts) {
  352. console.error('EventService: 达到最大重连次数,停止重连');
  353. return;
  354. }
  355. this.reconnectAttempts++;
  356. const delay = this.reconnectDelay * Math.pow(1.5, this.reconnectAttempts - 1); // 指数退避
  357. console.log(`EventService: ${this.reconnectAttempts}/${this.maxReconnectAttempts} 重连,等待 ${delay}ms`);
  358. this.reconnectTimeout = setTimeout(() => {
  359. if (this.authService.isAuthenticated()) {
  360. this.connectToEventStream(sessionId);
  361. }
  362. }, delay);
  363. }
  364. // 断开事件流连接
  365. disconnect() {
  366. if (this.eventSource) {
  367. this.eventSource.close();
  368. this.eventSource = null;
  369. this.isConnected = false;
  370. this.connectionStatusSubject.next(false);
  371. console.log('EventService: 已断开事件流连接');
  372. }
  373. if (this.reconnectTimeout) {
  374. clearTimeout(this.reconnectTimeout);
  375. this.reconnectTimeout = null;
  376. }
  377. // 清除心跳超时定时器
  378. this.clearHeartbeatTimeout();
  379. this.reconnectAttempts = 0;
  380. }
  381. // 订阅特定事件类型
  382. subscribeToEvent<T = any>(eventType: EventType): Observable<T> {
  383. return new Observable<T>(observer => {
  384. const subscription = this.allEvents$.subscribe(event => {
  385. if (event.payload.type === eventType) {
  386. observer.next(event.payload as T);
  387. }
  388. });
  389. return () => subscription.unsubscribe();
  390. });
  391. }
  392. // 手动发送事件(用于测试)
  393. emitTestEvent(event: GlobalEvent) {
  394. this.handleGlobalEvent(event);
  395. }
  396. // 检查连接状态
  397. isStreamConnected(): boolean {
  398. return this.isConnected;
  399. }
  400. // 切换会话过滤
  401. switchSession(sessionId?: string) {
  402. if (this.authService.isAuthenticated()) {
  403. this.connectToEventStream(sessionId);
  404. } else {
  405. console.warn('未登录状态,无法切换会话过滤');
  406. }
  407. }
  408. // 订阅特定会话的事件
  409. subscribeToSessionEvents(sessionId: string): Observable<GlobalEvent> {
  410. return this.sessionEvents$.pipe(
  411. filter(({ sessionId: eventSessionId }) => eventSessionId === sessionId),
  412. map(({ event }) => event)
  413. );
  414. }
  415. // 获取事件中的会话ID
  416. private getSessionIdFromEvent(event: GlobalEvent): string | undefined {
  417. const payload = event.payload;
  418. // 根据后端事件分发器的extractSessionIDFromEvent逻辑实现
  419. // 支持多种sessionID字段名和嵌套路径
  420. // 1. 检查payload.properties.sessionID(session.status事件等)
  421. if (payload.properties && payload.properties['sessionID']) {
  422. return payload.properties['sessionID'];
  423. }
  424. // 2. 根据事件类型处理
  425. switch (payload.type) {
  426. case 'session.updated':
  427. // session.updated事件中,会话ID在properties.info.id中
  428. const sessionEvent = payload as SessionUpdatedEvent;
  429. return sessionEvent.properties?.info?.id;
  430. case 'session.diff':
  431. // session.diff事件中,会话ID在properties.sessionID中
  432. const diffEvent = payload as SessionDiffEvent;
  433. return diffEvent.properties?.sessionID;
  434. case 'message.updated':
  435. // message.updated事件中,会话ID可能在多个位置
  436. const messageEvent = payload as MessageUpdatedEvent;
  437. // 尝试从properties.info.sessionID获取
  438. if (messageEvent.properties?.info && typeof messageEvent.properties.info === 'object') {
  439. const info = messageEvent.properties.info as any;
  440. if (info.sessionID) return info.sessionID;
  441. if (info.sessionId) return info.sessionId;
  442. if (info.session_id) return info.session_id;
  443. }
  444. // 尝试从properties.sessionID获取
  445. if (messageEvent.properties && (messageEvent.properties as any).sessionID) {
  446. return (messageEvent.properties as any).sessionID;
  447. }
  448. break;
  449. case 'message.part.updated':
  450. // message.part.updated事件中,会话ID可能在properties.part.sessionID中
  451. const partEvent = payload as MessagePartUpdatedEvent;
  452. if (partEvent.properties?.part && typeof partEvent.properties.part === 'object') {
  453. const part = partEvent.properties.part as any;
  454. if (part.sessionID) return part.sessionID;
  455. if (part.sessionId) return part.sessionId;
  456. if (part.session_id) return part.session_id;
  457. }
  458. break;
  459. case 'session.status':
  460. // session.status事件中,会话ID可能在properties.status.sessionID中
  461. const statusEvent = payload as any;
  462. if (statusEvent.properties?.status?.sessionID) {
  463. return statusEvent.properties.status.sessionID;
  464. }
  465. break;
  466. }
  467. // 3. 递归查找sessionID字段(仿照后端实现)
  468. const findSessionIDRecursive = (obj: any): string | undefined => {
  469. if (!obj || typeof obj !== 'object') return undefined;
  470. // 检查常见的sessionID字段名
  471. const sessionIDKeys = ['sessionID', 'sessionId', 'session_id', 'session'];
  472. for (const key of sessionIDKeys) {
  473. if (obj[key] && typeof obj[key] === 'string') {
  474. return obj[key];
  475. }
  476. }
  477. // 递归查找
  478. if (Array.isArray(obj)) {
  479. for (const item of obj) {
  480. const result = findSessionIDRecursive(item);
  481. if (result) return result;
  482. }
  483. } else {
  484. for (const key in obj) {
  485. if (Object.prototype.hasOwnProperty.call(obj, key)) {
  486. const result = findSessionIDRecursive(obj[key]);
  487. if (result) return result;
  488. }
  489. }
  490. }
  491. return undefined;
  492. };
  493. // 在整个事件对象中递归查找
  494. return findSessionIDRecursive(event);
  495. }
  496. // 分发事件到会话/实例
  497. private distributeEventBySession(event: GlobalEvent): void {
  498. const sessionId = this.getSessionIdFromEvent(event);
  499. if (sessionId) {
  500. console.log(`分发事件到会话: ${sessionId}, 类型: ${event.payload.type}`);
  501. // 发送到会话事件流(供该会话的组件订阅)
  502. this.sessionEventsSubject.next({ sessionId, event });
  503. // 注:每个浏览器标签页独立SSE连接,不再需要实例映射
  504. // 后端已按sessionId过滤事件,每个标签页只收到自己会话的事件
  505. }
  506. }
  507. ngOnDestroy() {
  508. this.subscriptions.unsubscribe();
  509. this.disconnect();
  510. }
  511. }