| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- package service
-
- import (
- "fmt"
- "strings"
- "time"
-
- "git.x2erp.com/qdy/go-base/logger"
- "git.x2erp.com/qdy/go-base/types"
- "git.x2erp.com/qdy/go-db/factory/doris"
- "git.x2erp.com/qdy/go-db/factory/http"
- )
-
- // queryToCSVAndInsert 处理单次查询并将结果插入到Doris
- func queryToCSVAndInsert(queryRequest types.QueryRequest) *types.QueryResult[interface{}] {
- // 记录查询开始时间
- queryStartTime := time.Now()
-
- // 1. 获取HTTP工厂实例
- httpFactory, err := http.GetHTTPFactory()
- if err != nil {
- logger.Errorf("Failed to get HTTP factory: %v", err)
- return &types.QueryResult[interface{}]{
- Success: false,
- Error: fmt.Sprintf("failed to get HTTP factory: %v", err),
- }
- }
- logger.Debug("HTTP factory created successfully")
-
- // 2. 获取Doris工厂实例
- dorisFactory, err := doris.GetDorisFactory(httpFactory)
- if err != nil {
- logger.Errorf("Failed to get Doris factory: %v", err)
- return &types.QueryResult[interface{}]{
- Success: false,
- Error: fmt.Sprintf("failed to get Doris factory: %v", err),
- }
- }
- logger.Debug("Doris factory created successfully")
-
- // 3. 创建HTTP客户端
- httpClient := httpFactory.CreateClient()
-
- agentQueryRequest := types.QueryRequest{
- SQL: queryRequest.SQL,
- Params: queryRequest.Params,
- PositionalParams: queryRequest.PositionalParams,
- WriterHeader: queryRequest.WriterHeader,
- }
-
- // 4. 发送POST请求到 /api/query/csv 获取CSV格式数据
- resp, err := httpClient.PostWithAuth(
- queryRequest.AgentUrl,
- agentQueryRequest,
- queryRequest.AgentToken,
- nil,
- )
-
- queryEndTime := time.Now()
- queryDuration := queryEndTime.Sub(queryStartTime)
-
- if err != nil {
- logger.Errorf("Query request failed: %v", err)
- return &types.QueryResult[interface{}]{
- Success: false,
- Error: fmt.Sprintf("查询失败: %v", err),
- QueryTime: queryDuration,
- }
- }
-
- if resp.StatusCode() != 200 {
- logger.Errorf("Query request failed with status code: %d", resp.StatusCode())
- return &types.QueryResult[interface{}]{
- Success: false,
- Error: fmt.Sprintf("查询请求失败, 状态码: %d", resp.StatusCode()),
- QueryTime: queryDuration,
- }
- }
-
- // 5. 获取CSV数据
- csvData := string(resp.Body())
- if len(csvData) == 0 {
- logger.Warn("No data queried")
- return &types.QueryResult[interface{}]{
- Success: true,
- Error: "没有查询到数据",
- QueryTime: queryDuration,
- }
- }
-
- // 估算数据行数(CSV行数)
- var totalRows int
- if queryRequest.WriterHeader {
- // 如果有表头,需要减1
- lines := strings.Count(csvData, "\n")
- if lines > 0 {
- totalRows = lines - 1
- }
- } else {
- totalRows = strings.Count(csvData, "\n")
- }
-
- logger.Debug("Query successful, retrieved %d rows of data", totalRows)
-
- // 6. 插入数据到Doris
- database := queryRequest.DorisDatabase
- table := queryRequest.DorisTable
- skipHeader := !queryRequest.WriterHeader // 如果包含表头,则跳过
-
- saveStartTime := time.Now()
- err = dorisFactory.InsertCSV(database, table, csvData, skipHeader)
- saveEndTime := time.Now()
- saveDuration := saveEndTime.Sub(saveStartTime)
-
- if err != nil {
- logger.Errorf("Failed to insert data into Doris: %v", err)
- return &types.QueryResult[interface{}]{
- Success: false,
- Error: fmt.Sprintf("数据插入Doris失败: %v", err),
- QueryTime: queryDuration,
- SaveTime: saveDuration,
- Count: totalRows,
- }
- }
-
- logger.Debug("Data successfully inserted into Doris: database=%s, table=%s, rows=%d",
- database, table, totalRows)
-
- return &types.QueryResult[interface{}]{
- Success: true,
- QueryTime: queryDuration,
- SaveTime: saveDuration,
- Count: totalRows,
- }
- }
-
- // ServiceHandler 服务处理器(供router使用)
- func ServiceAgentToDoris(queryRequest types.QueryRequest) *types.QueryResult[interface{}] {
- // 添加调试日志,确认 queryRequest 是否有数据
- logger.Debug("Processing query request: SQL=%s, AgentUrl=%s, Params count=%d",
- queryRequest.SQL, queryRequest.AgentUrl, len(queryRequest.PositionalParams))
-
- // 执行查询并插入
- return queryToCSVAndInsert(queryRequest)
- }
|