package service import ( "fmt" "log" "strings" "time" "git.x2erp.com/qdy/go-base/types" "git.x2erp.com/qdy/go-db/factory/database" "git.x2erp.com/qdy/go-db/factory/doris" "git.x2erp.com/qdy/go-db/factory/http" "github.com/gin-gonic/gin" ) // // QueryRequest 查询请求结构体 // type QueryRequest struct { // SQL string `json:"sql" binding:"required"` // Params map[string]interface{} `json:"params,omitempty"` // 名称参数 // PositionalParams []interface{} `json:"positionalParams,omitempty"` // 位置参数 // WriterHeader bool `json:"writerHeader"` // 是否写入CSV头行 // } // // QueryResult 查询结果 // type QueryResult struct { // Success bool `json:"success"` // ErrorMessage string `json:"errorMessage,omitempty"` // QueryTime time.Duration `json:"queryTime"` // SaveTime time.Duration `json:"saveTime"` // TotalRows int `json:"totalRows"` // CSVData string `json:"csvData,omitempty"` // } // queryToCSVAndInsert 处理单次查询并将结果插入到Doris func queryToCSVAndInsert(dbFactory *database.DBFactory, queryRequest types.QueryRequest) (*types.QueryResult, error) { // 记录查询开始时间 queryStartTime := time.Now() // 1. 获取HTTP工厂实例 httpFactory, err := http.GetHTTPFactory() if err != nil { return nil, fmt.Errorf("failed to get HTTP factory: %v", err) } log.Println("HTTP factory created successfully") // 2. 获取Doris工厂实例 dorisFactory, err := doris.GetDorisFactory(httpFactory) if err != nil { return nil, fmt.Errorf("failed to get Doris factory: %v", err) } log.Println("Doris factory created successfully") // 3. 创建HTTP客户端 httpClient := httpFactory.CreateClient() agentQueryRequest := types.QueryRequest{ SQL: queryRequest.SQL, Params: queryRequest.Params, PositionalParams: queryRequest.PositionalParams, WriterHeader: queryRequest.WriterHeader, } // 4. 发送POST请求到 /api/query/csv 获取CSV格式数据 resp, err := httpClient.PostWithAuth( queryRequest.AgentUrl, agentQueryRequest, queryRequest.AgentToken, nil, ) queryEndTime := time.Now() queryDuration := queryEndTime.Sub(queryStartTime) if err != nil { return &types.QueryResult{ Success: false, Error: fmt.Sprintf("查询失败: %v", err), QueryTime: queryDuration, }, nil } if resp.StatusCode() != 200 { return &types.QueryResult{ Success: false, Error: fmt.Sprintf("查询请求失败, 状态码: %d", resp.StatusCode()), QueryTime: queryDuration, }, nil } // 5. 获取CSV数据 csvData := string(resp.Body()) if len(csvData) == 0 { return &types.QueryResult{ Success: false, Error: "没有查询到数据", QueryTime: queryDuration, }, nil } // 估算数据行数(CSV行数) var totalRows int if queryRequest.WriterHeader { // 如果有表头,需要减1 lines := strings.Count(csvData, "\n") if lines > 0 { totalRows = lines - 1 } } else { totalRows = strings.Count(csvData, "\n") } log.Printf("查询成功,获取到 %d 行数据\n", totalRows) // 6. 插入数据到Doris database := queryRequest.DorisDatabase table := queryRequest.DorisTable skipHeader := !queryRequest.WriterHeader // 如果包含表头,则跳过 saveStartTime := time.Now() err = dorisFactory.InsertCSV(database, table, csvData, skipHeader) saveEndTime := time.Now() saveDuration := saveEndTime.Sub(saveStartTime) if err != nil { return &types.QueryResult{ Success: false, Error: fmt.Sprintf("数据插入Doris失败: %v", err), QueryTime: queryDuration, SaveTime: saveDuration, Count: totalRows, }, nil } return &types.QueryResult{ Success: true, QueryTime: queryDuration, SaveTime: saveDuration, Count: totalRows, }, nil } // ServiceHandler 服务处理器(供router使用) func ServiceAgentToDoris(dbFactory *database.DBFactory) func(c *gin.Context, req types.QueryRequest) { return func(c *gin.Context, queryRequest types.QueryRequest) { // 创建返回结果对象 result := &types.QueryResult{} // 添加调试日志,确认 queryRequest 是否有数据 //log.Printf("SQL: %s", queryRequest.SQL) //log.Printf("AgentUrl: %s", queryRequest.AgentUrl) //log.Printf("参数数量: %d", len(queryRequest.PositionalParams)) // 执行查询并插入 executionResult, err := queryToCSVAndInsert(dbFactory, queryRequest) if err != nil { result.Success = false result.Error = "服务内部错误: " + err.Error() c.JSON(500, result) return } // 使用QueryResult统一返回 result.Success = executionResult.Success result.QueryTime = executionResult.QueryTime result.SaveTime = executionResult.SaveTime result.TotalCount = executionResult.TotalCount // 计算总时间(秒) totalTime := executionResult.QueryTime + executionResult.SaveTime result.Time = totalTime.String() // 根据执行结果设置其他字段 if executionResult.Success { result.Data = gin.H{ "message": "数据成功插入到Doris", } result.Count = executionResult.TotalCount c.JSON(200, result) } else { result.Error = executionResult.Error result.Count = executionResult.TotalCount // 注意:这里使用200而不是500,因为executionResult.Success=false可能表示业务逻辑失败而非服务器错误 c.JSON(200, result) } } }