| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- package consumer
-
- import (
- "fmt"
- "log"
- "sync"
-
- "git.x2erp.com/qdy/go-base/types"
- "git.x2erp.com/qdy/go-db/factory/rabbitmq"
-
- "git.x2erp.com/qdy/go-svc-mqconsumer/handlers"
- )
-
- // ConsumerManager 消费者管理器
- type ConsumerManager struct {
- rabbitFactory *rabbitmq.RabbitMQFactory
- consumers sync.Map // consumerID -> *rabbitmq.Consumer
- handler *handlers.HTTPHandler
- }
-
- // NewConsumerManager 创建消费者管理器
- func NewConsumerManager(rabbitFactory *rabbitmq.RabbitMQFactory) *ConsumerManager {
- return &ConsumerManager{
- rabbitFactory: rabbitFactory,
- handler: handlers.NewHTTPHandler(),
- }
- }
-
- // StartAllConsumers 启动所有消费者
- func (cm *ConsumerManager) StartAllConsumers() error {
- queueConfigs, err := config.GetQueueConfigsFromDB()
- if err != nil {
- return fmt.Errorf("failed to load config: %v", err)
- }
- log.Printf("Starting %d consumers...", len(queueConfigs))
-
- for _, config := range queueConfigs {
- if err := cm.StartConsumer(config); err != nil {
- log.Printf("Failed to start consumer for queue %s: %v", config.QueueName, err)
- continue
- }
- log.Printf("Consumer started for queue: %s", config.QueueName)
- }
-
- return nil
- }
-
- // StartConsumer 启动单个消费者
- func (cm *ConsumerManager) StartConsumer(queueReq types.QueueRequest) error {
- // 生成消费者ID
- consumerID := fmt.Sprintf("%s_consumer", queueReq.QueueName)
-
- // 检查是否已存在
- if _, exists := cm.consumers.Load(consumerID); exists {
- return fmt.Errorf("consumer %s already exists", consumerID)
- }
-
- // 创建消费者
- consumer, err := cm.rabbitFactory.CreateConsumer(&queueReq, consumerID, cm.handler)
- if err != nil {
- return fmt.Errorf("failed to create consumer: %v", err)
- }
-
- // 保存消费者
- cm.consumers.Store(consumerID, consumer)
- log.Printf("Consumer %s started for queue %s", consumerID, queueReq.QueueName)
-
- return nil
- }
-
- // StopConsumer 停止指定消费者
- func (cm *ConsumerManager) StopConsumer(consumerID string) error {
- // 获取消费者
- consumerInterface, exists := cm.consumers.Load(consumerID)
- if !exists {
- return fmt.Errorf("consumer %s not found", consumerID)
- }
-
- consumer := consumerInterface.(*rabbitmq.Consumer)
-
- // 关闭通道
- if err := consumer.Channel.Close(); err != nil {
- return fmt.Errorf("failed to close channel: %v", err)
- }
-
- // 从map中移除
- cm.consumers.Delete(consumerID)
-
- // 关闭RabbitMQ通道
- if err := cm.rabbitFactory.CloseChannel(consumerID); err != nil {
- log.Printf("Warning: failed to close RabbitMQ channel %s: %v", consumerID, err)
- }
-
- log.Printf("Consumer %s stopped", consumerID)
- return nil
- }
-
- // StopAllConsumers 停止所有消费者
- func (cm *ConsumerManager) StopAllConsumers() error {
- var errs []error
-
- cm.consumers.Range(func(key, value interface{}) bool {
- consumerID := key.(string)
- if err := cm.StopConsumer(consumerID); err != nil {
- errs = append(errs, fmt.Errorf("failed to stop consumer %s: %v", consumerID, err))
- }
- return true
- })
-
- if len(errs) > 0 {
- return fmt.Errorf("errors stopping consumers: %v", errs)
- }
-
- log.Println("All consumers stopped")
- return nil
- }
-
- // GetConsumerCount 获取消费者数量
- func (cm *ConsumerManager) GetConsumerCount() int {
- count := 0
- cm.consumers.Range(func(_, _ interface{}) bool {
- count++
- return true
- })
- return count
- }
|