설명 없음
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

agent_to_doris.go 4.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package service
  2. import (
  3. "fmt"
  4. "strings"
  5. "time"
  6. "git.x2erp.com/qdy/go-base/logger"
  7. "git.x2erp.com/qdy/go-base/model/request/dorisreq"
  8. "git.x2erp.com/qdy/go-base/model/response"
  9. "git.x2erp.com/qdy/go-db/factory/doris"
  10. "git.x2erp.com/qdy/go-db/factory/http"
  11. )
  12. // queryToCSVAndInsert 处理单次查询并将结果插入到Doris
  13. func queryToCSVAndInsert(queryRequest dorisreq.QueryRequest) *response.QueryResult[interface{}] {
  14. // 记录查询开始时间
  15. queryStartTime := time.Now()
  16. // 1. 获取HTTP工厂实例
  17. httpFactory, err := http.GetHTTPFactory()
  18. if err != nil {
  19. logger.Errorf("Failed to get HTTP factory: %v", err)
  20. return &response.QueryResult[interface{}]{
  21. Success: false,
  22. Error: fmt.Sprintf("failed to get HTTP factory: %v", err),
  23. }
  24. }
  25. logger.Debug("HTTP factory created successfully")
  26. // 2. 获取Doris工厂实例
  27. dorisFactory, err := doris.GetDorisFactory(httpFactory)
  28. if err != nil {
  29. logger.Errorf("Failed to get Doris factory: %v", err)
  30. return &response.QueryResult[interface{}]{
  31. Success: false,
  32. Error: fmt.Sprintf("failed to get Doris factory: %v", err),
  33. }
  34. }
  35. logger.Debug("Doris factory created successfully")
  36. // 3. 创建HTTP客户端
  37. httpClient := httpFactory.CreateClient()
  38. agentQueryRequest := dorisreq.QueryRequest{
  39. SQL: queryRequest.SQL,
  40. Params: queryRequest.Params,
  41. PositionalParams: queryRequest.PositionalParams,
  42. WriterHeader: queryRequest.WriterHeader,
  43. }
  44. // 4. 发送POST请求到 /api/query/csv 获取CSV格式数据
  45. resp, err := httpClient.PostWithAuth(
  46. queryRequest.AgentUrl,
  47. agentQueryRequest,
  48. queryRequest.AgentToken,
  49. nil,
  50. )
  51. queryEndTime := time.Now()
  52. queryDuration := queryEndTime.Sub(queryStartTime)
  53. if err != nil {
  54. logger.Errorf("Query request failed: %v", err)
  55. return &response.QueryResult[interface{}]{
  56. Success: false,
  57. Error: fmt.Sprintf("查询失败: %v", err),
  58. QueryTime: queryDuration,
  59. }
  60. }
  61. if resp.StatusCode() != 200 {
  62. logger.Errorf("Query request failed with status code: %d", resp.StatusCode())
  63. return &response.QueryResult[interface{}]{
  64. Success: false,
  65. Error: fmt.Sprintf("查询请求失败, 状态码: %d", resp.StatusCode()),
  66. QueryTime: queryDuration,
  67. }
  68. }
  69. // 5. 获取CSV数据
  70. csvData := string(resp.Body())
  71. if len(csvData) == 0 {
  72. logger.Warn("No data queried")
  73. return &response.QueryResult[interface{}]{
  74. Success: true,
  75. Error: "没有查询到数据",
  76. QueryTime: queryDuration,
  77. }
  78. }
  79. // 估算数据行数(CSV行数)
  80. var totalRows int
  81. if queryRequest.WriterHeader {
  82. // 如果有表头,需要减1
  83. lines := strings.Count(csvData, "\n")
  84. if lines > 0 {
  85. totalRows = lines - 1
  86. }
  87. } else {
  88. totalRows = strings.Count(csvData, "\n")
  89. }
  90. logger.Debug("Query successful, retrieved %d rows of data", totalRows)
  91. // 6. 插入数据到Doris
  92. database := queryRequest.DorisDatabase
  93. table := queryRequest.DorisTable
  94. skipHeader := !queryRequest.WriterHeader // 如果包含表头,则跳过
  95. saveStartTime := time.Now()
  96. err = dorisFactory.InsertCSV(database, table, csvData, skipHeader)
  97. saveEndTime := time.Now()
  98. saveDuration := saveEndTime.Sub(saveStartTime)
  99. if err != nil {
  100. logger.Errorf("Failed to insert data into Doris: %v", err)
  101. return &response.QueryResult[interface{}]{
  102. Success: false,
  103. Error: fmt.Sprintf("数据插入Doris失败: %v", err),
  104. QueryTime: queryDuration,
  105. SaveTime: saveDuration,
  106. Count: totalRows,
  107. }
  108. }
  109. logger.Debug("Data successfully inserted into Doris: database=%s, table=%s, rows=%d",
  110. database, table, totalRows)
  111. return &response.QueryResult[interface{}]{
  112. Success: true,
  113. QueryTime: queryDuration,
  114. SaveTime: saveDuration,
  115. Count: totalRows,
  116. }
  117. }
  118. // ServiceHandler 服务处理器(供router使用)
  119. func ServiceAgentToDoris(queryRequest dorisreq.QueryRequest) *response.QueryResult[interface{}] {
  120. // 添加调试日志,确认 queryRequest 是否有数据
  121. logger.Debug("Processing query request: SQL=%s, AgentUrl=%s, Params count=%d",
  122. queryRequest.SQL, queryRequest.AgentUrl, len(queryRequest.PositionalParams))
  123. // 执行查询并插入
  124. return queryToCSVAndInsert(queryRequest)
  125. }