| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- package service
-
- import (
- "encoding/json"
- "fmt"
- "log"
- "net/http"
- "sync"
- "time"
-
- "git.x2erp.com/qdy/go-base/config"
- "git.x2erp.com/qdy/go-base/types"
- "git.x2erp.com/qdy/go-db/factory/rabbitmq"
- "git.x2erp.com/qdy/go-svc-mqconsumer/consumer"
- )
-
- // RabbitMQService RabbitMQ服务
- type RabbitMQService struct {
- rabbitFactory *rabbitmq.RabbitMQFactory
- consumerManager *consumer.ConsumerManager
- mu sync.RWMutex
- initialized bool
- }
-
- // NewRabbitMQService 创建RabbitMQ服务
- func NewRabbitMQService() *RabbitMQService {
- return &RabbitMQService{}
- }
-
- // Init 初始化服务
- func (s *RabbitMQService) Init() error {
- s.mu.Lock()
- defer s.mu.Unlock()
-
- if s.initialized {
- return nil
- }
-
- // 初始化RabbitMQ工厂
- factory, err := rabbitmq.NewRabbitMQFactory()
- if err != nil {
- return fmt.Errorf("failed to create RabbitMQ factory: %v", err)
- }
-
- s.rabbitFactory = factory
- s.consumerManager = consumer.NewConsumerManager(factory)
- s.initialized = true
-
- return nil
- }
-
- // StartConsumerHandler 启动消费者API处理器
- func (s *RabbitMQService) StartConsumerHandler(w http.ResponseWriter, r *http.Request) {
- if r.Method != http.MethodPost {
- http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
- return
- }
-
- var queueReq types.QueueRequest
- if err := json.NewDecoder(r.Body).Decode(&queueReq); err != nil {
- http.Error(w, fmt.Sprintf("Invalid request body: %v", err), http.StatusBadRequest)
- return
- }
-
- if queueReq.QueueName == "" {
- http.Error(w, "Queue name is required", http.StatusBadRequest)
- return
- }
-
- if err := s.consumerManager.StartConsumer(queueReq); err != nil {
- http.Error(w, fmt.Sprintf("Failed to start consumer: %v", err), http.StatusInternalServerError)
- return
- }
-
- response := map[string]interface{}{
- "success": true,
- "message": fmt.Sprintf("Consumer started for queue: %s", queueReq.QueueName),
- }
-
- w.Header().Set("Content-Type", "application/json")
- json.NewEncoder(w).Encode(response)
- }
-
- // StopConsumerHandler 停止消费者API处理器
- func (s *RabbitMQService) StopConsumerHandler(w http.ResponseWriter, r *http.Request) {
- if r.Method != http.MethodPost {
- http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
- return
- }
-
- var request struct {
- ConsumerID string `json:"consumer_id"`
- QueueName string `json:"queue_name"`
- }
-
- if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
- http.Error(w, fmt.Sprintf("Invalid request body: %v", err), http.StatusBadRequest)
- return
- }
-
- // 优先使用ConsumerID,如果为空则使用QueueName生成
- consumerID := request.ConsumerID
- if consumerID == "" && request.QueueName != "" {
- consumerID = fmt.Sprintf("%s_consumer", request.QueueName)
- }
-
- if consumerID == "" {
- http.Error(w, "consumer_id or queue_name is required", http.StatusBadRequest)
- return
- }
-
- if err := s.consumerManager.StopConsumer(consumerID); err != nil {
- http.Error(w, fmt.Sprintf("Failed to stop consumer: %v", err), http.StatusInternalServerError)
- return
- }
-
- response := map[string]interface{}{
- "success": true,
- "message": fmt.Sprintf("Consumer stopped: %s", consumerID),
- }
-
- w.Header().Set("Content-Type", "application/json")
- json.NewEncoder(w).Encode(response)
- }
-
- // HealthCheckHandler 健康检查
- func (s *RabbitMQService) HealthCheckHandler(w http.ResponseWriter, r *http.Request) {
- status := "healthy"
- if s.rabbitFactory == nil {
- status = "unhealthy"
- }
-
- response := map[string]interface{}{
- "status": status,
- "service": config.GetService().ServiceName,
- "consumers": s.consumerManager.GetConsumerCount(),
- "timestamp": time.Now().Format(time.RFC3339),
- }
-
- w.Header().Set("Content-Type", "application/json")
- json.NewEncoder(w).Encode(response)
- }
-
- // StopAllConsumers 停止所有消费者
- func (s *RabbitMQService) StopAllConsumers() error {
- if s.consumerManager != nil {
- return s.consumerManager.StopAllConsumers()
- }
- return nil
- }
-
- // Close 关闭服务
- func (s *RabbitMQService) Close() error {
- s.mu.Lock()
- defer s.mu.Unlock()
-
- // 停止所有消费者
- if s.consumerManager != nil {
- if err := s.consumerManager.StopAllConsumers(); err != nil {
- log.Printf("Warning: failed to stop consumers: %v", err)
- }
- }
-
- // 关闭RabbitMQ连接
- if s.rabbitFactory != nil {
- if err := s.rabbitFactory.Close(); err != nil {
- return fmt.Errorf("failed to close RabbitMQ factory: %v", err)
- }
- }
-
- s.initialized = false
- log.Println("RabbitMQ service closed successfully")
- return nil
- }
|