Nav apraksta
Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

rabbitmq_service.go 4.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package service
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. "net/http"
  7. "sync"
  8. "time"
  9. "git.x2erp.com/qdy/go-base/config"
  10. "git.x2erp.com/qdy/go-base/types"
  11. "git.x2erp.com/qdy/go-db/factory/rabbitmq"
  12. "git.x2erp.com/qdy/go-svc-mqconsumer/consumer"
  13. )
  14. // RabbitMQService RabbitMQ服务
  15. type RabbitMQService struct {
  16. rabbitFactory *rabbitmq.RabbitMQFactory
  17. consumerManager *consumer.ConsumerManager
  18. mu sync.RWMutex
  19. initialized bool
  20. }
  21. // NewRabbitMQService 创建RabbitMQ服务
  22. func NewRabbitMQService() *RabbitMQService {
  23. return &RabbitMQService{}
  24. }
  25. // Init 初始化服务
  26. func (s *RabbitMQService) Init() error {
  27. s.mu.Lock()
  28. defer s.mu.Unlock()
  29. if s.initialized {
  30. return nil
  31. }
  32. // 初始化RabbitMQ工厂
  33. factory, err := rabbitmq.NewRabbitMQFactory()
  34. if err != nil {
  35. return fmt.Errorf("failed to create RabbitMQ factory: %v", err)
  36. }
  37. s.rabbitFactory = factory
  38. s.consumerManager = consumer.NewConsumerManager(factory)
  39. s.initialized = true
  40. return nil
  41. }
  42. // StartConsumerHandler 启动消费者API处理器
  43. func (s *RabbitMQService) StartConsumerHandler(w http.ResponseWriter, r *http.Request) {
  44. if r.Method != http.MethodPost {
  45. http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
  46. return
  47. }
  48. var queueReq types.QueueRequest
  49. if err := json.NewDecoder(r.Body).Decode(&queueReq); err != nil {
  50. http.Error(w, fmt.Sprintf("Invalid request body: %v", err), http.StatusBadRequest)
  51. return
  52. }
  53. if queueReq.QueueName == "" {
  54. http.Error(w, "Queue name is required", http.StatusBadRequest)
  55. return
  56. }
  57. if err := s.consumerManager.StartConsumer(queueReq); err != nil {
  58. http.Error(w, fmt.Sprintf("Failed to start consumer: %v", err), http.StatusInternalServerError)
  59. return
  60. }
  61. response := map[string]interface{}{
  62. "success": true,
  63. "message": fmt.Sprintf("Consumer started for queue: %s", queueReq.QueueName),
  64. }
  65. w.Header().Set("Content-Type", "application/json")
  66. json.NewEncoder(w).Encode(response)
  67. }
  68. // StopConsumerHandler 停止消费者API处理器
  69. func (s *RabbitMQService) StopConsumerHandler(w http.ResponseWriter, r *http.Request) {
  70. if r.Method != http.MethodPost {
  71. http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
  72. return
  73. }
  74. var request struct {
  75. ConsumerID string `json:"consumer_id"`
  76. QueueName string `json:"queue_name"`
  77. }
  78. if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
  79. http.Error(w, fmt.Sprintf("Invalid request body: %v", err), http.StatusBadRequest)
  80. return
  81. }
  82. // 优先使用ConsumerID,如果为空则使用QueueName生成
  83. consumerID := request.ConsumerID
  84. if consumerID == "" && request.QueueName != "" {
  85. consumerID = fmt.Sprintf("%s_consumer", request.QueueName)
  86. }
  87. if consumerID == "" {
  88. http.Error(w, "consumer_id or queue_name is required", http.StatusBadRequest)
  89. return
  90. }
  91. if err := s.consumerManager.StopConsumer(consumerID); err != nil {
  92. http.Error(w, fmt.Sprintf("Failed to stop consumer: %v", err), http.StatusInternalServerError)
  93. return
  94. }
  95. response := map[string]interface{}{
  96. "success": true,
  97. "message": fmt.Sprintf("Consumer stopped: %s", consumerID),
  98. }
  99. w.Header().Set("Content-Type", "application/json")
  100. json.NewEncoder(w).Encode(response)
  101. }
  102. // HealthCheckHandler 健康检查
  103. func (s *RabbitMQService) HealthCheckHandler(w http.ResponseWriter, r *http.Request) {
  104. status := "healthy"
  105. if s.rabbitFactory == nil {
  106. status = "unhealthy"
  107. }
  108. response := map[string]interface{}{
  109. "status": status,
  110. "service": config.GetService().ServiceName,
  111. "consumers": s.consumerManager.GetConsumerCount(),
  112. "timestamp": time.Now().Format(time.RFC3339),
  113. }
  114. w.Header().Set("Content-Type", "application/json")
  115. json.NewEncoder(w).Encode(response)
  116. }
  117. // StopAllConsumers 停止所有消费者
  118. func (s *RabbitMQService) StopAllConsumers() error {
  119. if s.consumerManager != nil {
  120. return s.consumerManager.StopAllConsumers()
  121. }
  122. return nil
  123. }
  124. // Close 关闭服务
  125. func (s *RabbitMQService) Close() error {
  126. s.mu.Lock()
  127. defer s.mu.Unlock()
  128. // 停止所有消费者
  129. if s.consumerManager != nil {
  130. if err := s.consumerManager.StopAllConsumers(); err != nil {
  131. log.Printf("Warning: failed to stop consumers: %v", err)
  132. }
  133. }
  134. // 关闭RabbitMQ连接
  135. if s.rabbitFactory != nil {
  136. if err := s.rabbitFactory.Close(); err != nil {
  137. return fmt.Errorf("failed to close RabbitMQ factory: %v", err)
  138. }
  139. }
  140. s.initialized = false
  141. log.Println("RabbitMQ service closed successfully")
  142. return nil
  143. }