package service import ( "encoding/json" "fmt" "log" "net/http" "sync" "time" "git.x2erp.com/qdy/go-base/config" "git.x2erp.com/qdy/go-base/types" "git.x2erp.com/qdy/go-db/factory/rabbitmq" "git.x2erp.com/qdy/go-svc-mqconsumer/consumer" ) // RabbitMQService RabbitMQ服务 type RabbitMQService struct { rabbitFactory *rabbitmq.RabbitMQFactory consumerManager *consumer.ConsumerManager mu sync.RWMutex initialized bool } // NewRabbitMQService 创建RabbitMQ服务 func NewRabbitMQService() *RabbitMQService { return &RabbitMQService{} } // Init 初始化服务 func (s *RabbitMQService) Init() error { s.mu.Lock() defer s.mu.Unlock() if s.initialized { return nil } // 初始化RabbitMQ工厂 factory, err := rabbitmq.NewRabbitMQFactory() if err != nil { return fmt.Errorf("failed to create RabbitMQ factory: %v", err) } s.rabbitFactory = factory s.consumerManager = consumer.NewConsumerManager(factory) s.initialized = true return nil } // StartConsumerHandler 启动消费者API处理器 func (s *RabbitMQService) StartConsumerHandler(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } var queueReq types.QueueRequest if err := json.NewDecoder(r.Body).Decode(&queueReq); err != nil { http.Error(w, fmt.Sprintf("Invalid request body: %v", err), http.StatusBadRequest) return } if queueReq.QueueName == "" { http.Error(w, "Queue name is required", http.StatusBadRequest) return } if err := s.consumerManager.StartConsumer(queueReq); err != nil { http.Error(w, fmt.Sprintf("Failed to start consumer: %v", err), http.StatusInternalServerError) return } response := map[string]interface{}{ "success": true, "message": fmt.Sprintf("Consumer started for queue: %s", queueReq.QueueName), } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(response) } // StopConsumerHandler 停止消费者API处理器 func (s *RabbitMQService) StopConsumerHandler(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } var request struct { ConsumerID string `json:"consumer_id"` QueueName string `json:"queue_name"` } if err := json.NewDecoder(r.Body).Decode(&request); err != nil { http.Error(w, fmt.Sprintf("Invalid request body: %v", err), http.StatusBadRequest) return } // 优先使用ConsumerID,如果为空则使用QueueName生成 consumerID := request.ConsumerID if consumerID == "" && request.QueueName != "" { consumerID = fmt.Sprintf("%s_consumer", request.QueueName) } if consumerID == "" { http.Error(w, "consumer_id or queue_name is required", http.StatusBadRequest) return } if err := s.consumerManager.StopConsumer(consumerID); err != nil { http.Error(w, fmt.Sprintf("Failed to stop consumer: %v", err), http.StatusInternalServerError) return } response := map[string]interface{}{ "success": true, "message": fmt.Sprintf("Consumer stopped: %s", consumerID), } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(response) } // HealthCheckHandler 健康检查 func (s *RabbitMQService) HealthCheckHandler(w http.ResponseWriter, r *http.Request) { status := "healthy" if s.rabbitFactory == nil { status = "unhealthy" } response := map[string]interface{}{ "status": status, "service": config.GetService().ServiceName, "consumers": s.consumerManager.GetConsumerCount(), "timestamp": time.Now().Format(time.RFC3339), } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(response) } // StopAllConsumers 停止所有消费者 func (s *RabbitMQService) StopAllConsumers() error { if s.consumerManager != nil { return s.consumerManager.StopAllConsumers() } return nil } // Close 关闭服务 func (s *RabbitMQService) Close() error { s.mu.Lock() defer s.mu.Unlock() // 停止所有消费者 if s.consumerManager != nil { if err := s.consumerManager.StopAllConsumers(); err != nil { log.Printf("Warning: failed to stop consumers: %v", err) } } // 关闭RabbitMQ连接 if s.rabbitFactory != nil { if err := s.rabbitFactory.Close(); err != nil { return fmt.Errorf("failed to close RabbitMQ factory: %v", err) } } s.initialized = false log.Println("RabbitMQ service closed successfully") return nil }