Нет описания
Вы не можете выбрать более 25 тем Темы должны начинаться с буквы или цифры, могут содержать дефисы(-) и должны содержать не более 35 символов.

consumer_factory.go 2.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package rabbitmq
  2. import (
  3. "fmt"
  4. "git.x2erp.com/qdy/go-base/model/request"
  5. "git.x2erp.com/qdy/go-base/model/response"
  6. amqp "github.com/streadway/amqp"
  7. )
  8. // MessageHandler 消息处理器接口
  9. type MessageHandler interface {
  10. Process(ueueRequest *request.QueueRequest, body []byte) response.QueryResult[interface{}]
  11. }
  12. // Consumer 消费者结构体
  13. type Consumer struct {
  14. Channel *amqp.Channel
  15. QueueName string
  16. Handler MessageHandler
  17. QueueRequest *request.QueueRequest
  18. }
  19. // CreateConsumer 创建消费者
  20. // 返回Consumer对象,创建后自动开始消费
  21. func (f *RabbitMQFactory) CreateConsumer(queueRequest *request.QueueRequest, consumerID string, handler MessageHandler) (*Consumer, error) {
  22. // 生成唯一的通道名称
  23. channelName := fmt.Sprintf("%s_%s", queueRequest.QueueName, consumerID)
  24. // 创建新通道
  25. channel, err := f.CreateChannel(channelName)
  26. if err != nil {
  27. return nil, fmt.Errorf("failed to create channel: %v", err)
  28. }
  29. // 设置QoS(一次处理一条)
  30. err = channel.Qos(1, 0, false)
  31. if err != nil {
  32. f.CloseChannel(channelName) // 清理
  33. return nil, fmt.Errorf("failed to set qos: %v", err)
  34. }
  35. // 开始消费
  36. deliveries, err := channel.Consume(
  37. queueRequest.QueueName,
  38. channelName, // 使用通道名称作为消费者标签
  39. false, // 手动确认
  40. false, // 非独占
  41. false, // 不排除本地连接
  42. false, // 阻塞等待
  43. nil, // 额外参数
  44. )
  45. if err != nil {
  46. f.CloseChannel(channelName) // 清理
  47. return nil, fmt.Errorf("failed to start consumer: %v", err)
  48. }
  49. // 创建Consumer对象
  50. consumer := &Consumer{
  51. Channel: channel,
  52. QueueName: queueRequest.QueueName,
  53. Handler: handler,
  54. QueueRequest: queueRequest,
  55. }
  56. // 启动goroutine处理消息
  57. go func() {
  58. for delivery := range deliveries {
  59. queryResult := handler.Process(queueRequest, delivery.Body)
  60. if queryResult.Success {
  61. delivery.Ack(false)
  62. } else {
  63. delivery.Reject(true)
  64. }
  65. }
  66. // deliveries通道关闭时,goroutine自动结束
  67. }()
  68. return consumer, nil
  69. }