| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614 |
- import { Injectable, OnDestroy, inject, Optional } from '@angular/core';
- import { BehaviorSubject, Observable, Subject, Subscription, filter, map } from 'rxjs';
- import { AuthService } from './auth.service';
-
- import { GlobalEvent, EventPayload, EventType, SessionUpdatedEvent, SessionDiffEvent, ServerHeartbeatEvent, MessageUpdatedEvent, MessagePartUpdatedEvent, SessionStatusEvent, ServerConnectedEvent } from '../models/event.model';
-
- @Injectable({
- providedIn: 'root'
- })
- export class EventService implements OnDestroy {
- private eventSource: EventSource | null = null;
- private isConnected = false;
-
- private subscriptions: Subscription = new Subscription();
- private reconnectTimeout: any = null;
- private heartbeatTimeout: any = null;
- private reconnectAttempts = 0;
- private maxReconnectAttempts = 5;
- private reconnectDelay = 3000; // 3秒
- private heartbeatTimeoutDuration = 5 * 60 * 1000; // 5分钟心跳超时(根据用户要求)
-
- // 事件主题,用于广播所有事件
- private allEventsSubject = new Subject<GlobalEvent>();
- allEvents$ = this.allEventsSubject.asObservable();
-
- // 特定事件类型主题
- private sessionUpdatedSubject = new Subject<SessionUpdatedEvent>();
- sessionUpdated$ = this.sessionUpdatedSubject.asObservable();
-
- private sessionDiffSubject = new Subject<SessionDiffEvent>();
- sessionDiff$ = this.sessionDiffSubject.asObservable();
-
- private messageUpdatedSubject = new Subject<MessageUpdatedEvent>();
- messageUpdated$ = this.messageUpdatedSubject.asObservable();
-
- private messagePartUpdatedSubject = new Subject<MessagePartUpdatedEvent>();
- messagePartUpdated$ = this.messagePartUpdatedSubject.asObservable();
-
- // 按会话ID过滤的事件主题
- private sessionEventsSubject = new Subject<{sessionId: string, event: GlobalEvent}>();
- sessionEvents$ = this.sessionEventsSubject.asObservable();
-
- // 连接状态主题
- private connectionStatusSubject = new BehaviorSubject<boolean>(false);
- connectionStatus$ = this.connectionStatusSubject.asObservable();
-
- // 会话ID到实例ID的注册映射(一对多,支持同一会话多个实例)
- // 已移除:每个浏览器标签页独立SSE连接,不再需要映射
-
- // 实例事件主题
- // 已移除:每个浏览器标签页独立SSE连接,不再需要实例事件路由
-
- // 多会话事件源映射(用于文档列表等场景)
- private multiSessionEventSources = new Map<string, EventSource>();
-
- // 多会话事件主题
- private multiSessionEventsSubject = new Subject<{sessionId: string, event: GlobalEvent}>();
- multiSessionEvents$ = this.multiSessionEventsSubject.asObservable();
-
- constructor(
- private authService: AuthService
- ) {
- this.initializeAuthSubscription();
- }
-
- // 初始化认证状态订阅
- private initializeAuthSubscription() {
- this.subscriptions.add(
- this.authService.authState$.subscribe(authState => {
- console.log('EventService: 认证状态变化', authState.isAuthenticated);
-
- if (authState.isAuthenticated) {
- // 用户已登录,连接事件流
- this.connectToEventStream();
- } else {
- // 用户登出,断开事件流
- this.disconnect();
- }
- })
- );
- }
-
- // 连接到事件流
- connectToEventStream(sessionId?: string) {
- if (this.eventSource) {
- this.eventSource.close();
- this.eventSource = null;
- }
-
- // 检查是否已登录
- if (!this.authService.isAuthenticated()) {
- console.warn('未登录状态,不连接事件流');
- return;
- }
-
- // 获取认证token
- const token = this.authService.getToken();
- if (!token) {
- console.error('无法获取认证token');
- return;
- }
-
- // 构建带认证参数的URL - 使用日志流端点,因为它发送全局事件
- let url = `/api/logs/stream?token=${encodeURIComponent(token)}`;
- if (sessionId) {
- url += `&sessionId=${sessionId}`;
- }
-
- console.log('EventService: 连接事件流URL:', url);
- this.eventSource = new EventSource(url);
-
- this.eventSource.onopen = () => {
- console.log('EventService: 事件流连接已建立');
- this.isConnected = true;
- this.connectionStatusSubject.next(true);
- this.reconnectAttempts = 0; // 重置重连计数
-
- // 启动心跳超时定时器(5分钟)
- this.resetHeartbeatTimeout();
-
- // 发送连接成功事件
- this.allEventsSubject.next({
- payload: {
- type: 'connection.established',
- properties: { timestamp: new Date().toISOString() }
- }
- });
- };
-
- this.eventSource.onmessage = (event) => {
- const eventData = event.data;
- console.log('EventService: 收到原始事件数据:', eventData.substring(0, 200));
-
- // 处理SSE注释格式的心跳(保持向后兼容)
- if (eventData === ': heartbeat' || eventData.startsWith(': ')) {
- // 重置心跳超时定时器(表示连接活跃)
- this.resetHeartbeatTimeout();
-
- // 发送内部心跳事件,供其他服务订阅
- this.allEventsSubject.next({
- payload: {
- type: 'server.heartbeat.comment',
- properties: {
- timestamp: new Date().toISOString(),
- rawData: eventData
- }
- }
- });
- return;
- }
-
- try {
- // 解析JSON事件
- const globalEvent: GlobalEvent = JSON.parse(eventData);
- this.handleGlobalEvent(globalEvent);
- } catch (error) {
- // 如果不是JSON,可能是纯文本日志
- console.log('EventService: 非JSON事件,可能是纯文本日志:', eventData.substring(0, 100));
-
- // 如果是纯文本日志,可以转发给日志服务或忽略
- // 这里只处理JSON事件
- }
- };
-
- this.eventSource.onerror = (error) => {
- console.error('EventService: 事件流连接错误:', error);
- this.isConnected = false;
- this.connectionStatusSubject.next(false);
-
- // 清除心跳超时定时器
- this.clearHeartbeatTimeout();
-
- // 检查是否仍然登录
- if (this.authService.isAuthenticated()) {
- console.log('EventService: 连接断开,正在重连...');
- this.scheduleReconnect(sessionId);
- } else {
- console.log('EventService: 连接断开(用户未登录)');
- }
- };
- }
-
- /**
- * 连接到多会话事件流(用于文档列表等场景)
- * @param sessionIds 会话ID数组
- */
- connectToMultipleSessions(sessionIds: string[]) {
- if (!sessionIds || sessionIds.length === 0) {
- return;
- }
-
- // 检查是否已登录
- if (!this.authService.isAuthenticated()) {
- console.warn('未登录状态,不连接多会话事件流');
- return;
- }
-
- // 获取认证token
- const token = this.authService.getToken();
- if (!token) {
- console.error('无法获取认证token');
- return;
- }
-
- // 为每个会话ID创建独立的EventSource连接
- sessionIds.forEach(sessionId => {
- // 如果已经存在连接,先关闭
- if (this.multiSessionEventSources.has(sessionId)) {
- const existingSource = this.multiSessionEventSources.get(sessionId);
- if (existingSource) {
- existingSource.close();
- }
- this.multiSessionEventSources.delete(sessionId);
- }
-
- // 构建带认证参数的URL
- const url = `/api/logs/stream?token=${encodeURIComponent(token)}&sessionId=${sessionId}`;
- console.log(`EventService: 连接多会话事件流,会话ID: ${sessionId}, URL: ${url}`);
-
- const eventSource = new EventSource(url);
- this.multiSessionEventSources.set(sessionId, eventSource);
-
- eventSource.onopen = () => {
- console.log(`EventService: 多会话事件流连接已建立,会话ID: ${sessionId}`);
- };
-
- eventSource.onmessage = (event) => {
- const eventData = event.data;
-
- // 跳过心跳消息
- if (eventData === ': heartbeat' || eventData.startsWith(': ')) {
- return;
- }
-
- try {
- // 解析JSON事件
- const globalEvent: GlobalEvent = JSON.parse(eventData);
- console.log(`EventService: 收到多会话事件,会话ID: ${sessionId}, 类型: ${globalEvent.payload.type}`);
-
- // 分发到多会话事件主题
- this.multiSessionEventsSubject.next({ sessionId, event: globalEvent });
-
- // 同时分发到全局事件流(保持兼容性)
- this.distributeEventBySession(globalEvent);
- } catch (error) {
- console.error(`EventService: 解析多会话事件失败,会话ID: ${sessionId}, 数据:`, eventData.substring(0, 100));
- }
- };
-
- eventSource.onerror = (error) => {
- console.error(`EventService: 多会话事件流连接错误,会话ID: ${sessionId}:`, error);
- // 多会话连接错误不触发全局重连,只关闭该连接
- eventSource.close();
- this.multiSessionEventSources.delete(sessionId);
-
- // 可选:重新连接单个会话(延迟重试)
- setTimeout(() => {
- if (this.authService.isAuthenticated()) {
- this.connectToMultipleSessions([sessionId]);
- }
- }, 5000);
- };
- });
- }
-
- /**
- * 断开指定会话的多会话事件流
- * @param sessionIds 会话ID数组(如果为空则断开所有)
- */
- disconnectMultipleSessions(sessionIds?: string[]) {
- if (!sessionIds) {
- // 断开所有多会话连接
- this.multiSessionEventSources.forEach((eventSource, sessionId) => {
- eventSource.close();
- console.log(`EventService: 断开多会话事件流,会话ID: ${sessionId}`);
- });
- this.multiSessionEventSources.clear();
- } else {
- // 断开指定会话的连接
- sessionIds.forEach(sessionId => {
- const eventSource = this.multiSessionEventSources.get(sessionId);
- if (eventSource) {
- eventSource.close();
- this.multiSessionEventSources.delete(sessionId);
- console.log(`EventService: 断开多会话事件流,会话ID: ${sessionId}`);
- }
- });
- }
- }
-
- /**
- * 获取指定会话的事件流(按会话ID过滤)
- * @param sessionId 会话ID
- * @returns 过滤后的事件Observable
- */
- getSessionEvents(sessionId: string): Observable<GlobalEvent> {
- return this.sessionEvents$.pipe(
- filter(({ sessionId: eventSessionId }) => eventSessionId === sessionId),
- map(({ event }) => event)
- );
- }
-
- /**
- * 获取多个会话的事件流(按会话ID数组过滤)
- * @param sessionIds 会话ID数组
- * @returns 过滤后的事件Observable
- */
- getMultipleSessionEvents(sessionIds: string[]): Observable<{sessionId: string, event: GlobalEvent}> {
- return this.multiSessionEvents$.pipe(
- filter(({ sessionId }) => sessionIds.includes(sessionId))
- );
- }
-
- // 处理全局事件
- private handleGlobalEvent(event: GlobalEvent) {
- console.log('EventService: 处理全局事件,类型:', event.payload.type);
-
- // 任何有效事件都重置心跳超时(表明连接活跃)
- this.resetHeartbeatTimeout();
-
- // 广播所有事件
- this.allEventsSubject.next(event);
-
- // 根据会话分发事件
- this.distributeEventBySession(event);
-
- // 根据事件类型分发到特定主题
- const payload = event.payload;
-
- switch (payload.type) {
- case 'session.updated': {
- const sessionEvent = payload as SessionUpdatedEvent;
- this.sessionUpdatedSubject.next(sessionEvent);
- console.log('EventService: 分发 session.updated 事件', sessionEvent.properties.info?.title);
- break;
- }
-
- case 'session.diff':
- this.sessionDiffSubject.next(payload as SessionDiffEvent);
- console.log('EventService: 分发 session.diff 事件');
- break;
-
- case 'server.heartbeat':
- this.allEventsSubject.next(event); // 已经广播过
- this.resetHeartbeatTimeout();
- break;
-
- case 'message.updated':
- this.messageUpdatedSubject.next(payload as MessageUpdatedEvent);
- console.log('EventService: 分发 message.updated 事件');
- break;
-
- case 'message.part.updated':
- this.messagePartUpdatedSubject.next(payload as MessagePartUpdatedEvent);
- console.log('EventService: 分发 message.part.updated 事件');
- break;
-
- case 'session.status':
- this.allEventsSubject.next(event);
- console.log('EventService: 分发 session.status 事件');
- break;
-
- case 'server.connected':
- this.allEventsSubject.next(event);
- console.log('EventService: 分发 server.connected 事件');
- break;
-
- default:
- console.log('EventService: 未知事件类型:', payload.type);
- // 未知事件类型仍然通过 allEvents$ 广播
- }
- }
-
- // 重置心跳超时定时器
- private resetHeartbeatTimeout() {
- // 清除现有超时
- if (this.heartbeatTimeout) {
- clearTimeout(this.heartbeatTimeout);
- }
-
- // 设置新的超时定时器(5分钟)
- this.heartbeatTimeout = setTimeout(() => {
- console.warn('EventService: 心跳超时(5分钟未收到心跳),尝试重连');
- this.isConnected = false;
- this.connectionStatusSubject.next(false);
-
- // 清除现有的重连定时器(如果有)
- if (this.reconnectTimeout) {
- clearTimeout(this.reconnectTimeout);
- }
-
- // 立即尝试重连
- if (this.authService.isAuthenticated()) {
- const activeSession = this.getCurrentSessionId(); // 需要获取当前会话ID
- this.connectToEventStream(activeSession);
- }
- }, this.heartbeatTimeoutDuration);
-
- console.log('EventService: 心跳超时定时器已重置(5分钟)');
- }
-
- // 清除心跳超时定时器
- private clearHeartbeatTimeout() {
- if (this.heartbeatTimeout) {
- clearTimeout(this.heartbeatTimeout);
- this.heartbeatTimeout = null;
- }
- }
-
- // 获取当前会话ID(用于重连时保持会话过滤)
- private getCurrentSessionId(): string | undefined {
- // 这里可以从URL或状态管理获取当前会话ID
- // 暂时返回undefined,表示不进行会话过滤
- return undefined;
- }
-
- // 安排重连
- private scheduleReconnect(sessionId?: string) {
- if (this.reconnectTimeout) {
- clearTimeout(this.reconnectTimeout);
- }
-
- if (this.reconnectAttempts >= this.maxReconnectAttempts) {
- console.error('EventService: 达到最大重连次数,停止重连');
- return;
- }
-
- this.reconnectAttempts++;
- const delay = this.reconnectDelay * Math.pow(1.5, this.reconnectAttempts - 1); // 指数退避
-
- console.log(`EventService: ${this.reconnectAttempts}/${this.maxReconnectAttempts} 重连,等待 ${delay}ms`);
-
- this.reconnectTimeout = setTimeout(() => {
- if (this.authService.isAuthenticated()) {
- this.connectToEventStream(sessionId);
- }
- }, delay);
- }
-
- // 断开事件流连接
- disconnect() {
- if (this.eventSource) {
- this.eventSource.close();
- this.eventSource = null;
- this.isConnected = false;
- this.connectionStatusSubject.next(false);
-
- console.log('EventService: 已断开事件流连接');
- }
-
- if (this.reconnectTimeout) {
- clearTimeout(this.reconnectTimeout);
- this.reconnectTimeout = null;
- }
-
- // 清除心跳超时定时器
- this.clearHeartbeatTimeout();
-
- this.reconnectAttempts = 0;
- }
-
- // 订阅特定事件类型
- subscribeToEvent<T = any>(eventType: EventType): Observable<T> {
- return new Observable<T>(observer => {
- const subscription = this.allEvents$.subscribe(event => {
- if (event.payload.type === eventType) {
- observer.next(event.payload as T);
- }
- });
-
- return () => subscription.unsubscribe();
- });
- }
-
- // 手动发送事件(用于测试)
- emitTestEvent(event: GlobalEvent) {
- this.handleGlobalEvent(event);
- }
-
- // 检查连接状态
- isStreamConnected(): boolean {
- return this.isConnected;
- }
-
- // 切换会话过滤
- switchSession(sessionId?: string) {
- if (this.authService.isAuthenticated()) {
- this.connectToEventStream(sessionId);
- } else {
- console.warn('未登录状态,无法切换会话过滤');
- }
- }
-
- // 订阅特定会话的事件
- subscribeToSessionEvents(sessionId: string): Observable<GlobalEvent> {
- return this.sessionEvents$.pipe(
- filter(({ sessionId: eventSessionId }) => eventSessionId === sessionId),
- map(({ event }) => event)
- );
- }
-
- // 获取事件中的会话ID
- private getSessionIdFromEvent(event: GlobalEvent): string | undefined {
- const payload = event.payload;
-
- // 根据后端事件分发器的extractSessionIDFromEvent逻辑实现
- // 支持多种sessionID字段名和嵌套路径
-
- // 1. 检查payload.properties.sessionID(session.status事件等)
- if (payload.properties && payload.properties['sessionID']) {
- return payload.properties['sessionID'];
- }
-
- // 2. 根据事件类型处理
- switch (payload.type) {
- case 'session.updated':
- // session.updated事件中,会话ID在properties.info.id中
- const sessionEvent = payload as SessionUpdatedEvent;
- return sessionEvent.properties?.info?.id;
-
- case 'session.diff':
- // session.diff事件中,会话ID在properties.sessionID中
- const diffEvent = payload as SessionDiffEvent;
- return diffEvent.properties?.sessionID;
-
- case 'message.updated':
- // message.updated事件中,会话ID可能在多个位置
- const messageEvent = payload as MessageUpdatedEvent;
- // 尝试从properties.info.sessionID获取
- if (messageEvent.properties?.info && typeof messageEvent.properties.info === 'object') {
- const info = messageEvent.properties.info as any;
- if (info.sessionID) return info.sessionID;
- if (info.sessionId) return info.sessionId;
- if (info.session_id) return info.session_id;
- }
- // 尝试从properties.sessionID获取
- if (messageEvent.properties && (messageEvent.properties as any).sessionID) {
- return (messageEvent.properties as any).sessionID;
- }
- break;
-
- case 'message.part.updated':
- // message.part.updated事件中,会话ID可能在properties.part.sessionID中
- const partEvent = payload as MessagePartUpdatedEvent;
- if (partEvent.properties?.part && typeof partEvent.properties.part === 'object') {
- const part = partEvent.properties.part as any;
- if (part.sessionID) return part.sessionID;
- if (part.sessionId) return part.sessionId;
- if (part.session_id) return part.session_id;
- }
- break;
-
- case 'session.status':
- // session.status事件中,会话ID可能在properties.status.sessionID中
- const statusEvent = payload as any;
- if (statusEvent.properties?.status?.sessionID) {
- return statusEvent.properties.status.sessionID;
- }
- break;
- }
-
- // 3. 递归查找sessionID字段(仿照后端实现)
- const findSessionIDRecursive = (obj: any): string | undefined => {
- if (!obj || typeof obj !== 'object') return undefined;
-
- // 检查常见的sessionID字段名
- const sessionIDKeys = ['sessionID', 'sessionId', 'session_id', 'session'];
- for (const key of sessionIDKeys) {
- if (obj[key] && typeof obj[key] === 'string') {
- return obj[key];
- }
- }
-
- // 递归查找
- if (Array.isArray(obj)) {
- for (const item of obj) {
- const result = findSessionIDRecursive(item);
- if (result) return result;
- }
- } else {
- for (const key in obj) {
- if (Object.prototype.hasOwnProperty.call(obj, key)) {
- const result = findSessionIDRecursive(obj[key]);
- if (result) return result;
- }
- }
- }
-
- return undefined;
- };
-
- // 在整个事件对象中递归查找
- return findSessionIDRecursive(event);
- }
-
- // 分发事件到会话/实例
- private distributeEventBySession(event: GlobalEvent): void {
- const sessionId = this.getSessionIdFromEvent(event);
- if (sessionId) {
- console.log(`分发事件到会话: ${sessionId}, 类型: ${event.payload.type}`);
-
- // 发送到会话事件流(供该会话的组件订阅)
- this.sessionEventsSubject.next({ sessionId, event });
-
- // 注:每个浏览器标签页独立SSE连接,不再需要实例映射
- // 后端已按sessionId过滤事件,每个标签页只收到自己会话的事件
- }
- }
-
- ngOnDestroy() {
- this.subscriptions.unsubscribe();
- this.disconnect();
- }
- }
|