暫無描述
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

consumer_factory.go 2.1KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package rabbitmq
  2. import (
  3. "fmt"
  4. "git.x2erp.com/qdy/go-base/model/request/mqreq"
  5. mq "git.x2erp.com/qdy/go-base/model/request/mqreq"
  6. "git.x2erp.com/qdy/go-base/model/response"
  7. amqp "github.com/streadway/amqp"
  8. )
  9. // MessageHandler 消息处理器接口
  10. type MessageHandler interface {
  11. Process(ueueRequest *mq.QueueRequest, body []byte) response.QueryResult[interface{}]
  12. }
  13. // Consumer 消费者结构体
  14. type Consumer struct {
  15. Channel *amqp.Channel
  16. QueueName string
  17. Handler MessageHandler
  18. QueueRequest *mq.QueueRequest
  19. }
  20. // CreateConsumer 创建消费者
  21. // 返回Consumer对象,创建后自动开始消费
  22. func (f *RabbitMQFactory) CreateConsumer(queueRequest *mqreq.QueueRequest, consumerID string, handler MessageHandler) (*Consumer, error) {
  23. // 生成唯一的通道名称
  24. channelName := fmt.Sprintf("%s_%s", queueRequest.QueueName, consumerID)
  25. // 创建新通道
  26. channel, err := f.CreateChannel(channelName)
  27. if err != nil {
  28. return nil, fmt.Errorf("failed to create channel: %v", err)
  29. }
  30. // 设置QoS(一次处理一条)
  31. err = channel.Qos(1, 0, false)
  32. if err != nil {
  33. f.CloseChannel(channelName) // 清理
  34. return nil, fmt.Errorf("failed to set qos: %v", err)
  35. }
  36. // 开始消费
  37. deliveries, err := channel.Consume(
  38. queueRequest.QueueName,
  39. channelName, // 使用通道名称作为消费者标签
  40. false, // 手动确认
  41. false, // 非独占
  42. false, // 不排除本地连接
  43. false, // 阻塞等待
  44. nil, // 额外参数
  45. )
  46. if err != nil {
  47. f.CloseChannel(channelName) // 清理
  48. return nil, fmt.Errorf("failed to start consumer: %v", err)
  49. }
  50. // 创建Consumer对象
  51. consumer := &Consumer{
  52. Channel: channel,
  53. QueueName: queueRequest.QueueName,
  54. Handler: handler,
  55. QueueRequest: queueRequest,
  56. }
  57. // 启动goroutine处理消息
  58. go func() {
  59. for delivery := range deliveries {
  60. queryResult := handler.Process(queueRequest, delivery.Body)
  61. if queryResult.Success {
  62. delivery.Ack(false)
  63. } else {
  64. delivery.Reject(true)
  65. }
  66. }
  67. // deliveries通道关闭时,goroutine自动结束
  68. }()
  69. return consumer, nil
  70. }