Kaynağa Gözat

流式订阅测试通过-测试通过

qdy 3 hafta önce
ebeveyn
işleme
f559d693bb

+ 11
- 0
src/app/app.component.html Dosyayı Görüntüle

@@ -21,6 +21,17 @@
21 21
           <mat-icon class="mr-2 text-gray-600 group-hover:text-red-600">logout</mat-icon>
22 22
           <span class="text-sm text-gray-700 group-hover:text-red-700">退出</span>
23 23
         </div>
24
+        
25
+        <!-- 事件流连接状态指示器 -->
26
+        <div class="flex items-center" matTooltip="{{ isEventStreamConnected ? '事件流已连接' : '事件流已断开' }}" matTooltipPosition="above">
27
+          <mat-icon class="mr-1 text-sm" [class.text-green-500]="isEventStreamConnected" [class.text-red-500]="!isEventStreamConnected">
28
+            {{ isEventStreamConnected ? 'wifi' : 'wifi_off' }}
29
+          </mat-icon>
30
+          <span class="text-xs text-gray-600">
31
+            {{ isEventStreamConnected ? '已连接' : '断开' }}
32
+          </span>
33
+        </div>
34
+        
24 35
         <span class="text-sm text-gray-700">
25 36
           {{ username }}
26 37
         </span>

+ 12
- 1
src/app/app.component.ts Dosyayı Görüntüle

@@ -10,6 +10,7 @@ import { MatMenuModule } from '@angular/material/menu';
10 10
 import { MatTooltipModule } from '@angular/material/tooltip';
11 11
 import { DragDropModule } from '@angular/cdk/drag-drop';
12 12
 import { AuthService } from './services/auth.service';
13
+import { EventService } from './services/event.service';
13 14
 import { Subscription } from 'rxjs';
14 15
 import { ConfigService } from 'base-core';
15 16
 
@@ -29,11 +30,13 @@ export class AppComponent implements OnInit, OnDestroy {
29 30
   minRightWidth = 300;
30 31
   maxRightWidth = 600;
31 32
   private subscriptions: Subscription = new Subscription();
33
+  isEventStreamConnected = false;
32 34
 
33 35
   constructor(
34 36
     private router: Router,
35 37
     public authService: AuthService,
36
-    private config: ConfigService
38
+    private config: ConfigService,
39
+    private eventService: EventService
37 40
   ) {}
38 41
   
39 42
   ngOnInit() {
@@ -61,6 +64,14 @@ export class AppComponent implements OnInit, OnDestroy {
61 64
 
62 65
 
63 66
     // 订阅路由变化(暂无需要)
67
+    
68
+    // 订阅事件流连接状态
69
+    this.subscriptions.add(
70
+      this.eventService.connectionStatus$.subscribe(connected => {
71
+        this.isEventStreamConnected = connected;
72
+        console.log('事件流连接状态:', connected ? '已连接' : '已断开');
73
+      })
74
+    );
64 75
   }
65 76
   
66 77
   get isLoginPage(): boolean {

+ 37
- 7
src/app/components/conversation.component.ts Dosyayı Görüntüle

@@ -69,7 +69,7 @@ import { Session } from '../models/session.model';
69 69
           color="primary" 
70 70
           class="send-button"
71 71
           (click)="sendMessage()"
72
-          [disabled]="!userInput.trim() || isLoading"
72
+          [disabled]="!userInput.trim()"
73 73
         >
74 74
           <mat-icon *ngIf="!isLoading">send</mat-icon>
75 75
           <mat-spinner *ngIf="isLoading" diameter="20"></mat-spinner>
@@ -205,6 +205,14 @@ export class ConversationComponent implements OnInit, OnDestroy, AfterViewChecke
205 205
     // 订阅活动会话变化
206 206
     this.subscriptions.add(
207 207
       this.sessionService.activeSession$.subscribe(session => {
208
+        console.log('🔍 [ConversationComponent] 活动会话变化:', session?.id);
209
+        
210
+        // 会话切换时取消当前正在进行的流式请求
211
+        if (this.activeSession && this.activeSession.id !== session?.id) {
212
+          console.log('🔍 [ConversationComponent] 会话切换,取消当前流式请求');
213
+          this.cancelCurrentStream();
214
+        }
215
+        
208 216
         this.activeSession = session;
209 217
         if (session) {
210 218
           this.loadMessages(session.id);
@@ -240,6 +248,19 @@ export class ConversationComponent implements OnInit, OnDestroy, AfterViewChecke
240 248
           if (lastMessage) {
241 249
             lastMessage.loading = false;
242 250
           }
251
+          // 重置加载状态并聚焦输入框
252
+          this.isLoading = false;
253
+          this.focusInput();
254
+        } else if (update.type === 'error') {
255
+          // 处理错误情况
256
+          const lastMessage = this.messages[this.messages.length - 1];
257
+          if (lastMessage && lastMessage.role === 'assistant') {
258
+            lastMessage.content += '\n\n[错误: ' + update.data + ']';
259
+            lastMessage.loading = false;
260
+          }
261
+          // 重置加载状态并聚焦输入框
262
+          this.isLoading = false;
263
+          this.focusInput();
243 264
         }
244 265
       })
245 266
     );
@@ -284,9 +305,9 @@ export class ConversationComponent implements OnInit, OnDestroy, AfterViewChecke
284 305
   }
285 306
 
286 307
   sendMessage() {
287
-    if (!this.userInput.trim() || !this.activeSession || this.isLoading) return;
308
+    if (!this.userInput.trim() || !this.activeSession) return;
288 309
 
289
-    // 取消之前可能仍在进行的流式请求
310
+    // 取消之前可能仍在进行的流式请求(允许打断之前的回答)
290 311
     this.cancelCurrentStream();
291 312
 
292 313
     const userMessage: ChatMessage = {
@@ -320,7 +341,8 @@ export class ConversationComponent implements OnInit, OnDestroy, AfterViewChecke
320 341
       this.userInput
321 342
     ).subscribe({
322 343
       next: () => {
323
-        this.isLoading = false;
344
+        // 流式连接已建立,但传输仍在继续
345
+        // isLoading状态将在流式完成时在streamUpdate$中重置
324 346
       },
325 347
       error: (error) => {
326 348
         console.error('发送消息失败:', error);
@@ -328,6 +350,7 @@ export class ConversationComponent implements OnInit, OnDestroy, AfterViewChecke
328 350
         aiMessage.loading = false;
329 351
         this.isLoading = false;
330 352
         this.currentStreamSubscription = null;
353
+        this.focusInput();
331 354
       },
332 355
       complete: () => {
333 356
         // 流式完成已在streamUpdate$中处理
@@ -339,9 +362,7 @@ export class ConversationComponent implements OnInit, OnDestroy, AfterViewChecke
339 362
     this.userInput = '';
340 363
     
341 364
     // 聚焦输入框
342
-    setTimeout(() => {
343
-      this.messageInput.nativeElement.focus();
344
-    }, 100);
365
+    this.focusInput();
345 366
   }
346 367
 
347 368
   formatContent(content: string): string {
@@ -357,4 +378,13 @@ export class ConversationComponent implements OnInit, OnDestroy, AfterViewChecke
357 378
       console.error('滚动失败:', err);
358 379
     }
359 380
   }
381
+
382
+  private focusInput() {
383
+    // 延迟聚焦以确保DOM已更新
384
+    setTimeout(() => {
385
+      if (this.messageInput?.nativeElement) {
386
+        this.messageInput.nativeElement.focus();
387
+      }
388
+    }, 100);
389
+  }
360 390
 }

+ 115
- 0
src/app/models/event.model.ts Dosyayı Görüntüle

@@ -0,0 +1,115 @@
1
+// 全局事件模型
2
+export interface GlobalEvent {
3
+  directory?: string;
4
+  payload: EventPayload;
5
+}
6
+
7
+// 事件负载
8
+export interface EventPayload {
9
+  type: string;
10
+  properties?: Record<string, any>;
11
+}
12
+
13
+// 会话更新事件
14
+export interface SessionUpdatedEvent {
15
+  type: 'session.updated';
16
+  properties: {
17
+    info: SessionInfo;
18
+  };
19
+}
20
+
21
+// 会话信息
22
+export interface SessionInfo {
23
+  id: string;
24
+  slug: string;
25
+  version: string;
26
+  projectID: string;
27
+  directory: string;
28
+  title: string;
29
+  time: {
30
+    created: number;
31
+    updated: number;
32
+  };
33
+  summary?: {
34
+    additions: number;
35
+    deletions: number;
36
+    files: number;
37
+  };
38
+}
39
+
40
+// 会话差异事件
41
+export interface SessionDiffEvent {
42
+  type: 'session.diff';
43
+  properties: {
44
+    sessionID: string;
45
+    diff: any[];
46
+  };
47
+}
48
+
49
+// 服务器心跳事件
50
+export interface ServerHeartbeatEvent {
51
+  type: 'server.heartbeat';
52
+  properties: Record<string, never>;
53
+}
54
+
55
+// 消息更新事件
56
+export interface MessageUpdatedEvent {
57
+  type: 'message.updated';
58
+  properties: {
59
+    info: MessageInfo;
60
+  };
61
+}
62
+
63
+// 消息部分更新事件
64
+export interface MessagePartUpdatedEvent {
65
+  type: 'message.part.updated';
66
+  properties: {
67
+    part: MessagePart;
68
+    delta?: string;
69
+  };
70
+}
71
+
72
+// 消息信息
73
+export interface MessageInfo {
74
+  id: string;
75
+  role: string;
76
+  content?: string;
77
+  parts?: MessagePart[];
78
+}
79
+
80
+// 消息部分
81
+export interface MessagePart {
82
+  type: 'text' | 'reasoning' | string;
83
+  text?: string;
84
+}
85
+
86
+// 会话状态事件
87
+export interface SessionStatusEvent {
88
+  type: 'session.status';
89
+  properties: {
90
+    status: {
91
+      type: string;
92
+      [key: string]: any;
93
+    };
94
+  };
95
+}
96
+
97
+// 服务器连接事件
98
+export interface ServerConnectedEvent {
99
+  type: 'server.connected';
100
+  properties: Record<string, any>;
101
+}
102
+
103
+// 事件类型映射
104
+export type EventType = 
105
+  | 'session.updated'
106
+  | 'session.diff'
107
+  | 'server.heartbeat'
108
+  | 'message.updated'
109
+  | 'message.part.updated'
110
+  | 'session.status'
111
+  | 'server.connected'
112
+  | string;
113
+
114
+// 事件处理器类型
115
+export type EventHandler<T = any> = (event: T) => void;

+ 59
- 11
src/app/services/conversation.service.ts Dosyayı Görüntüle

@@ -50,6 +50,9 @@ export class ConversationService {
50 50
       // 创建AbortController用于取消请求
51 51
       const abortController = new AbortController();
52 52
       
53
+      // 心跳超时定时器(5分钟)- 用于清理函数访问
54
+      let heartbeatTimeout: any = null;
55
+      
53 56
       // 使用fetch API以便流式读取SSE响应
54 57
       fetch('/api/prompt/stream', {
55 58
         method: 'POST',
@@ -79,15 +82,41 @@ export class ConversationService {
79 82
         const decoder = new TextDecoder();
80 83
         let buffer = '';
81 84
         
85
+        // 心跳超时定时器(5分钟)- 如果5分钟内未收到任何数据(包括心跳),则认为连接已断开
86
+        // 使用外层声明的heartbeatTimeout变量
87
+        const HEARTBEAT_TIMEOUT_MS = 5 * 60 * 1000; // 5分钟
88
+        
89
+        const resetHeartbeatTimeout = () => {
90
+          // 清除现有定时器
91
+          if (heartbeatTimeout) {
92
+            clearTimeout(heartbeatTimeout);
93
+          }
94
+          // 设置新的超时定时器
95
+          heartbeatTimeout = setTimeout(() => {
96
+            console.error('🔍 [ConversationService] 心跳超时(5分钟未收到数据),主动断开连接');
97
+            abortController.abort();
98
+            this.streamUpdateSubject.next({ type: 'error', data: '连接超时,请重试' });
99
+            observer.error(new Error('连接超时,请重试'));
100
+          }, HEARTBEAT_TIMEOUT_MS);
101
+        };
102
+        
103
+        // 初始设置心跳超时定时器
104
+        resetHeartbeatTimeout();
105
+        
82 106
         const readStream = () => {
83
-          reader.read().then(({ done, value }) => {
84
-            if (done) {
85
-              console.log('🔍 [ConversationService] 流结束');
86
-              this.streamUpdateSubject.next({ type: 'done', data: '' });
87
-              observer.next();
88
-              observer.complete();
89
-              return;
90
-            }
107
+           reader.read().then(({ done, value }) => {
108
+             if (done) {
109
+               console.log('🔍 [ConversationService] 流结束');
110
+               // 清除心跳超时定时器
111
+               if (heartbeatTimeout) {
112
+                 clearTimeout(heartbeatTimeout);
113
+                 heartbeatTimeout = null;
114
+               }
115
+               this.streamUpdateSubject.next({ type: 'done', data: '' });
116
+               observer.next();
117
+               observer.complete();
118
+               return;
119
+             }
91 120
             
92 121
             // 解码数据
93 122
             buffer += decoder.decode(value, { stream: true });
@@ -98,6 +127,17 @@ export class ConversationService {
98 127
               const event = buffer.substring(0, eventEnd);
99 128
               buffer = buffer.substring(eventEnd + 2); // 移除已处理的事件和两个换行符
100 129
               
130
+              // 重置心跳超时(收到任何事件,包括注释)
131
+              resetHeartbeatTimeout();
132
+              
133
+              // 检查是否为注释行(以冒号开头)
134
+              if (event.startsWith(':')) {
135
+                console.log('🔍 [ConversationService] 收到SSE注释:', event.substring(0, 50));
136
+                // 心跳注释,继续处理下一个事件
137
+                eventEnd = buffer.indexOf('\n\n');
138
+                continue;
139
+              }
140
+              
101 141
               // 查找数据行
102 142
               const dataLineStart = event.indexOf('data: ');
103 143
               if (dataLineStart !== -1) {
@@ -148,9 +188,12 @@ export class ConversationService {
148 188
                         else if (payload.type === 'server.connected') {
149 189
                           console.log('🔍 [ConversationService] 服务器连接成功');
150 190
                         }
151
-                        else if (payload.type === 'session.status') {
152
-                          console.log('🔍 [ConversationService] 会话状态更新:', payload.properties?.status?.type);
153
-                        }
191
+                         else if (payload.type === 'session.status') {
192
+                           console.log('🔍 [ConversationService] 会话状态更新:', payload.properties?.status?.type);
193
+                         }
194
+                         else if (payload.type === 'session.idle') {
195
+                           console.log('🔍 [ConversationService] AI进入空闲状态,保持连接可继续交互');
196
+                         }
154 197
                   } catch (e) {
155 198
                     console.error('🔍 [ConversationService] 解析SSE JSON数据失败:', e, '原始数据:', data);
156 199
                     // 如果不是JSON,按纯文本处理
@@ -195,6 +238,11 @@ export class ConversationService {
195 238
       // 清理函数
196 239
       return () => {
197 240
         console.log('🔍 [ConversationService] 清理流式连接,取消请求');
241
+        // 清除心跳超时定时器
242
+        if (heartbeatTimeout) {
243
+          clearTimeout(heartbeatTimeout);
244
+          heartbeatTimeout = null;
245
+        }
198 246
         // 取消fetch请求
199 247
         abortController.abort();
200 248
       };

+ 344
- 0
src/app/services/event.service.ts Dosyayı Görüntüle

@@ -0,0 +1,344 @@
1
+import { Injectable, OnDestroy } from '@angular/core';
2
+import { BehaviorSubject, Observable, Subject, Subscription } from 'rxjs';
3
+import { AuthService } from './auth.service';
4
+import { GlobalEvent, EventPayload, EventType, SessionUpdatedEvent, SessionDiffEvent, ServerHeartbeatEvent, MessageUpdatedEvent, MessagePartUpdatedEvent, SessionStatusEvent, ServerConnectedEvent } from '../models/event.model';
5
+
6
+@Injectable({
7
+  providedIn: 'root'
8
+})
9
+export class EventService implements OnDestroy {
10
+  private eventSource: EventSource | null = null;
11
+  private isConnected = false;
12
+  
13
+  private subscriptions: Subscription = new Subscription();
14
+  private reconnectTimeout: any = null;
15
+  private heartbeatTimeout: any = null;
16
+  private reconnectAttempts = 0;
17
+  private maxReconnectAttempts = 5;
18
+  private reconnectDelay = 3000; // 3秒
19
+  private heartbeatTimeoutDuration = 5 * 60 * 1000; // 5分钟心跳超时(根据用户要求)
20
+  
21
+  // 事件主题,用于广播所有事件
22
+  private allEventsSubject = new Subject<GlobalEvent>();
23
+  allEvents$ = this.allEventsSubject.asObservable();
24
+  
25
+  // 特定事件类型主题
26
+  private sessionUpdatedSubject = new Subject<SessionUpdatedEvent>();
27
+  sessionUpdated$ = this.sessionUpdatedSubject.asObservable();
28
+  
29
+  private sessionDiffSubject = new Subject<SessionDiffEvent>();
30
+  sessionDiff$ = this.sessionDiffSubject.asObservable();
31
+  
32
+  private messageUpdatedSubject = new Subject<MessageUpdatedEvent>();
33
+  messageUpdated$ = this.messageUpdatedSubject.asObservable();
34
+  
35
+  private messagePartUpdatedSubject = new Subject<MessagePartUpdatedEvent>();
36
+  messagePartUpdated$ = this.messagePartUpdatedSubject.asObservable();
37
+  
38
+  // 连接状态主题
39
+  private connectionStatusSubject = new BehaviorSubject<boolean>(false);
40
+  connectionStatus$ = this.connectionStatusSubject.asObservable();
41
+  
42
+  constructor(private authService: AuthService) {
43
+    this.initializeAuthSubscription();
44
+  }
45
+  
46
+  // 初始化认证状态订阅
47
+  private initializeAuthSubscription() {
48
+    this.subscriptions.add(
49
+      this.authService.authState$.subscribe(authState => {
50
+        console.log('EventService: 认证状态变化', authState.isAuthenticated);
51
+        
52
+        if (authState.isAuthenticated) {
53
+          // 用户已登录,连接事件流
54
+          this.connectToEventStream();
55
+        } else {
56
+          // 用户登出,断开事件流
57
+          this.disconnect();
58
+        }
59
+      })
60
+    );
61
+  }
62
+  
63
+  // 连接到事件流
64
+  connectToEventStream(sessionId?: string) {
65
+    if (this.eventSource) {
66
+      this.eventSource.close();
67
+      this.eventSource = null;
68
+    }
69
+    
70
+    // 检查是否已登录
71
+    if (!this.authService.isAuthenticated()) {
72
+      console.warn('未登录状态,不连接事件流');
73
+      return;
74
+    }
75
+    
76
+    // 获取认证token
77
+    const token = this.authService.getToken();
78
+    if (!token) {
79
+      console.error('无法获取认证token');
80
+      return;
81
+    }
82
+    
83
+    // 构建带认证参数的URL - 使用日志流端点,因为它发送全局事件
84
+    let url = `/api/logs/stream?token=${encodeURIComponent(token)}`;
85
+    if (sessionId) {
86
+      url += `&sessionId=${sessionId}`;
87
+    }
88
+    
89
+    console.log('EventService: 连接事件流URL:', url);
90
+    this.eventSource = new EventSource(url);
91
+    
92
+    this.eventSource.onopen = () => {
93
+      console.log('EventService: 事件流连接已建立');
94
+      this.isConnected = true;
95
+      this.connectionStatusSubject.next(true);
96
+      this.reconnectAttempts = 0; // 重置重连计数
97
+      
98
+      // 启动心跳超时定时器(5分钟)
99
+      this.resetHeartbeatTimeout();
100
+      
101
+      // 发送连接成功事件
102
+      this.allEventsSubject.next({
103
+        payload: {
104
+          type: 'connection.established',
105
+          properties: { timestamp: new Date().toISOString() }
106
+        }
107
+      });
108
+    };
109
+    
110
+    this.eventSource.onmessage = (event) => {
111
+      const eventData = event.data;
112
+      console.log('EventService: 收到原始事件数据:', eventData.substring(0, 200));
113
+      
114
+      // 处理SSE注释格式的心跳(保持向后兼容)
115
+      if (eventData === ': heartbeat' || eventData.startsWith(': ')) {
116
+        // 重置心跳超时定时器(表示连接活跃)
117
+        this.resetHeartbeatTimeout();
118
+        
119
+        // 发送内部心跳事件,供其他服务订阅
120
+        this.allEventsSubject.next({
121
+          payload: {
122
+            type: 'server.heartbeat.comment',
123
+            properties: { 
124
+              timestamp: new Date().toISOString(),
125
+              rawData: eventData 
126
+            }
127
+          }
128
+        });
129
+        return;
130
+      }
131
+      
132
+      try {
133
+        // 解析JSON事件
134
+        const globalEvent: GlobalEvent = JSON.parse(eventData);
135
+        this.handleGlobalEvent(globalEvent);
136
+      } catch (error) {
137
+        // 如果不是JSON,可能是纯文本日志
138
+        console.log('EventService: 非JSON事件,可能是纯文本日志:', eventData.substring(0, 100));
139
+        
140
+        // 如果是纯文本日志,可以转发给日志服务或忽略
141
+        // 这里只处理JSON事件
142
+      }
143
+    };
144
+    
145
+    this.eventSource.onerror = (error) => {
146
+      console.error('EventService: 事件流连接错误:', error);
147
+      this.isConnected = false;
148
+      this.connectionStatusSubject.next(false);
149
+      
150
+      // 清除心跳超时定时器
151
+      this.clearHeartbeatTimeout();
152
+      
153
+      // 检查是否仍然登录
154
+      if (this.authService.isAuthenticated()) {
155
+        console.log('EventService: 连接断开,正在重连...');
156
+        this.scheduleReconnect(sessionId);
157
+      } else {
158
+        console.log('EventService: 连接断开(用户未登录)');
159
+      }
160
+    };
161
+  }
162
+  
163
+  // 处理全局事件
164
+  private handleGlobalEvent(event: GlobalEvent) {
165
+    console.log('EventService: 处理全局事件,类型:', event.payload.type);
166
+    
167
+    // 任何有效事件都重置心跳超时(表明连接活跃)
168
+    this.resetHeartbeatTimeout();
169
+    
170
+    // 广播所有事件
171
+    this.allEventsSubject.next(event);
172
+    
173
+    // 根据事件类型分发到特定主题
174
+    const payload = event.payload;
175
+    
176
+    switch (payload.type) {
177
+      case 'session.updated': {
178
+        const sessionEvent = payload as SessionUpdatedEvent;
179
+        this.sessionUpdatedSubject.next(sessionEvent);
180
+        console.log('EventService: 分发 session.updated 事件', sessionEvent.properties.info?.title);
181
+        break;
182
+      }
183
+        
184
+      case 'session.diff':
185
+        this.sessionDiffSubject.next(payload as SessionDiffEvent);
186
+        console.log('EventService: 分发 session.diff 事件');
187
+        break;
188
+        
189
+      case 'server.heartbeat':
190
+        this.allEventsSubject.next(event); // 已经广播过
191
+        this.resetHeartbeatTimeout();
192
+        break;
193
+        
194
+      case 'message.updated':
195
+        this.messageUpdatedSubject.next(payload as MessageUpdatedEvent);
196
+        console.log('EventService: 分发 message.updated 事件');
197
+        break;
198
+        
199
+      case 'message.part.updated':
200
+        this.messagePartUpdatedSubject.next(payload as MessagePartUpdatedEvent);
201
+        console.log('EventService: 分发 message.part.updated 事件');
202
+        break;
203
+        
204
+      case 'session.status':
205
+        this.allEventsSubject.next(event);
206
+        console.log('EventService: 分发 session.status 事件');
207
+        break;
208
+        
209
+      case 'server.connected':
210
+        this.allEventsSubject.next(event);
211
+        console.log('EventService: 分发 server.connected 事件');
212
+        break;
213
+        
214
+      default:
215
+        console.log('EventService: 未知事件类型:', payload.type);
216
+        // 未知事件类型仍然通过 allEvents$ 广播
217
+    }
218
+  }
219
+  
220
+  // 重置心跳超时定时器
221
+  private resetHeartbeatTimeout() {
222
+    // 清除现有超时
223
+    if (this.heartbeatTimeout) {
224
+      clearTimeout(this.heartbeatTimeout);
225
+    }
226
+    
227
+    // 设置新的超时定时器(5分钟)
228
+    this.heartbeatTimeout = setTimeout(() => {
229
+      console.warn('EventService: 心跳超时(5分钟未收到心跳),尝试重连');
230
+      this.isConnected = false;
231
+      this.connectionStatusSubject.next(false);
232
+      
233
+      // 清除现有的重连定时器(如果有)
234
+      if (this.reconnectTimeout) {
235
+        clearTimeout(this.reconnectTimeout);
236
+      }
237
+      
238
+      // 立即尝试重连
239
+      if (this.authService.isAuthenticated()) {
240
+        const activeSession = this.getCurrentSessionId(); // 需要获取当前会话ID
241
+        this.connectToEventStream(activeSession);
242
+      }
243
+    }, this.heartbeatTimeoutDuration);
244
+    
245
+    console.log('EventService: 心跳超时定时器已重置(5分钟)');
246
+  }
247
+  
248
+  // 清除心跳超时定时器
249
+  private clearHeartbeatTimeout() {
250
+    if (this.heartbeatTimeout) {
251
+      clearTimeout(this.heartbeatTimeout);
252
+      this.heartbeatTimeout = null;
253
+    }
254
+  }
255
+  
256
+  // 获取当前会话ID(用于重连时保持会话过滤)
257
+  private getCurrentSessionId(): string | undefined {
258
+    // 这里可以从URL或状态管理获取当前会话ID
259
+    // 暂时返回undefined,表示不进行会话过滤
260
+    return undefined;
261
+  }
262
+  
263
+  // 安排重连
264
+  private scheduleReconnect(sessionId?: string) {
265
+    if (this.reconnectTimeout) {
266
+      clearTimeout(this.reconnectTimeout);
267
+    }
268
+    
269
+    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
270
+      console.error('EventService: 达到最大重连次数,停止重连');
271
+      return;
272
+    }
273
+    
274
+    this.reconnectAttempts++;
275
+    const delay = this.reconnectDelay * Math.pow(1.5, this.reconnectAttempts - 1); // 指数退避
276
+    
277
+    console.log(`EventService: ${this.reconnectAttempts}/${this.maxReconnectAttempts} 重连,等待 ${delay}ms`);
278
+    
279
+    this.reconnectTimeout = setTimeout(() => {
280
+      if (this.authService.isAuthenticated()) {
281
+        this.connectToEventStream(sessionId);
282
+      }
283
+    }, delay);
284
+  }
285
+  
286
+  // 断开事件流连接
287
+  disconnect() {
288
+    if (this.eventSource) {
289
+      this.eventSource.close();
290
+      this.eventSource = null;
291
+      this.isConnected = false;
292
+      this.connectionStatusSubject.next(false);
293
+      
294
+      console.log('EventService: 已断开事件流连接');
295
+    }
296
+    
297
+    if (this.reconnectTimeout) {
298
+      clearTimeout(this.reconnectTimeout);
299
+      this.reconnectTimeout = null;
300
+    }
301
+    
302
+    // 清除心跳超时定时器
303
+    this.clearHeartbeatTimeout();
304
+    
305
+    this.reconnectAttempts = 0;
306
+  }
307
+  
308
+  // 订阅特定事件类型
309
+  subscribeToEvent<T = any>(eventType: EventType): Observable<T> {
310
+    return new Observable<T>(observer => {
311
+      const subscription = this.allEvents$.subscribe(event => {
312
+        if (event.payload.type === eventType) {
313
+          observer.next(event.payload as T);
314
+        }
315
+      });
316
+      
317
+      return () => subscription.unsubscribe();
318
+    });
319
+  }
320
+  
321
+  // 手动发送事件(用于测试)
322
+  emitTestEvent(event: GlobalEvent) {
323
+    this.handleGlobalEvent(event);
324
+  }
325
+  
326
+  // 检查连接状态
327
+  isStreamConnected(): boolean {
328
+    return this.isConnected;
329
+  }
330
+  
331
+  // 切换会话过滤
332
+  switchSession(sessionId?: string) {
333
+    if (this.authService.isAuthenticated()) {
334
+      this.connectToEventStream(sessionId);
335
+    } else {
336
+      console.warn('未登录状态,无法切换会话过滤');
337
+    }
338
+  }
339
+  
340
+  ngOnDestroy() {
341
+    this.subscriptions.unsubscribe();
342
+    this.disconnect();
343
+  }
344
+}

+ 61
- 5
src/app/services/session.service.ts Dosyayı Görüntüle

@@ -1,14 +1,16 @@
1
-import { Injectable } from '@angular/core';
1
+import { Injectable, OnDestroy } from '@angular/core';
2 2
 import { HttpClient } from '@angular/common/http';
3
-import { BehaviorSubject, Observable, of } from 'rxjs';
3
+import { BehaviorSubject, Observable, of, Subscription } from 'rxjs';
4 4
 import { map, tap } from 'rxjs/operators';
5 5
 import { Session, SessionCreateRequest, SessionCreateApiResponse, SessionListResponse } from '../models/session.model';
6 6
 import { AuthService } from './auth.service';
7
+import { EventService } from './event.service';
8
+import { SessionUpdatedEvent } from '../models/event.model';
7 9
 
8 10
 @Injectable({
9 11
   providedIn: 'root'
10 12
 })
11
-export class SessionService {
13
+export class SessionService implements OnDestroy {
12 14
   private activeSession = new BehaviorSubject<Session | null>(null);
13 15
   activeSession$ = this.activeSession.asObservable();
14 16
   
@@ -16,10 +18,15 @@ export class SessionService {
16 18
   private sessionsSubject = new BehaviorSubject<Session[]>([]);
17 19
   sessions$ = this.sessionsSubject.asObservable();
18 20
 
21
+  private eventSubscription: Subscription = new Subscription();
22
+
19 23
   constructor(
20 24
     private http: HttpClient,
21
-    private authService: AuthService
22
-  ) {}
25
+    private authService: AuthService,
26
+    private eventService: EventService
27
+  ) {
28
+    this.setupEventSubscriptions();
29
+  }
23 30
 
24 31
   // 获取会话列表(需要认证)
25 32
   getSessions(): Observable<Session[]> {
@@ -129,4 +136,53 @@ export class SessionService {
129 136
       })
130 137
     );
131 138
   }
139
+
140
+  // 设置事件订阅
141
+  private setupEventSubscriptions() {
142
+    // 订阅会话更新事件
143
+    this.eventSubscription.add(
144
+      this.eventService.sessionUpdated$.subscribe(event => {
145
+        this.handleSessionUpdated(event);
146
+      })
147
+    );
148
+  }
149
+
150
+  // 处理会话更新事件
151
+  private handleSessionUpdated(event: SessionUpdatedEvent) {
152
+    const sessionInfo = event.properties.info;
153
+    console.log('SessionService: 处理会话更新事件', sessionInfo.id, sessionInfo.title);
154
+    
155
+    // 查找现有会话
156
+    const existingSessionIndex = this.sessions.findIndex(s => s.id === sessionInfo.id);
157
+    
158
+    if (existingSessionIndex !== -1) {
159
+      // 更新现有会话
160
+      const updatedSession: Session = {
161
+        ...this.sessions[existingSessionIndex],
162
+        title: sessionInfo.title,
163
+        // 保留其他字段如port、baseURL等
164
+      };
165
+      this.sessions[existingSessionIndex] = updatedSession;
166
+      console.log('SessionService: 更新现有会话', updatedSession.title);
167
+    } else {
168
+      // 添加新会话到列表开头
169
+      const newSession: Session = {
170
+        id: sessionInfo.id,
171
+        title: sessionInfo.title,
172
+        // port和baseURL可能在后端创建会话时提供,这里使用默认值
173
+        // 实际使用中,可以从事件或后续API调用中获取
174
+        port: 8020, // 默认端口,可根据需要调整
175
+        baseURL: `http://localhost:8020`, // 默认baseURL
176
+      };
177
+      this.sessions.unshift(newSession);
178
+      console.log('SessionService: 添加新会话', newSession.title);
179
+    }
180
+    
181
+    // 通知订阅者
182
+    this.sessionsSubject.next([...this.sessions]);
183
+  }
184
+
185
+  ngOnDestroy() {
186
+    this.eventSubscription.unsubscribe();
187
+  }
132 188
 }

Loading…
İptal
Kaydet