暫無描述
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

message_functions.go 3.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. package functions
  2. import (
  3. "encoding/json"
  4. "time"
  5. "git.x2erp.com/qdy/go-base/ctx"
  6. "git.x2erp.com/qdy/go-base/types"
  7. "git.x2erp.com/qdy/go-db/factory/rabbitmq"
  8. "github.com/streadway/amqp"
  9. )
  10. // SendMessage 发送JSON消息
  11. func SendMessage(rabbitFactory *rabbitmq.RabbitMQFactory, req types.MessageRequest, reqCtx *ctx.RequestContext) *types.QueryResult {
  12. // 设置默认值
  13. if req.ChannelName == "" {
  14. req.ChannelName = "default"
  15. }
  16. if req.ExchangeName == "" {
  17. req.ExchangeName = "" // 默认交换机
  18. }
  19. if req.ContentType == "" {
  20. req.ContentType = "application/json"
  21. }
  22. if req.Timestamp.IsZero() {
  23. req.Timestamp = time.Now()
  24. }
  25. // 确保通道存在
  26. channel, err := rabbitFactory.GetChannel(req.ChannelName)
  27. if err != nil {
  28. return &types.QueryResult{
  29. Success: false,
  30. Error: "Failed to get channel: " + err.Error(),
  31. Time: time.Now().Format(time.RFC3339),
  32. }
  33. }
  34. // 序列化消息
  35. messageBody, err := json.Marshal(req.Message)
  36. if err != nil {
  37. return &types.QueryResult{
  38. Success: false,
  39. Error: "Failed to marshal message: " + err.Error(),
  40. Time: time.Now().Format(time.RFC3339),
  41. }
  42. }
  43. // 创建消息
  44. msg := amqp.Publishing{
  45. ContentType: req.ContentType,
  46. Body: messageBody,
  47. Headers: amqp.Table(req.Headers),
  48. Priority: req.Priority,
  49. CorrelationId: req.CorrelationID,
  50. ReplyTo: req.ReplyTo,
  51. Expiration: req.Expiration,
  52. MessageId: req.MessageID,
  53. Timestamp: req.Timestamp,
  54. Type: req.Type,
  55. AppId: req.AppID,
  56. DeliveryMode: amqp.Persistent, // 持久化消息
  57. }
  58. // 发送消息
  59. err = channel.Publish(
  60. req.ExchangeName,
  61. req.RoutingKey,
  62. false, // mandatory
  63. false, // immediate
  64. msg,
  65. )
  66. if err != nil {
  67. return &types.QueryResult{
  68. Success: false,
  69. Error: "Failed to publish message: " + err.Error(),
  70. Time: time.Now().Format(time.RFC3339),
  71. }
  72. }
  73. return &types.QueryResult{
  74. Success: true,
  75. Message: "Message sent successfully",
  76. Time: time.Now().Format(time.RFC3339),
  77. Data: map[string]interface{}{
  78. "exchange": req.ExchangeName,
  79. "routing_key": req.RoutingKey,
  80. "channel": req.ChannelName,
  81. "message_size": len(messageBody),
  82. "content_type": req.ContentType,
  83. "timestamp": req.Timestamp.Format(time.RFC3339),
  84. "MessageID": req.MessageID,
  85. },
  86. }
  87. }
  88. // SendBytesMessage 发送字节消息
  89. func SendBytesMessage(rabbitFactory *rabbitmq.RabbitMQFactory, req types.BytesMessageRequest, reqCtx *ctx.RequestContext) *types.QueryResult {
  90. // 设置默认值
  91. if req.ChannelName == "" {
  92. req.ChannelName = "default"
  93. }
  94. if req.ExchangeName == "" {
  95. req.ExchangeName = ""
  96. }
  97. if req.ContentType == "" {
  98. req.ContentType = "application/octet-stream"
  99. }
  100. // 确保通道存在
  101. channel, err := rabbitFactory.GetChannel(req.ChannelName)
  102. if err != nil {
  103. return &types.QueryResult{
  104. Success: false,
  105. Error: "Failed to get channel: " + err.Error(),
  106. Time: time.Now().Format(time.RFC3339),
  107. }
  108. }
  109. // 创建消息
  110. msg := amqp.Publishing{
  111. ContentType: req.ContentType,
  112. Body: req.Data,
  113. Headers: convertHeaders(req.Headers),
  114. DeliveryMode: amqp.Persistent,
  115. }
  116. // 发送消息
  117. err = channel.Publish(
  118. req.ExchangeName,
  119. req.RoutingKey,
  120. false,
  121. false,
  122. msg,
  123. )
  124. if err != nil {
  125. return &types.QueryResult{
  126. Success: false,
  127. Error: "Failed to publish bytes message: " + err.Error(),
  128. Time: time.Now().Format(time.RFC3339),
  129. }
  130. }
  131. return &types.QueryResult{
  132. Success: true,
  133. Message: "Bytes message sent successfully",
  134. Time: time.Now().Format(time.RFC3339),
  135. Data: map[string]interface{}{
  136. "exchange": req.ExchangeName,
  137. "routing_key": req.RoutingKey,
  138. "channel": req.ChannelName,
  139. "data_size": len(req.Data),
  140. "content_type": req.ContentType,
  141. },
  142. }
  143. }
  144. // 转换headers类型
  145. func convertHeaders(headers map[string]string) amqp.Table {
  146. table := make(amqp.Table)
  147. for k, v := range headers {
  148. table[k] = v
  149. }
  150. return table
  151. }