| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- package rabbitmq
-
- import (
- "fmt"
-
- "git.x2erp.com/qdy/go-base/types"
- amqp "github.com/streadway/amqp"
- )
-
- // MessageHandler 消息处理器接口
- type MessageHandler interface {
- Process(ueueRequest *types.QueueRequest, body []byte) types.QueryResult
- }
-
- // Consumer 消费者结构体
- type Consumer struct {
- Channel *amqp.Channel
- QueueName string
- Handler MessageHandler
- QueueRequest *types.QueueRequest
- }
-
- // CreateConsumer 创建消费者
- // 返回Consumer对象,创建后自动开始消费
- func (f *RabbitMQFactory) CreateConsumer(queueRequest *types.QueueRequest, consumerID string, handler MessageHandler) (*Consumer, error) {
- // 生成唯一的通道名称
- channelName := fmt.Sprintf("%s_%s", queueRequest.QueueName, consumerID)
-
- // 创建新通道
- channel, err := f.CreateChannel(channelName)
- if err != nil {
- return nil, fmt.Errorf("failed to create channel: %v", err)
- }
-
- // 设置QoS(一次处理一条)
- err = channel.Qos(1, 0, false)
- if err != nil {
- f.CloseChannel(channelName) // 清理
- return nil, fmt.Errorf("failed to set qos: %v", err)
- }
-
- // 开始消费
- deliveries, err := channel.Consume(
- queueRequest.QueueName,
- channelName, // 使用通道名称作为消费者标签
- false, // 手动确认
- false, // 非独占
- false, // 不排除本地连接
- false, // 阻塞等待
- nil, // 额外参数
- )
-
- if err != nil {
- f.CloseChannel(channelName) // 清理
- return nil, fmt.Errorf("failed to start consumer: %v", err)
- }
-
- // 创建Consumer对象
- consumer := &Consumer{
- Channel: channel,
- QueueName: queueRequest.QueueName,
- Handler: handler,
- QueueRequest: queueRequest,
- }
-
- // 启动goroutine处理消息
- go func() {
- for delivery := range deliveries {
- queryResult := handler.Process(queueRequest, delivery.Body)
- if queryResult.Success {
- delivery.Ack(false)
- } else {
- delivery.Reject(true)
- }
- }
- // deliveries通道关闭时,goroutine自动结束
- }()
-
- return consumer, nil
- }
|