Geen omschrijving
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

rabbitmq_factory.go 4.4KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. // factory/rabbitmq_factory.go
  2. package rabbitmq
  3. import (
  4. "fmt"
  5. "sync"
  6. "git.x2erp.com/qdy/go-base/config"
  7. "git.x2erp.com/qdy/go-base/config/subconfigs"
  8. amqp "github.com/streadway/amqp"
  9. )
  10. // RabbitMQFactory RabbitMQ工厂
  11. type RabbitMQFactory struct {
  12. mu sync.RWMutex
  13. clients map[string]*amqp.Channel
  14. conn *amqp.Connection
  15. config *subconfigs.RabbitMQConfig
  16. }
  17. // NewRabbitMQFactory 创建RabbitMQ工厂
  18. func NewRabbitMQFactory() (*RabbitMQFactory, error) {
  19. cfg := config.GetConfig()
  20. rabbitConfig := cfg.GetRabbitMQConfig()
  21. if rabbitConfig.Host == "" || rabbitConfig.Port == 0 {
  22. return nil, fmt.Errorf("rabbitmq configuration is incomplete")
  23. }
  24. // 构建连接URL
  25. url := fmt.Sprintf("amqp://%s:%s@%s:%d/%s",
  26. rabbitConfig.Username,
  27. rabbitConfig.Password,
  28. rabbitConfig.Host,
  29. rabbitConfig.Port,
  30. rabbitConfig.Vhost)
  31. // 简单的连接配置
  32. mqConfig := amqp.Config{
  33. Properties: amqp.Table{
  34. "connection_name": cfg.GetServiceConfig().InstanceName,
  35. }}
  36. // 建立连接
  37. conn, err := amqp.DialConfig(url, mqConfig)
  38. if err != nil {
  39. return nil, fmt.Errorf("failed to connect to rabbitmq: %v", err)
  40. }
  41. return &RabbitMQFactory{
  42. conn: conn,
  43. config: rabbitConfig,
  44. clients: make(map[string]*amqp.Channel),
  45. }, nil
  46. }
  47. // CreateChannel 创建消息通道
  48. func (f *RabbitMQFactory) CreateChannel(name string) (*amqp.Channel, error) {
  49. f.mu.Lock()
  50. defer f.mu.Unlock()
  51. // 如果已存在,直接返回
  52. if channel, ok := f.clients[name]; ok && channel != nil {
  53. return channel, nil
  54. }
  55. // 创建新通道
  56. channel, err := f.conn.Channel()
  57. if err != nil {
  58. return nil, fmt.Errorf("failed to create channel '%s': %v", name, err)
  59. }
  60. // 设置QoS(可选)
  61. if f.config.PrefetchCount > 0 {
  62. err = channel.Qos(
  63. f.config.PrefetchCount,
  64. f.config.PrefetchSize,
  65. f.config.Global,
  66. )
  67. if err != nil {
  68. channel.Close()
  69. return nil, fmt.Errorf("failed to set qos for channel '%s': %v", name, err)
  70. }
  71. }
  72. f.clients[name] = channel
  73. return channel, nil
  74. }
  75. // GetChannel 获取已创建的通道
  76. func (f *RabbitMQFactory) GetChannel(name string) (*amqp.Channel, error) {
  77. f.mu.RLock()
  78. defer f.mu.RUnlock()
  79. channel, ok := f.clients[name]
  80. if !ok || channel == nil {
  81. return nil, fmt.Errorf("channel '%s' not found", name)
  82. }
  83. return channel, nil
  84. }
  85. // CloseChannel 关闭指定通道
  86. func (f *RabbitMQFactory) CloseChannel(name string) error {
  87. f.mu.Lock()
  88. defer f.mu.Unlock()
  89. channel, ok := f.clients[name]
  90. if ok && channel != nil {
  91. if err := channel.Close(); err != nil {
  92. return fmt.Errorf("failed to close channel '%s': %v", name, err)
  93. }
  94. delete(f.clients, name)
  95. }
  96. return nil
  97. }
  98. // AddExchange 添加交换机
  99. func (f *RabbitMQFactory) AddExchange(channelName, exchange, exchangeType string, durable bool) error {
  100. channel, err := f.GetChannel(channelName)
  101. if err != nil {
  102. return err
  103. }
  104. return channel.ExchangeDeclare(
  105. exchange,
  106. exchangeType, // "direct", "fanout", "topic", "headers"
  107. durable, // durable
  108. false, // auto-deleted
  109. false, // internal
  110. false, // no-wait
  111. nil, // arguments
  112. )
  113. }
  114. // AddQueue 添加队列
  115. func (f *RabbitMQFactory) AddQueue(channelName, queue string, durable, exclusive, autoDelete bool) error {
  116. channel, err := f.GetChannel(channelName)
  117. if err != nil {
  118. return err
  119. }
  120. _, err = channel.QueueDeclare(
  121. queue,
  122. durable, // durable
  123. autoDelete, // delete when unused
  124. exclusive, // exclusive
  125. false, // no-wait
  126. nil, // arguments
  127. )
  128. return err
  129. }
  130. // BindQueue 绑定队列到交换机
  131. func (f *RabbitMQFactory) BindQueue(channelName, queue, exchange, routingKey string) error {
  132. channel, err := f.GetChannel(channelName)
  133. if err != nil {
  134. return err
  135. }
  136. return channel.QueueBind(
  137. queue,
  138. routingKey,
  139. exchange,
  140. false, // no-wait
  141. nil, // arguments
  142. )
  143. }
  144. // Close 关闭所有连接
  145. func (f *RabbitMQFactory) Close() error {
  146. f.mu.Lock()
  147. defer f.mu.Unlock()
  148. // 关闭所有通道
  149. var errs []error
  150. for name, channel := range f.clients {
  151. if channel != nil {
  152. if err := channel.Close(); err != nil {
  153. errs = append(errs, fmt.Errorf("close channel '%s' error: %v", name, err))
  154. }
  155. }
  156. }
  157. f.clients = make(map[string]*amqp.Channel)
  158. // 关闭连接
  159. if f.conn != nil {
  160. if err := f.conn.Close(); err != nil {
  161. errs = append(errs, fmt.Errorf("close connection error: %v", err))
  162. }
  163. }
  164. if len(errs) > 0 {
  165. return fmt.Errorf("errors closing rabbitmq: %v", errs)
  166. }
  167. return nil
  168. }