package rabbitmq import ( "fmt" "git.x2erp.com/qdy/go-base/types" amqp "github.com/streadway/amqp" ) // MessageHandler 消息处理器接口 type MessageHandler interface { Process(ueueRequest *types.QueueRequest, body []byte) types.QueryResult[interface{}] } // Consumer 消费者结构体 type Consumer struct { Channel *amqp.Channel QueueName string Handler MessageHandler QueueRequest *types.QueueRequest } // CreateConsumer 创建消费者 // 返回Consumer对象,创建后自动开始消费 func (f *RabbitMQFactory) CreateConsumer(queueRequest *types.QueueRequest, consumerID string, handler MessageHandler) (*Consumer, error) { // 生成唯一的通道名称 channelName := fmt.Sprintf("%s_%s", queueRequest.QueueName, consumerID) // 创建新通道 channel, err := f.CreateChannel(channelName) if err != nil { return nil, fmt.Errorf("failed to create channel: %v", err) } // 设置QoS(一次处理一条) err = channel.Qos(1, 0, false) if err != nil { f.CloseChannel(channelName) // 清理 return nil, fmt.Errorf("failed to set qos: %v", err) } // 开始消费 deliveries, err := channel.Consume( queueRequest.QueueName, channelName, // 使用通道名称作为消费者标签 false, // 手动确认 false, // 非独占 false, // 不排除本地连接 false, // 阻塞等待 nil, // 额外参数 ) if err != nil { f.CloseChannel(channelName) // 清理 return nil, fmt.Errorf("failed to start consumer: %v", err) } // 创建Consumer对象 consumer := &Consumer{ Channel: channel, QueueName: queueRequest.QueueName, Handler: handler, QueueRequest: queueRequest, } // 启动goroutine处理消息 go func() { for delivery := range deliveries { queryResult := handler.Process(queueRequest, delivery.Body) if queryResult.Success { delivery.Ack(false) } else { delivery.Reject(true) } } // deliveries通道关闭时,goroutine自动结束 }() return consumer, nil }