Bez popisu
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

consumer_manager.go 3.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package consumer
  2. import (
  3. "fmt"
  4. "log"
  5. "sync"
  6. "git.x2erp.com/qdy/go-base/types"
  7. "git.x2erp.com/qdy/go-db/factory/rabbitmq"
  8. "git.x2erp.com/qdy/go-svc-mqconsumer/handlers"
  9. )
  10. // ConsumerManager 消费者管理器
  11. type ConsumerManager struct {
  12. rabbitFactory *rabbitmq.RabbitMQFactory
  13. consumers sync.Map // consumerID -> *rabbitmq.Consumer
  14. handler *handlers.HTTPHandler
  15. }
  16. // NewConsumerManager 创建消费者管理器
  17. func NewConsumerManager(rabbitFactory *rabbitmq.RabbitMQFactory) *ConsumerManager {
  18. return &ConsumerManager{
  19. rabbitFactory: rabbitFactory,
  20. handler: handlers.NewHTTPHandler(),
  21. }
  22. }
  23. // StartAllConsumers 启动所有消费者
  24. func (cm *ConsumerManager) StartAllConsumers() error {
  25. queueConfigs, err := config.GetQueueConfigsFromDB()
  26. if err != nil {
  27. return fmt.Errorf("failed to load config: %v", err)
  28. }
  29. log.Printf("Starting %d consumers...", len(queueConfigs))
  30. for _, config := range queueConfigs {
  31. if err := cm.StartConsumer(config); err != nil {
  32. log.Printf("Failed to start consumer for queue %s: %v", config.QueueName, err)
  33. continue
  34. }
  35. log.Printf("Consumer started for queue: %s", config.QueueName)
  36. }
  37. return nil
  38. }
  39. // StartConsumer 启动单个消费者
  40. func (cm *ConsumerManager) StartConsumer(queueReq types.QueueRequest) error {
  41. // 生成消费者ID
  42. consumerID := fmt.Sprintf("%s_consumer", queueReq.QueueName)
  43. // 检查是否已存在
  44. if _, exists := cm.consumers.Load(consumerID); exists {
  45. return fmt.Errorf("consumer %s already exists", consumerID)
  46. }
  47. // 创建消费者
  48. consumer, err := cm.rabbitFactory.CreateConsumer(&queueReq, consumerID, cm.handler)
  49. if err != nil {
  50. return fmt.Errorf("failed to create consumer: %v", err)
  51. }
  52. // 保存消费者
  53. cm.consumers.Store(consumerID, consumer)
  54. log.Printf("Consumer %s started for queue %s", consumerID, queueReq.QueueName)
  55. return nil
  56. }
  57. // StopConsumer 停止指定消费者
  58. func (cm *ConsumerManager) StopConsumer(consumerID string) error {
  59. // 获取消费者
  60. consumerInterface, exists := cm.consumers.Load(consumerID)
  61. if !exists {
  62. return fmt.Errorf("consumer %s not found", consumerID)
  63. }
  64. consumer := consumerInterface.(*rabbitmq.Consumer)
  65. // 关闭通道
  66. if err := consumer.Channel.Close(); err != nil {
  67. return fmt.Errorf("failed to close channel: %v", err)
  68. }
  69. // 从map中移除
  70. cm.consumers.Delete(consumerID)
  71. // 关闭RabbitMQ通道
  72. if err := cm.rabbitFactory.CloseChannel(consumerID); err != nil {
  73. log.Printf("Warning: failed to close RabbitMQ channel %s: %v", consumerID, err)
  74. }
  75. log.Printf("Consumer %s stopped", consumerID)
  76. return nil
  77. }
  78. // StopAllConsumers 停止所有消费者
  79. func (cm *ConsumerManager) StopAllConsumers() error {
  80. var errs []error
  81. cm.consumers.Range(func(key, value interface{}) bool {
  82. consumerID := key.(string)
  83. if err := cm.StopConsumer(consumerID); err != nil {
  84. errs = append(errs, fmt.Errorf("failed to stop consumer %s: %v", consumerID, err))
  85. }
  86. return true
  87. })
  88. if len(errs) > 0 {
  89. return fmt.Errorf("errors stopping consumers: %v", errs)
  90. }
  91. log.Println("All consumers stopped")
  92. return nil
  93. }
  94. // GetConsumerCount 获取消费者数量
  95. func (cm *ConsumerManager) GetConsumerCount() int {
  96. count := 0
  97. cm.consumers.Range(func(_, _ interface{}) bool {
  98. count++
  99. return true
  100. })
  101. return count
  102. }