package consumer import ( "fmt" "log" "sync" "git.x2erp.com/qdy/go-base/types" "git.x2erp.com/qdy/go-db/factory/rabbitmq" "git.x2erp.com/qdy/go-svc-mqconsumer/handlers" ) // ConsumerManager 消费者管理器 type ConsumerManager struct { rabbitFactory *rabbitmq.RabbitMQFactory consumers sync.Map // consumerID -> *rabbitmq.Consumer handler *handlers.HTTPHandler } // NewConsumerManager 创建消费者管理器 func NewConsumerManager(rabbitFactory *rabbitmq.RabbitMQFactory) *ConsumerManager { return &ConsumerManager{ rabbitFactory: rabbitFactory, handler: handlers.NewHTTPHandler(), } } // StartAllConsumers 启动所有消费者 func (cm *ConsumerManager) StartAllConsumers() error { queueConfigs, err := config.GetQueueConfigsFromDB() if err != nil { return fmt.Errorf("failed to load config: %v", err) } log.Printf("Starting %d consumers...", len(queueConfigs)) for _, config := range queueConfigs { if err := cm.StartConsumer(config); err != nil { log.Printf("Failed to start consumer for queue %s: %v", config.QueueName, err) continue } log.Printf("Consumer started for queue: %s", config.QueueName) } return nil } // StartConsumer 启动单个消费者 func (cm *ConsumerManager) StartConsumer(queueReq types.QueueRequest) error { // 生成消费者ID consumerID := fmt.Sprintf("%s_consumer", queueReq.QueueName) // 检查是否已存在 if _, exists := cm.consumers.Load(consumerID); exists { return fmt.Errorf("consumer %s already exists", consumerID) } // 创建消费者 consumer, err := cm.rabbitFactory.CreateConsumer(&queueReq, consumerID, cm.handler) if err != nil { return fmt.Errorf("failed to create consumer: %v", err) } // 保存消费者 cm.consumers.Store(consumerID, consumer) log.Printf("Consumer %s started for queue %s", consumerID, queueReq.QueueName) return nil } // StopConsumer 停止指定消费者 func (cm *ConsumerManager) StopConsumer(consumerID string) error { // 获取消费者 consumerInterface, exists := cm.consumers.Load(consumerID) if !exists { return fmt.Errorf("consumer %s not found", consumerID) } consumer := consumerInterface.(*rabbitmq.Consumer) // 关闭通道 if err := consumer.Channel.Close(); err != nil { return fmt.Errorf("failed to close channel: %v", err) } // 从map中移除 cm.consumers.Delete(consumerID) // 关闭RabbitMQ通道 if err := cm.rabbitFactory.CloseChannel(consumerID); err != nil { log.Printf("Warning: failed to close RabbitMQ channel %s: %v", consumerID, err) } log.Printf("Consumer %s stopped", consumerID) return nil } // StopAllConsumers 停止所有消费者 func (cm *ConsumerManager) StopAllConsumers() error { var errs []error cm.consumers.Range(func(key, value interface{}) bool { consumerID := key.(string) if err := cm.StopConsumer(consumerID); err != nil { errs = append(errs, fmt.Errorf("failed to stop consumer %s: %v", consumerID, err)) } return true }) if len(errs) > 0 { return fmt.Errorf("errors stopping consumers: %v", errs) } log.Println("All consumers stopped") return nil } // GetConsumerCount 获取消费者数量 func (cm *ConsumerManager) GetConsumerCount() int { count := 0 cm.consumers.Range(func(_, _ interface{}) bool { count++ return true }) return count }