package functions import ( "encoding/json" "time" "git.x2erp.com/qdy/go-base/ctx" "git.x2erp.com/qdy/go-base/types" "git.x2erp.com/qdy/go-db/factory/rabbitmq" "github.com/streadway/amqp" ) // SendMessage 发送JSON消息 func SendMessage(rabbitFactory *rabbitmq.RabbitMQFactory, req types.MessageRequest, reqCtx *ctx.RequestContext) *types.QueryResult { // 设置默认值 if req.ChannelName == "" { req.ChannelName = "default" } if req.ExchangeName == "" { req.ExchangeName = "" // 默认交换机 } if req.ContentType == "" { req.ContentType = "application/json" } if req.Timestamp.IsZero() { req.Timestamp = time.Now() } // 确保通道存在 channel, err := rabbitFactory.GetChannel(req.ChannelName) if err != nil { return &types.QueryResult{ Success: false, Error: "Failed to get channel: " + err.Error(), Time: time.Now().Format(time.RFC3339), } } // 序列化消息 messageBody, err := json.Marshal(req.Message) if err != nil { return &types.QueryResult{ Success: false, Error: "Failed to marshal message: " + err.Error(), Time: time.Now().Format(time.RFC3339), } } // 创建消息 msg := amqp.Publishing{ ContentType: req.ContentType, Body: messageBody, Headers: amqp.Table(req.Headers), Priority: req.Priority, CorrelationId: req.CorrelationID, ReplyTo: req.ReplyTo, Expiration: req.Expiration, MessageId: req.MessageID, Timestamp: req.Timestamp, Type: req.Type, AppId: req.AppID, DeliveryMode: amqp.Persistent, // 持久化消息 } // 发送消息 err = channel.Publish( req.ExchangeName, req.RoutingKey, false, // mandatory false, // immediate msg, ) if err != nil { return &types.QueryResult{ Success: false, Error: "Failed to publish message: " + err.Error(), Time: time.Now().Format(time.RFC3339), } } return &types.QueryResult{ Success: true, Message: "Message sent successfully", Time: time.Now().Format(time.RFC3339), Data: map[string]interface{}{ "exchange": req.ExchangeName, "routing_key": req.RoutingKey, "channel": req.ChannelName, "message_size": len(messageBody), "content_type": req.ContentType, "timestamp": req.Timestamp.Format(time.RFC3339), "MessageID": req.MessageID, }, } } // SendBytesMessage 发送字节消息 func SendBytesMessage(rabbitFactory *rabbitmq.RabbitMQFactory, req types.BytesMessageRequest, reqCtx *ctx.RequestContext) *types.QueryResult { // 设置默认值 if req.ChannelName == "" { req.ChannelName = "default" } if req.ExchangeName == "" { req.ExchangeName = "" } if req.ContentType == "" { req.ContentType = "application/octet-stream" } // 确保通道存在 channel, err := rabbitFactory.GetChannel(req.ChannelName) if err != nil { return &types.QueryResult{ Success: false, Error: "Failed to get channel: " + err.Error(), Time: time.Now().Format(time.RFC3339), } } // 创建消息 msg := amqp.Publishing{ ContentType: req.ContentType, Body: req.Data, Headers: convertHeaders(req.Headers), DeliveryMode: amqp.Persistent, } // 发送消息 err = channel.Publish( req.ExchangeName, req.RoutingKey, false, false, msg, ) if err != nil { return &types.QueryResult{ Success: false, Error: "Failed to publish bytes message: " + err.Error(), Time: time.Now().Format(time.RFC3339), } } return &types.QueryResult{ Success: true, Message: "Bytes message sent successfully", Time: time.Now().Format(time.RFC3339), Data: map[string]interface{}{ "exchange": req.ExchangeName, "routing_key": req.RoutingKey, "channel": req.ChannelName, "data_size": len(req.Data), "content_type": req.ContentType, }, } } // 转换headers类型 func convertHeaders(headers map[string]string) amqp.Table { table := make(amqp.Table) for k, v := range headers { table[k] = v } return table }