package service import ( "fmt" "log" "strings" "time" "git.x2erp.com/qdy/go-base/types" "git.x2erp.com/qdy/go-db/factory/database" "git.x2erp.com/qdy/go-db/factory/doris" "git.x2erp.com/qdy/go-db/factory/http" ) // queryToCSVAndInsert 处理单次查询并将结果插入到Doris func queryToCSVAndInsert(dbFactory *database.DBFactory, queryRequest types.QueryRequest) (*types.QueryResult, error) { // 记录查询开始时间 queryStartTime := time.Now() // 1. 获取HTTP工厂实例 httpFactory, err := http.GetHTTPFactory() if err != nil { return nil, fmt.Errorf("failed to get HTTP factory: %v", err) } log.Println("HTTP factory created successfully") // 2. 获取Doris工厂实例 dorisFactory, err := doris.GetDorisFactory(httpFactory) if err != nil { return nil, fmt.Errorf("failed to get Doris factory: %v", err) } log.Println("Doris factory created successfully") // 3. 创建HTTP客户端 httpClient := httpFactory.CreateClient() agentQueryRequest := types.QueryRequest{ SQL: queryRequest.SQL, Params: queryRequest.Params, PositionalParams: queryRequest.PositionalParams, WriterHeader: queryRequest.WriterHeader, } // 4. 发送POST请求到 /api/query/csv 获取CSV格式数据 resp, err := httpClient.PostWithAuth( queryRequest.AgentUrl, agentQueryRequest, queryRequest.AgentToken, nil, ) queryEndTime := time.Now() queryDuration := queryEndTime.Sub(queryStartTime) if err != nil { return &types.QueryResult{ Success: false, Error: fmt.Sprintf("查询失败: %v", err), QueryTime: queryDuration, }, nil } if resp.StatusCode() != 200 { return &types.QueryResult{ Success: false, Error: fmt.Sprintf("查询请求失败, 状态码: %d", resp.StatusCode()), QueryTime: queryDuration, }, nil } // 5. 获取CSV数据 csvData := string(resp.Body()) if len(csvData) == 0 { return &types.QueryResult{ Success: false, Error: "没有查询到数据", QueryTime: queryDuration, }, nil } // 估算数据行数(CSV行数) var totalRows int if queryRequest.WriterHeader { // 如果有表头,需要减1 lines := strings.Count(csvData, "\n") if lines > 0 { totalRows = lines - 1 } } else { totalRows = strings.Count(csvData, "\n") } log.Printf("查询成功,获取到 %d 行数据\n", totalRows) // 6. 插入数据到Doris database := queryRequest.DorisDatabase table := queryRequest.DorisTable skipHeader := !queryRequest.WriterHeader // 如果包含表头,则跳过 saveStartTime := time.Now() err = dorisFactory.InsertCSV(database, table, csvData, skipHeader) saveEndTime := time.Now() saveDuration := saveEndTime.Sub(saveStartTime) if err != nil { return &types.QueryResult{ Success: false, Error: fmt.Sprintf("数据插入Doris失败: %v", err), QueryTime: queryDuration, SaveTime: saveDuration, Count: totalRows, }, nil } return &types.QueryResult{ Success: true, QueryTime: queryDuration, SaveTime: saveDuration, Count: totalRows, }, nil } // ServiceHandler 服务处理器(供router使用) func ServiceAgentToDoris(dbFactory *database.DBFactory, queryRequest types.QueryRequest) *types.QueryResult { // 添加调试日志,确认 queryRequest 是否有数据 //log.Printf("SQL: %s", queryRequest.SQL) //log.Printf("AgentUrl: %s", queryRequest.AgentUrl) //log.Printf("参数数量: %d", len(queryRequest.PositionalParams)) // 执行查询并插入 executionResult, err := queryToCSVAndInsert(dbFactory, queryRequest) if err != nil { return &types.QueryResult{ Success: false, Error: "服务内部错误: " + err.Error(), } } return executionResult }