暫無描述
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

message_functions.go 3.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. package functions
  2. import (
  3. "encoding/json"
  4. "time"
  5. "git.x2erp.com/qdy/go-base/types"
  6. "git.x2erp.com/qdy/go-db/factory/rabbitmq"
  7. "github.com/streadway/amqp"
  8. )
  9. // SendMessage 发送JSON消息
  10. func SendMessage(rabbitFactory *rabbitmq.RabbitMQFactory, req types.MessageRequest) *types.QueryResult {
  11. // 设置默认值
  12. if req.ChannelName == "" {
  13. req.ChannelName = "default"
  14. }
  15. if req.ExchangeName == "" {
  16. req.ExchangeName = "" // 默认交换机
  17. }
  18. if req.ContentType == "" {
  19. req.ContentType = "application/json"
  20. }
  21. if req.Timestamp.IsZero() {
  22. req.Timestamp = time.Now()
  23. }
  24. // 确保通道存在
  25. channel, err := rabbitFactory.GetChannel(req.ChannelName)
  26. if err != nil {
  27. return &types.QueryResult{
  28. Success: false,
  29. Error: "Failed to get channel: " + err.Error(),
  30. Time: time.Now().Format(time.RFC3339),
  31. }
  32. }
  33. // 序列化消息
  34. messageBody, err := json.Marshal(req.Message)
  35. if err != nil {
  36. return &types.QueryResult{
  37. Success: false,
  38. Error: "Failed to marshal message: " + err.Error(),
  39. Time: time.Now().Format(time.RFC3339),
  40. }
  41. }
  42. // 创建消息
  43. msg := amqp.Publishing{
  44. ContentType: req.ContentType,
  45. Body: messageBody,
  46. Headers: amqp.Table(req.Headers),
  47. Priority: req.Priority,
  48. CorrelationId: req.CorrelationID,
  49. ReplyTo: req.ReplyTo,
  50. Expiration: req.Expiration,
  51. MessageId: req.MessageID,
  52. Timestamp: req.Timestamp,
  53. Type: req.Type,
  54. AppId: req.AppID,
  55. DeliveryMode: amqp.Persistent, // 持久化消息
  56. }
  57. // 发送消息
  58. err = channel.Publish(
  59. req.ExchangeName,
  60. req.RoutingKey,
  61. false, // mandatory
  62. false, // immediate
  63. msg,
  64. )
  65. if err != nil {
  66. return &types.QueryResult{
  67. Success: false,
  68. Error: "Failed to publish message: " + err.Error(),
  69. Time: time.Now().Format(time.RFC3339),
  70. }
  71. }
  72. return &types.QueryResult{
  73. Success: true,
  74. Message: "Message sent successfully",
  75. Time: time.Now().Format(time.RFC3339),
  76. Data: map[string]interface{}{
  77. "exchange": req.ExchangeName,
  78. "routing_key": req.RoutingKey,
  79. "channel": req.ChannelName,
  80. "message_size": len(messageBody),
  81. "content_type": req.ContentType,
  82. "timestamp": req.Timestamp.Format(time.RFC3339),
  83. "MessageID": req.MessageID,
  84. },
  85. }
  86. }
  87. // SendBytesMessage 发送字节消息
  88. func SendBytesMessage(rabbitFactory *rabbitmq.RabbitMQFactory, req types.BytesMessageRequest) *types.QueryResult {
  89. // 设置默认值
  90. if req.ChannelName == "" {
  91. req.ChannelName = "default"
  92. }
  93. if req.ExchangeName == "" {
  94. req.ExchangeName = ""
  95. }
  96. if req.ContentType == "" {
  97. req.ContentType = "application/octet-stream"
  98. }
  99. // 确保通道存在
  100. channel, err := rabbitFactory.GetChannel(req.ChannelName)
  101. if err != nil {
  102. return &types.QueryResult{
  103. Success: false,
  104. Error: "Failed to get channel: " + err.Error(),
  105. Time: time.Now().Format(time.RFC3339),
  106. }
  107. }
  108. // 创建消息
  109. msg := amqp.Publishing{
  110. ContentType: req.ContentType,
  111. Body: req.Data,
  112. Headers: convertHeaders(req.Headers),
  113. DeliveryMode: amqp.Persistent,
  114. }
  115. // 发送消息
  116. err = channel.Publish(
  117. req.ExchangeName,
  118. req.RoutingKey,
  119. false,
  120. false,
  121. msg,
  122. )
  123. if err != nil {
  124. return &types.QueryResult{
  125. Success: false,
  126. Error: "Failed to publish bytes message: " + err.Error(),
  127. Time: time.Now().Format(time.RFC3339),
  128. }
  129. }
  130. return &types.QueryResult{
  131. Success: true,
  132. Message: "Bytes message sent successfully",
  133. Time: time.Now().Format(time.RFC3339),
  134. Data: map[string]interface{}{
  135. "exchange": req.ExchangeName,
  136. "routing_key": req.RoutingKey,
  137. "channel": req.ChannelName,
  138. "data_size": len(req.Data),
  139. "content_type": req.ContentType,
  140. },
  141. }
  142. }
  143. // 转换headers类型
  144. func convertHeaders(headers map[string]string) amqp.Table {
  145. table := make(amqp.Table)
  146. for k, v := range headers {
  147. table[k] = v
  148. }
  149. return table
  150. }