Bez popisu
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

consumer_factory.go 2.0KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. package rabbitmq
  2. import (
  3. "fmt"
  4. "git.x2erp.com/qdy/go-base/types"
  5. amqp "github.com/streadway/amqp"
  6. )
  7. // MessageHandler 消息处理器接口
  8. type MessageHandler interface {
  9. Process(ueueRequest *types.QueueRequest, body []byte) types.QueryResult[interface{}]
  10. }
  11. // Consumer 消费者结构体
  12. type Consumer struct {
  13. Channel *amqp.Channel
  14. QueueName string
  15. Handler MessageHandler
  16. QueueRequest *types.QueueRequest
  17. }
  18. // CreateConsumer 创建消费者
  19. // 返回Consumer对象,创建后自动开始消费
  20. func (f *RabbitMQFactory) CreateConsumer(queueRequest *types.QueueRequest, consumerID string, handler MessageHandler) (*Consumer, error) {
  21. // 生成唯一的通道名称
  22. channelName := fmt.Sprintf("%s_%s", queueRequest.QueueName, consumerID)
  23. // 创建新通道
  24. channel, err := f.CreateChannel(channelName)
  25. if err != nil {
  26. return nil, fmt.Errorf("failed to create channel: %v", err)
  27. }
  28. // 设置QoS(一次处理一条)
  29. err = channel.Qos(1, 0, false)
  30. if err != nil {
  31. f.CloseChannel(channelName) // 清理
  32. return nil, fmt.Errorf("failed to set qos: %v", err)
  33. }
  34. // 开始消费
  35. deliveries, err := channel.Consume(
  36. queueRequest.QueueName,
  37. channelName, // 使用通道名称作为消费者标签
  38. false, // 手动确认
  39. false, // 非独占
  40. false, // 不排除本地连接
  41. false, // 阻塞等待
  42. nil, // 额外参数
  43. )
  44. if err != nil {
  45. f.CloseChannel(channelName) // 清理
  46. return nil, fmt.Errorf("failed to start consumer: %v", err)
  47. }
  48. // 创建Consumer对象
  49. consumer := &Consumer{
  50. Channel: channel,
  51. QueueName: queueRequest.QueueName,
  52. Handler: handler,
  53. QueueRequest: queueRequest,
  54. }
  55. // 启动goroutine处理消息
  56. go func() {
  57. for delivery := range deliveries {
  58. queryResult := handler.Process(queueRequest, delivery.Body)
  59. if queryResult.Success {
  60. delivery.Ack(false)
  61. } else {
  62. delivery.Reject(true)
  63. }
  64. }
  65. // deliveries通道关闭时,goroutine自动结束
  66. }()
  67. return consumer, nil
  68. }