| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- package main
-
- import (
- "encoding/json"
- "log"
- "net/http"
- "strings"
- "time"
-
- "git.x2erp.com/qdy/go-base/config"
- "git.x2erp.com/qdy/go-base/logger"
- "git.x2erp.com/qdy/go-base/myservice"
- "git.x2erp.com/qdy/go-base/types"
- "git.x2erp.com/qdy/go-db/factory/database"
- "git.x2erp.com/qdy/go-svc-worker/service"
- "go-micro.dev/v4/metadata"
- )
-
- // 定义业务服务
- type DBFactory struct {
- dbFactory *database.DBFactory
- }
-
- // 配置
- var (
- cfg config.IConfig
- serviceName string
- serviceVersion string
- )
-
- func main() {
-
- // ========== 第一阶段:强制写入文件的启动日志 ==========
- // 这一步确保即使配置加载失败,也有日志记录
- if err := logger.InitBootLog("svc-worker"); err != nil {
- // 连启动日志都初始化失败,只能输出到控制台
- log.Fatal("无法初始化启动日志: ", err)
- }
-
- logger.BootLog("开始加载配置...")
- cfg, err := config.GetConfig()
- if err != nil {
- log.Fatalf("Failed to create RabbitMQ factory: %v", err)
- }
-
- serviceConfig := cfg.GetService()
- microConfig := cfg.GetMicro()
- serviceName = serviceConfig.ServiceName
- serviceVersion = serviceConfig.ServiceVersion
-
- log.Printf("serviceName: %s", serviceName)
- log.Printf("Port: %d", serviceConfig.Port)
- log.Printf("Consul: %s", microConfig.RegistryAddress)
-
- // 2. 初始化数据库
- dbFactory, err := database.GetDBFactory()
- if err != nil {
- log.Fatal("数据库连接失败:", err)
- }
- defer func() {
- if err := dbFactory.Close(); err != nil {
- logger.Info("数据库关闭错误: %v", err)
- }
- }()
-
- // 3. 创建服务实例
- dbfactory := &DBFactory{dbFactory: dbFactory}
-
- // 4. 使用 micro.Start 启动服务
- webService := myservice.Start(cfg)
-
- // 7. 注册HTTP路由
- webService.Handle("/", http.HandlerFunc(rootHandler))
- webService.Handle("/health", http.HandlerFunc(dbfactory.healthHandler))
- webService.Handle("/info", http.HandlerFunc(infoHandler))
- webService.Handle("/api/data/agent/to/doris", authMiddleware(http.HandlerFunc(dbfactory.agentToDorisHandler)))
-
- logger.InitRuntimeLogger("order-service", cfg.GetLog())
-
- log.Println("日志系统初始化完成")
-
- //关闭-启动日志输出文件功能
- logger.CloseBootLogger()
-
- if err := webService.Run(); err != nil {
- log.Fatal("服务运行失败:", err)
- }
-
- }
-
- // 根处理器
- func rootHandler(w http.ResponseWriter, r *http.Request) {
- if r.URL.Path != "/" {
- http.NotFound(w, r)
- return
- }
- respondJSON(w, http.StatusOK, map[string]string{
- "service": serviceName,
- "status": "running",
- "mode": "http-microservice",
- })
- }
-
- // 健康检查处理器
- func (s *DBFactory) healthHandler(w http.ResponseWriter, r *http.Request) {
- if err := s.dbFactory.TestConnection(s.dbFactory.GetDBType()); err != nil {
- respondJSON(w, http.StatusServiceUnavailable, map[string]string{
- "status": "down",
- "error": err.Error(),
- })
- return
- }
- respondJSON(w, http.StatusOK, map[string]string{
- "status": "up",
- "time": time.Now().Format(time.RFC3339),
- })
- }
-
- // 信息处理器
- func infoHandler(w http.ResponseWriter, r *http.Request) {
- respondJSON(w, http.StatusOK, map[string]interface{}{
- "service": serviceName,
- "version": serviceVersion,
- "api": map[string]string{
- "POST /api/data/agent/to/doris": "同步数据到Doris",
- "GET /health": "健康检查",
- "GET /info": "服务信息",
- "GET /": "根路径",
- },
- "features": []string{
- "服务发现(Consul)",
- "负载均衡",
- "健康检查",
- "HTTP API网关",
- },
- })
- }
-
- // AgentToDoris处理器
- func (s *DBFactory) agentToDorisHandler(w http.ResponseWriter, r *http.Request) {
- if r.Method != "POST" {
- respondJSON(w, http.StatusMethodNotAllowed, types.QueryResult{
- Error: "只支持POST请求",
- Success: false,
- })
- return
- }
-
- // 解析请求
- var requestData types.QueryRequest
- if err := json.NewDecoder(r.Body).Decode(&requestData); err != nil {
- respondJSON(w, http.StatusBadRequest, map[string]string{
- "error": "无效的JSON数据",
- })
- return
- }
-
- // 处理业务逻辑
- result := service.ServiceAgentToDoris(s.dbFactory, requestData)
-
- respondJSON(w, http.StatusOK, result)
- }
-
- // 认证中间件
- func authMiddleware(next http.Handler) http.Handler {
- return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-
- // JWT令牌认证
- token := r.Header.Get("Authorization")
- if token != "" && strings.HasPrefix(token, "Bearer ") {
- token = token[7:]
- }
-
- // 双重认证:API密钥或JWT
- if token == "" {
- respondJSON(w, http.StatusUnauthorized, map[string]string{
- "error": "需要API密钥或Bearer令牌",
- })
- return
- }
-
- // 验证JWT令牌
- if token != "" && !isValidJWT(token) {
- respondJSON(w, http.StatusUnauthorized, map[string]string{
- "error": "无效的访问令牌",
- })
- return
- }
-
- // 将认证信息添加到上下文
- ctx := r.Context()
- if token != "" {
- ctx = metadata.Set(ctx, "Authorization", "Bearer "+token)
- }
-
- next.ServeHTTP(w, r.WithContext(ctx))
- })
- }
-
- // JWT验证
- func isValidJWT(token string) bool {
- // TODO: 实现JWT验证逻辑
- // 可以使用 github.com/golang-jwt/jwt/v5
- // 临时实现:检查token是否有效格式
- //if len(token) < 10 {
- // return false
- // }
- return true // 临时返回true,实际需要验证签名和过期时间
- }
-
- // JSON响应辅助函数
- func respondJSON(w http.ResponseWriter, status int, data interface{}) {
- w.Header().Set("Content-Type", "application/json")
- w.Header().Set("X-Service-Name", serviceName)
- w.Header().Set("X-Service-Version", serviceVersion)
- w.WriteHeader(status)
-
- if err := json.NewEncoder(w).Encode(data); err != nil {
- log.Printf("JSON编码错误: %v", err)
- }
- }
|