Bez popisu
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

agentToDoris.go 3.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package service
  2. import (
  3. "fmt"
  4. "log"
  5. "strings"
  6. "time"
  7. "git.x2erp.com/qdy/go-base/types"
  8. "git.x2erp.com/qdy/go-db/factory/database"
  9. "git.x2erp.com/qdy/go-db/factory/doris"
  10. "git.x2erp.com/qdy/go-db/factory/http"
  11. )
  12. // queryToCSVAndInsert 处理单次查询并将结果插入到Doris
  13. func queryToCSVAndInsert(dbFactory *database.DBFactory, queryRequest types.QueryRequest) (*types.QueryResult, error) {
  14. // 记录查询开始时间
  15. queryStartTime := time.Now()
  16. // 1. 获取HTTP工厂实例
  17. httpFactory, err := http.GetHTTPFactory()
  18. if err != nil {
  19. return nil, fmt.Errorf("failed to get HTTP factory: %v", err)
  20. }
  21. log.Println("HTTP factory created successfully")
  22. // 2. 获取Doris工厂实例
  23. dorisFactory, err := doris.GetDorisFactory(httpFactory)
  24. if err != nil {
  25. return nil, fmt.Errorf("failed to get Doris factory: %v", err)
  26. }
  27. log.Println("Doris factory created successfully")
  28. // 3. 创建HTTP客户端
  29. httpClient := httpFactory.CreateClient()
  30. agentQueryRequest := types.QueryRequest{
  31. SQL: queryRequest.SQL,
  32. Params: queryRequest.Params,
  33. PositionalParams: queryRequest.PositionalParams,
  34. WriterHeader: queryRequest.WriterHeader,
  35. }
  36. // 4. 发送POST请求到 /api/query/csv 获取CSV格式数据
  37. resp, err := httpClient.PostWithAuth(
  38. queryRequest.AgentUrl,
  39. agentQueryRequest,
  40. queryRequest.AgentToken,
  41. nil,
  42. )
  43. queryEndTime := time.Now()
  44. queryDuration := queryEndTime.Sub(queryStartTime)
  45. if err != nil {
  46. return &types.QueryResult{
  47. Success: false,
  48. Error: fmt.Sprintf("查询失败: %v", err),
  49. QueryTime: queryDuration,
  50. }, nil
  51. }
  52. if resp.StatusCode() != 200 {
  53. return &types.QueryResult{
  54. Success: false,
  55. Error: fmt.Sprintf("查询请求失败, 状态码: %d", resp.StatusCode()),
  56. QueryTime: queryDuration,
  57. }, nil
  58. }
  59. // 5. 获取CSV数据
  60. csvData := string(resp.Body())
  61. if len(csvData) == 0 {
  62. return &types.QueryResult{
  63. Success: false,
  64. Error: "没有查询到数据",
  65. QueryTime: queryDuration,
  66. }, nil
  67. }
  68. // 估算数据行数(CSV行数)
  69. var totalRows int
  70. if queryRequest.WriterHeader {
  71. // 如果有表头,需要减1
  72. lines := strings.Count(csvData, "\n")
  73. if lines > 0 {
  74. totalRows = lines - 1
  75. }
  76. } else {
  77. totalRows = strings.Count(csvData, "\n")
  78. }
  79. log.Printf("查询成功,获取到 %d 行数据\n", totalRows)
  80. // 6. 插入数据到Doris
  81. database := queryRequest.DorisDatabase
  82. table := queryRequest.DorisTable
  83. skipHeader := !queryRequest.WriterHeader // 如果包含表头,则跳过
  84. saveStartTime := time.Now()
  85. err = dorisFactory.InsertCSV(database, table, csvData, skipHeader)
  86. saveEndTime := time.Now()
  87. saveDuration := saveEndTime.Sub(saveStartTime)
  88. if err != nil {
  89. return &types.QueryResult{
  90. Success: false,
  91. Error: fmt.Sprintf("数据插入Doris失败: %v", err),
  92. QueryTime: queryDuration,
  93. SaveTime: saveDuration,
  94. Count: totalRows,
  95. }, nil
  96. }
  97. return &types.QueryResult{
  98. Success: true,
  99. QueryTime: queryDuration,
  100. SaveTime: saveDuration,
  101. Count: totalRows,
  102. }, nil
  103. }
  104. // ServiceHandler 服务处理器(供router使用)
  105. func ServiceAgentToDoris(dbFactory *database.DBFactory, queryRequest types.QueryRequest) *types.QueryResult {
  106. // 添加调试日志,确认 queryRequest 是否有数据
  107. //log.Printf("SQL: %s", queryRequest.SQL)
  108. //log.Printf("AgentUrl: %s", queryRequest.AgentUrl)
  109. //log.Printf("参数数量: %d", len(queryRequest.PositionalParams))
  110. // 执行查询并插入
  111. executionResult, err := queryToCSVAndInsert(dbFactory, queryRequest)
  112. if err != nil {
  113. return &types.QueryResult{
  114. Success: false,
  115. Error: "服务内部错误: " + err.Error(),
  116. }
  117. }
  118. return executionResult
  119. }