| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- package functions
-
- import (
- "encoding/json"
- "time"
-
- "git.x2erp.com/qdy/go-base/ctx"
- "git.x2erp.com/qdy/go-base/types"
- "git.x2erp.com/qdy/go-db/factory/rabbitmq"
- "github.com/streadway/amqp"
- )
-
- // SendMessage 发送JSON消息
- func SendMessage(rabbitFactory *rabbitmq.RabbitMQFactory, req types.MessageRequest, reqCtx *ctx.RequestContext) *types.QueryResult {
- // 设置默认值
- if req.ChannelName == "" {
- req.ChannelName = "default"
- }
- if req.ExchangeName == "" {
- req.ExchangeName = "" // 默认交换机
- }
- if req.ContentType == "" {
- req.ContentType = "application/json"
- }
- if req.Timestamp.IsZero() {
- req.Timestamp = time.Now()
- }
-
- // 确保通道存在
- channel, err := rabbitFactory.GetChannel(req.ChannelName)
- if err != nil {
- return &types.QueryResult{
- Success: false,
- Error: "Failed to get channel: " + err.Error(),
- Time: time.Now().Format(time.RFC3339),
- }
- }
-
- // 序列化消息
- messageBody, err := json.Marshal(req.Message)
- if err != nil {
- return &types.QueryResult{
- Success: false,
- Error: "Failed to marshal message: " + err.Error(),
- Time: time.Now().Format(time.RFC3339),
- }
- }
-
- // 创建消息
- msg := amqp.Publishing{
- ContentType: req.ContentType,
- Body: messageBody,
- Headers: amqp.Table(req.Headers),
- Priority: req.Priority,
- CorrelationId: req.CorrelationID,
- ReplyTo: req.ReplyTo,
- Expiration: req.Expiration,
- MessageId: req.MessageID,
- Timestamp: req.Timestamp,
- Type: req.Type,
- AppId: req.AppID,
- DeliveryMode: amqp.Persistent, // 持久化消息
- }
-
- // 发送消息
- err = channel.Publish(
- req.ExchangeName,
- req.RoutingKey,
- false, // mandatory
- false, // immediate
- msg,
- )
- if err != nil {
- return &types.QueryResult{
- Success: false,
- Error: "Failed to publish message: " + err.Error(),
- Time: time.Now().Format(time.RFC3339),
- }
- }
-
- return &types.QueryResult{
- Success: true,
- Message: "Message sent successfully",
- Time: time.Now().Format(time.RFC3339),
- Data: map[string]interface{}{
- "exchange": req.ExchangeName,
- "routing_key": req.RoutingKey,
- "channel": req.ChannelName,
- "message_size": len(messageBody),
- "content_type": req.ContentType,
- "timestamp": req.Timestamp.Format(time.RFC3339),
- "MessageID": req.MessageID,
- },
- }
- }
-
- // SendBytesMessage 发送字节消息
- func SendBytesMessage(rabbitFactory *rabbitmq.RabbitMQFactory, req types.BytesMessageRequest, reqCtx *ctx.RequestContext) *types.QueryResult {
- // 设置默认值
- if req.ChannelName == "" {
- req.ChannelName = "default"
- }
- if req.ExchangeName == "" {
- req.ExchangeName = ""
- }
- if req.ContentType == "" {
- req.ContentType = "application/octet-stream"
- }
-
- // 确保通道存在
- channel, err := rabbitFactory.GetChannel(req.ChannelName)
- if err != nil {
- return &types.QueryResult{
- Success: false,
- Error: "Failed to get channel: " + err.Error(),
- Time: time.Now().Format(time.RFC3339),
- }
- }
-
- // 创建消息
- msg := amqp.Publishing{
- ContentType: req.ContentType,
- Body: req.Data,
- Headers: convertHeaders(req.Headers),
- DeliveryMode: amqp.Persistent,
- }
-
- // 发送消息
- err = channel.Publish(
- req.ExchangeName,
- req.RoutingKey,
- false,
- false,
- msg,
- )
- if err != nil {
- return &types.QueryResult{
- Success: false,
- Error: "Failed to publish bytes message: " + err.Error(),
- Time: time.Now().Format(time.RFC3339),
- }
- }
-
- return &types.QueryResult{
- Success: true,
- Message: "Bytes message sent successfully",
- Time: time.Now().Format(time.RFC3339),
- Data: map[string]interface{}{
- "exchange": req.ExchangeName,
- "routing_key": req.RoutingKey,
- "channel": req.ChannelName,
- "data_size": len(req.Data),
- "content_type": req.ContentType,
- },
- }
- }
-
- // 转换headers类型
- func convertHeaders(headers map[string]string) amqp.Table {
- table := make(amqp.Table)
- for k, v := range headers {
- table[k] = v
- }
- return table
- }
|