| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196 |
- // factory/rabbitmq_factory.go
- package rabbitmq
-
- import (
- "fmt"
- "sync"
-
- "git.x2erp.com/qdy/go-base/config"
- "git.x2erp.com/qdy/go-base/config/subconfigs"
- amqp "github.com/streadway/amqp"
- )
-
- // RabbitMQFactory RabbitMQ工厂
- type RabbitMQFactory struct {
- mu sync.RWMutex
- clients map[string]*amqp.Channel
- conn *amqp.Connection
- config *subconfigs.RabbitMQConfig
- }
-
- // NewRabbitMQFactory 创建RabbitMQ工厂
- func NewRabbitMQFactory() (*RabbitMQFactory, error) {
- cfg := config.GetConfig()
-
- rabbitConfig := cfg.GetRabbitMQConfig()
- if rabbitConfig.Host == "" || rabbitConfig.Port == 0 {
- return nil, fmt.Errorf("rabbitmq configuration is incomplete")
- }
-
- // 构建连接URL
- url := fmt.Sprintf("amqp://%s:%s@%s:%d/%s",
- rabbitConfig.Username,
- rabbitConfig.Password,
- rabbitConfig.Host,
- rabbitConfig.Port,
- rabbitConfig.Vhost)
-
- // 简单的连接配置
- mqConfig := amqp.Config{
- Properties: amqp.Table{
- "connection_name": cfg.GetServiceConfig().InstanceName,
- }}
-
- // 建立连接
- conn, err := amqp.DialConfig(url, mqConfig)
- if err != nil {
- return nil, fmt.Errorf("failed to connect to rabbitmq: %v", err)
- }
-
- return &RabbitMQFactory{
- conn: conn,
- config: rabbitConfig,
- clients: make(map[string]*amqp.Channel),
- }, nil
- }
-
- // CreateChannel 创建消息通道
- func (f *RabbitMQFactory) CreateChannel(name string) (*amqp.Channel, error) {
- f.mu.Lock()
- defer f.mu.Unlock()
-
- // 如果已存在,直接返回
- if channel, ok := f.clients[name]; ok && channel != nil {
- return channel, nil
- }
-
- // 创建新通道
- channel, err := f.conn.Channel()
- if err != nil {
- return nil, fmt.Errorf("failed to create channel '%s': %v", name, err)
- }
-
- // 设置QoS(可选)
- if f.config.PrefetchCount > 0 {
- err = channel.Qos(
- f.config.PrefetchCount,
- f.config.PrefetchSize,
- f.config.Global,
- )
- if err != nil {
- channel.Close()
- return nil, fmt.Errorf("failed to set qos for channel '%s': %v", name, err)
- }
- }
-
- f.clients[name] = channel
- return channel, nil
- }
-
- // GetChannel 获取已创建的通道
- func (f *RabbitMQFactory) GetChannel(name string) (*amqp.Channel, error) {
- f.mu.RLock()
- defer f.mu.RUnlock()
-
- channel, ok := f.clients[name]
- if !ok || channel == nil {
- return nil, fmt.Errorf("channel '%s' not found", name)
- }
- return channel, nil
- }
-
- // CloseChannel 关闭指定通道
- func (f *RabbitMQFactory) CloseChannel(name string) error {
- f.mu.Lock()
- defer f.mu.Unlock()
-
- channel, ok := f.clients[name]
- if ok && channel != nil {
- if err := channel.Close(); err != nil {
- return fmt.Errorf("failed to close channel '%s': %v", name, err)
- }
- delete(f.clients, name)
- }
- return nil
- }
-
- // AddExchange 添加交换机
- func (f *RabbitMQFactory) AddExchange(channelName, exchange, exchangeType string, durable bool) error {
- channel, err := f.GetChannel(channelName)
- if err != nil {
- return err
- }
-
- return channel.ExchangeDeclare(
- exchange,
- exchangeType, // "direct", "fanout", "topic", "headers"
- durable, // durable
- false, // auto-deleted
- false, // internal
- false, // no-wait
- nil, // arguments
- )
- }
-
- // AddQueue 添加队列
- func (f *RabbitMQFactory) AddQueue(channelName, queue string, durable, exclusive, autoDelete bool) error {
- channel, err := f.GetChannel(channelName)
- if err != nil {
- return err
- }
-
- _, err = channel.QueueDeclare(
- queue,
- durable, // durable
- autoDelete, // delete when unused
- exclusive, // exclusive
- false, // no-wait
- nil, // arguments
- )
- return err
- }
-
- // BindQueue 绑定队列到交换机
- func (f *RabbitMQFactory) BindQueue(channelName, queue, exchange, routingKey string) error {
- channel, err := f.GetChannel(channelName)
- if err != nil {
- return err
- }
-
- return channel.QueueBind(
- queue,
- routingKey,
- exchange,
- false, // no-wait
- nil, // arguments
- )
- }
-
- // Close 关闭所有连接
- func (f *RabbitMQFactory) Close() error {
- f.mu.Lock()
- defer f.mu.Unlock()
-
- // 关闭所有通道
- var errs []error
- for name, channel := range f.clients {
- if channel != nil {
- if err := channel.Close(); err != nil {
- errs = append(errs, fmt.Errorf("close channel '%s' error: %v", name, err))
- }
- }
- }
- f.clients = make(map[string]*amqp.Channel)
-
- // 关闭连接
- if f.conn != nil {
- if err := f.conn.Close(); err != nil {
- errs = append(errs, fmt.Errorf("close connection error: %v", err))
- }
- }
-
- if len(errs) > 0 {
- return fmt.Errorf("errors closing rabbitmq: %v", errs)
- }
- return nil
- }
|