package service import ( "fmt" "strings" "time" "git.x2erp.com/qdy/go-base/logger" "git.x2erp.com/qdy/go-base/types" "git.x2erp.com/qdy/go-db/factory/doris" "git.x2erp.com/qdy/go-db/factory/http" ) // queryToCSVAndInsert 处理单次查询并将结果插入到Doris func queryToCSVAndInsert(queryRequest types.QueryRequest) *types.QueryResult[interface{}] { // 记录查询开始时间 queryStartTime := time.Now() // 1. 获取HTTP工厂实例 httpFactory, err := http.GetHTTPFactory() if err != nil { logger.Errorf("Failed to get HTTP factory: %v", err) return &types.QueryResult[interface{}]{ Success: false, Error: fmt.Sprintf("failed to get HTTP factory: %v", err), } } logger.Debug("HTTP factory created successfully") // 2. 获取Doris工厂实例 dorisFactory, err := doris.GetDorisFactory(httpFactory) if err != nil { logger.Errorf("Failed to get Doris factory: %v", err) return &types.QueryResult[interface{}]{ Success: false, Error: fmt.Sprintf("failed to get Doris factory: %v", err), } } logger.Debug("Doris factory created successfully") // 3. 创建HTTP客户端 httpClient := httpFactory.CreateClient() agentQueryRequest := types.QueryRequest{ SQL: queryRequest.SQL, Params: queryRequest.Params, PositionalParams: queryRequest.PositionalParams, WriterHeader: queryRequest.WriterHeader, } // 4. 发送POST请求到 /api/query/csv 获取CSV格式数据 resp, err := httpClient.PostWithAuth( queryRequest.AgentUrl, agentQueryRequest, queryRequest.AgentToken, nil, ) queryEndTime := time.Now() queryDuration := queryEndTime.Sub(queryStartTime) if err != nil { logger.Errorf("Query request failed: %v", err) return &types.QueryResult[interface{}]{ Success: false, Error: fmt.Sprintf("查询失败: %v", err), QueryTime: queryDuration, } } if resp.StatusCode() != 200 { logger.Errorf("Query request failed with status code: %d", resp.StatusCode()) return &types.QueryResult[interface{}]{ Success: false, Error: fmt.Sprintf("查询请求失败, 状态码: %d", resp.StatusCode()), QueryTime: queryDuration, } } // 5. 获取CSV数据 csvData := string(resp.Body()) if len(csvData) == 0 { logger.Warn("No data queried") return &types.QueryResult[interface{}]{ Success: true, Error: "没有查询到数据", QueryTime: queryDuration, } } // 估算数据行数(CSV行数) var totalRows int if queryRequest.WriterHeader { // 如果有表头,需要减1 lines := strings.Count(csvData, "\n") if lines > 0 { totalRows = lines - 1 } } else { totalRows = strings.Count(csvData, "\n") } logger.Debug("Query successful, retrieved %d rows of data", totalRows) // 6. 插入数据到Doris database := queryRequest.DorisDatabase table := queryRequest.DorisTable skipHeader := !queryRequest.WriterHeader // 如果包含表头,则跳过 saveStartTime := time.Now() err = dorisFactory.InsertCSV(database, table, csvData, skipHeader) saveEndTime := time.Now() saveDuration := saveEndTime.Sub(saveStartTime) if err != nil { logger.Errorf("Failed to insert data into Doris: %v", err) return &types.QueryResult[interface{}]{ Success: false, Error: fmt.Sprintf("数据插入Doris失败: %v", err), QueryTime: queryDuration, SaveTime: saveDuration, Count: totalRows, } } logger.Debug("Data successfully inserted into Doris: database=%s, table=%s, rows=%d", database, table, totalRows) return &types.QueryResult[interface{}]{ Success: true, QueryTime: queryDuration, SaveTime: saveDuration, Count: totalRows, } } // ServiceHandler 服务处理器(供router使用) func ServiceAgentToDoris(queryRequest types.QueryRequest) *types.QueryResult[interface{}] { // 添加调试日志,确认 queryRequest 是否有数据 logger.Debug("Processing query request: SQL=%s, AgentUrl=%s, Params count=%d", queryRequest.SQL, queryRequest.AgentUrl, len(queryRequest.PositionalParams)) // 执行查询并插入 return queryToCSVAndInsert(queryRequest) }